Статьи

Создать simpe RESTful сервис с vert.x 2.0, RxJava и mongoDB

Новая статья после перерыва почти пол года. В этой статье мы кратко рассмотрим, как вы можете начать работу с vert.x, и более интересно, как вы можете использовать RxJava для упрощения программирования асинхронных систем. Мы рассмотрим следующие темы:

  • Создайте пустой проект vert.x, используя maven
  • Импортируйте в IntelliJ и создайте простой HTTP-сервер
  • Загрузка данных из mongoDB с помощью модуля постоянных данных vert.x mongoDB
  • Выставить почтовые индексы через интерфейс REST
  • Замените обратные вызовы наблюдателями RxJava

Первое, что нужно сделать, это очень просто, мы просто используем стандартный архетип Maven для создания проекта vert.x. (обратите внимание, что полный окончательный пример можно скачать с github: https://github.com/josdirksen/smartjava/tree/master/vertx-demo-1 )

Создайте пустой проект vert.x, используя maven

Перейдите в каталог, в котором вы хотите создать свой проект vert.x, введите следующее и нажмите enter:

1
jos@Joss-MacBook-Pro.local:~/Dev/playground$ mvn archetype:generate -Dfilter=io.vertx:

Здесь показаны все доступные архетипы io.vertx (в данном случае только 1)

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
[INFO] Scanning for projects...
[INFO]                                                                        
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom <<<
[INFO]
[INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] No archetype defined. Using maven-archetype-quickstart (org.apache.maven.archetypes:maven-archetype-quickstart:1.0)
Choose archetype:
1: remote -> io.vertx:vertx-maven-archetype (-)
Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): :

Так как есть только один, просто введите «1» и нажмите Enter. Далее он покажет вам версии, которые вы можете выбрать. Для этого примера я выбрал 2.0.1-финальную версию.

1
2
3
4
5
6
7
8
9
Choose io.vertx:vertx-maven-archetype version:
1: 1.0.0-beta1
2: 1.0.0-beta2
3: 1.0.0-beta3
4: 1.0.0-CR1
5: 1.0.0-CR2
6: 2.0.0-final
7: 2.0.1-final
Choose a number: 7:

Введите «7» и нажмите Enter. Следующие шаги позволяют вам определить имя и версию вашего проекта:

01
02
03
04
05
06
07
08
09
10
Define value for property 'groupId': : org.smartjava
Define value for property 'artifactId': : vertx-demo-1
Define value for property 'version':  1.0-SNAPSHOT: :
Define value for property 'package':  org.smartjava: :
Confirm properties configuration:
groupId: org.smartjava
artifactId: vertx-demo-1
version: 1.0-SNAPSHOT
package: org.smartjava
 Y: : Y

Введите значения, которые вы видите выше (или используйте свои собственные), и, наконец, введите «Y», чтобы подтвердить свой выбор. Теперь будет создан проект:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: vertx-maven-archetype:2.0.1-final
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.smartjava
[INFO] Parameter: artifactId, Value: vertx-demo-1
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: org.smartjava
[INFO] Parameter: packageInPathFormat, Value: org/smartjava
[INFO] Parameter: package, Value: org.smartjava
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: groupId, Value: org.smartjava
[INFO] Parameter: artifactId, Value: vertx-demo-1
[INFO] project created from Archetype in dir: /Users/jos/Dev/playground/vertx-demo-1
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5:37.710s
[INFO] Finished at: Sun Nov 24 14:55:12 CET 2013
[INFO] Final Memory: 9M/24M
[INFO] ------------------------------------------------------------------------

Чтобы проверить, все ли прошло правильно, просто перейдите в только что созданный каталог и запустите mvn install. это загрузит все необходимые библиотеки, запустит несколько тестов и установит ваш проект в локальное хранилище maven. Теперь, когда у нас есть проект maven, мы можем загрузить его в нашу любимую среду IDE. В моем случае я использую IntelliJ, но Eclipse работает примерно так же.

Импортируйте в IntelliJ и создайте простой HTTP-сервер

Запустите IntelliJ и выберите «Файл-> Импортировать проект», перейдите в каталог, созданный maven, и импортируйте проект.

Выберите файл или каталог для импорта

