Статьи

Реактивная разработка с использованием Vert.x

В последнее время кажется, что мы слышим о новейших и лучших фреймворках для Java. Инструменты, такие как Ninja , SparkJava и Play ; но каждый из них самоуверен и заставляет вас чувствовать, что вам нужно изменить дизайн всего приложения, чтобы использовать их замечательные функции. Вот почему я так обрадовался, когда обнаружил Vert.x. Vert.x — это не фреймворк, это инструментарий, и он неубедителен, и он раскрепощает. Vert.x не хочет, чтобы вы перепроектировали все приложение, чтобы использовать его, он просто хочет сделать вашу жизнь проще. Можете ли вы написать все ваше приложение в Vert.x? Конечно! Можете ли вы добавить возможности Vert.x в ваши существующие приложения Spring / Guice / CDI? Ага! Можете ли вы использовать Vert.x внутри существующих приложений JavaEE? Абсолютно! И это то, что делает это удивительным.

Фон

Vert.x родился, когда Тим Фокс решил, что ему нравится многое из того, что разрабатывается в экосистеме NodeJS, но ему не нравятся некоторые компромиссы работы в V8: однопоточность, ограниченная поддержка библиотек и Сам JavaScript Тим решил написать инструментарий, в котором не было ни единого мнения о том, как и где он используется, и он решил, что лучшее место для его реализации — это JVM. Итак, Тим и сообщество решили создать управляемый событиями, неблокирующий, реактивный инструментарий, который во многом отражал то, что можно сделать в NodeJS, но также использовал преимущества, доступные внутри JVM. Node.x родился и позже стал Vert.x.

обзор

Vert.x разработан для реализации шины событий, которая позволяет различным частям приложения взаимодействовать неблокирующим / потокобезопасным способом. Части этого были смоделированы после методологии Актера, выставленной Eralng и Akka. Он также предназначен для того, чтобы в полной мере использовать преимущества современных многоядерных процессоров и требования одновременного программирования. Таким образом, по умолчанию все Vert.x VERTICLES по умолчанию реализованы как однопоточные. В отличие от NodeJS, Vert.x может выполнять МНОГО вершин в МНОГИХ потоках. Кроме того, вы можете указать, что некоторые статьи являются рабочими и МОГУТ быть многопоточными. И чтобы действительно добавить некоторую изюминку в торт, Vert.x имеет низкоуровневую поддержку многоузловой кластеризации шины событий с помощью Hazelcast. Он включает в себя множество других удивительных функций, которых слишком много, чтобы перечислять здесь, но вы можете прочитать больше в официальных документах Vert.x.

Первое, что вам нужно знать о Vert.x, как и NodeJS, никогда не блокировать текущий поток. Все в Vert.x по умолчанию настроено на использование обратных вызовов / фьючерсов / обещаний. Вместо синхронных операций Vert.x предоставляет асинхронные методы для выполнения большинства операций ввода-вывода и интенсивной работы процессора, которые могут блокировать текущий поток. Теперь обратные вызовы могут быть уродливыми и болезненными для работы, поэтому Vert.x дополнительно предоставляет API на основе RxJava, который реализует те же функциональные возможности с использованием шаблона Observer . Наконец, Vert.x упрощает использование существующих классов и методов, предоставляя метод executeBlocking (Function f) во многих его асинхронных API. Это означает, что вы можете выбрать, как вы предпочитаете работать с Vert.x, вместо того, чтобы инструментарий подсказывал вам, как его использовать.

Второе, что нужно знать о Vert.x, это то, что он состоит из вершин, модулей и узлов. Verты являются наименьшей единицей логики в Vert.x и обычно представлены одним классом. Статьи должны быть простыми и однозначными в соответствии с философией UNIX . Группа статей может быть собрана в модуль, который обычно упакован в один файл JAR. Модуль представляет группу связанных функциональных возможностей, которые в совокупности могут представлять целое приложение или только часть более крупного распределенного приложения. Наконец, узлы — это отдельные экземпляры JVM, на которых запущен один или несколько модулей / статей. Поскольку Vert.x имеет встроенную кластеризацию с самого начала, приложения Vert.x могут охватывать узлы либо на одном компьютере, либо на нескольких компьютерах в нескольких географических точках (хотя задержка может снизить производительность).

Пример проекта

