В конце прошлого года я написал пару статей, в которых показано, как можно использовать Spray.io для создания службы REST на основе Scala () и как создать сервер веб-сокетов с помощью Scala, Akka и responsetivemongo (). Я хотел немного больше изучить серверную часть REST, но обнаружил, что в конце 2013 года Spray.io был приобретен typesafe и будет интегрирован со стеком Akka. Итак, в этой статье мы рассмотрим, как вы можете использовать функциональные возможности HTTP Akka для создания простого веб-сервера, и в дальнейшем мы рассмотрим, как маршрутизация из Spray.io была перенесена в Akka.
В этой статье мы предпримем следующие шаги:
- Получите несколько фиктивных данных в mongoDB для тестирования.
- Создайте сервер с помощью Akka Http, который использует простой асинхронный обработчик для обработки запросов.
- Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.
- Протестируйте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http.
Итак, давайте начнем с некоторой подготовительной работы и перенесем некоторые данные в mongoDB, чтобы мы могли с ними работать.
Загрузка данных в mongoDB
Для этого примера мы используем некоторую информацию, касающуюся акций, которую вы можете скачать здесь ( http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip ). Вы можете легко сделать это, выполнив следующие шаги:
Сначала получите данные:
1
|
wget http: //jsonstudio.com/wp-content/uploads/2014/02/stocks.zip |
Запустите mongodb в другом терминале
1
|
mongod --dbpath ./data/ |
И, наконец, используйте mongoimport для импорта данных.
1
|
unzip -c stocks.zip | mongoimport --db akka --collection stocks |
И в качестве быстрой проверки запустите запрос, чтобы увидеть, все ли работает:
01
02
03
04
05
06
07
08
09
10
11
|
jos @Joss -MacBook-Pro.local:~$ mongo akka MongoDB shell version: 2.4 . 8 connecting to: akka > db.stocks.findOne({},{Company: 1 , Country: 1 , Ticker: 1 } ) { "_id" : ObjectId( "52853800bb1177ca391c17ff" ), "Ticker" : "A" , "Country" : "USA" , "Company" : "Agilent Technologies Inc." } > |
На данный момент у нас есть наши тестовые данные и мы можем посмотреть на код, необходимый для запуска сервера.
Создайте сервер, который использует простой асинхронный обработчик для обработки запросов
Для работы с Akka Http и доступа к данным в монго нам понадобятся дополнительные библиотеки. Поэтому, прежде чем мы что-то еще сделаем, давайте сначала посмотрим на файл сборки sbt, который мы использовали для этой статьи:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
import com.typesafe.sbt.SbtAspectj._ name := "http-akka" version := "1.0" scalaVersion := "2.11.5" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-M2" , "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23" , "org.reactivemongo" %% "play2-reactivemongo" % "0.10.5.0.akka23" , "com.typesafe.play" % "play-json_2.11" % "2.4.0-M2" , "ch.qos.logback" % "logback-classic" % "1.1.2" ) mainClass in (Compile, run) := Some( "Boot" ) |
Когда вы просматриваете зависимости, вы видите обычных подозреваемых:
- akka-http-core-экспериментальный содержит весь http-сервер и клиентский материал, который мы собираемся использовать. Эта библиотека зависит от akka-stream, поэтому мы также получим эту библиотеку в нашем пути к классам.
- «Реактимонго» позволяет нам реагировать на монго.
- Я также включил play2-реагирующий mongo и play-json, что значительно упрощает преобразование BSON, возвращенного из монго в JSON.
- Наконец, для регистрации мы добавляем logback.
Теперь, прежде чем мы рассмотрим код, необходимый для запуска сервера, давайте быстро рассмотрим, как мы будем запрашивать монго. Для этого мы создали простой вспомогательный объект с креативным именем Database:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import reactivemongo.api._ import reactivemongo.api.collections. default .BSONCollection import reactivemongo.bson.BSONDocument import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future object Database { val collection = connect() def connect(): BSONCollection = { val driver = new MongoDriver val connection = driver.connection(List( "localhost" )) val db = connection( "akka" ) db.collection( "stocks" ) } def findAllTickers(): Future[List[BSONDocument]] = { val query = BSONDocument() val filter = BSONDocument( "Company" -> 1 , "Country" -> 1 , "Ticker" -> 1 ) // which results in a Future[List[BSONDocument]] Database.collection .find(query, filter) .cursor[BSONDocument] .collect[List]() } def findTicker(ticker: String) : Future[Option[BSONDocument]] = { val query = BSONDocument( "Ticker" -> ticker) Database.collection .find(query) .one } } |
Не так много, чтобы объяснить. Здесь важно отметить, что обе функции поиска возвращают будущее, поэтому вызовы этих функций не будут блокироваться. Теперь, когда у нас есть основы, давайте рассмотрим код для первого http-сервера, который использует асинхронный обработчик.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
/** * Simple Object that starts an HTTP server using akka-http. All requests are handled * through an Akka flow. */ object Boot extends App { // the actor system to use. Required for flowmaterializer and HTTP. // passed in implicit implicit val system = ActorSystem( "Streams" ) implicit val materializer = FlowMaterializer() // start the server on the specified interface and port. val serverBinding2 = Http().bind( interface = "localhost" , port = 8091 ) serverBinding2.connections.foreach { connection => connection.handleWith(Flow[HttpRequest].mapAsync(asyncHandler)) } } |
В этом фрагменте кода мы создаем http-сервер, который прослушивает порт 8091. Мы обрабатываем каждое соединение, созданное с помощью asyncHandler. Этот обработчик должен возвращать Future [HttpResponse]. Давайте посмотрим на этот обработчик следующий:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
// With an async handler, we use futures. Threads aren't blocked. def asyncHandler(request: HttpRequest): Future[HttpResponse] = { // we match the request, and some simple path checking request match { // match specific path. Returns all the avaiable tickers case HttpRequest(GET, Uri.Path( "/getAllTickers" ), _, _, _) => { // make a db call, which returns a future. // use for comprehension to flatmap this into // a Future[HttpResponse] for { input <- Database.findAllTickers } yield { HttpResponse(entity = convertToString(input)) } } // match GET pat. Return a single ticker case HttpRequest(GET, Uri.Path( "/get" ), _, _, _) => { // next we match on the query paramter request.uri.query.get( "ticker" ) match { // if we find the query parameter case Some(queryParameter) => { // query the database val ticker = Database.findTicker(queryParameter) // use a simple for comprehension, to make // working with futures easier. for { t <- ticker } yield { t match { case Some(bson) => HttpResponse(entity = convertToString(bson)) case None => HttpResponse(status = StatusCodes.OK) } } } // if the query parameter isn't there case None => Future(HttpResponse(status = StatusCodes.OK)) } } // Simple case that matches everything, just return a not found case HttpRequest(_, _, _, _, _) => { Future[HttpResponse] { HttpResponse(status = StatusCodes.NotFound) } } } } |
Как видно из этого кода, код обработчика довольно прост. Мы используем сопоставление с образцом для сопоставления с конкретным URL-адресом и используем объект базы данных, который мы видели ранее, для запроса монго. Обратите внимание на вызовы convertToString. Это пара вспомогательных методов, которые конвертируют BSON в JSON, используя библиотеки воспроизведения, которые мы включили ранее:
1
2
3
4
5
6
7
8
9
|
def convertToString(input: List[BSONDocument]) : String = { input .map(f => convertToString(f)) .mkString( "[" , "," , "]" ) } def convertToString(input: BSONDocument) : String = { Json.stringify(BSONFormats.toJSON(input)) } |
Когда мы запустим этот сервер и откроем адрес в браузере, мы увидим что-то вроде этого:
Легко ли? Теперь давайте посмотрим на более сложный сценарий.
Создайте сервер, который использует пользовательский потоковый график для обработки входящих запросов.
Akka-http внутренне использует akka-streams для обработки http-соединений. Это означает, что мы можем использовать akka-streams для быстрой обработки http-запросов. Для линейного потока мы можем использовать стандартный API потока, предоставляемый akka. Для более сложных графов akka-streams предоставляет собственный DSL, с помощью которого вы можете очень легко создавать более сложные графы, в которых события потока обрабатываются параллельно.
Давайте создадим новую привязку сервера, которая прослушивает порт 8090:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
object Boot extends App { // the actor system to use. Required for flowmaterializer and HTTP. // passed in implicit implicit val system = ActorSystem( "Streams" ) implicit val materializer = FlowMaterializer() // start the server on the specified interface and port. val serverBinding1 = Http().bind( interface = "localhost" , port = 8090 ) serverBinding1.connections.foreach { connection => connection.handleWith(broadCastMergeFlow) } } |
Эта привязка к серверу создается так же, как мы делали ранее. Основное отличие состоит в том, что на этот раз мы не передаем обработку запроса обработчику, а указываем экземпляр потока с именем broadCastMergeFlow. Этот поток слияния выглядит следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
val bCast = Broadcast[HttpRequest] // some basic steps that each retrieve a different ticket value (as a future) val step1 = Flow[HttpRequest].mapAsync[String](getTickerHandler( "GOOG" )) val step2 = Flow[HttpRequest].mapAsync[String](getTickerHandler( "AAPL" )) val step3 = Flow[HttpRequest].mapAsync[String](getTickerHandler( "MSFT" )) // We'll use the source and output provided by the http endpoint val in = UndefinedSource[HttpRequest] val out = UndefinedSink[HttpResponse] // when an element is available on one of the inputs, take // that one, igore the rest val merge = Merge[String] // since merge doesn't output a HttpResponse add an additional map step. val mapToResponse = Flow[String].map[HttpResponse]( (inp:String) => HttpResponse(status = StatusCodes.OK, entity = inp) ) // define another flow. This uses the merge function which // takes the first available response val broadCastMergeFlow = Flow[HttpRequest, HttpResponse]() { implicit builder => bCast ~> step1 ~> merge in ~> bCast ~> step2 ~> merge ~> mapToResponse ~> out bCast ~> step3 ~> merge (in, out) } |
Наиболее важной частью являются последние несколько строк в этом фрагменте кода. Здесь мы рисуем график, который определяет, как обрабатывается сообщение, когда оно обрабатывается сервером. В этом случае мы сначала транслируем входящий HTTP-запрос на три параллельных потока. В каждом потоке мы затем делаем вызов в нашу базу данных, чтобы получить билет. Затем мы объединяем результаты вместе (объединение занимает первое доступное даже в обратном направлении) и создаем ответ. Таким образом, в зависимости от того, какой из шагов является самым быстрым, мы возвращаем тикер для GOOG, AAPL или MSFT. Чтобы лучше увидеть результат, мы добавили сон в getTickerHandler:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
def getTickerHandler(tickName: String)(request: HttpRequest): Future[String] = { // query the database val ticker = Database.findTicker(tickName) Thread.sleep(Math.random() * 1000 toInt) // use a simple for comprehension, to make // working with futures easier. for { t <- ticker } yield { t match { case Some(bson) => convertToString(bson) case None => "" } } } |
Аккуратно верно! Akka-streams предоставляет ряд базовых строительных блоков, которые вы можете использовать для создания этих потоков (дополнительную информацию см. В документации: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2 / scala / s… ). Например, если мы хотим сжать ответы шагов вместе, мы могли бы создать поток как это:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
// waits for events on the three inputs and returns a response val zip = ZipWith[String, String, String, HttpResponse] ( (inp1, inp2, inp3) => new HttpResponse(status = StatusCodes.OK,entity = inp1 + inp2 + inp3) // define a flow which broadcasts the request to the three // steps, and uses the zipWith to combine the elements before val broadCastZipFlow = Flow[HttpRequest, HttpResponse]() { implicit builder => bCast ~> step1 ~> zip.input1 in ~> bCast ~> step2 ~> zip.input2 ~> out bCast ~> step3 ~> zip.input3 (in, out) } |
Мне очень нравится, как это работает и как легко визуализировать данные, проходящие через различные этапы. Если мы используем подход слияния, вы увидите результат, который выглядит примерно так (когда вызывается 10 раз):
01
02
03
04
05
06
07
08
09
10
|
{ "_id" :{ "$oid" : "52853804bb1177ca391c2221" }, "Ticker" : "GOOG" , "Profit Margin" : 0.217 { "_id" :{ "$oid" : "52853804bb1177ca391c2221" }, "Ticker" : "GOOG" , "Profit Margin" : 0.217 { "_id" :{ "$oid" : "52853800bb1177ca391c1809" }, "Ticker" : "AAPL" , "Profit Margin" : 0.217 { "_id" :{ "$oid" : "52853807bb1177ca391c2781" }, "Ticker" : "MSFT" , "Profit Margin" : 0.282 { "_id" :{ "$oid" : "52853804bb1177ca391c2221" }, "Ticker" : "GOOG" , "Profit Margin" : 0.217 { "_id" :{ "$oid" : "52853800bb1177ca391c1809" }, "Ticker" : "AAPL" , "Profit Margin" : 0.217 { "_id" :{ "$oid" : "52853807bb1177ca391c2781" }, "Ticker" : "MSFT" , "Profit Margin" : 0.282 { "_id" :{ "$oid" : "52853804bb1177ca391c2221" }, "Ticker" : "GOOG" , "Profit Margin" : 0.217 { "_id" :{ "$oid" : "52853800bb1177ca391c1809" }, "Ticker" : "AAPL" , "Profit Margin" : 0.217 { "_id" :{ "$oid" : "52853807bb1177ca391c2781" }, "Ticker" : "MSFT" , "Profit Margin" : 0.282 |
В заключительной части я хотел бы показать, как вы можете использовать тот же подход при создании http-клиента с помощью akka-http.
Проверьте оба этих сервера с помощью http-клиента, также созданного с помощью Akka-Http.
Akka-http также предоставляет функциональные возможности для простой настройки http-клиента, который также использует потоковый / потоковый подход обработки сообщений. Следующий листинг показывает полный запущенный клиент:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
import akka.actor.ActorSystem import akka.http.Http import akka.stream.FlowMaterializer import akka.http.model._ import akka.stream.scaladsl._ import akka.stream.scaladsl.Source import akka.stream.scaladsl.FlowGraphImplicits._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future /** * Simple HTTP client created with akka-http */ object Client extends App { // the actor system to use. Required for flowmaterializer and HTTP. // passed in implicit implicit val system = ActorSystem( "ServerTest" ) implicit val materializer = FlowMaterializer() val httpClient1 = Http(system).outgoingConnection( "localhost" , 8090 ).flow val httpClient2 = Http(system).outgoingConnection( "localhost" , 8091 ).flow // define a sink that will process the answer // we could also process this as a flow val printChunksConsumer = Sink.foreach[HttpResponse] { res => if (res.status == StatusCodes.OK) { println( "Recieved response : " + res); res.entity.getDataBytes().map { chunk => System.out.println( "Chunk: " + chunk.decodeString(HttpCharsets.`UTF- 8 `.value).substring( 0 , 80 )) }.to(Sink.ignore).run() } else println(res.status) } // we need to set allow cycles since internally the httpclient // has some cyclic flows (apparently) // we construct a sink, to which we connect a later to define source. val reqFlow2: Sink[HttpRequest] = Sink[HttpRequest]() { implicit b => b.allowCycles() val source = UndefinedSource[HttpRequest] val bcast = Broadcast[HttpRequest] val concat = Concat[HttpResponse] // simple graph. Duplicate the request, send twice. // concat the result. bcast ~> httpClient1 ~> concat.first source ~> bcast ~> httpClient1 ~> concat.second ~> printChunksConsumer source } // make two calls, both return futures, first one shows direct linked sinks and // sources. Second one makes yse if our graph. // make number of calls val res = 1 to 5 map( i => { Source.single(HttpRequest()).to(reqFlow2).run().get(printChunksConsumer) }) val f = Future.sequence(res) // make some calls with filled in request URI val f3 = Source.single(HttpRequest(uri = Uri( "/getAllTickers" ))).via(httpClient2).runWith(printChunksConsumer) val f4 = Source.single(HttpRequest(uri = Uri( "/get?ticker=ADAT" ))).via(httpClient2).runWith(printChunksConsumer) val f5 = Source.single(HttpRequest(uri = Uri( "/get?tikcer=FNB" ))).via(httpClient2).runWith(printChunksConsumer) for { f2Result <- f f2Result <- f3 f2Result <- f4 f2Result <- f5 } yield ({ println( "All calls done" ) system.shutdown() system.awaitTermination() } ) } |
Я не буду вдаваться в подробности, поскольку код следует тому же процессу, что и для HTTP-сервера. Вот и все для этой статьи и введения в akka-stream и akka-http. Мне очень нравится их подход к обработке сообщений и созданию читабельного, реактивного кода. В следующей статье мы рассмотрим некоторые другие аспекты akka-http (например, маршруты).
Ссылка: | Создание службы REST в Scala с помощью Akka HTTP, Akka Streams и реактивного монго от нашего партнера JCG Йоса Дирксена в блоге Smart Java . |