Просто нажмите «Далее» по всем вопросам, и вы получите проект внутри IntelliJ. Если вы создаете проект на основе этого архетипа, вы автоматически получаете ряд статей, с которыми можно поэкспериментировать. Несколько из них определены в заводной. IntelliJ автоматически пытается скомпилировать их, но так как он не может найти отличный компилятор, процесс компиляции / создания завершается неудачно. В этом примере мы сначала сконцентрируемся на Java-части файла vert.x, поэтому просто удалите файлы .groovy из каталога ‘src / main / resources’ и каталога ‘test / resources / интеграции_tests / groovy’.

Теперь мы можем запустить vert.x с предоставленными обработчиками напрямую через maven, установив модуль с помощью maven и затем вызвав цель ‘vertx: runModIDEA’. Обратите внимание, что вам нужно сначала вызвать mvn: compile, чтобы увидеть ваши изменения. Если вы не хотите использовать maven для запуска вашего проекта из IDE, вы также можете использовать другой подход, где вы используете класс org.vertx.java.platform.impl.cli.Starter для запуска vert.x прямо из IDE. В IntelliJ вы создаете следующую конфигурацию запуска для этого:

Run_Debug Configurations-2

Если вы запустите это, вы все равно увидите ошибку. Что-то вроде этого:

01
02
03
04
05
06
07
08
09
10
11
Exception in thread "main" java.lang.ClassNotFoundException: org.vertx.java.platform.impl.cli.Starter
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:190)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:113)

Причина, по которой вы это видите, заключается в том, что в файле pom.xml, созданном архетипом vert.x, библиотеки vert.x указаны как «предоставленные». В качестве быстрого решения откройте pom.xml и измените область действия трех зависимостей io.vertx с «предоставлено» на «компилировать». Теперь, когда вы запускаете этот модуль запуска из IntelliJ, vert.x будет запускаться правильно.

01
02
03
04
05
06
07
08
09
10
/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/bin/java -Didea.launcher.port=7543 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA 12.app/bin" -Dfile.encoding=UTF-8 -classpath "..." com.intellij.rt.execution.application.AppMain org.vertx.java.platform.impl.cli.Starter runmod org.smartjava~vertx-demo-1~1.0-SNAPSHOT
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Nov 24, 2013 3:43:26 PM org.vertx.java.core.logging.impl.JULLogDelegate info
INFO: Module org.smartjava~vertx-demo-1~1.0-SNAPSHOT successfully installed
Nov 24, 2013 3:43:26 PM org.vertx.java.core.logging.impl.JULLogDelegate info
INFO: PingVerticle started
Nov 24, 2013 3:43:26 PM org.vertx.java.core.logging.impl.JULLogDelegate info
INFO: Succeeded in deploying module

Теперь, когда у нас есть настройка проекта в IntelliJ, и мы можем легко запустить его непосредственно из IDE (и перезапустить его с помощью ctrl-F5), давайте приступим к созданию простого HTTP-сервера, чтобы мы могли увидеть некоторые выходные данные из браузера для проведения тестирования. Проще (обратите внимание, что есть гораздо лучшие способы тестирования vert.x и verstitutions, чем я здесь показываю, но это кое-что для другой статьи). Откройте файл PingVerticle.java и замените метод запуска следующим кодом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
package org.smartjava;
 
import org.vertx.java.core.Handler;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.platform.Verticle;
 
public class PingVerticle extends Verticle {
 
  public void start() {
 
    vertx.createHttpServer().requestHandler(new Handler<HttpServerRequest>() {
        @Override
        public void handle(HttpServerRequest httpServerRequest) {
            httpServerRequest.response().end("Hello smartjava");
        }
    }).listen(8888);
 
    container.logger().info("Webserver started, listening on port: 8888");
 
  }
}

Запустите это и откройте браузер для localhost: 8888, и вы увидите следующее.

localhost_8888

Это веб-сервер, который вы создали в vert.x и запускали прямо из вашей IDE. Кусок торта до сих пор. Теперь давайте поэкспериментируем с некоторыми данными.

Загрузка данных из mongoDB с помощью модуля постоянных данных vert.x mongoDB

Я не буду углубляться в то, как вы устанавливаете mongoDB, в Интернете достаточно статей, объясняющих это. Если вы работаете на Mac и у вас установлены macports, вы можете просто использовать следующую командную строку для установки mongoDB:

1
sudo port install mongodb