В последнее время я был на нескольких встречах и конференциях, где первое, что они показывают вам, когда говорят о реактивном программировании, — это создание приложения для чата. Это все хорошо, но на самом деле это не поможет вам полностью понять силу реактивного развития. Приложения чата просты и просты. Мы можем сделать лучше. В этом уроке мы собираемся взять унаследованное приложение Spring и преобразовать его, чтобы использовать Vert.x. Это имеет несколько целей: это показывает, что инструментарий легко интегрировать с существующими проектами Java, он позволяет нам использовать преимущества существующих инструментов, которые могут быть укоренившимися частями нашей экосистемы, а также позволяет нам следовать принципу СУХИ в том смысле, что мы не используем не нужно переписывать большие массивы кода, чтобы получить преимущества Vert.x.

Наше унаследованное приложение Spring представляет собой надуманный простой пример REST API с использованием Spring Boot, Spring Data JPA и Spring REST. Исходный код можно найти в «основной» ветке ЗДЕСЬ . Существуют и другие ветви, которые мы будем использовать для демонстрации прогресса по ходу работы, поэтому для любого, кто имеет небольшой опыт работы с git и Java 8, будет просто следовать. Давайте начнем с изучения класса Spring Configuration для стандартного приложения Spring.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@SpringBootApplication
@EnableJpaRepositories
@EnableTransactionManagement
@Slf4j
public class Application {
    public static void main(String[] args) {
        ApplicationContext ctx = SpringApplication.run(Application.class, args);
 
        System.out.println("Let's inspect the beans provided by Spring Boot:");
 
        String[] beanNames = ctx.getBeanDefinitionNames();
        Arrays.sort(beanNames);
        for (String beanName : beanNames) {
            System.out.println(beanName);
        }
    }
 
    @Bean
    public DataSource dataSource() {
        EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
        return builder.setType(EmbeddedDatabaseType.HSQL).build();
    }
 
    @Bean
    public EntityManagerFactory entityManagerFactory() {
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        vendorAdapter.setGenerateDdl(true);
 
        LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
        factory.setJpaVendorAdapter(vendorAdapter);
        factory.setPackagesToScan("com.zanclus.data.entities");
        factory.setDataSource(dataSource());
        factory.afterPropertiesSet();
 
        return factory.getObject();
    }
 
    @Bean
    public PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {
        final JpaTransactionManager txManager = new JpaTransactionManager();
        txManager.setEntityManagerFactory(emf);
        return txManager;
    }
}

Как вы можете видеть в верхней части класса, у нас есть довольно стандартные аннотации Spring Boot. Вы также увидите аннотацию @ Slf4j, являющуюся частью библиотеки lombok , которая предназначена для уменьшения количества кода. У нас также есть аннотированные методы @Bean для предоставления доступа к JPA EntityManager, TransactionManager и DataSource. Каждый из этих элементов предоставляет инъецируемые объекты для использования другими классами. Остальные классы в проекте аналогично упрощены. Существует POJO клиента, который является типом сущности, используемым в сервисе. Существует CustomerDAO, который создается через Spring Data. Наконец, существует класс CustomerEndpoints, который представляет собой аннотированный контроллер REST JAX-RS.

Как объяснялось ранее, это все стандартные тарифы в приложении Spring Boot. Проблема с этим приложением состоит в том, что по большей части оно имеет ограниченную масштабируемость. Вы могли бы запустить это приложение внутри контейнера сервлета или со встроенным сервером, таким как Jetty или Undertow. В любом случае каждый запрос связывает поток и, таким образом, тратит ресурсы, ожидая операций ввода-вывода.

Переключившись на ветку Convert-To-Vert.x-Web , мы видим, что класс Application немного изменился. Теперь у нас есть несколько новых аннотированных методов @Bean для внедрения самого экземпляра Vertx , а также экземпляр ObjectMapper (часть библиотеки JSON Джексона). Мы также заменили класс CustomerEnpoints новым CustomerVerticle . Практически все остальное тоже самое.

Класс CustomerVerticle аннотируется @Component, что означает, что Spring будет создавать экземпляр этого класса при запуске. Он также имеет метод запуска, аннотированный @PostConstruct, так что Verticle запускается при запуске. Глядя на фактическое содержание кода, мы видим наши первые биты кода Vert.x: Маршрутизатор .

