Статьи

Создание службы REST в Scala с помощью Akka HTTP, Akka Streams и реактивного монго

В конце прошлого года я написал пару статей, в которых показано, как можно использовать Spray.io для создания службы REST на основе Scala () и как создать сервер веб-сокетов с помощью Scala, Akka и responsetivemongo (). Я хотел немного больше изучить серверную часть REST, но обнаружил, что в конце 2013 года Spray.io был приобретен typesafe и будет интегрирован со стеком Akka. Итак, в этой статье мы рассмотрим, как вы можете использовать функциональные возможности HTTP Akka для создания простого веб-сервера, и в дальнейшем мы рассмотрим, как маршрутизация из Spray.io была перенесена в Akka.

В этой статье мы предпримем следующие шаги:

  • Получите несколько фиктивных данных в mongoDB для тестирования.
  • Создайте сервер с помощью Akka Http, который использует простой асинхронный обработчик для обработки запросов.
  • Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.
  • Протестируйте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http.

Итак, давайте начнем с некоторой подготовительной работы и перенесем некоторые данные в mongoDB, чтобы мы могли с ними работать.

Загрузка данных в mongoDB

Для этого примера мы используем некоторую информацию, касающуюся акций, которую вы можете скачать здесь ( http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip ). Вы можете легко сделать это, выполнив следующие шаги:

Сначала получите данные:

1
wget http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip

Запустите mongodb в другом терминале

1
mongod --dbpath ./data/

И, наконец, используйте mongoimport для импорта данных.

1
unzip -c stocks.zip | mongoimport --db akka --collection stocks

И в качестве быстрой проверки запустите запрос, чтобы увидеть, все ли работает:

01
02
03
04
05
06
07
08
09
10
11
jos@Joss-MacBook-Pro.local:~$ mongo akka     
MongoDB shell version: 2.4.8
connecting to: akka
> db.stocks.findOne({},{Company: 1, Country: 1, Ticker:1 } )
{
        "_id" : ObjectId("52853800bb1177ca391c17ff"),
        "Ticker" : "A",
        "Country" : "USA",
        "Company" : "Agilent Technologies Inc."
}
>

На данный момент у нас есть наши тестовые данные и мы можем посмотреть на код, необходимый для запуска сервера.

Создайте сервер, который использует простой асинхронный обработчик для обработки запросов

Для работы с Akka Http и доступа к данным в монго нам понадобятся дополнительные библиотеки. Поэтому, прежде чем мы что-то еще сделаем, давайте сначала посмотрим на файл сборки sbt, который мы использовали для этой статьи:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
import com.typesafe.sbt.SbtAspectj._
 
name := "http-akka"
 
version := "1.0"
 
scalaVersion := "2.11.5"
 
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-M2",
  "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",
  "org.reactivemongo" %% "play2-reactivemongo" % "0.10.5.0.akka23",
  "com.typesafe.play" % "play-json_2.11" % "2.4.0-M2",
  "ch.qos.logback" % "logback-classic" % "1.1.2"
)
 
resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"
 
 
mainClass in (Compile, run) := Some("Boot")

Когда вы просматриваете зависимости, вы видите обычных подозреваемых:

  • akka-http-core-экспериментальный содержит весь http-сервер и клиентский материал, который мы собираемся использовать. Эта библиотека зависит от akka-stream, поэтому мы также получим эту библиотеку в нашем пути к классам.
  • «Реактимонго» позволяет нам реагировать на монго.
  • Я также включил play2-реагирующий mongo и play-json, что значительно упрощает преобразование BSON, возвращенного из монго в JSON.
  • Наконец, для регистрации мы добавляем logback.

Теперь, прежде чем мы рассмотрим код, необходимый для запуска сервера, давайте быстро рассмотрим, как мы будем запрашивать монго. Для этого мы создали простой вспомогательный объект с креативным именем Database:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import reactivemongo.api._
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.bson.BSONDocument
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
 
object Database {
 
  val collection = connect()
 
 
  def connect(): BSONCollection = {
 
    val driver = new MongoDriver
    val connection = driver.connection(List("localhost"))
 
    val db = connection("akka")
    db.collection("stocks")
  }
 
