Статьи

Создание службы REST в Scala с помощью Akka HTTP, Akka Streams и Reactive Mongo

В конце прошлого года я написал пару статей, в которых показано, как можно использовать Spray.io для создания службы REST на основе Scala () и как создать сервер веб-сокетов с помощью Scala, Akka и responsetivemongo (). Я хотел немного больше изучить серверную часть REST, но обнаружил, что в конце 2013 года Spray.io был приобретен typesafe и будет интегрирован со стеком Akka. Итак, в этой статье мы рассмотрим, как вы можете использовать функциональные возможности HTTP Akka для создания простого веб-сервера, и в дальнейшем мы рассмотрим, как маршрутизация из Spray.io была перенесена в Akka.

В этой статье мы предпримем следующие шаги:

  • Получите несколько фиктивных данных в mongoDB для тестирования.
  • Создайте сервер с помощью Akka Http, который использует простой асинхронный обработчик для обработки запросов.
  • Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.
  • Протестируйте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http.

Итак, давайте начнем с некоторой подготовительной работы и перенесем некоторые данные в mongoDB, чтобы мы могли с ними работать.

Загрузка данных в mongoDB

Для этого примера мы используем некоторую информацию, касающуюся акций, которую вы можете скачать здесь ( http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip ). Вы можете легко сделать это, выполнив следующие шаги:

Сначала получите данные:

Запустите mongodb в другом терминале

   mongod --dbpath ./data/

И, наконец, используйте mongoimport для импорта данных.

    unzip -c stocks.zip | mongoimport --db akka --collection stocks

И в качестве быстрой проверки запустите запрос, чтобы увидеть, все ли работает:

[email protected]:~$ mongo akka      
MongoDB shell version: 2.4.8
connecting to: akka
> db.stocks.findOne({},{Company: 1, Country: 1, Ticker:1 } )
{
        "_id" : ObjectId("52853800bb1177ca391c17ff"),
        "Ticker" : "A",
        "Country" : "USA",
        "Company" : "Agilent Technologies Inc."
}
> 

На данный момент у нас есть наши тестовые данные и мы можем посмотреть на код, необходимый для запуска сервера.

Создайте сервер, который использует простой асинхронный обработчик для обработки запросов

Для работы с Akka Http и доступа к данным в монго нам понадобятся дополнительные библиотеки. Итак, прежде чем что-то делать, давайте сначала посмотрим на файл сборки sbt, который мы использовали для этой статьи:

import com.typesafe.sbt.SbtAspectj._

name := "http-akka"

version := "1.0"

scalaVersion := "2.11.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-M2",
  "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",
  "org.reactivemongo" %% "play2-reactivemongo" % "0.10.5.0.akka23",
  "com.typesafe.play" % "play-json_2.11" % "2.4.0-M2",
  "ch.qos.logback" % "logback-classic" % "1.1.2"
)

resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"

resolvers += "Typesafe" at "https://repo.typesafe.com/typesafe/releases/"

mainClass in (Compile, run) := Some("Boot")

Когда вы просматриваете зависимости, вы видите обычных подозреваемых:

  • akka-http-core-экспериментальный содержит весь http-сервер и клиентский материал, который мы собираемся использовать. Эта библиотека зависит от akka-stream, поэтому мы также получим эту библиотеку в нашем пути к классам.
  • «Реактимонго» позволяет нам реагировать на монго.
  • Я также включил play2-реагирующий mongo и play-json, что значительно упрощает преобразование BSON, возвращенного из монго в JSON.
  • Наконец, для регистрации мы добавляем logback.

Теперь, прежде чем мы рассмотрим код, необходимый для запуска сервера, давайте быстро рассмотрим, как мы будем запрашивать монго. Для этого мы создали простой вспомогательный объект с креативным именем Database:

import reactivemongo.api._
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.bson.BSONDocument
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object Database {

  val collection = connect()


  def connect(): BSONCollection = {

    val driver = new MongoDriver
    val connection = driver.connection(List("localhost"))

    val db = connection("akka")
    db.collection("stocks")
  }

  def findAllTickers(): Future[List[BSONDocument]] = {
    val query = BSONDocument()
    val filter = BSONDocument("Company" -> 1, "Country" -> 1, "Ticker" -> 1)

    // which results in a Future[List[BSONDocument]]
    Database.collection
      .find(query, filter)
      .cursor[BSONDocument]
      .collect[List]()
  }

  def findTicker(ticker: String) : Future[Option[BSONDocument]] = {
    val query = BSONDocument("Ticker" -> ticker)

    Database.collection
      .find(query)
      .one
  }

}

Не так много, чтобы объяснить. Здесь важно отметить, что обе функции поиска возвращают будущее, поэтому вызовы этих функций не будут блокироваться. Теперь, когда у нас есть основы, давайте рассмотрим код для первого http-сервера, который использует асинхронный обработчик.

/**
 * Simple Object that starts an HTTP server using akka-http. All requests are handled
 * through an Akka flow.
 */
object Boot extends App {
 
  // the actor system to use. Required for flowmaterializer and HTTP.
  // passed in implicit
  implicit val system = ActorSystem("Streams")
  implicit val materializer = FlowMaterializer()
 
  // start the server on the specified interface and port.
  val serverBinding2 = Http().bind(interface = "localhost", port = 8091)
  serverBinding2.connections.foreach { connection =>
    connection.handleWith(Flow[HttpRequest].mapAsync(asyncHandler))
  }
}

В этом фрагменте кода мы создаем http-сервер, который прослушивает порт 8091. Мы обрабатываем каждое соединение, созданное с помощью asyncHandler. Этот обработчик должен возвращать Future [HttpResponse]. Давайте посмотрим на этот обработчик следующий:

// With an async handler, we use futures. Threads aren't blocked.
  def asyncHandler(request: HttpRequest): Future[HttpResponse] = {
 
    // we match the request, and some simple path checking
    request match {
 
      // match specific path. Returns all the avaiable tickers
      case HttpRequest(GET, Uri.Path("/getAllTickers"), _, _, _) => {
 
        // make a db call, which returns a future.
        // use for comprehension to flatmap this into
        // a Future[HttpResponse]
        for {
          input <- Database.findAllTickers
        } yield {
          HttpResponse(entity = convertToString(input))
        }
      }
 
      // match GET pat. Return a single ticker
      case HttpRequest(GET, Uri.Path("/get"), _, _, _) => {
 
        // next we match on the query paramter
        request.uri.query.get("ticker") match {
 
            // if we find the query parameter
            case Some(queryParameter) => {
 
              // query the database
              val ticker = Database.findTicker(queryParameter)
 
              // use a simple for comprehension, to make
              // working with futures easier.
              for {
                t <- ticker
              } yield  {
                t match {
                  case Some(bson) => HttpResponse(entity = convertToString(bson))
                  case None => HttpResponse(status = StatusCodes.OK)
                }
              }
            }
 
            // if the query parameter isn't there
            case None => Future(HttpResponse(status = StatusCodes.OK))
          }
      }
 
      // Simple case that matches everything, just return a not found
      case HttpRequest(_, _, _, _, _) => {
        Future[HttpResponse] {
          HttpResponse(status = StatusCodes.NotFound)
        }
      }
    }
  }

Как видно из этого кода, код обработчика довольно прост. Мы используем сопоставление с образцом для сопоставления с конкретным URL-адресом и используем объект базы данных, который мы видели ранее, для запроса монго. Обратите внимание на вызовы convertToString. Это пара вспомогательных методов, которые конвертируют BSON в JSON, используя библиотеки воспроизведения, которые мы включили ранее:

 def convertToString(input: List[BSONDocument]) : String = {
    input
      .map(f => convertToString(f))
      .mkString("[", ",", "]")
  }
 
  def convertToString(input: BSONDocument) : String = {
    Json.stringify(BSONFormats.toJSON(input))
  }

Когда мы запустим этот сервер и откроем адрес в браузере, мы увидим что-то вроде этого:

Акка-поток-1.png.png

Легко ли? Теперь давайте посмотрим на более сложный сценарий.

Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.

