Статьи

RxJava + Java8 + Java EE 7 + Arquillian = Блаженство

Пожарный-реактивно-management_DigitalStorm_226x150

Микросервисы — это архитектурный стиль, в котором каждый сервис реализован как независимая система. Они могут использовать свою собственную систему персистентности (хотя это не обязательно), развертывание, язык,…

Поскольку система состоит из более чем одной службы, каждая служба будет взаимодействовать с другими службами, обычно используя легкий протокол, такой как HTTP, и следуя подходу Restful Web . Вы можете прочитать больше о микросервисах здесь: http://martinfowler.com/articles/microservices.html

Давайте посмотрим на действительно простой пример. Предположим, у нас есть магазин бронирования, где пользователи могут перемещаться по каталогу, и когда они находят книгу, в которой они хотят видеть больше информации, они нажимают на isbn, а затем открывается новый экран с подробной информацией о книге и комментариями о ней. написано читателями.

Эта система может состоять из двух служб:

  • Один сервис, чтобы получить детали книги. Их можно извлечь из любой устаревшей системы, например, СУБД.
  • Один сервис для получения всех комментариев, записанных в книге, и в этом случае эта информация может храниться в базе данных базы документов.

Проблема здесь заключается в том, что для каждого запроса пользователя нам нужно открыть два соединения, по одному для каждой службы. Конечно, нам нужен способ выполнять эту работу параллельно, чтобы повысить производительность. И здесь кроется одна проблема, как мы можем справиться с этими асинхронными запросами? Первая идея — использовать класс Future . Для двух служб это может быть хорошо, но если вам требуется четыре или пять служб, код будет становиться все более и более сложным, или, например, вам может потребоваться получить данные из одной службы и использовать их в других службах или адаптировать результат одной службы для ввод другого. Таким образом, существует стоимость управления потоками и синхронизации.

Было бы здорово иметь какой-то способ решить эту проблему простым и понятным способом. И это именно то, что делает RxJava . RxJava — это Java-реализация Reactive Extensions: библиотека для составления асинхронных программ и программ, основанных на событиях, с использованием наблюдаемых последовательностей.

При использовании RxJava вместо извлечения данных из структуры данные передаются в нее, которая реагирует на событие, которое прослушивается подписчиком, и действует соответствующим образом. Вы можете найти больше информации в https://github.com/Netflix/RxJava .

Поэтому в данном случае мы собираемся реализовать пример, описанный здесь с использованием RxJava , Java EE 7 , Java 8 и Arquillian для тестирования.

В этом посте предполагается, что вы знаете, как писать службы Rest, используя спецификацию Java EE .

Итак, начнем с двух сервисов:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
@Singleton
@Path("bookinfo")
public class BookInfoService {
 
    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_JSON)
    public JsonObject findBookByISBN(@PathParam("isbn") String isbn) {
 
        return Json.createObjectBuilder()
            .add("author", "George R.R. Martin")
            .add("isbn", "1111")
            .add("title", "A Game Of Thrones").build();
    }
 
}
01
02
03
04
05
06
07
08
09
10
11
12
13
14
@Singleton
@Path("comments")
public class CommentsService {
 
    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    public JsonArray bookComments(@PathParam("isbn") String isbn) {
 
        return Json.createArrayBuilder().add("Good Book").add("Awesome").build();
 
    }
 
}
1
2
3
@ApplicationPath("rest")
public class ApplicationResource extends Application {
}

И, наконец, пришло время создать третью службу фасадов, которая получает сообщение от клиента, параллельно отправляет обеим службам запрос и, наконец, упаковывает оба ответа. zip — это процесс объединения наборов элементов, которые выводятся вместе через указанную функцию и отправляются обратно клиенту (не путать со сжатием!).

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Singleton
@Path("book")
public class BookService {
 
    private static final String BOOKSERVICE = "http://localhost:8080/bookservice";
    private static final String COMMENTSERVICE = "http://localhost:8080/bookcomments";
 
    @Resource(name = "DefaultManagedExecutorService")
    ManagedExecutorService executor;
 
    Client bookServiceClient;
    WebTarget bookServiceTarget;
 
    Client commentServiceClient;
    WebTarget commentServiceTarget;
 