  def findAllTickers(): Future[List[BSONDocument]] = {
    val query = BSONDocument()
    val filter = BSONDocument("Company" -> 1, "Country" -> 1, "Ticker" -> 1)
 
    // which results in a Future[List[BSONDocument]]
    Database.collection
      .find(query, filter)
      .cursor[BSONDocument]
      .collect[List]()
  }
 
  def findTicker(ticker: String) : Future[Option[BSONDocument]] = {
    val query = BSONDocument("Ticker" -> ticker)
 
    Database.collection
      .find(query)
      .one
  }
 
}

Не так много, чтобы объяснить. Здесь важно отметить, что обе функции поиска возвращают будущее, поэтому вызовы этих функций не будут блокироваться. Теперь, когда у нас есть основы, давайте рассмотрим код для первого http-сервера, который использует асинхронный обработчик.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
/**
 * Simple Object that starts an HTTP server using akka-http. All requests are handled
 * through an Akka flow.
 */
object Boot extends App {
 
  // the actor system to use. Required for flowmaterializer and HTTP.
  // passed in implicit
  implicit val system = ActorSystem("Streams")
  implicit val materializer = FlowMaterializer()
 
  // start the server on the specified interface and port.
  val serverBinding2 = Http().bind(interface = "localhost", port = 8091)
  serverBinding2.connections.foreach { connection =>
    connection.handleWith(Flow[HttpRequest].mapAsync(asyncHandler))
   }
 }

В этом фрагменте кода мы создаем http-сервер, который прослушивает порт 8091. Мы обрабатываем каждое соединение, созданное с помощью asyncHandler. Этот обработчик должен возвращать Future [HttpResponse]. Давайте посмотрим на этот обработчик следующий:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// With an async handler, we use futures. Threads aren't blocked.
def asyncHandler(request: HttpRequest): Future[HttpResponse] = {
 
  // we match the request, and some simple path checking
  request match {
 
    // match specific path. Returns all the avaiable tickers
    case HttpRequest(GET, Uri.Path("/getAllTickers"), _, _, _) => {
 
      // make a db call, which returns a future.
      // use for comprehension to flatmap this into
      // a Future[HttpResponse]
      for {
        input <- Database.findAllTickers
      } yield {
        HttpResponse(entity = convertToString(input))
      }
    }
 
    // match GET pat. Return a single ticker
    case HttpRequest(GET, Uri.Path("/get"), _, _, _) => {
 
      // next we match on the query paramter
      request.uri.query.get("ticker") match {
 
          // if we find the query parameter
          case Some(queryParameter) => {
 
            // query the database
            val ticker = Database.findTicker(queryParameter)
 
            // use a simple for comprehension, to make
            // working with futures easier.
            for {
              t <- ticker
            } yield  {
              t match {
                case Some(bson) => HttpResponse(entity = convertToString(bson))
                case None => HttpResponse(status = StatusCodes.OK)
              }
            }
          }
 
          // if the query parameter isn't there
          case None => Future(HttpResponse(status = StatusCodes.OK))
        }
    }
 
    // Simple case that matches everything, just return a not found
    case HttpRequest(_, _, _, _, _) => {
      Future[HttpResponse] {
        HttpResponse(status = StatusCodes.NotFound)
      }
    }
  }
}

Как видно из этого кода, код обработчика довольно прост. Мы используем сопоставление с образцом для сопоставления с конкретным URL-адресом и используем объект базы данных, который мы видели ранее, для запроса монго. Обратите внимание на вызовы convertToString. Это пара вспомогательных методов, которые конвертируют BSON в JSON, используя библиотеки воспроизведения, которые мы включили ранее:

1
2
3
4
5
6
7
8
9
def convertToString(input: List[BSONDocument]) : String = {
   input
     .map(f => convertToString(f))
     .mkString("[", ",", "]")
 }
 
 def convertToString(input: BSONDocument) : String = {
   Json.stringify(BSONFormats.toJSON(input))
 }

Когда мы запустим этот сервер и откроем адрес в браузере, мы увидим что-то вроде этого:

Акка-поток-1.png