Akka-http внутренне использует akka-streams для обработки http-соединений. Это означает, что мы можем использовать akka-streams для быстрой обработки http-запросов. Для линейного потока мы можем использовать стандартный API потока, предоставляемый akka. Для более сложных графов akka-streams предоставляет собственный DSL, с помощью которого вы можете очень легко создавать более сложные графы, в которых события потока обрабатываются параллельно.

Давайте создадим новую привязку сервера, которая прослушивает порт 8090:

object Boot extends App {
 
  // the actor system to use. Required for flowmaterializer and HTTP.
  // passed in implicit
  implicit val system = ActorSystem("Streams")
  implicit val materializer = FlowMaterializer()
 
  // start the server on the specified interface and port.
  val serverBinding1 = Http().bind(interface = "localhost", port = 8090)
  serverBinding1.connections.foreach { connection =>
    connection.handleWith(broadCastMergeFlow)
  }
}

Эта привязка к серверу создается так же, как мы делали ранее. Основное отличие состоит в том, что на этот раз мы не передаем обработку запроса обработчику, а указываем экземпляр потока с именем broadCastMergeFlow. Этот поток слияния выглядит следующим образом:

 val bCast = Broadcast[HttpRequest]
 
  // some basic steps that each retrieve a different ticket value (as a future)
  val step1 = Flow[HttpRequest].mapAsync[String](getTickerHandler("GOOG"))
  val step2 = Flow[HttpRequest].mapAsync[String](getTickerHandler("AAPL"))
  val step3 = Flow[HttpRequest].mapAsync[String](getTickerHandler("MSFT"))
 
  // We'll use the source and output provided by the http endpoint
  val in = UndefinedSource[HttpRequest]
  val out = UndefinedSink[HttpResponse]
  // when an element is available on one of the inputs, take
  // that one, igore the rest
  val merge = Merge[String]
  // since merge doesn't output a HttpResponse add an additional map step.
  val mapToResponse = Flow[String].map[HttpResponse](
    (inp:String) => HttpResponse(status = StatusCodes.OK, entity = inp)
  )
  // define another flow. This uses the merge function which
  // takes the first available response
  val broadCastMergeFlow = Flow[HttpRequest, HttpResponse]() {
    implicit builder =>
 
            bCast ~> step1 ~> merge
      in ~> bCast ~> step2 ~> merge ~> mapToResponse ~> out
            bCast ~> step3 ~> merge
 
      (in, out)
  }

Наиболее важной частью являются последние несколько строк в этом фрагменте кода. Здесь мы рисуем график, который определяет, как обрабатывается сообщение, когда оно обрабатывается сервером. В этом случае мы сначала транслируем входящий HTTP-запрос на три параллельных потока. В каждом потоке мы затем делаем вызов в нашу базу данных, чтобы получить билет. Затем мы объединяем результаты вместе (объединение занимает первое доступное даже восходящее направление) и создаем ответ. Таким образом, в зависимости от того, какой из шагов является самым быстрым, мы возвращаем тикер для GOOG, AAPL или MSFT. Чтобы лучше увидеть результат, мы добавили сон в getTickerHandler:

 def getTickerHandler(tickName: String)(request: HttpRequest): Future[String] = {
    // query the database
    val ticker = Database.findTicker(tickName)
 
    Thread.sleep(Math.random() * 1000 toInt)
 
    // use a simple for comprehension, to make
    // working with futures easier.
    for {
      t <- ticker
    } yield  {
      t match {
        case Some(bson) => convertToString(bson)
        case None => ""
      }
    }
  }

Аккуратно верно! Akka-streams предоставляет ряд базовых строительных блоков, которые вы можете использовать для создания этих потоков (дополнительную информацию см. В их документации:  http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2. / scala / s … ). Например, если мы хотим сжать ответы шагов вместе, мы могли бы создать поток как это:

