Статьи

Учебник по Spring Reactor

В мире сервисов RESTful, где большая часть работы фактически происходит за кулисами, нам часто приходится выполнять большую часть обработки в нашем приложении, которая фактически не влияет на ответ, который необходимо отправить реальному пользователю. Эти бизнес-решения могут быть приняты реагирующим образом, чтобы они не влияли на взаимодействие пользователя с приложением. Spring Framework предоставляет нам отличный проект, называемый Spring Reactor, который позволяет нам очень хорошо управлять фоновой обработкой в ​​фоновом режиме. Прежде чем углубляться в урок, следует отметить, что реактивное программирование — это не то же самое, что параллельное программирование .

Один из сценариев использования в программировании RESTful для реактивного поведения заключается в том, что в большинстве случаев службы по сути являются блокирующими и синхронными. Реактивное программирование позволяет нам выйти за рамки синхронного потока, и сложные оркестровки могут быть выполнены без демонстрации блокирующего поведения. Давайте углубимся в урок, чтобы понять, как это реактивное поведение может быть интегрировано в приложение на основе Spring Boot.

1. Введение

На этом уроке Spring Reactor мы узнаем, как начать работу с реактивным поведением в проекте Spring Boot, и начнем создавать и потреблять сообщения в одном и том же приложении. Помимо простого проекта, мы увидим, как работает поток Spring Reactive и как обрабатываются запросы, когда есть несколько обработчиков для различных типов запросов.

С появлением микросервисов необходимость асинхронной связи между задействованными сервисами стала основным требованием. Для связи между различными задействованными сервисами мы можем использовать такие проекты, как Apache Kafka . Теперь асинхронная связь также желательна для длительных запросов в том же приложении. Это где фактический вариант использования Spring Reactor вступает в игру.

Обратите внимание, что шаблон Reactor, как показано в этом приложении, должен использоваться только тогда, когда пользователь не ожидает отклика непосредственно от приложения, поскольку мы выполняем только фоновые задания с использованием этой демонстрации Reactor. Использование Reactors — очень хороший выбор, когда разработчики могут назначить приложению немного больше динамической памяти (в зависимости от количества потоков, которые будет использовать это приложение), и они хотят выполнять задачи параллельно, а порядок выполнения задач не зависит. это не имеет значения. Этот момент действительно важен, поэтому мы повторим его, перефразировав, невозможно подтвердить порядок выполнения заданий, когда они выполняются параллельно.

2. Реактор в JVM

Reactor, как утверждает сама Spring , является фундаментальной платформой для асинхронных приложений на JVM, которая на скромном оборудовании позволяет обрабатывать более 15 000 000 событий в секунду с помощью самого быстрого неблокирующего Dispatcher . Как это звучит, платформа Reactor основана на шаблоне проектирования Reactor .

Наиболее важной особенностью Spring Reactor является уровень абстракции, которую эта среда предоставляет Java-разработчику, разрабатывающему приложения с использованием Spring. Эта абстракция делает реализацию функциональности в нашем собственном приложении очень простой. Давайте начнем с примера проекта, чтобы увидеть, как мы можем использовать этот фреймворк в приложении, близком к реальности. Проект реактора также поддерживает неблокирующую межпроцессную связь (IPC) с компонентами реактора-ipc, но его обсуждение выходит за рамки данного урока.

3. Создание проекта Spring Boot с Maven

Мы будем использовать один из многих архетипов Maven для создания примера проекта для нашего примера. Чтобы создать проект, выполните следующую команду в каталоге, который вы будете использовать в качестве рабочей области:

Создание проекта

1
mvn archetype:generate -DgroupId=com.javacodegeeks.example -DartifactId=JCG-BootReactor-Example -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

Если вы запускаете maven в первый раз, выполнение команды генерации займет несколько секунд, поскольку maven должен загрузить все необходимые плагины и артефакты, чтобы выполнить задачу генерации. Как только мы запустим этот проект, мы увидим следующий результат и проект будет создан:

Настройка проекта Spring Reactor

Настройка проекта Spring Reactor

4. Добавление зависимостей Maven

Создав проект, не стесняйтесь открывать его в своей любимой среде IDE. Следующим шагом является добавление соответствующих зависимостей Maven в проект. Мы будем работать со следующими зависимостями в нашем проекте:

  • spring-boot-starter-web : эта зависимость помечает этот проект как веб-проект и добавляет зависимости для создания контроллеров и создания веб-связанных классов
  • reactor-bus : это зависимость, которая переносит все связанные с Reactor зависимости в путь к классам проекта
  • spring-boot-starter-test : эта зависимость собирает все связанные с тестами JAR-файлы в проекте, такие как JUnit и Mockito

