В конце прошлого года я написал пару статей, в которых показано, как можно использовать Spray.io для создания службы REST на основе Scala () и как создать сервер веб-сокетов с помощью Scala, Akka и responsetivemongo (). Я хотел немного больше изучить серверную часть REST, но обнаружил, что в конце 2013 года Spray.io был приобретен typesafe и будет интегрирован со стеком Akka. Итак, в этой статье мы рассмотрим, как вы можете использовать функциональные возможности HTTP Akka для создания простого веб-сервера, и в дальнейшем мы рассмотрим, как маршрутизация из Spray.io была перенесена в Akka.
В этой статье мы предпримем следующие шаги:
- Получите несколько фиктивных данных в mongoDB для тестирования.
- Создайте сервер с помощью Akka Http, который использует простой асинхронный обработчик для обработки запросов.
- Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.
- Протестируйте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http.
Итак, давайте начнем с некоторой подготовительной работы и перенесем некоторые данные в mongoDB, чтобы мы могли с ними работать.
Загрузка данных в mongoDB
Для этого примера мы используем некоторую информацию, касающуюся акций, которую вы можете скачать здесь ( http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip ). Вы можете легко сделать это, выполнив следующие шаги:
Сначала получите данные:
|
1
|
wget http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip |
Запустите mongodb в другом терминале
|
1
|
mongod --dbpath ./data/ |
И, наконец, используйте mongoimport для импорта данных.
|
1
|
unzip -c stocks.zip | mongoimport --db akka --collection stocks |
И в качестве быстрой проверки запустите запрос, чтобы увидеть, все ли работает:
|
01
02
03
04
05
06
07
08
09
10
11
|
jos@Joss-MacBook-Pro.local:~$ mongo akka MongoDB shell version: 2.4.8connecting to: akka> db.stocks.findOne({},{Company: 1, Country: 1, Ticker:1 } ){ "_id" : ObjectId("52853800bb1177ca391c17ff"), "Ticker" : "A", "Country" : "USA", "Company" : "Agilent Technologies Inc."}> |
На данный момент у нас есть наши тестовые данные и мы можем посмотреть на код, необходимый для запуска сервера.
Создайте сервер, который использует простой асинхронный обработчик для обработки запросов
Для работы с Akka Http и доступа к данным в монго нам понадобятся дополнительные библиотеки. Поэтому, прежде чем мы что-то еще сделаем, давайте сначала посмотрим на файл сборки sbt, который мы использовали для этой статьи:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
import com.typesafe.sbt.SbtAspectj._name := "http-akka"version := "1.0"scalaVersion := "2.11.5"libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-M2", "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23", "org.reactivemongo" %% "play2-reactivemongo" % "0.10.5.0.akka23", "com.typesafe.play" % "play-json_2.11" % "2.4.0-M2", "ch.qos.logback" % "logback-classic" % "1.1.2")mainClass in (Compile, run) := Some("Boot") |
Когда вы просматриваете зависимости, вы видите обычных подозреваемых:
- akka-http-core-экспериментальный содержит весь http-сервер и клиентский материал, который мы собираемся использовать. Эта библиотека зависит от akka-stream, поэтому мы также получим эту библиотеку в нашем пути к классам.
- «Реактимонго» позволяет нам реагировать на монго.
- Я также включил play2-реагирующий mongo и play-json, что значительно упрощает преобразование BSON, возвращенного из монго в JSON.
- Наконец, для регистрации мы добавляем logback.
Теперь, прежде чем мы рассмотрим код, необходимый для запуска сервера, давайте быстро рассмотрим, как мы будем запрашивать монго. Для этого мы создали простой вспомогательный объект с креативным именем Database:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import reactivemongo.api._import reactivemongo.api.collections.default.BSONCollectionimport reactivemongo.bson.BSONDocumentimport scala.concurrent.ExecutionContext.Implicits.globalimport scala.concurrent.Futureobject Database { val collection = connect() def connect(): BSONCollection = { val driver = new MongoDriver val connection = driver.connection(List("localhost")) val db = connection("akka") db.collection("stocks") } def findAllTickers(): Future[List[BSONDocument]] = { val query = BSONDocument() val filter = BSONDocument("Company" -> 1, "Country" -> 1, "Ticker" -> 1) // which results in a Future[List[BSONDocument]] Database.collection .find(query, filter) .cursor[BSONDocument] .collect[List]() } def findTicker(ticker: String) : Future[Option[BSONDocument]] = { val query = BSONDocument("Ticker" -> ticker) Database.collection .find(query) .one }} |
Не так много, чтобы объяснить. Здесь важно отметить, что обе функции поиска возвращают будущее, поэтому вызовы этих функций не будут блокироваться. Теперь, когда у нас есть основы, давайте рассмотрим код для первого http-сервера, который использует асинхронный обработчик.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
/** * Simple Object that starts an HTTP server using akka-http. All requests are handled * through an Akka flow. */object Boot extends App { // the actor system to use. Required for flowmaterializer and HTTP. // passed in implicit implicit val system = ActorSystem("Streams") implicit val materializer = FlowMaterializer() // start the server on the specified interface and port. val serverBinding2 = Http().bind(interface = "localhost", port = 8091) serverBinding2.connections.foreach { connection => connection.handleWith(Flow[HttpRequest].mapAsync(asyncHandler)) } } |
В этом фрагменте кода мы создаем http-сервер, который прослушивает порт 8091. Мы обрабатываем каждое соединение, созданное с помощью asyncHandler. Этот обработчик должен возвращать Future [HttpResponse]. Давайте посмотрим на этот обработчик следующий:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
// With an async handler, we use futures. Threads aren't blocked.def asyncHandler(request: HttpRequest): Future[HttpResponse] = { // we match the request, and some simple path checking request match { // match specific path. Returns all the avaiable tickers case HttpRequest(GET, Uri.Path("/getAllTickers"), _, _, _) => { // make a db call, which returns a future. // use for comprehension to flatmap this into // a Future[HttpResponse] for { input <- Database.findAllTickers } yield { HttpResponse(entity = convertToString(input)) } } // match GET pat. Return a single ticker case HttpRequest(GET, Uri.Path("/get"), _, _, _) => { // next we match on the query paramter request.uri.query.get("ticker") match { // if we find the query parameter case Some(queryParameter) => { // query the database val ticker = Database.findTicker(queryParameter) // use a simple for comprehension, to make // working with futures easier. for { t <- ticker } yield { t match { case Some(bson) => HttpResponse(entity = convertToString(bson)) case None => HttpResponse(status = StatusCodes.OK) } } } // if the query parameter isn't there case None => Future(HttpResponse(status = StatusCodes.OK)) } } // Simple case that matches everything, just return a not found case HttpRequest(_, _, _, _, _) => { Future[HttpResponse] { HttpResponse(status = StatusCodes.NotFound) } } }} |
Как видно из этого кода, код обработчика довольно прост. Мы используем сопоставление с образцом для сопоставления с конкретным URL-адресом и используем объект базы данных, который мы видели ранее, для запроса монго. Обратите внимание на вызовы convertToString. Это пара вспомогательных методов, которые конвертируют BSON в JSON, используя библиотеки воспроизведения, которые мы включили ранее:
|
1
2
3
4
5
6
7
8
9
|
def convertToString(input: List[BSONDocument]) : String = { input .map(f => convertToString(f)) .mkString("[", ",", "]") } def convertToString(input: BSONDocument) : String = { Json.stringify(BSONFormats.toJSON(input)) } |
Когда мы запустим этот сервер и откроем адрес в браузере, мы увидим что-то вроде этого:
Легко ли? Теперь давайте посмотрим на более сложный сценарий.
Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.
Akka-http внутренне использует akka-streams для обработки http-соединений. Это означает, что мы можем использовать akka-streams для быстрой обработки http-запросов. Для линейного потока мы можем использовать стандартный API потока, предоставляемый akka. Для более сложных графов akka-streams предоставляет собственный DSL, с помощью которого вы можете очень легко создавать более сложные графы, в которых события потока обрабатываются параллельно.
Давайте создадим новую привязку сервера, которая прослушивает порт 8090:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
object Boot extends App { // the actor system to use. Required for flowmaterializer and HTTP. // passed in implicit implicit val system = ActorSystem("Streams") implicit val materializer = FlowMaterializer() // start the server on the specified interface and port. val serverBinding1 = Http().bind(interface = "localhost", port = 8090) serverBinding1.connections.foreach { connection => connection.handleWith(broadCastMergeFlow) } } |
Эта привязка к серверу создается так же, как мы делали ранее. Основное отличие состоит в том, что на этот раз мы не передаем обработку запроса обработчику, а указываем экземпляр потока с именем broadCastMergeFlow. Этот поток слияния выглядит следующим образом:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
val bCast = Broadcast[HttpRequest] // some basic steps that each retrieve a different ticket value (as a future)val step1 = Flow[HttpRequest].mapAsync[String](getTickerHandler("GOOG"))val step2 = Flow[HttpRequest].mapAsync[String](getTickerHandler("AAPL"))val step3 = Flow[HttpRequest].mapAsync[String](getTickerHandler("MSFT")) // We'll use the source and output provided by the http endpointval in = UndefinedSource[HttpRequest]val out = UndefinedSink[HttpResponse] // when an element is available on one of the inputs, take// that one, igore the restval merge = Merge[String]// since merge doesn't output a HttpResponse add an additional map step.val mapToResponse = Flow[String].map[HttpResponse]((inp:String) => HttpResponse(status = StatusCodes.OK, entity = inp)) // define another flow. This uses the merge function which // takes the first available response val broadCastMergeFlow = Flow[HttpRequest, HttpResponse]() { implicit builder => bCast ~> step1 ~> merge in ~> bCast ~> step2 ~> merge ~> mapToResponse ~> out bCast ~> step3 ~> merge (in, out) } |
Наиболее важной частью являются последние несколько строк в этом фрагменте кода. Здесь мы рисуем график, который определяет, как обрабатывается сообщение, когда оно обрабатывается сервером. В этом случае мы сначала транслируем входящий HTTP-запрос на три параллельных потока. В каждом потоке мы затем делаем вызов в нашу базу данных, чтобы получить билет. Затем мы объединяем результаты вместе (объединение занимает первое доступное даже в обратном направлении) и создаем ответ. Таким образом, в зависимости от того, какой из шагов является самым быстрым, мы возвращаем тикер для GOOG, AAPL или MSFT. Чтобы лучше увидеть результат, мы добавили сон в getTickerHandler:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
def getTickerHandler(tickName: String)(request: HttpRequest): Future[String] = { // query the database val ticker = Database.findTicker(tickName) Thread.sleep(Math.random() * 1000 toInt) // use a simple for comprehension, to make // working with futures easier. for { t <- ticker } yield { t match { case Some(bson) => convertToString(bson) case None => "" } }} |
Аккуратно верно! Akka-streams предоставляет ряд базовых строительных блоков, которые вы можете использовать для создания этих потоков (дополнительную информацию см. В документации: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2 / scala / s… ). Например, если мы хотим сжать ответы шагов вместе, мы могли бы создать поток как это:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
// waits for events on the three inputs and returns a responseval zip = ZipWith[String, String, String, HttpResponse] ( (inp1, inp2, inp3) => new HttpResponse(status = StatusCodes.OK,entity = inp1 + inp2 + inp3)// define a flow which broadcasts the request to the three// steps, and uses the zipWith to combine the elements beforeval broadCastZipFlow = Flow[HttpRequest, HttpResponse]() { implicit builder => bCast ~> step1 ~> zip.input1 in ~> bCast ~> step2 ~> zip.input2 ~> out bCast ~> step3 ~> zip.input3 (in, out)} |
Мне очень нравится, как это работает и как легко визуализировать данные, проходящие через различные этапы. Если мы используем подход слияния, вы увидите результат, который выглядит примерно так (когда вызывается 10 раз):
|
01
02
03
04
05
06
07
08
09
10
|
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282 |
В заключительной части я хотел бы показать, как вы можете использовать тот же подход при создании http-клиента с помощью akka-http.
Проверьте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http.
Akka-http также предоставляет функциональные возможности для простой настройки http-клиента, который также использует потоковый / потоковый подход обработки сообщений. Следующий листинг показывает полный запущенный клиент:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
import akka.actor.ActorSystemimport akka.http.Httpimport akka.stream.FlowMaterializerimport akka.http.model._import akka.stream.scaladsl._import akka.stream.scaladsl.Sourceimport akka.stream.scaladsl.FlowGraphImplicits._import scala.concurrent.ExecutionContext.Implicits.globalimport scala.concurrent.Future/** * Simple HTTP client created with akka-http */object Client extends App { // the actor system to use. Required for flowmaterializer and HTTP. // passed in implicit implicit val system = ActorSystem("ServerTest") implicit val materializer = FlowMaterializer() val httpClient1 = Http(system).outgoingConnection("localhost", 8090).flow val httpClient2 = Http(system).outgoingConnection("localhost", 8091).flow // define a sink that will process the answer // we could also process this as a flow val printChunksConsumer = Sink.foreach[HttpResponse] { res => if(res.status == StatusCodes.OK) { println("Recieved response : " + res); res.entity.getDataBytes().map { chunk => System.out.println("Chunk: " + chunk.decodeString(HttpCharsets.`UTF-8`.value).substring(0, 80)) }.to(Sink.ignore).run() } else println(res.status) } // we need to set allow cycles since internally the httpclient // has some cyclic flows (apparently) // we construct a sink, to which we connect a later to define source. val reqFlow2: Sink[HttpRequest] = Sink[HttpRequest]() { implicit b => b.allowCycles() val source = UndefinedSource[HttpRequest] val bcast = Broadcast[HttpRequest] val concat = Concat[HttpResponse] // simple graph. Duplicate the request, send twice. // concat the result. bcast ~> httpClient1 ~> concat.first source ~> bcast ~> httpClient1 ~> concat.second ~> printChunksConsumer source } // make two calls, both return futures, first one shows direct linked sinks and // sources. Second one makes yse if our graph. // make number of calls val res = 1 to 5 map( i => { Source.single(HttpRequest()).to(reqFlow2).run().get(printChunksConsumer) }) val f = Future.sequence(res) // make some calls with filled in request URI val f3 = Source.single(HttpRequest(uri = Uri("/getAllTickers"))).via(httpClient2).runWith(printChunksConsumer) val f4 = Source.single(HttpRequest(uri = Uri("/get?ticker=ADAT"))).via(httpClient2).runWith(printChunksConsumer) val f5 = Source.single(HttpRequest(uri = Uri("/get?tikcer=FNB"))).via(httpClient2).runWith(printChunksConsumer) for { f2Result <- f f2Result <- f3 f2Result <- f4 f2Result <- f5 } yield ({ println("All calls done") system.shutdown() system.awaitTermination() } )} |
Я не буду вдаваться в подробности, поскольку код следует тому же процессу, что и для HTTP-сервера. Вот и все для этой статьи и введения в akka-stream и akka-http. Мне очень нравится их подход к обработке сообщений и созданию читабельного, реактивного кода. В следующей статье мы рассмотрим некоторые другие аспекты akka-http (например, маршруты).
| Ссылка: | Создание службы REST в Scala с помощью Akka HTTP, Akka Streams и реактивного монго от нашего партнера JCG Йоса Дирксена в блоге Smart Java . |