В оставшейся части этой статьи я предполагаю, что у вас установлен mongoDB, а утилиты командной строки доступны из консоли. Первое, что нам нужно сделать, это получить данные, с которыми можно поиграться. Для этого примера мы будем использовать список почтовых индексов, которые вы можете скачать с сайта mongoDB: http://media.mongodb.org/zips.json . Загрузите этот файл, откройте консоль и выполните следующую команду, чтобы сначала запустить mongoDB, а затем импортировать этот список zip-файлов в mongoDB.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
jos@Joss-MacBook-Pro.local:~/Dev/playground/vertx-demo-1$ mkdir data
jos@Joss-MacBook-Pro.local:~/Dev/playground/vertx-demo-1$ mongod --dbpath ./data/
Sun Nov 24 16:23:51.765 [initandlisten] MongoDB starting : pid=77755 port=27017 dbpath=./data/ 64-bit host=Joss-MacBook-Pro.local
Sun Nov 24 16:23:51.765 [initandlisten] db version v2.4.5
Sun Nov 24 16:23:51.765 [initandlisten] git version: nogitversion
Sun Nov 24 16:23:51.765 [initandlisten] build info: Darwin Joss-MacBook-Pro.local 12.4.0 Darwin Kernel Version 12.4.0: Wed May  1 17:57:12 PDT 2013; root:xnu-2050.24.15~1/RELEASE_X86_64 x86_64 BOOST_LIB_VERSION=1_54
Sun Nov 24 16:23:51.765 [initandlisten] allocator: tcmalloc
Sun Nov 24 16:23:51.765 [initandlisten] options: { dbpath: "./data/" }
Sun Nov 24 16:23:51.766 [initandlisten] journal dir=./data/journal
Sun Nov 24 16:23:51.766 [initandlisten] recover : no journal files present, no recovery needed
Sun Nov 24 16:23:51.779 [FileAllocator] allocating new datafile ./data/local.ns, filling with zeroes...
Sun Nov 24 16:23:51.779 [FileAllocator] creating directory ./data/_tmp
Sun Nov 24 16:23:51.812 [FileAllocator] done allocating datafile ./data/local.ns, size: 16MB,  took 0.031 secs
Sun Nov 24 16:23:51.853 [FileAllocator] allocating new datafile ./data/local.0, filling with zeroes...
Sun Nov 24 16:23:52.254 [FileAllocator] done allocating datafile ./data/local.0, size: 64MB,  took 0.4 secs
Sun Nov 24 16:23:52.260 [initandlisten] command local.$cmd command: { create: "startup_log", size: 10485760, capped: true } ntoreturn:1 keyUpdates:0  reslen:37 480ms
Sun Nov 24 16:23:52.260 [initandlisten] waiting for connections on port 27017
Sun Nov 24 16:23:52.260 [websvr] admin web console waiting for connections on port 28017

Теперь мы можем использовать mongoImport для импорта загруженных почтовых индексов:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
jos@Joss-MacBook-Pro.local:~/Dev/playground/vertx-demo-1$ wget http://media.mongodb.org/zips.json
--2013-11-24 16:25:45--  http://media.mongodb.org/zips.json
Resolving media.mongodb.org... 54.230.131.14, 54.230.131.51, 54.230.128.129, ...
Connecting to media.mongodb.org|54.230.131.14|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2871006 (2.7M) [application/json]
Saving to: `zips.json'
 
100%[======================================>] 2,871,006   2.20M/s   in 1.2s   
 
2013-11-24 16:25:47 (2.20 MB/s) - `zips.json' saved [2871006/2871006]
 
jos@Joss-MacBook-Pro.local:~/Dev/playground/vertx-demo-1$ mongoimport --db vertx --collection zips --file ./zips.json
connected to: 127.0.0.1
Sun Nov 24 16:26:28.337 check 9 29470
Sun Nov 24 16:26:28.458 imported 29470 objects
jos@Joss-MacBook-Pro.local:~/Dev/playground/vertx-demo-1$

Если вы установили плагин mongoDB в IntelliJ, вы можете легко проверить, работает ли он:

PingVerticle.java - [vertx-demo-1] - vertx-demo-1 - [~ _Dev_playground_vertx-demo-1]

На данный момент нам нужно только вызвать экземпляр mongoDB из vert.x и загрузить данные. Для этого мы будем использовать библиотеку mongodb persistor. Во-первых, нам нужно добавить этот модуль в сборку maven (обратите внимание, что это в основном для случаев, когда мы хотим отладить внутренне, vert.x разрешает сам этот модуль):

