В конце прошлого года я написал пару статей, в которых показано, как можно использовать Spray.io для создания службы REST на основе Scala () и как создать сервер веб-сокетов с помощью Scala, Akka и responsetivemongo (). Я хотел немного больше изучить серверную часть REST, но обнаружил, что в конце 2013 года Spray.io был приобретен typesafe и будет интегрирован со стеком Akka. Итак, в этой статье мы рассмотрим, как вы можете использовать функциональные возможности HTTP Akka для создания простого веб-сервера, и в дальнейшем мы рассмотрим, как маршрутизация из Spray.io была перенесена в Akka.
В этой статье мы предпримем следующие шаги:
- Получите несколько фиктивных данных в mongoDB для тестирования.
- Создайте сервер с помощью Akka Http, который использует простой асинхронный обработчик для обработки запросов.
- Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.
- Протестируйте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http.
Итак, давайте начнем с некоторой подготовительной работы и перенесем некоторые данные в mongoDB, чтобы мы могли с ними работать.
Загрузка данных в mongoDB
Для этого примера мы используем некоторую информацию, касающуюся акций, которую вы можете скачать здесь ( http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip ). Вы можете легко сделать это, выполнив следующие шаги:
Сначала получите данные:
Запустите mongodb в другом терминале
mongod --dbpath ./data/
И, наконец, используйте mongoimport для импорта данных.
unzip -c stocks.zip | mongoimport --db akka --collection stocks
И в качестве быстрой проверки запустите запрос, чтобы увидеть, все ли работает:
[email protected]:~$ mongo akka MongoDB shell version: 2.4.8 connecting to: akka > db.stocks.findOne({},{Company: 1, Country: 1, Ticker:1 } ) { "_id" : ObjectId("52853800bb1177ca391c17ff"), "Ticker" : "A", "Country" : "USA", "Company" : "Agilent Technologies Inc." } >
На данный момент у нас есть наши тестовые данные и мы можем посмотреть на код, необходимый для запуска сервера.
Создайте сервер, который использует простой асинхронный обработчик для обработки запросов
Для работы с Akka Http и доступа к данным в монго нам понадобятся дополнительные библиотеки. Итак, прежде чем что-то делать, давайте сначала посмотрим на файл сборки sbt, который мы использовали для этой статьи:
import com.typesafe.sbt.SbtAspectj._ name := "http-akka" version := "1.0" scalaVersion := "2.11.5" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-M2", "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23", "org.reactivemongo" %% "play2-reactivemongo" % "0.10.5.0.akka23", "com.typesafe.play" % "play-json_2.11" % "2.4.0-M2", "ch.qos.logback" % "logback-classic" % "1.1.2" ) resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/" resolvers += "Typesafe" at "https://repo.typesafe.com/typesafe/releases/" mainClass in (Compile, run) := Some("Boot")
Когда вы просматриваете зависимости, вы видите обычных подозреваемых:
- akka-http-core-экспериментальный содержит весь http-сервер и клиентский материал, который мы собираемся использовать. Эта библиотека зависит от akka-stream, поэтому мы также получим эту библиотеку в нашем пути к классам.
- «Реактимонго» позволяет нам реагировать на монго.
- Я также включил play2-реагирующий mongo и play-json, что значительно упрощает преобразование BSON, возвращенного из монго в JSON.
- Наконец, для регистрации мы добавляем logback.
Теперь, прежде чем мы рассмотрим код, необходимый для запуска сервера, давайте быстро рассмотрим, как мы будем запрашивать монго. Для этого мы создали простой вспомогательный объект с креативным именем Database:
import reactivemongo.api._ import reactivemongo.api.collections.default.BSONCollection import reactivemongo.bson.BSONDocument import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future object Database { val collection = connect() def connect(): BSONCollection = { val driver = new MongoDriver val connection = driver.connection(List("localhost")) val db = connection("akka") db.collection("stocks") } def findAllTickers(): Future[List[BSONDocument]] = { val query = BSONDocument() val filter = BSONDocument("Company" -> 1, "Country" -> 1, "Ticker" -> 1) // which results in a Future[List[BSONDocument]] Database.collection .find(query, filter) .cursor[BSONDocument] .collect[List]() } def findTicker(ticker: String) : Future[Option[BSONDocument]] = { val query = BSONDocument("Ticker" -> ticker) Database.collection .find(query) .one } }
Не так много, чтобы объяснить. Здесь важно отметить, что обе функции поиска возвращают будущее, поэтому вызовы этих функций не будут блокироваться. Теперь, когда у нас есть основы, давайте рассмотрим код для первого http-сервера, который использует асинхронный обработчик.
/** * Simple Object that starts an HTTP server using akka-http. All requests are handled * through an Akka flow. */ object Boot extends App { // the actor system to use. Required for flowmaterializer and HTTP. // passed in implicit implicit val system = ActorSystem("Streams") implicit val materializer = FlowMaterializer() // start the server on the specified interface and port. val serverBinding2 = Http().bind(interface = "localhost", port = 8091) serverBinding2.connections.foreach { connection => connection.handleWith(Flow[HttpRequest].mapAsync(asyncHandler)) } }
В этом фрагменте кода мы создаем http-сервер, который прослушивает порт 8091. Мы обрабатываем каждое соединение, созданное с помощью asyncHandler. Этот обработчик должен возвращать Future [HttpResponse]. Давайте посмотрим на этот обработчик следующий:
// With an async handler, we use futures. Threads aren't blocked. def asyncHandler(request: HttpRequest): Future[HttpResponse] = { // we match the request, and some simple path checking request match { // match specific path. Returns all the avaiable tickers case HttpRequest(GET, Uri.Path("/getAllTickers"), _, _, _) => { // make a db call, which returns a future. // use for comprehension to flatmap this into // a Future[HttpResponse] for { input <- Database.findAllTickers } yield { HttpResponse(entity = convertToString(input)) } } // match GET pat. Return a single ticker case HttpRequest(GET, Uri.Path("/get"), _, _, _) => { // next we match on the query paramter request.uri.query.get("ticker") match { // if we find the query parameter case Some(queryParameter) => { // query the database val ticker = Database.findTicker(queryParameter) // use a simple for comprehension, to make // working with futures easier. for { t <- ticker } yield { t match { case Some(bson) => HttpResponse(entity = convertToString(bson)) case None => HttpResponse(status = StatusCodes.OK) } } } // if the query parameter isn't there case None => Future(HttpResponse(status = StatusCodes.OK)) } } // Simple case that matches everything, just return a not found case HttpRequest(_, _, _, _, _) => { Future[HttpResponse] { HttpResponse(status = StatusCodes.NotFound) } } } }
Как видно из этого кода, код обработчика довольно прост. Мы используем сопоставление с образцом для сопоставления с конкретным URL-адресом и используем объект базы данных, который мы видели ранее, для запроса монго. Обратите внимание на вызовы convertToString. Это пара вспомогательных методов, которые конвертируют BSON в JSON, используя библиотеки воспроизведения, которые мы включили ранее:
def convertToString(input: List[BSONDocument]) : String = { input .map(f => convertToString(f)) .mkString("[", ",", "]") } def convertToString(input: BSONDocument) : String = { Json.stringify(BSONFormats.toJSON(input)) }
Когда мы запустим этот сервер и откроем адрес в браузере, мы увидим что-то вроде этого:
Легко ли? Теперь давайте посмотрим на более сложный сценарий.
Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.
Akka-http внутренне использует akka-streams для обработки http-соединений. Это означает, что мы можем использовать akka-streams для быстрой обработки http-запросов. Для линейного потока мы можем использовать стандартный API потока, предоставляемый akka. Для более сложных графов akka-streams предоставляет собственный DSL, с помощью которого вы можете очень легко создавать более сложные графы, в которых события потока обрабатываются параллельно.
Давайте создадим новую привязку сервера, которая прослушивает порт 8090:
object Boot extends App { // the actor system to use. Required for flowmaterializer and HTTP. // passed in implicit implicit val system = ActorSystem("Streams") implicit val materializer = FlowMaterializer() // start the server on the specified interface and port. val serverBinding1 = Http().bind(interface = "localhost", port = 8090) serverBinding1.connections.foreach { connection => connection.handleWith(broadCastMergeFlow) } }
Эта привязка к серверу создается так же, как мы делали ранее. Основное отличие состоит в том, что на этот раз мы не передаем обработку запроса обработчику, а указываем экземпляр потока с именем broadCastMergeFlow. Этот поток слияния выглядит следующим образом:
val bCast = Broadcast[HttpRequest] // some basic steps that each retrieve a different ticket value (as a future) val step1 = Flow[HttpRequest].mapAsync[String](getTickerHandler("GOOG")) val step2 = Flow[HttpRequest].mapAsync[String](getTickerHandler("AAPL")) val step3 = Flow[HttpRequest].mapAsync[String](getTickerHandler("MSFT")) // We'll use the source and output provided by the http endpoint val in = UndefinedSource[HttpRequest] val out = UndefinedSink[HttpResponse] // when an element is available on one of the inputs, take // that one, igore the rest val merge = Merge[String] // since merge doesn't output a HttpResponse add an additional map step. val mapToResponse = Flow[String].map[HttpResponse]( (inp:String) => HttpResponse(status = StatusCodes.OK, entity = inp) ) // define another flow. This uses the merge function which // takes the first available response val broadCastMergeFlow = Flow[HttpRequest, HttpResponse]() { implicit builder => bCast ~> step1 ~> merge in ~> bCast ~> step2 ~> merge ~> mapToResponse ~> out bCast ~> step3 ~> merge (in, out) }
Наиболее важной частью являются последние несколько строк в этом фрагменте кода. Здесь мы рисуем график, который определяет, как обрабатывается сообщение, когда оно обрабатывается сервером. В этом случае мы сначала транслируем входящий HTTP-запрос на три параллельных потока. В каждом потоке мы затем делаем вызов в нашу базу данных, чтобы получить билет. Затем мы объединяем результаты вместе (объединение занимает первое доступное даже восходящее направление) и создаем ответ. Таким образом, в зависимости от того, какой из шагов является самым быстрым, мы возвращаем тикер для GOOG, AAPL или MSFT. Чтобы лучше увидеть результат, мы добавили сон в getTickerHandler:
def getTickerHandler(tickName: String)(request: HttpRequest): Future[String] = { // query the database val ticker = Database.findTicker(tickName) Thread.sleep(Math.random() * 1000 toInt) // use a simple for comprehension, to make // working with futures easier. for { t <- ticker } yield { t match { case Some(bson) => convertToString(bson) case None => "" } } }
Аккуратно верно! Akka-streams предоставляет ряд базовых строительных блоков, которые вы можете использовать для создания этих потоков (дополнительную информацию см. В их документации: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2. / scala / s … ). Например, если мы хотим сжать ответы шагов вместе, мы могли бы создать поток как это:
// waits for events on the three inputs and returns a response val zip = ZipWith[String, String, String, HttpResponse] ( (inp1, inp2, inp3) => new HttpResponse(status = StatusCodes.OK,entity = inp1 + inp2 + inp3) // define a flow which broadcasts the request to the three // steps, and uses the zipWith to combine the elements before val broadCastZipFlow = Flow[HttpRequest, HttpResponse]() { implicit builder => bCast ~> step1 ~> zip.input1 in ~> bCast ~> step2 ~> zip.input2 ~> out bCast ~> step3 ~> zip.input3 (in, out) }
Мне очень нравится, как это работает и как легко визуализировать данные, проходящие через различные этапы. Если мы используем подход слияния, вы увидите результат, который выглядит примерно так (когда вызывается 10 раз):
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217 {"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217 {"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217 {"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282 {"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217 {"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217 {"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282 {"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217 {"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217 {"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
В заключительной части я хотел бы показать, как вы можете использовать тот же подход при создании http-клиента с помощью akka-http.
Протестируйте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http
Akka-http также предоставляет функциональные возможности для простой настройки http-клиента, который также использует потоковый / потоковый подход обработки сообщений. Следующий листинг показывает полный запущенный клиент:
import akka.actor.ActorSystem import akka.http.Http import akka.stream.FlowMaterializer import akka.http.model._ import akka.stream.scaladsl._ import akka.stream.scaladsl.Source import akka.stream.scaladsl.FlowGraphImplicits._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future /** * Simple HTTP client created with akka-http */ object Client extends App { // the actor system to use. Required for flowmaterializer and HTTP. // passed in implicit implicit val system = ActorSystem("ServerTest") implicit val materializer = FlowMaterializer() val httpClient1 = Http(system).outgoingConnection("localhost", 8090).flow val httpClient2 = Http(system).outgoingConnection("localhost", 8091).flow // define a sink that will process the answer // we could also process this as a flow val printChunksConsumer = Sink.foreach[HttpResponse] { res => if(res.status == StatusCodes.OK) { println("Recieved response : " + res); res.entity.getDataBytes().map { chunk => System.out.println("Chunk: " + chunk.decodeString(HttpCharsets.`UTF-8`.value).substring(0, 80)) }.to(Sink.ignore).run() } else println(res.status) } // we need to set allow cycles since internally the httpclient // has some cyclic flows (apparently) // we construct a sink, to which we connect a later to define source. val reqFlow2: Sink[HttpRequest] = Sink[HttpRequest]() { implicit b => b.allowCycles() val source = UndefinedSource[HttpRequest] val bcast = Broadcast[HttpRequest] val concat = Concat[HttpResponse] // simple graph. Duplicate the request, send twice. // concat the result. bcast ~> httpClient1 ~> concat.first source ~> bcast ~> httpClient1 ~> concat.second ~> printChunksConsumer source } // make two calls, both return futures, first one shows direct linked sinks and // sources. Second one makes yse if our graph. // make number of calls val res = 1 to 5 map( i => { Source.single(HttpRequest()).to(reqFlow2).run().get(printChunksConsumer) }) val f = Future.sequence(res) // make some calls with filled in request URI val f3 = Source.single(HttpRequest(uri = Uri("/getAllTickers"))).via(httpClient2).runWith(printChunksConsumer) val f4 = Source.single(HttpRequest(uri = Uri("/get?ticker=ADAT"))).via(httpClient2).runWith(printChunksConsumer) val f5 = Source.single(HttpRequest(uri = Uri("/get?tikcer=FNB"))).via(httpClient2).runWith(printChunksConsumer) for { f2Result <- f f2Result <- f3 f2Result <- f4 f2Result <- f5 } yield ({ println("All calls done") system.shutdown() system.awaitTermination() } ) }
Я не буду вдаваться в подробности, поскольку код следует тому же процессу, что и для HTTP-сервера. Вот и все для этой статьи и введения в akka-stream и akka-http. Мне очень нравится их подход к обработке сообщений и созданию читабельного, реактивного кода. В следующей статье мы рассмотрим некоторые другие аспекты akka-http (например, маршруты).