Класс Router является частью веб-библиотеки vertx-web и позволяет нам использовать свободный API для определения HTTP-URL, методов и фильтров заголовков для обработки нашего запроса. Добавление экземпляра BodyHandler к маршруту по умолчанию позволяет обрабатывать тело POST / PUT и преобразовывать его в объект JSON, который Vert.x может затем обработать как часть RoutingContext. Порядок маршрутов в Vert.x МОЖЕТ быть значительным. Если вы определяете маршрут, который имеет какое-либо сопоставление глобуса (* или регулярное выражение), он может проглотить запросы на маршруты, определенные после него, если вы не реализуете цепочку . Наш пример показывает 3 маршрута изначально.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
@PostConstruct
    public void start() throws Exception {
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.get("/v1/customer/:id")
                .produces("application/json")
                .blockingHandler(this::getCustomerById);
        router.put("/v1/customer")
                .consumes("application/json")
                .produces("application/json")
                .blockingHandler(this::addCustomer);
        router.get("/v1/customer")
                .produces("application/json")
                .blockingHandler(this::getAllCustomers);
        vertx.createHttpServer().requestHandler(router::accept).listen(8080);
    }

Обратите внимание, что метод HTTP определен, заголовок «Accept» определен (с помощью потребления) и заголовок «Content-Type» определен (с помощью продукции). Мы также видим, что мы передаем обработку запроса с помощью вызова метода blockingHandler . Блокирующий обработчик для маршрута Vert.x принимает объект RoutingContext в качестве единственного параметра. RoutingContext содержит объект запроса Vert.x, объект Response и любые параметры / данные тела POST (например, «: id»). Вы также увидите, что я использовал ссылки на методы, а не на лямбды, чтобы вставить логику в blockingHandler (я считаю, что это более читабельно). Каждый обработчик для 3-х маршрутов запроса определяется отдельным методом ниже в классе. Эти методы в основном просто вызывают методы в DAO, сериализуют или десериализуют по мере необходимости, устанавливают некоторые заголовки ответа и завершают () запрос, отправляя ответ. В целом, довольно просто и понятно.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void addCustomer(RoutingContext rc) {
        try {
            String body = rc.getBodyAsString();
            Customer customer = mapper.readValue(body, Customer.class);
            Customer saved = dao.save(customer);
            if (saved!=null) {
                rc.response().setStatusMessage("Accepted").setStatusCode(202).end(mapper.writeValueAsString(saved));
            } else {
                rc.response().setStatusMessage("Bad Request").setStatusCode(400).end("Bad Request");
            }
        } catch (IOException e) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", e);
        }
    }
 
    private void getCustomerById(RoutingContext rc) {
        log.info("Request for single customer");
        Long id = Long.parseLong(rc.request().getParam("id"));
        try {
            Customer customer = dao.findOne(id);
            if (customer==null) {
                rc.response().setStatusMessage("Not Found").setStatusCode(404).end("Not Found");
            } else {
                rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(dao.findOne(id)));
            }
        } catch (JsonProcessingException jpe) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", jpe);
        }
    }
 
    private void getAllCustomers(RoutingContext rc) {
        log.info("Request for all customers");
        List customers = StreamSupport.stream(dao.findAll().spliterator(), false).collect(Collectors.toList());
        try {
            rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(customers));
        } catch (JsonProcessingException jpe) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", jpe);
        }
    }

«Но это гораздо сложнее и сложнее, чем мои аннотации и классы Spring», — скажете вы. Это МОЖЕТ быть правдой, но это действительно зависит от того, как вы реализуете код. Это должен быть вводный пример, поэтому я оставил код очень простым и легким для подражания. Я мог бы использовать библиотеку аннотаций для Vert.x для реализации конечных точек способом, аналогичным JAX-RS. Кроме того, мы получили значительное улучшение масштабируемости. Внутри Vert.x Web использует Netty для низкоуровневых операций асинхронного ввода-вывода, что дает нам возможность обрабатывать МНОГИЕ более параллельные запросы (ограниченные размером пула соединений с базой данных).