1
2
3
4
5
6
<dependency>
          <groupId>io.vertx</groupId>
          <artifactId>mod-mongo-persistor</artifactId>
          <version>2.1.0-SNAPSHOT</version>
          <scope>compile</scope>
      </dependency>

Vert.x имеет очень хорошую и интересную модульную систему (также для другой статьи), чтобы иметь возможность использовать этот моно-персистор, нам сначала нужно развернуть его как модуль. Это на самом деле довольно легко сделать:

1
2
3
4
5
// load the general config object, loaded by using -config on command line
JsonObject appConfig = container.config();
 
// deploy the mongo-persistor module, which we'll use for persistence
container.deployModule("io.vertx~mod-mongo-persistor~2.1.0-SNAPSHOT", appConfig.getObject("mongo-persistor"));

Здесь мы загружаем конфигурацию для этого модуля, а затем вызываем deployModule с именем модуля и соответствующей частью конфигурации. Прежде всего, давайте посмотрим на конфигурацию, которую мы используем для этого:

1
2
3
4
5
6
7
8
9
{
    "mongo-persistor" : {
        "address": "mongodb-persistor",
        "host": "localhost",
        "port": 27017,
        "pool_size": 10,
        "db_name": "vertx"
    }
}

Ничего сложного. Мы просто указываем блок монго-персистера на наш экземпляр mongoDB. Единственный вопрос, который у вас может возникнуть, как получить этот файл внутри vert.x. Для этого нам просто нужно внести небольшое изменение в нашу программу запуска и изменить аргументы программы:

1
runmod org.smartjava~vertx-demo-1~1.0-SNAPSHOT

к этому:

1
runmod org.smartjava~vertx-demo-1~1.0-SNAPSHOT -conf src/main/resources/config.json

Файл config.json, содержит конфигурацию, которую мы только что показали. Таким образом, с этой настройкой у нас есть библиотека mongodb-persistor, прослушивающая адрес события по шине «mongodb-persistor». Теперь все, что осталось сделать, это отправить сообщения этой конечной точке в формате, понятном этому модулю. Для этого первого шага мы просто будем искать все почтовые индексы в состоянии «AL». Если вы посмотрите документацию https://github.com/vert-x/mod-mongo-persistor/, вы увидите, что мы сказали этому модулю «коллекцию», которую мы хотим найти, и тип «действия». мы хотим использовать. В зависимости от действия требуется дополнительная настройка. Для поиска всех почтовых индексов в состоянии ‘AL’ нам нужно создать следующее сообщение json:

1
2
3
4
5
6
7
{
    "action": "find",
    "collection": "zips",
    "matcher": {
        "state": "AL"
    }
}

Давайте изменим обработчик запроса и посмотрим на метод полного запуска:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void start() {
 
        // load the general config object, loaded by using -config on command line
        JsonObject appConfig = container.config();
 
        // deploy the mongo-persistor module, which we'll use for persistence
        container.deployModule("io.vertx~mod-mongo-persistor~2.1.0-SNAPSHOT", appConfig.getObject("mongo-persistor"));
 
        // create and run the server
        vertx.createHttpServer().requestHandler(new Handler<HttpServerRequest>() {
            @Override
            public void handle(final HttpServerRequest httpServerRequest) {
 
                // we send the response from the mongo query back to the client.
                // first create the query
                JsonObject matcher = new JsonObject().putString("state", "AL");
                JsonObject json = new JsonObject().putString("collection", "zips")
                        .putString("action", "find")
                        .putObject("matcher", matcher);
 
                // send it over the bus
                vertx.eventBus().send("mongodb-persistor", json, new Handler<Message<JsonObject>>() {
 
                    @Override
                    public void handle(Message<JsonObject> message) {
                        // send the response back, encoded as string
                        httpServerRequest.response().end(message.body().encodePrettily());
                    }
                });
            }
        }).listen(8888);
 
        // output that the server is started
        container.logger().info("Webserver started, listening on port: 8888");
    }

Здесь вы можете видеть, что мы создаем правильное сообщение json, отправляем его по шине и ждем отправки ответа, пока не получим ответ от mongoDB. Мы предварительно подтверждаем этот ответ и отправляем его обратно клиенту:

localhost_8888-1

Выставить почтовые индексы через интерфейс REST