// waits for events on the three inputs and returns a response
  val zip = ZipWith[String, String, String, HttpResponse] (
    (inp1, inp2, inp3) => new HttpResponse(status = StatusCodes.OK,entity = inp1 + inp2 + inp3)
 
  // define a flow which broadcasts the request to the three
  // steps, and uses the zipWith to combine the elements before
  val broadCastZipFlow = Flow[HttpRequest, HttpResponse]() {
    implicit builder =>
 
            bCast ~> step1 ~> zip.input1
      in ~> bCast ~> step2 ~> zip.input2 ~> out
            bCast ~> step3 ~> zip.input3
 
      (in, out)
  }

Мне очень нравится, как это работает и как легко визуализировать данные, проходящие через различные этапы. Если мы используем подход слияния, вы увидите результат, который выглядит примерно так (когда вызывается 10 раз):

{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282

В заключительной части я хотел бы показать, как вы можете использовать тот же подход при создании http-клиента с помощью akka-http.

Протестируйте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http

Akka-http также предоставляет функциональные возможности для простой настройки http-клиента, который также использует потоковый / потоковый подход обработки сообщений. Следующий листинг показывает полный запущенный клиент:

import akka.actor.ActorSystem
import akka.http.Http
import akka.stream.FlowMaterializer
import akka.http.model._
import akka.stream.scaladsl._
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.FlowGraphImplicits._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
 
/**
 * Simple HTTP client created with akka-http
 */
object Client extends App {
 
  // the actor system to use. Required for flowmaterializer and HTTP.
  // passed in implicit
  implicit val system = ActorSystem("ServerTest")
  implicit val materializer = FlowMaterializer()
 
  val httpClient1 = Http(system).outgoingConnection("localhost", 8090).flow
  val httpClient2 = Http(system).outgoingConnection("localhost", 8091).flow
 
  // define a sink that will process the answer
  // we could also process this as a flow
  val printChunksConsumer = Sink.foreach[HttpResponse] { res =>
    if(res.status == StatusCodes.OK) {
 
      println("Recieved response : " + res);
      res.entity.getDataBytes().map {
        chunk =>
          System.out.println("Chunk: " + chunk.decodeString(HttpCharsets.`UTF-8`.value).substring(0, 80))
        }.to(Sink.ignore).run()
    } else
      println(res.status)
  }
 
  // we need to set allow cycles since internally the httpclient
  // has some cyclic flows (apparently)
  // we construct a sink, to which we connect a later to define source.
  val reqFlow2: Sink[HttpRequest] = Sink[HttpRequest]() { implicit b =>
    b.allowCycles()
    val source = UndefinedSource[HttpRequest]
    val bcast = Broadcast[HttpRequest]
    val concat = Concat[HttpResponse]
 
    // simple graph. Duplicate the request, send twice.
    // concat the result.
              bcast ~> httpClient1 ~> concat.first
    source ~> bcast ~> httpClient1 ~> concat.second ~> printChunksConsumer
    source
  }
 
  // make two calls, both return futures, first one shows direct linked sinks and
  // sources. Second one makes yse if our graph.
 
  // make number of calls
  val res = 1 to 5 map( i => {
    Source.single(HttpRequest()).to(reqFlow2).run().get(printChunksConsumer)
  })
  val f = Future.sequence(res)
 
  // make some calls with filled in request URI
  val f3 = Source.single(HttpRequest(uri = Uri("/getAllTickers"))).via(httpClient2).runWith(printChunksConsumer)
  val f4 = Source.single(HttpRequest(uri = Uri("/get?ticker=ADAT"))).via(httpClient2).runWith(printChunksConsumer)
  val f5 = Source.single(HttpRequest(uri = Uri("/get?tikcer=FNB"))).via(httpClient2).runWith(printChunksConsumer)
 
  for {
    f2Result <- f
    f2Result <- f3
    f2Result <- f4
    f2Result <- f5
  } yield ({
      println("All calls done")
      system.shutdown()
      system.awaitTermination()
    }
  )
}

Я не буду вдаваться в подробности, поскольку код следует тому же процессу, что и для HTTP-сервера. Вот и все для этой статьи и введения в akka-stream и akka-http. Мне очень нравится их подход к обработке сообщений и созданию читабельного, реактивного кода. В следующей статье мы рассмотрим некоторые другие аспекты akka-http (например, маршруты).