    @PostConstruct
    void initializeRestClients() {
 
        bookServiceClient = ClientBuilder.newClient();
        bookServiceTarget = bookServiceClient.target(BOOKSERVICE + "/rest/bookinfo");
 
        commentServiceClient = ClientBuilder.newClient();
        commentServiceTarget = commentServiceClient.target(COMMENTSERVICE + "/rest/comments");
 
    }
 
    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {
    //RxJava code shown below
    }
}

В основном мы создаем новый сервис. В этом случае URL-адреса обеих служб, которые мы собираемся подключить, жестко закодированы. Это делается для академических целей, но в производственном коде вы будете внедрять его из класса производителя, файла свойств или любой системы, которую вы будете использовать для этой цели. Затем мы создаем javax.ws.rs.client.WebTarget для использования веб-службы Restful .

После этого нам нужно реализовать метод bookAndComment, используя RxJava API .

Основным классом, используемым в RxJava, является rx.Observabl e. Этот класс является наблюдаемым, как следует из его названия, и он отвечает за запуск событий для толкания объектов. По умолчанию события являются синхронными, и разработчик должен сделать их асинхронными.

Итак, нам нужен один асинхронный наблюдаемый экземпляр для каждого сервиса:

01
02
03
04
05
06
07
08
09
10
11
12
public Observable<JsonObject> getBookInfo(final String isbn) {
        return Observable.create((Observable.OnSubscribe<JsonObject>) subscriber -> {
 
            Runnable r = () -> {
                subscriber.onNext(bookServiceTarget.path(isbn).request().get(JsonObject.class));
                subscriber.onCompleted();
            };
 
            executor.execute(r);
 
        });
}

По сути, мы создаем Observable, который будет выполнять указанную функцию, когда подписчик подписывается на нее. Функция создается с использованием лямбда-выражения, чтобы избежать создания вложенных внутренних классов. В этом случае мы возвращаем JsonObject в результате вызова сервиса bookinfo . Результат передается в метод onNext, чтобы подписчики могли получить результат. Поскольку мы хотим выполнить эту логику асинхронно, код помещается в блок Runnable .

Также необходимо вызывать метод onCompleted, когда вся логика выполнена.

Обратите внимание, что поскольку мы хотим сделать наблюдаемую асинхронной помимо создания Runnable , мы используем Executor для запуска логики в отдельном потоке. Одним из замечательных дополнений в Java EE 7 является управляемый способ создания потоков внутри контейнера. В этом случае мы используем ManagedExecutorService, предоставляемый контейнером, для асинхронного выполнения задачи в другом потоке текущего потока.

01
02
03
04
05
06
07
08
09
10
11
12
public Observable<JsonArray> getComments(final String isbn) {
        return Observable.create((Observable.OnSubscribe<JsonArray>) subscriber -> {
 
            Runnable r = () -> {
                subscriber.onNext(commentServiceTarget.path(isbn).request().get(JsonArray.class));
                subscriber.onCompleted();
            };
 
            executor.execute(r);
 
        });
}

Аналогично предыдущему, но вместо получения информации о книге мы получаем массив комментариев.

Затем нам нужно создать наблюдаемую функцию, отвечающую за сжатие обоих ответов, когда они оба доступны. И это делается с помощью метода zip в классе Observable, который получает два Observables и применяет функцию для объединения результатов обоих. В этом случае лямбда-выражение, которое создает новый объект json, добавляя оба ответа.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@GET
@Path("{isbn}")
@Produces(MediaType.APPLICATION_JSON)
public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {
    //Calling previous defined functions
    Observable<JsonObject> bookInfo = getBookInfo(isbn);
    Observable<JsonArray> comments = getComments(isbn);
 
    Observable.zip(bookInfo, comments, (JsonObject book, JsonArray bookcomments) ->
                    Json.createObjectBuilder().add("book", book).add("comments", bookcomments).build()
                  )
                  .subscribe(new Subscriber<JsonObject>() {
                        @Override
                        public void onCompleted() {
                        }
                 
                        @Override
                        public void onError(Throwable e) {
                            asyncResponse.resume(e);
                        }
 
                        @Override
                        public void onNext(JsonObject jsonObject) {
                            asyncResponse.resume(jsonObject);
                        }
                    });
}