Теперь, когда у нас есть базовые бэкэнд-компоненты, давайте посмотрим, что нужно для создания простого REST-интерфейса. Мы пропустим специальную фильтрацию медиатипа (я добавлю это в более позднюю статью), а пока мы просто рассмотрим HTTP-глаголы и URL-адреса. Для этой части мы хотим поддержать следующие вызовы REST:

1
2
3
4
5
6
7
8
9
* GET /zips
 Show all the zipcode information that are stored in mongoDB
* GET /zips/:id
 Show the information belonging to the specified zip code
* GET /zips?state=:state&city=:city
 Simple search service, where you can search for zip codes per city or state
 
* POST /zips/:id
 Update existing zip code information

Очень просто, но главная цель здесь — показать, как это делается, а не как вы можете создать полноценный сервис RESTful. Для обработки этих различных URL-адресов и глаголов vert.x предоставляет средство сопоставления маршрутов: (тела методов оставлены для ясности)

01
02
03
04
05
06
07
08
09
10
11
12
RouteMatcher matcher = new RouteMatcher();
 
        // the matcher for the complete list and the search
        matcher.get("/zips", new Handler<HttpServerRequest>() {...}
 
        // the matcher for a specific id
        matcher.get("/zips/:id", new Handler<HttpServerRequest>() {...}
 
        // the matcher for the update
        matcher.post("/zips/:id", new Handler<HttpServerRequest>() {...}
 
        vertx.createHttpServer().requestHandler(matcher).listen(8888);

Для тех из вас, кто работал с такими библиотеками, как sinatra или scalatra, это будет выглядеть знакомо. Мы определяем метод, который мы хотим обработать (в этом случае получаем и публикуем), URL, который нас интересует, и обработчик, который будет вызываться при получении запроса. Как вы можете видеть в последней строке, мы передаем этот обработчик для обработки запросов на сервер, который мы создали.

Теперь давайте кратко рассмотрим реализацию этих обработчиков. Здесь мы создаем сообщения монго-персистора, которые связываются с нами с помощью mongoDB. Я не буду вдаваться в подробности этих методов, поскольку они в значительной степени говорят сами за себя:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// the matcher for the complete list and the search
        matcher.get("/zips", new Handler<HttpServerRequest>() {
            public void handle(final HttpServerRequest req) {
 
                JsonObject json = new JsonObject();
                MultiMap params = req.params();
 
                if (params.size() > 0 && params.contains("state") || params.contains("city")) {
                    // create the matcher configuration
                    JsonObject matcher = new JsonObject();
                    if (params.contains("state")) matcher.putString("state", params.get("state"));
                    if (params.contains("city")) matcher.putString("city", params.get("city"));
 
                    // create the message for the mongo-persistor verticle
                    json = new JsonObject().putString("collection", "zips")
                            .putString("action", "find")
                            .putObject("matcher", matcher);
 
                } else {
                    // create the query
                    json = new JsonObject().putString("collection", "zips")
                            .putString("action", "find")
                            .putObject("matcher", new JsonObject());
                }
 
                JsonObject data = new JsonObject();
                data.putArray("results", new JsonArray());
                // and call the event we want to use
                vertx.eventBus().send("mongodb-persistor", json, new ReplyHandler(req, data));
            }
        });

В этом методе мы получаем все почтовые индексы из mongoDB. поскольку монго-персистор не возвращает все, что нам нужно, чтобы повторить ответ. Мы делаем это, используя следующий ReplyHandler:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static class ReplyHandler implements Handler<Message<JsonObject>> {
 
        private final HttpServerRequest request;
        private JsonObject data;
 
        private ReplyHandler(final HttpServerRequest request, JsonObject data) {
            this.request = request;
            this.data = data;
        }
 
        @Override
        public void handle(Message<JsonObject> event) {
            // if the response contains more message, we need to get the rest
            if (event.body().getString("status").equals("more-exist")) {
                JsonArray results = event.body().getArray("results");
 
                for (Object el : results) {
                    data.getArray("results").add(el);
                }
 
                event.reply(new JsonObject(), new ReplyHandler(request, data));
            } else {
 
                JsonArray results = event.body().getArray("results");
                for (Object el : results) {
                    data.getArray("results").add(el);
                }
 
                request.response().putHeader("Content-Type", "application/json");
                request.response().end(data.encodePrettily());
            }
        }
    }

В этом replyHandler мы просто просматриваем результаты и продолжаем просить больше контента, пока не перестанем видеть статус «больше существовать». Я пропущу обработчик, где мы просто получим один почтовый индекс, поскольку это не так интересно. Следующий обработчик обрабатывает функцию post, с помощью которой мы обновляем существующий элемент.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
matcher.post("/zips/:id", new Handler<HttpServerRequest>() {
            public void handle(final HttpServerRequest req) {
 
                // process the body
                req.bodyHandler(new Handler<Buffer>() {
 
                    @Override
                    public void handle(Buffer event) {
                        // normally we'd validate the input, for now just assume it is correct.
                        final String body = event.getString(0,event.length());
 
                        // create the query
                        JsonObject newObject = new JsonObject(body);
                        JsonObject matcher = new JsonObject().putString("_id", req.params().get("id"));
                        JsonObject json = new JsonObject().putString("collection", "zips")
                                .putString("action", "update")
                                .putObject("criteria", matcher)
                                .putBoolean("upsert", false)
                                .putBoolean("multi",false)
                                .putObject("objNew",newObject);
 
                        // and call the event we want to use
                        vertx.eventBus().send("mongodb-persistor", json, new Handler<Message<JsonObject>>() {
                            @Override
                            public void handle(Message<JsonObject> event) {
                                // we could handle the errors here, but for now
                                // assume everything went ok, and return the original
                                // and updated json
                                req.response().end(body);
                            }
                        });
                    }
                });
            }
        });

Сам по себе код не так уж и сложен. Сначала мы используем обработчик для обработки запроса, из этого обработчика мы создаем новый обработчик, который используется для получения тела запроса, и, наконец, создается обработчик, который обновляет базу данных и отправляет ответ обратно. Несмотря на то, что он не сложный, он становится немного громоздким и трудным для чтения, когда задействовано все больше и больше обработчиков. Итак, в последнем разделе этой статьи мы рассмотрим, как вы можете заменить вложенные обработчики, используя предоставляемую rxjava функциональность.

Замените обратные вызовы наблюдателями RxJava

Для кода rxjava мы просто добавим пару дополнительных обработчиков, которые соответствуют другому URL. Таким образом, вместо / zips / 90210 URL будет / rxzips / 90210. Для начала добавьте следующую зависимость в вашу конфигурацию maven:

1
2
3
4
5
6
<dependency>
          <groupId>io.vertx</groupId>
          <artifactId>mod-rxjava</artifactId>
          <version>1.0.0-beta2-SNAPSHOT</version>
          <scope>compile</scope>
      </dependency>

Прежде чем углубиться в то, как можно использовать rxjava и vert.x, приведем краткую цитату (с сайта rxjava), которая объясняет, почему это полезно

Java Futures просты в использовании для одного уровня асинхронного выполнения, но они начинают добавлять нетривиальную сложность, когда они вложены.

Сложно использовать Futures для оптимального составления условных асинхронных потоков выполнения (или невозможно, поскольку задержки каждого запроса меняются во время выполнения). Конечно, это можно сделать, но это быстро усложняется (и поэтому подвержено ошибкам) ​​или преждевременно блокирует Future.get (), что исключает преимущество асинхронного выполнения.

Наблюдаемые RxJava, с другой стороны, предназначены для составления потоков и последовательностей асинхронных данных.

С помощью RxJava Observables легко составлять потоки и последовательности асинхронных данных. Если вы посмотрите последние пару примеров кода из нашего примера, вы поймете, почему это было бы полезно. В последнем примере «post» у нас было три вложенных обратных вызова, с помощью наблюдаемых гораздо проще составить это и позволить коду фактически рассказать, что происходит.

Теперь давайте расширим метод post, чтобы сделать следующее:

  1. сначала получить тело
  2. после того, как у нас есть тело, мы обновляем элемент в базе данных
  3. затем мы получаем последнюю версию из базы данных, после успешного обновления
  4. после получения последней версии мы возвращаем это в ответе.

Если бы мы делали это с помощью обратных вызовов, нам, вероятно, потребовалось бы четыре вложенных уровня обратных вызовов. В rxjava мы можем сделать это следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
matcher.post("/rxzips/:id", new Handler<HttpServerRequest>() {
            public void handle(final HttpServerRequest req) {
                // first access the buffer as an observable. We do this this way, since
                // we want to keep using the matchhandler and we can't do that with rxHttpServer
                Observable<Buffer> reqDataObservable = RxSupport.toObservable(req);
 
                // after we have the body, we update the element in the database
                Observable<RxMessage<JsonObject>> updateObservable = reqDataObservable.flatMap(new Func1<Buffer, Observable<RxMessage<JsonObject>>>() {
                    @Override
                    public Observable<RxMessage<JsonObject>> call(Buffer buffer) {
                        System.out.println("buffer = " + buffer);
                        // create the message
                        JsonObject newObject = new JsonObject(buffer.getString(0, buffer.length()));
                        JsonObject matcher = new JsonObject().putString("_id", req.params().get("id"));
                        JsonObject json = new JsonObject().putString("collection", "zips")
                                .putString("action", "update")
                                .putObject("criteria", matcher)
                                .putBoolean("upsert", false)
                                .putBoolean("multi", false)
                                .putObject("objNew", newObject);
 
                        // and return an observable
                        return rxEventBus.send("mongodb-persistor", json);
                    }
                });
 
                // use the previous input again, so we could see whether the update was successful.
                Observable<RxMessage<JsonObject>> getLatestObservable = updateObservable.flatMap(new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() {
                    @Override
                    public Observable<RxMessage<JsonObject>> call(RxMessage<JsonObject> jsonObjectRxMessage) {
                        System.out.println("jsonObjectRxMessage = " + jsonObjectRxMessage);
                        // next we get the latest version from the database, after the update has succeeded
                        // this isn't dependent on the previous one. It just has to wait till the previous
                        // one has updated the database, but we could check whether the previous one was successfully
                        JsonObject matcher = new JsonObject().putString("_id", req.params().get("id"));
                        JsonObject json2 = new JsonObject().putString("collection", "zips")
                                .putString("action", "find")
                                .putObject("matcher", matcher);
                        return rxEventBus.send("mongodb-persistor", json2);
                    }
                });
 
                // after we've got the latest version we return this in the response.
                getLatestObservable.subscribe(new Action1<RxMessage<JsonObject>>() {
                    @Override
                    public void call(RxMessage<JsonObject> jsonObjectRxMessage) {
                        req.response().end(jsonObjectRxMessage.body().encodePrettily());
                    }
                });
            }
        });

Все еще большой кусок кода, но большинство из них являются комментариями и вызваны тем фактом, что Java не поддерживает замыкания (пока). Так что здесь происходит?

  1. Сначала мы создаем наблюдателя из запроса
    1
    reqDataObservable = RxSupport.toObservable(req)

    Это означает, что мы хотим получать информацию, когда данные доступны в буфере нашего запроса.

  2. Поскольку мы хотим что-то сделать с этими данными, мы используем
    1
    reqDataObservable.flatMap

    функция. Это позволяет нам указать, что происходит, когда некоторые данные становятся доступными для ранее созданной наблюдаемой. Поэтому вместо вложенных обратных вызовов мы просто указываем поток данных через различные асинхронные вызовы. Когда данные получены, мы используем это для обновления базы данных. Обратите внимание, что мы используем

    1
    rxEventBus.send

    метод. Это еще не делает вызов, но снова возвращает наблюдаемое.

  3. В качестве третьего шага мы используем вывод предыдущего, чтобы (возможно) определить, было ли обновление успешным. Затем мы получаем последнюю версию из базы данных. Это еще раз сделано с использованием наблюдаемых.
  4. Пока у нас нет подписчика, на самом деле ничего не происходит. Поскольку мы заинтересованы в результате последнего асинхронного действия, мы используем
    1
    getLatestObservable.subscribe

    функция и ожидание (это все еще не блокирует) для результата от окончательного чтения базы данных. Как только это получено, мы отправляем ответ обратно на основе полученного сообщения.

В этом примере мы показали только пару очень маленьких частей rxjava:

  • Мы использовали наблюдаемые, чтобы упростить работу и последовательность асинхронных действий.
  • Мы используем функцию flatmap для передачи результата от одного асинхронного вызова в другой, и таким образом исключаем вложенные обратные вызовы
  • Мы использовали расширения rxSupport и rxEventbus rxJava vert.x для простого создания наблюдаемых rxJava
  • И мы начали полную последовательность, подписавшись на последнюю наблюдаемую в цепочке

В следующей статье мы немного углубимся в rxJava, как вы также можете организовать более сложные асинхронные потоки.

Справка: создайте простой сервис RESTful с vert.x 2.0, RxJava и mongoDB от нашего партнера по JCG Йоса Дирксена в блоге Smart Java .