Легко ли? Теперь давайте посмотрим на более сложный сценарий.

Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.

Akka-http внутренне использует akka-streams для обработки http-соединений. Это означает, что мы можем использовать akka-streams для быстрой обработки http-запросов. Для линейного потока мы можем использовать стандартный API потока, предоставляемый akka. Для более сложных графов akka-streams предоставляет собственный DSL, с помощью которого вы можете очень легко создавать более сложные графы, в которых события потока обрабатываются параллельно.

Давайте создадим новую привязку сервера, которая прослушивает порт 8090:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
object Boot extends App {
 
  // the actor system to use. Required for flowmaterializer and HTTP.
  // passed in implicit
  implicit val system = ActorSystem("Streams")
  implicit val materializer = FlowMaterializer()
 
  // start the server on the specified interface and port.
  val serverBinding1 = Http().bind(interface = "localhost", port = 8090)
 
  serverBinding1.connections.foreach { connection =>
    connection.handleWith(broadCastMergeFlow)
   }
 }

Эта привязка к серверу создается так же, как мы делали ранее. Основное отличие состоит в том, что на этот раз мы не передаем обработку запроса обработчику, а указываем экземпляр потока с именем broadCastMergeFlow. Этот поток слияния выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
val bCast = Broadcast[HttpRequest]
  
// some basic steps that each retrieve a different ticket value (as a future)
val step1 = Flow[HttpRequest].mapAsync[String](getTickerHandler("GOOG"))
val step2 = Flow[HttpRequest].mapAsync[String](getTickerHandler("AAPL"))
val step3 = Flow[HttpRequest].mapAsync[String](getTickerHandler("MSFT"))
  
// We'll use the source and output provided by the http endpoint
val in = UndefinedSource[HttpRequest]
val out = UndefinedSink[HttpResponse]
 // when an element is available on one of the inputs, take
// that one, igore the rest
val merge = Merge[String]
// since merge doesn't output a HttpResponse add an additional map step.
val mapToResponse = Flow[String].map[HttpResponse](
(inp:String) => HttpResponse(status = StatusCodes.OK, entity = inp)
)
 
 
  // define another flow. This uses the merge function which
  // takes the first available response
  val broadCastMergeFlow = Flow[HttpRequest, HttpResponse]() {
    implicit builder =>
 
            bCast ~> step1 ~> merge
      in ~> bCast ~> step2 ~> merge ~> mapToResponse ~> out
            bCast ~> step3 ~> merge
 
      (in, out)
  }

Наиболее важной частью являются последние несколько строк в этом фрагменте кода. Здесь мы рисуем график, который определяет, как обрабатывается сообщение, когда оно обрабатывается сервером. В этом случае мы сначала транслируем входящий HTTP-запрос на три параллельных потока. В каждом потоке мы затем делаем вызов в нашу базу данных, чтобы получить билет. Затем мы объединяем результаты вместе (объединение занимает первое доступное даже в обратном направлении) и создаем ответ. Таким образом, в зависимости от того, какой из шагов является самым быстрым, мы возвращаем тикер для GOOG, AAPL или MSFT. Чтобы лучше увидеть результат, мы добавили сон в getTickerHandler:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
def getTickerHandler(tickName: String)(request: HttpRequest): Future[String] = {
  // query the database
  val ticker = Database.findTicker(tickName)
 
  Thread.sleep(Math.random() * 1000 toInt)
 
  // use a simple for comprehension, to make
  // working with futures easier.
  for {
    t <- ticker
  } yield  {
    t match {
      case Some(bson) => convertToString(bson)
      case None => ""
    }
  }
}