Давайте посмотрим на предыдущий сервис. Мы используем одно из новых дополнений в Java EE — асинхронные конечные точки REST Jax-Rs 2.0 с использованием аннотации @Suspended . В основном то, что мы делаем, — это освобождаем ресурсы сервера и генерируем ответ, когда он доступен, используя метод возобновления.

И наконец тест. Мы используем Wildfly 8.1 в качестве сервера Java EE 7 и Arquillian . Поскольку каждая служба может быть развернута на другом сервере, мы собираемся развернуть каждую службу в другой войне, но внутри одного и того же сервера.

Так что в этом случае мы собираемся развернуть три военных файла, что совершенно легко сделать в Arquillian .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@RunWith(Arquillian.class)
public class BookTest {
 
    @Deployment(testable = false, name = "bookservice")
    public static WebArchive createDeploymentBookInfoService() {
        return ShrinkWrap.create(WebArchive.class, "bookservice.war").addClasses(BookInfoService.class, ApplicationResource.class);
    }
 
    @Deployment(testable = false, name = "bookcomments")
    public static WebArchive createDeploymentCommentsService() {
        return ShrinkWrap.create(WebArchive.class, "bookcomments.war").addClasses(CommentsService.class, ApplicationResource.class);
    }
 
    @Deployment(testable = false, name = "book")
    public static WebArchive createDeploymentBookService() {
        WebArchive webArchive = ShrinkWrap.create(WebArchive.class, "book.war").addClasses(BookService.class, ApplicationResource.class)
                .addAsLibraries(Maven.resolver().loadPomFromFile("pom.xml").resolve("com.netflix.rxjava:rxjava-core").withTransitivity().as(JavaArchive.class));
        return webArchive;
    }
 
    @ArquillianResource
    URL base;
 
    @Test
    @OperateOnDeployment("book")
    public void should_return_book() throws MalformedURLException {
 
        Client client = ClientBuilder.newClient();
        JsonObject book = client.target(URI.create(new URL(base, "rest/").toExternalForm())).path("book/1111").request().get(JsonObject.class);
 
        //assertions
    }
}

В этом случае клиент запросит всю информацию из книги. В серверной части метод zip будет ожидать параллельного получения книги и комментариев, а затем объединит оба ответа в один объект и отправит их обратно клиенту.

Это очень простой пример RxJava . На самом деле в этом случае мы видели только, как использовать zip- метод, но RxJava предоставляет гораздо больше таких полезных методов, как take () , map () , merge () ,… ( https: // github .com / Netflix / RxJava / wiki / Алфавитный список наблюдаемых операторов )

Более того, в этом примере мы видели только пример подключения к двум службам и параллельного извлечения информации, и вы можете задаться вопросом, почему бы не использовать класс Future . В этом примере совершенно нормально использовать Future и Callbacks, но, вероятно, в вашей реальной жизни ваша логика не будет такой простой, как архивирование двух сервисов. Может быть, у вас будет больше услуг, может быть, вам нужно будет получить информацию от одной службы, а затем для каждого результата открыть новое соединение. Как вы можете видеть, вы можете начать с двух экземпляров Future, но закончить с кучей методов Future.get () , тайм-ауты … Так что именно в этих ситуациях RxJava действительно упрощает разработку приложения.

Кроме того, мы увидели, как использовать некоторые из новых дополнений Java EE 7, например, как разрабатывать асинхронный сервис Restful с Jax-R .

В этом посте мы узнали, как справиться с взаимосвязью между сервисами и как сделать их масштабируемыми и потреблять меньше ресурсов. Но мы не говорили о том, что происходит, когда один из этих сервисов выходит из строя. Что происходит с звонящими? У нас есть способ управлять этим? Есть ли способ не потратить ресурсы, когда один из сервисов недоступен? Мы коснемся этого в следующем посте, посвященном отказоустойчивости.

Мы продолжаем учиться,

Алекс.


Бон диа, бон диа! Bon dia al dematí! Fem fora la mandra I saltem corrents del llit. (Бон Диа! — Дамарис Гелаберт)

Ссылка: RxJava + Java8 + Java EE 7 + Arquillian = Bliss от нашего партнера JCG Алекса Сото в блоге One Jar to Rule All All .