Вот файл pom.xml с добавленными соответствующими зависимостями:

pom.xml

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>1.5.10.RELEASE</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>
 
<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  <java.version>1.8</java.version>
</properties>
 
<dependencies>
 
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
 
  <dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-bus</artifactId>
    <version>2.0.8.RELEASE</version>
  </dependency>
 
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
  </dependency>
 
</dependencies>
 
<build>
  <plugins>
    <plugin>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
  </plugins>
</build>

Найти последние зависимости Maven от Maven Central . Мы также добавили плагин Maven для загрузочных проектов Spring, который помогает нам превратить этот проект в работающий JAR-файл, чтобы его можно было легко развернуть без помощи каких-либо современных инструментов и зависимостей. JAR, который мы получаем от этого плагина, полностью готов к развертыванию в виде исполняемого файла.

Наконец, чтобы понять все JAR-файлы, которые добавляются в проект при добавлении этой зависимости, мы можем запустить простую команду Maven, которая позволяет нам видеть полное дерево зависимостей для проекта, когда мы добавляем в него некоторые зависимости. Это дерево зависимостей также покажет, сколько зависимостей было добавлено, когда мы добавили некоторые из наших собственных зависимостей в правильном иерархическом порядке. Вот команда, которую мы можем использовать для того же:

Проверьте дерево зависимостей

1
mvn dependency:tree

Когда мы запустим эту команду, она покажет нам следующее дерево зависимостей:

Maven Dependency Tree

Maven Dependency Tree

Заметили что-то? Так много зависимостей было добавлено, просто добавив три зависимости в проект. Spring Boot сама собирает все связанные зависимости и ничего не оставляет для нас в этом вопросе. Наибольшим преимуществом является то, что все эти зависимости гарантированно совместимы друг с другом, поскольку эти зависимости управляются и предоставляются самим файлом pom проекта Spring Boot.

5. Структура проекта

Прежде чем мы продолжим работу и начнем работать над кодом для проекта, давайте представим здесь структуру проекта, которая у нас будет, как только мы закончим добавление всего кода в проект, чтобы мы знали, где размещать классы, которые мы сделаем в этом проекте:

Структура проекта Spring Reactor

Структура проекта Spring Reactor

Мы разделили проект на несколько пакетов, чтобы следовать принципу разделения интересов, а код остается модульным, что делает расширение проекта довольно простым.

6. Понимание примера приложения

Чтобы приложение было простым для понимания и приближалось к реальной ситуации, мы рассмотрим сценарий логистического приложения, которое управляет доставкой различных грузов, размещенных в системе.

Это приложение получает обновления от внешнего поставщика о местонахождении груза, доставляемого клиенту по указанному адресу. Как только наше приложение получает это обновление, оно выполняет различные операции, такие как:

  • Обновить местоположение для отгрузки в базе данных
  • Отправка уведомления на мобильное устройство пользователя
  • Отправить уведомление по электронной почте
  • Отправить смс пользователю

Мы решили демонстрировать реактивное поведение для этих операций, поскольку пользователь не зависит от того, чтобы эти операции выполнялись в реальном времени, поскольку они в основном являются фоновыми задачами, которые также могут занимать немного больше времени, и пользователь не будет сильно подвержен влиянию, если Обновление статуса для отправки задерживается на несколько минут. Давайте начнем с создания модели в первую очередь.

7. Определение модели POJO

Мы начнем с определения нашего POJO, который представляет отправку, отправляемую клиенту, которая имеет такие поля, как shipmentId , currentLocation и т. Д. Давайте определим этот POJO здесь:

Shipment.java

01
02
03
04
05
06
07
08
09
10
11
12
package com.javacodegeeks.example.model;
 
public class Shipment {
     
    private String shipmentId;
    private String name;
    private String currentLocation;
    private String deliveryAddress;
    private String status;
 
    //standard setters and getters
}

Мы определили некоторые основные поля здесь. Для краткости мы опускаем стандартные методы получения и установки, но их необходимо создавать, поскольку Джексон использует их во время сериализации и десериализации объекта.

8. Определение сервиса

Мы определим базовый интерфейс, который определяет контракт для функциональности, которую мы будем использовать далее, которая определит бизнес-логику, которая должна выполняться, когда приложение использует четное число.

Вот определение контракта, которое мы будем использовать:

ShipmentService.java

1
2
3
4
5
6
7
package com.javacodegeeks.example.service;
 
import com.javacodegeeks.example.model.Shipment;
 
public interface ShipmentService {
    void shipmentLocationUpdate(Shipment shipment);
}

У нас есть только одно определение метода в этом интерфейсе, так как это все, что нам нужно на данный момент. Давайте теперь перейдем к реализации этого сервиса, где мы фактически продемонстрируем метод сна, который просто моделирует поведение операции для этого класса:

ShipmentServiceImpl.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
package com.javacodegeeks.example.service;
 
import com.javacodegeeks.example.model.Shipment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
 
@Service
public class ShipmentServiceImpl implements ShipmentService {
     
    private final Logger LOG = LoggerFactory.getLogger("ShipmentService");
 
    @Override
    public void shipmentLocationUpdate(Shipment shipment) throws InterruptedException {
        LOG.info("Shipment data: {}", shipment.getShipmentId());
         
        Thread.sleep(3000);
 
        LOG.info("Shipment with ID: {} reached at javacodegeeks!!!", shipment.getShipmentId());
    }
}

В целях иллюстрации, когда эта служба вызывается с деталями отгрузки, она просто предоставляет некоторые операторы печати и использует задержку в 3000 миллисекунд, чтобы понять время, которое могли занять операции, которые мы определили в предыдущем разделе. Обратите внимание, что каждая из этих операций могла занять намного больше времени, чем всего 3 секунды, но приложение освобождается от этого (пока потоки не начнут накапливаться в куче памяти для приложения, которым необходимо управлять).

9. Определение потребителя события

В этом разделе мы наконец увидим, как мы можем определить потребителя, который будет прослушивать обновления места отгрузки событий. Вы можете вызвать этого потребителя, просто поместив событие для обновления отгрузки в EventBus SPring, который мы вскоре определим и используем.

EventHandler.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.javacodegeeks.example.handler;
 
import com.javacodegeeks.example.model.Shipment;
import com.javacodegeeks.example.service.ShipmentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.bus.Event;
import reactor.fn.Consumer;
 
@Service
public class EventHandler implements Consumer<Event<Shipment>> {
     
    private final ShipmentService shipmentService;
 
    @Autowired
    public EventHandler(ShipmentService shipmentService) {
        this.shipmentService = shipmentService;
    }
 
    @Override
    public void accept(Event<Shipment> shipmentEvent) {
        Shipment shipment = shipmentEvent.getData();
        try {
            shipmentService.shipmentLocationUpdate(shipment);
        } catch (InterruptedException e) {
            //do something as bad things have happened
        }
    }
}

Этот потребительский сервис принимает объект в шине событий и сообщает нашему классу обслуживания, что он может выполнять необходимые операции асинхронно. Обратите внимание, что мы также определим пул потоков, который будет использоваться для запуска этого потребителя, чтобы различные потоки могли использоваться для запуска вызова метода сервиса. Даже если мы сами не определим пул потоков, Spring Boot сделает это за нас с некоторым фиксированным числом максимальных потоков в пуле.

Преимущество этого потребительского класса в том, что он сам получает объект Shipment из шины событий, и в самом классе нет необходимости выполнять преобразование или приведение, что является распространенной областью ошибок, а также увеличивает время, затрачиваемое на бизнес-логику. выполнить.

10. Определение конфигурации Java

Мы можем определить конфигурации с Java в нашем приложении. Давайте сделаем эти определения здесь:

ReactorConfig.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
package com.javacodegeeks.example.config;
 
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.Environment;
import reactor.bus.EventBus;
 
@Configuration
public class ReactorConfig {
 
    @Bean
    Environment env() {
        return Environment.initializeIfEmpty().assignErrorJournal();
    }
 
    @Bean
    EventBus createEventBus(Environment env) {
        return EventBus.create(env, Environment.THREAD_POOL);
    }
}

Понятно, что здесь нет ничего особенного. Мы только что инициализировали наш пул потоков с некоторым номером (по умолчанию здесь). Мы просто хотели продемонстрировать, как вы можете изменить количество потоков, если хотите, в зависимости от варианта использования вашего приложения.

11. Определение класса Spring Boot

На последнем этапе мы создадим класс Spring Boot, с помощью которого мы сможем опубликовать сообщение, которое может использоваться обработчиком событий, который мы определили ранее. Вот определение класса для основного класса:

Application.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.javacodegeeks.example;
 