Аккуратно верно! Akka-streams предоставляет ряд базовых строительных блоков, которые вы можете использовать для создания этих потоков (дополнительную информацию см. В документации: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2 / scala / s… ). Например, если мы хотим сжать ответы шагов вместе, мы могли бы создать поток как это:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
// waits for events on the three inputs and returns a response
val zip = ZipWith[String, String, String, HttpResponse] (
  (inp1, inp2, inp3) => new HttpResponse(status = StatusCodes.OK,entity = inp1 + inp2 + inp3)
 
 
// define a flow which broadcasts the request to the three
// steps, and uses the zipWith to combine the elements before
val broadCastZipFlow = Flow[HttpRequest, HttpResponse]() {
  implicit builder =>
 
          bCast ~> step1 ~> zip.input1
    in ~> bCast ~> step2 ~> zip.input2 ~> out
          bCast ~> step3 ~> zip.input3
 
    (in, out)
}

Мне очень нравится, как это работает и как легко визуализировать данные, проходящие через различные этапы. Если мы используем подход слияния, вы увидите результат, который выглядит примерно так (когда вызывается 10 раз):

01
02
03
04
05
06
07
08
09
10
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282

В заключительной части я хотел бы показать, как вы можете использовать тот же подход при создании http-клиента с помощью akka-http.

Проверьте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http.

Akka-http также предоставляет функциональные возможности для простой настройки http-клиента, который также использует потоковый / потоковый подход обработки сообщений. Следующий листинг показывает полный запущенный клиент:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import akka.actor.ActorSystem
import akka.http.Http
import akka.stream.FlowMaterializer
import akka.http.model._
import akka.stream.scaladsl._
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.FlowGraphImplicits._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
 
/**
 * Simple HTTP client created with akka-http
 */
object Client extends App {
 
  // the actor system to use. Required for flowmaterializer and HTTP.
  // passed in implicit
  implicit val system = ActorSystem("ServerTest")
  implicit val materializer = FlowMaterializer()
 
  val httpClient1 = Http(system).outgoingConnection("localhost", 8090).flow
  val httpClient2 = Http(system).outgoingConnection("localhost", 8091).flow
 
  // define a sink that will process the answer
  // we could also process this as a flow
  val printChunksConsumer = Sink.foreach[HttpResponse] { res =>
    if(res.status == StatusCodes.OK) {
 
      println("Recieved response : " + res);
      res.entity.getDataBytes().map {
        chunk =>
          System.out.println("Chunk: " + chunk.decodeString(HttpCharsets.`UTF-8`.value).substring(0, 80))
        }.to(Sink.ignore).run()
    } else
      println(res.status)
  }
 
  // we need to set allow cycles since internally the httpclient
  // has some cyclic flows (apparently)
  // we construct a sink, to which we connect a later to define source.
  val reqFlow2: Sink[HttpRequest] = Sink[HttpRequest]() { implicit b =>
    b.allowCycles()
    val source = UndefinedSource[HttpRequest]
    val bcast = Broadcast[HttpRequest]
    val concat = Concat[HttpResponse]
 
    // simple graph. Duplicate the request, send twice.
    // concat the result.
              bcast ~> httpClient1 ~> concat.first
    source ~> bcast ~> httpClient1 ~> concat.second ~> printChunksConsumer
    source
  }
 
  // make two calls, both return futures, first one shows direct linked sinks and
  // sources. Second one makes yse if our graph.
 
  // make number of calls
  val res = 1 to 5 map( i => {
    Source.single(HttpRequest()).to(reqFlow2).run().get(printChunksConsumer)
  })
  val f = Future.sequence(res)
 
  // make some calls with filled in request URI
  val f3 = Source.single(HttpRequest(uri = Uri("/getAllTickers"))).via(httpClient2).runWith(printChunksConsumer)
  val f4 = Source.single(HttpRequest(uri = Uri("/get?ticker=ADAT"))).via(httpClient2).runWith(printChunksConsumer)
  val f5 = Source.single(HttpRequest(uri = Uri("/get?tikcer=FNB"))).via(httpClient2).runWith(printChunksConsumer)
 
  for {
    f2Result <- f
    f2Result <- f3
    f2Result <- f4
    f2Result <- f5
  } yield ({
      println("All calls done")
      system.shutdown()
      system.awaitTermination()
    }
  )
}

Я не буду вдаваться в подробности, поскольку код следует тому же процессу, что и для HTTP-сервера. Вот и все для этой статьи и введения в akka-stream и akka-http. Мне очень нравится их подход к обработке сообщений и созданию читабельного, реактивного кода. В следующей статье мы рассмотрим некоторые другие аспекты akka-http (например, маршруты).