В конце прошлого года я написал пару статей, в которых показано, как можно использовать Spray.io для создания службы REST на основе Scala () и как создать сервер веб-сокетов с помощью Scala, Akka и responsetivemongo (). Я хотел немного больше изучить серверную часть REST, но обнаружил, что в конце 2013 года Spray.io был приобретен typesafe и будет интегрирован со стеком Akka. Итак, в этой статье мы рассмотрим, как вы можете использовать функциональные возможности HTTP Akka для создания простого веб-сервера, и в дальнейшем мы рассмотрим, как маршрутизация из Spray.io была перенесена в Akka.
В этой статье мы предпримем следующие шаги:
- Получите несколько фиктивных данных в mongoDB для тестирования.
 - Создайте сервер с помощью Akka Http, который использует простой асинхронный обработчик для обработки запросов.
 - Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.
 - Протестируйте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http.
 
Итак, давайте начнем с некоторой подготовительной работы и перенесем некоторые данные в mongoDB, чтобы мы могли с ними работать.
Загрузка данных в mongoDB
Для этого примера мы используем некоторую информацию, касающуюся акций, которую вы можете скачать здесь ( http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip ). Вы можете легко сделать это, выполнив следующие шаги:
Сначала получите данные:
| 
 1 
 | 
wget http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip | 
Запустите mongodb в другом терминале
| 
 1 
 | 
mongod --dbpath ./data/ | 
И, наконец, используйте mongoimport для импорта данных.
| 
 1 
 | 
unzip -c stocks.zip | mongoimport --db akka --collection stocks | 
И в качестве быстрой проверки запустите запрос, чтобы увидеть, все ли работает:
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
 | 
jos@Joss-MacBook-Pro.local:~$ mongo akka      MongoDB shell version: 2.4.8connecting to: akka> db.stocks.findOne({},{Company: 1, Country: 1, Ticker:1 } ){        "_id" : ObjectId("52853800bb1177ca391c17ff"),        "Ticker" : "A",        "Country" : "USA",        "Company" : "Agilent Technologies Inc."}> | 
На данный момент у нас есть наши тестовые данные и мы можем посмотреть на код, необходимый для запуска сервера.
Создайте сервер, который использует простой асинхронный обработчик для обработки запросов
Для работы с Akka Http и доступа к данным в монго нам понадобятся дополнительные библиотеки. Поэтому, прежде чем мы что-то еще сделаем, давайте сначала посмотрим на файл сборки sbt, который мы использовали для этой статьи:
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
 | 
import com.typesafe.sbt.SbtAspectj._name := "http-akka"version := "1.0"scalaVersion := "2.11.5"libraryDependencies ++= Seq(  "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-M2",  "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",  "org.reactivemongo" %% "play2-reactivemongo" % "0.10.5.0.akka23",  "com.typesafe.play" % "play-json_2.11" % "2.4.0-M2",  "ch.qos.logback" % "logback-classic" % "1.1.2")mainClass in (Compile, run) := Some("Boot") | 
Когда вы просматриваете зависимости, вы видите обычных подозреваемых:
- akka-http-core-экспериментальный содержит весь http-сервер и клиентский материал, который мы собираемся использовать. Эта библиотека зависит от akka-stream, поэтому мы также получим эту библиотеку в нашем пути к классам.
 - «Реактимонго» позволяет нам реагировать на монго.
 - Я также включил play2-реагирующий mongo и play-json, что значительно упрощает преобразование BSON, возвращенного из монго в JSON.
 - Наконец, для регистрации мы добавляем logback.
 
Теперь, прежде чем мы рассмотрим код, необходимый для запуска сервера, давайте быстро рассмотрим, как мы будем запрашивать монго. Для этого мы создали простой вспомогательный объект с креативным именем Database:
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
 | 