Мы уже внесли некоторые улучшения в масштабируемость и параллелизм этого приложения с помощью веб-библиотеки Vert.x, но мы можем немного улучшить ситуацию, реализовав Vert.x EventBus . Разделяя операции базы данных по рабочим статьям вместо использования blockingHandler, мы можем более эффективно обрабатывать запросы. Это шоу в ветке Convert-to-Worker-Vertific . Класс приложения остался прежним, но мы изменили класс CustomerEndpoints и добавили новый класс с именем CustomerWorker . Кроме того, мы добавили новую библиотеку под названием Spring Vert.x Extension, которая обеспечивает поддержку инъекций Spring Dependency для Vert.x. Начните с просмотра нового класса CustomerEndpoints .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@PostConstruct
    public void start() throws Exception {
        log.info("Successfully create CustomerVerticle");
        DeploymentOptions deployOpts = new DeploymentOptions().setWorker(true).setMultiThreaded(true).setInstances(4);
        vertx.deployVerticle("java-spring:com.zanclus.verticles.CustomerWorker", deployOpts, res -> {
            if (res.succeeded()) {
                Router router = Router.router(vertx);
                router.route().handler(BodyHandler.create());
                final DeliveryOptions opts = new DeliveryOptions()
                        .setSendTimeout(2000);
                router.get("/v1/customer/:id")
                        .produces("application/json")
                        .handler(rc -> {
                            opts.addHeader("method", "getCustomer")
                                    .addHeader("id", rc.request().getParam("id"));
                            vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));
                        });
                router.put("/v1/customer")
                        .consumes("application/json")
                        .produces("application/json")
                        .handler(rc -> {
                            opts.addHeader("method", "addCustomer");
                            vertx.eventBus().send("com.zanclus.customer", rc.getBodyAsJson(), opts, reply -> handleReply(reply, rc));
                        });
                router.get("/v1/customer")
                        .produces("application/json")
                        .handler(rc -> {
                            opts.addHeader("method", "getAllCustomers");
                            vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));
                        });
                vertx.createHttpServer().requestHandler(router::accept).listen(8080);
            } else {
                log.error("Failed to deploy worker verticles.", res.cause());
            }
        });
    }

Маршруты одинаковы, но код реализации — нет. Вместо того чтобы использовать вызовы blockingHandler, мы теперь реализовали правильные асинхронные обработчики, которые отправляют события на шину событий. В этой статье больше не выполняется обработка базы данных. Мы переместили обработку базы данных в рабочую вертикалку, которая имеет несколько экземпляров для параллельной обработки нескольких запросов потокобезопасным способом. Мы также регистрируем обратный вызов для ответа на эти события, чтобы мы могли отправить соответствующий ответ клиенту, выполняющему запрос. Теперь в CustomerWorker Verticle мы реализовали логику базы данных и обработку ошибок.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public void start() throws Exception {
    vertx.eventBus().consumer("com.zanclus.customer").handler(this::handleDatabaseRequest);
}
 
public void handleDatabaseRequest(Message<Object> msg) {
    String method = msg.headers().get("method");
 
    DeliveryOptions opts = new DeliveryOptions();
    try {
        String retVal;
        switch (method) {
            case "getAllCustomers":
                retVal = mapper.writeValueAsString(dao.findAll());
                msg.reply(retVal, opts);
                break;
            case "getCustomer":
                Long id = Long.parseLong(msg.headers().get("id"));
                retVal = mapper.writeValueAsString(dao.findOne(id));
                msg.reply(retVal);
                break;
            case "addCustomer":
                retVal = mapper.writeValueAsString(
                                    dao.save(
                                            mapper.readValue(
                                                    ((JsonObject)msg.body()).encode(), Customer.class)));
                msg.reply(retVal);
                break;
            default:
                log.error("Invalid method '" + method + "'");
                opts.addHeader("error", "Invalid method '" + method + "'");
                msg.fail(1, "Invalid method");
        }
    } catch (IOException | NullPointerException e) {
        log.error("Problem parsing JSON data.", e);
        msg.fail(2, e.getLocalizedMessage());
    }
}

Рабочие версии CustomerWorker регистрируют получателя сообщений на шине событий. Строка, представляющая адрес на шине событий, является произвольной, но рекомендуется использовать структуру имен в обратном стиле, чтобы было легко гарантировать, что адреса уникальны («com.zanclus.customer»). Всякий раз, когда новое сообщение отправляется на этот адрес, оно будет доставлено в одну и только одну из рабочих статей. Затем рабочая вертикаль вызывает handleDatabaseRequest для работы с базой данных, сериализации JSON и обработки ошибок.

Там у вас есть это. Вы видели, что Vert.x может быть интегрирован в ваши унаследованные приложения для повышения параллелизма и эффективности без необходимости переписывать все приложение. Мы могли бы сделать нечто подобное с существующим приложением Google Guice или JavaEE CDI. Вся бизнес-логика могла оставаться относительно нетронутой, пока мы пытались в Vert.x добавить реактивные возможности. Следующие шаги зависят от вас. Некоторые идеи о том, куда идти дальше, включают Clustering , WebSockets и VertxRx для сахара ReactiveX.