import com.javacodegeeks.example.handler.EventHandler;
import com.javacodegeeks.example.model.Shipment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import reactor.bus.Event;
import reactor.bus.EventBus;
 
import static reactor.bus.selector.Selectors.$;
 
@SpringBootApplication
public class Application implements CommandLineRunner {
 
    private final Logger LOG = LoggerFactory.getLogger("Application");
 
    private final EventBus eventBus;
    private final EventHandler eventHandler;
 
    @Autowired
    public Application(EventBus eventBus, EventHandler eventHandler) {
        this.eventBus = eventBus;
        this.eventHandler = eventHandler;
    }
 
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
 
    @Override
    public void run(String... strings) throws Exception {
        eventBus.on($("eventHandler"), eventHandler);
 
        //Publish messages here
        for (int i = 0; i < 10; i++) {
            Shipment shipment = new Shipment();
            shipment.setShipmentId(String.valueOf(i));
            eventBus.notify("eventHandler", Event.wrap(shipment));
            LOG.info("Published shipment number {}.", i);
        }
    }
}

Мы использовали интерфейс CommandLineRunner, чтобы заставить этот класс выполнять код, с помощью которого мы можем тестировать код класса производителя и конфигурации, который мы написали. В этом классе мы публикуем сообщение в указанной теме и прослушиваем его в классе потребителя, который мы определили в том же приложении. Обратите внимание, что мы используем собственную шину событий Spring для выполнения заданий, и эти задания не будут помещены на диск. Если это приложение аккуратно уничтожено с помощью привода Spring Boot, эти задания будут автоматически сохраняться на диске, чтобы их можно было ставить в очередь при повторном запуске приложения.

В следующем разделе мы запустим наш проект с помощью простой команды Maven.

12. Запуск проекта

Теперь, когда определение основного класса сделано, мы можем запустить наш проект. С приложением maven легко запустить приложение, просто используйте следующую команду:

pom.xml

1
mvn spring-boot:run

Выполнив приведенную выше команду, мы увидим, что сообщение было опубликовано, и это же приложение использовало сообщение в обработчике событий:

Запуск приложения Spring Reactor

Запуск приложения Spring Reactor

Мы видели, что события были опубликованы, когда приложение было запущено с использованием метода CommandLineRunner который мы использовали в неблокирующем режиме. Как только события были опубликованы, они обрабатывались обработчиком событий параллельно. Если вы внимательно посмотрите на потребителя, вы заметите, что Spring определяет четыре потока в пуле потоков для управления этими событиями. Это ограничение по умолчанию для количества потоков, которые Spring определяет для параллельного управления событиями.

13. Вывод

В этом уроке мы рассмотрели, как легко и быстро создать приложение Spring Boot с интегрированным в него проектом Reactor. Как мы уже говорили, хорошо спроектированная схема реактора в вашем приложении может иметь пропускную способность до 15 000 000 (это шесть нулей ) событий в секунду. Это показывает, насколько эффективно реализована внутренняя очередь для этого реактора.

В небольшом приложении, которое мы определили, мы продемонстрировали простой способ определения исполнителя пула потоков, который определил четыре потока, и потребителя, который использует этот пул потоков для параллельного управления событиями. Одна из наиболее распространенных проблем, с которыми сталкиваются приложения, использующие асинхронное поведение при выполнении операций, заключается в том, что нехватка памяти заканчивается очень скоро, так как есть несколько потоков, которые начинают занимать пространство кучи и создают объекты, когда они начинают обрабатываться. Важно убедиться, что при запуске приложения мы назначаем приложению хороший размер кучи, который напрямую зависит от размера пула потоков, определенного для приложения.

Реактивный стиль программирования является одним из наиболее распространенных стилей программирования, который сейчас находится в стадии разработки, поскольку приложения начали использовать ядра ЦП с их параллельным выполнением, что является очень хорошей идеей для использования аппаратного обеспечения на уровне приложений. Reactor обеспечивает полное основание неблокирующего программирования для JVM, а также доступно для Groovy или Kotlin. В связи с тем, что Java сама по себе не является реактивным языком, она не поддерживает сопрограммы изначально. Существует несколько языков JVM, таких как Scala и Clojure, которые лучше поддерживают реактивные модели с точки зрения нативности, но сама Java не делает этого (по крайней мере, до версии 9).

14. Загрузите исходный код

Это был пример языка программирования Java с шаблоном Spring Boot и Reactor.

Скачать
Вы можете скачать полный исходный код этого примера здесь: Пример Reactor