import reactivemongo.api._import reactivemongo.api.collections.default.BSONCollectionimport reactivemongo.bson.BSONDocumentimport scala.concurrent.ExecutionContext.Implicits.globalimport scala.concurrent.Futureobject Database {  val collection = connect()  def connect(): BSONCollection = {    val driver = new MongoDriver    val connection = driver.connection(List("localhost"))    val db = connection("akka")    db.collection("stocks")  }  def findAllTickers(): Future[List[BSONDocument]] = {    val query = BSONDocument()    val filter = BSONDocument("Company" -> 1, "Country" -> 1, "Ticker" -> 1)    // which results in a Future[List[BSONDocument]]    Database.collection      .find(query, filter)      .cursor[BSONDocument]      .collect[List]()  }  def findTicker(ticker: String) : Future[Option[BSONDocument]] = {    val query = BSONDocument("Ticker" -> ticker)    Database.collection      .find(query)      .one  }} | 
Не так много, чтобы объяснить. Здесь важно отметить, что обе функции поиска возвращают будущее, поэтому вызовы этих функций не будут блокироваться. Теперь, когда у нас есть основы, давайте рассмотрим код для первого http-сервера, который использует асинхронный обработчик.
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
 | 
/** * Simple Object that starts an HTTP server using akka-http. All requests are handled * through an Akka flow. */object Boot extends App {  // the actor system to use. Required for flowmaterializer and HTTP.  // passed in implicit  implicit val system = ActorSystem("Streams")  implicit val materializer = FlowMaterializer()  // start the server on the specified interface and port.  val serverBinding2 = Http().bind(interface = "localhost", port = 8091)  serverBinding2.connections.foreach { connection =>    connection.handleWith(Flow[HttpRequest].mapAsync(asyncHandler))   } } | 
В этом фрагменте кода мы создаем http-сервер, который прослушивает порт 8091. Мы обрабатываем каждое соединение, созданное с помощью asyncHandler. Этот обработчик должен возвращать Future [HttpResponse]. Давайте посмотрим на этот обработчик следующий:
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
 | 
// With an async handler, we use futures. Threads aren't blocked.def asyncHandler(request: HttpRequest): Future[HttpResponse] = {  // we match the request, and some simple path checking  request match {    // match specific path. Returns all the avaiable tickers    case HttpRequest(GET, Uri.Path("/getAllTickers"), _, _, _) => {      // make a db call, which returns a future.      // use for comprehension to flatmap this into      // a Future[HttpResponse]      for {        input <- Database.findAllTickers      } yield {        HttpResponse(entity = convertToString(input))      }    }    // match GET pat. Return a single ticker    case HttpRequest(GET, Uri.Path("/get"), _, _, _) => {      // next we match on the query paramter      request.uri.query.get("ticker") match {          // if we find the query parameter          case Some(queryParameter) => {            // query the database            val ticker = Database.findTicker(queryParameter)            // use a simple for comprehension, to make            // working with futures easier.            for {              t <- ticker            } yield  {              t match {                case Some(bson) => HttpResponse(entity = convertToString(bson))                case None => HttpResponse(status = StatusCodes.OK)              }            }          }          // if the query parameter isn't there          case None => Future(HttpResponse(status = StatusCodes.OK))        }    }    // Simple case that matches everything, just return a not found    case HttpRequest(_, _, _, _, _) => {      Future[HttpResponse] {        HttpResponse(status = StatusCodes.NotFound)      }    }  }} | 
Как видно из этого кода, код обработчика довольно прост. Мы используем сопоставление с образцом для сопоставления с конкретным URL-адресом и используем объект базы данных, который мы видели ранее, для запроса монго. Обратите внимание на вызовы convertToString. Это пара вспомогательных методов, которые конвертируют BSON в JSON, используя библиотеки воспроизведения, которые мы включили ранее:
| 
 1 
2 
3 
4 
5 
6 
7 
8 
9 
 | 
def convertToString(input: List[BSONDocument]) : String = {   input     .map(f => convertToString(f))     .mkString("[", ",", "]") } def convertToString(input: BSONDocument) : String = {   Json.stringify(BSONFormats.toJSON(input)) } | 
Когда мы запустим этот сервер и откроем адрес в браузере, мы увидим что-то вроде этого:
Легко ли? Теперь давайте посмотрим на более сложный сценарий.
Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.
Akka-http внутренне использует akka-streams для обработки http-соединений. Это означает, что мы можем использовать akka-streams для быстрой обработки http-запросов. Для линейного потока мы можем использовать стандартный API потока, предоставляемый akka. Для более сложных графов akka-streams предоставляет собственный DSL, с помощью которого вы можете очень легко создавать более сложные графы, в которых события потока обрабатываются параллельно.
Давайте создадим новую привязку сервера, которая прослушивает порт 8090:
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
 | 
object Boot extends App {  // the actor system to use. Required for flowmaterializer and HTTP.  // passed in implicit  implicit val system = ActorSystem("Streams")  implicit val materializer = FlowMaterializer()  // start the server on the specified interface and port.  val serverBinding1 = Http().bind(interface = "localhost", port = 8090)  serverBinding1.connections.foreach { connection =>    connection.handleWith(broadCastMergeFlow)   } } | 
Эта привязка к серверу создается так же, как мы делали ранее. Основное отличие состоит в том, что на этот раз мы не передаем обработку запроса обработчику, а указываем экземпляр потока с именем broadCastMergeFlow. Этот поток слияния выглядит следующим образом:
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
 | 
val bCast = Broadcast[HttpRequest]  // some basic steps that each retrieve a different ticket value (as a future)val step1 = Flow[HttpRequest].mapAsync[String](getTickerHandler("GOOG"))val step2 = Flow[HttpRequest].mapAsync[String](getTickerHandler("AAPL"))val step3 = Flow[HttpRequest].mapAsync[String](getTickerHandler("MSFT"))  // We'll use the source and output provided by the http endpointval in = UndefinedSource[HttpRequest]val out = UndefinedSink[HttpResponse]  // when an element is available on one of the inputs, take// that one, igore the restval merge = Merge[String]// since merge doesn't output a HttpResponse add an additional map step.val mapToResponse = Flow[String].map[HttpResponse]((inp:String) => HttpResponse(status = StatusCodes.OK, entity = inp))   // define another flow. This uses the merge function which  // takes the first available response  val broadCastMergeFlow = Flow[HttpRequest, HttpResponse]() {    implicit builder =>            bCast ~> step1 ~> merge      in ~> bCast ~> step2 ~> merge ~> mapToResponse ~> out            bCast ~> step3 ~> merge      (in, out)  } | 
Наиболее важной частью являются последние несколько строк в этом фрагменте кода. Здесь мы рисуем график, который определяет, как обрабатывается сообщение, когда оно обрабатывается сервером. В этом случае мы сначала транслируем входящий HTTP-запрос на три параллельных потока. В каждом потоке мы затем делаем вызов в нашу базу данных, чтобы получить билет. Затем мы объединяем результаты вместе (объединение занимает первое доступное даже в обратном направлении) и создаем ответ. Таким образом, в зависимости от того, какой из шагов является самым быстрым, мы возвращаем тикер для GOOG, AAPL или MSFT. Чтобы лучше увидеть результат, мы добавили сон в getTickerHandler:
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
 | 
def getTickerHandler(tickName: String)(request: HttpRequest): Future[String] = {  // query the database  val ticker = Database.findTicker(tickName)  Thread.sleep(Math.random() * 1000 toInt)  // use a simple for comprehension, to make  // working with futures easier.  for {    t <- ticker  } yield  {    t match {      case Some(bson) => convertToString(bson)      case None => ""    }  }} | 
Аккуратно верно! Akka-streams предоставляет ряд базовых строительных блоков, которые вы можете использовать для создания этих потоков (дополнительную информацию см. В документации: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2 / scala / s… ). Например, если мы хотим сжать ответы шагов вместе, мы могли бы создать поток как это:
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
 | 
// waits for events on the three inputs and returns a responseval zip = ZipWith[String, String, String, HttpResponse] (  (inp1, inp2, inp3) => new HttpResponse(status = StatusCodes.OK,entity = inp1 + inp2 + inp3)// define a flow which broadcasts the request to the three// steps, and uses the zipWith to combine the elements beforeval broadCastZipFlow = Flow[HttpRequest, HttpResponse]() {  implicit builder =>          bCast ~> step1 ~> zip.input1    in ~> bCast ~> step2 ~> zip.input2 ~> out          bCast ~> step3 ~> zip.input3    (in, out)} | 
Мне очень нравится, как это работает и как легко визуализировать данные, проходящие через различные этапы. Если мы используем подход слияния, вы увидите результат, который выглядит примерно так (когда вызывается 10 раз):
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
 | 
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282 | 
В заключительной части я хотел бы показать, как вы можете использовать тот же подход при создании http-клиента с помощью akka-http.
Проверьте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http.
Akka-http также предоставляет функциональные возможности для простой настройки http-клиента, который также использует потоковый / потоковый подход обработки сообщений. Следующий листинг показывает полный запущенный клиент:
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
57 
58 
59 
60 
61 
62 
63 
64 
65 
66 
67 
68 
69 
70 
71 
72 
73 
74 
75 
76 
77 
78 
79 
 | 
import akka.actor.ActorSystemimport akka.http.Httpimport akka.stream.FlowMaterializerimport akka.http.model._import akka.stream.scaladsl._import akka.stream.scaladsl.Sourceimport akka.stream.scaladsl.FlowGraphImplicits._import scala.concurrent.ExecutionContext.Implicits.globalimport scala.concurrent.Future/** * Simple HTTP client created with akka-http */object Client extends App {  // the actor system to use. Required for flowmaterializer and HTTP.  // passed in implicit  implicit val system = ActorSystem("ServerTest")  implicit val materializer = FlowMaterializer()  val httpClient1 = Http(system).outgoingConnection("localhost", 8090).flow  val httpClient2 = Http(system).outgoingConnection("localhost", 8091).flow  // define a sink that will process the answer  // we could also process this as a flow  val printChunksConsumer = Sink.foreach[HttpResponse] { res =>    if(res.status == StatusCodes.OK) {      println("Recieved response : " + res);      res.entity.getDataBytes().map {        chunk =>          System.out.println("Chunk: " + chunk.decodeString(HttpCharsets.`UTF-8`.value).substring(0, 80))        }.to(Sink.ignore).run()    } else      println(res.status)  }  // we need to set allow cycles since internally the httpclient  // has some cyclic flows (apparently)  // we construct a sink, to which we connect a later to define source.  val reqFlow2: Sink[HttpRequest] = Sink[HttpRequest]() { implicit b =>    b.allowCycles()    val source = UndefinedSource[HttpRequest]    val bcast = Broadcast[HttpRequest]    val concat = Concat[HttpResponse]    // simple graph. Duplicate the request, send twice.    // concat the result.              bcast ~> httpClient1 ~> concat.first    source ~> bcast ~> httpClient1 ~> concat.second ~> printChunksConsumer    source  }  // make two calls, both return futures, first one shows direct linked sinks and  // sources. Second one makes yse if our graph.  // make number of calls  val res = 1 to 5 map( i => {    Source.single(HttpRequest()).to(reqFlow2).run().get(printChunksConsumer)  })  val f = Future.sequence(res)  // make some calls with filled in request URI  val f3 = Source.single(HttpRequest(uri = Uri("/getAllTickers"))).via(httpClient2).runWith(printChunksConsumer)  val f4 = Source.single(HttpRequest(uri = Uri("/get?ticker=ADAT"))).via(httpClient2).runWith(printChunksConsumer)  val f5 = Source.single(HttpRequest(uri = Uri("/get?tikcer=FNB"))).via(httpClient2).runWith(printChunksConsumer)  for {    f2Result <- f    f2Result <- f3    f2Result <- f4    f2Result <- f5  } yield ({      println("All calls done")      system.shutdown()      system.awaitTermination()    }  )} | 
Я не буду вдаваться в подробности, поскольку код следует тому же процессу, что и для HTTP-сервера. Вот и все для этой статьи и введения в akka-stream и akka-http. Мне очень нравится их подход к обработке сообщений и созданию читабельного, реактивного кода. В следующей статье мы рассмотрим некоторые другие аспекты akka-http (например, маршруты).
| Ссылка: | Создание службы REST в Scala с помощью Akka HTTP, Akka Streams и реактивного монго от нашего партнера JCG Йоса Дирксена в блоге Smart Java . | 
