Реактивная или неблокирующая обработка пользуется большим спросом, но прежде чем принять ее, вы должны понять ее модель потока Для модели потока очень важны две вещи: знание потоковой связи и потоков выполнения. В этой статье я попытаюсь углубиться в оба эти аспекта.
Что такое реактивное программирование?
В Интернете есть несколько определений. Определение Wiki немного теоретическое и общее. С точки зрения многопоточности моя версия « Реактивное программирование — это обработка асинхронного потока событий, на котором вы можете наблюдать»
Вы можете найти гораздо больше дискуссий о реактивном программировании в Интернете, но сейчас давайте придерживаться нашей темы «Реактивные модели потоков». Давайте начнем с очень простого реактивного варианта использования, где мы хотим вернуть сумму целочисленного массива. Это означает, что наш основной поток запросов не должен блокироваться при обработке суммы целочисленного массива.
Вам также может понравиться: Понимание шаблона Reactor: на основе потоков и событий
Давайте начнем с создания простого WebServer и попытаемся изобразить то же самое.
Джава
xxxxxxxxxx
1
ServerSocket server = new ServerSocket(9090);
2
while (true) {
3
try (Socket socket = server.accept()) {
4
Consumer<Integer> response = a -> {
5
String responseStr = "HTTP/1.1 200 OK\r\n\r\n"+"Result= "+ a + " and Thread: "+Thread.currentThread();
6
try {
7
socket.getOutputStream().write(responseStr.getBytes("UTF-8"));
8
} catch (IOException e) {
9
e.printStackTrace();
10
}
11
};
12
Random random = new Random();
13
ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[] {
14
random.nextInt(), random.nextInt(), random.nextInt()
15
}).subscribe(response);
16
TimeUnit.MILLISECONDS.sleep(200);
17
18
}
Здесь мы создаем сервер сокетов, открываем сокет и поддерживаем сокет, пока асинхронная обработка не будет завершена. Асинхронная обработка происходит путем вызова nonBlockingSum и передачи функции потребителя или lambada как наблюдаемой. Как только сумма будет готова, наша функция / lambada получит обратный вызов. От обратного вызова мы возвращаем значение суммы клиенту через сокет.
Поэтому, если вы вызовете URL-адрес, http: // localhost: 9090, параллельно / sequence, вы получите следующий ответ:
Джава
xxxxxxxxxx
1
Result= 83903382 and Thread: Thread[ReactiveScheduler-2,5,main]
2
Result= -1908131554 and Thread: Thread[ReactiveScheduler-3,5,main]
Выше, просто чтобы изобразить реактивные вещи. Вы должны использовать netty / undertow / servlet 3.1 в качестве реактивного веб-сервера. Теперь давайте углубимся и попытаемся понять следующие потоки:
- Блокировка вызова
- Неблокирующий звонок
- Неблокирующий вызов с выполнением потока
- Последовательная обработка бизнес-потоков
- Параллельная обработка бизнес-потоков
Мы собираемся использовать Spring WebFlux, который построен поверх платформы Reactor для реактивного программирования. Давайте рассмотрим разделы 1 и 2 в этой статье и другие разделы в части 2, чтобы ее было очень легко понять.
Мы собираемся написать простой метод суммирования и сделать его реактивным, используя функцию поставщика.
Джава
xxxxxxxxxx
1
public Integer getSum(final Integer arr[]) {
2
Integer count = 0;
3
for (int i = 0; i < arr.length; i++) {
4
count += arr[i];
5
}
6
return count;
7
}
Джава
xxxxxxxxxx
1
public Mono<Integer> nonBlockingSum(final Integer arr[]) throws InterruptedException {
2
3
Mono<Integer> m = Mono.fromSupplier(() ->
4
this.computationService.getSum(arr)).subscribeOn(this.scheduler);
5
return m;
6
}
1) Блокировка вызова
Джава
xxxxxxxxxx
1
Integer t = ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[] {4,78,4,676,3,45}).block();
Как показано на диаграмме, поток запросов блокируется до тех пор, пока не будет завершено вычисление суммы. Если мы выполним, код получит следующий ответ:
Джава
xxxxxxxxxx
1
In ReactiveApplication.blockingCall: Thread[main,5,main]
2
In ReactorComponent.nonBlockingSum: Thread[main,5,main]
3
Returning form ReactorComponent.nonBlockingSum: Thread[main,5,main]
4
In ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
5
Returning from ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
6
Returning from ReactiveApplication.blockingCall result= 810
Это ясно показывает, что блокирующий вызов ждал, пока выполнение суммы не будет завершено.
2) Неблокирующий звонок
Джава
xxxxxxxxxx
1
public static void nonBlockingCall(ApplicationContext ctx) throws InterruptedException{
2
3
Consumer<Integer> display = a -> {
4
System.out.println("In Consumer/Lambada result= "+ a + " and Thread:
5
"+Thread.currentThread());
6
};
7
ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[]
8
{4,78,4,676,3,45}).subscribe(display);
9
}
Здесь поток запроса не блокируется, и выполнение суммы переносится на поток, выделенный из пула потоков. Обратный вызов и функция / лямбада также выполняются в том же потоке. Если мы выполним, код получит следующий ответ:
Джава
xxxxxxxxxx
1
In ReactiveApplication.nonBlockingCall: Thread[main,5,main]
2
In ReactorComponent.nonBlockingSum: Thread[main,5,main]
3
Returning form ReactorComponent.nonBlockingSum: Thread[main,5,main]
4
Returning from ReactiveApplication.nonBlockingCall: Thread[main,5,main]
5
In ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
6
Returning from ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
7
In Consumer/Lambada result= 810 and Thread: Thread[ReactiveScheduler-2,5,main]
Это ясно показывает, что поток запроса не ожидал, пока сумма не будет вычислена. Также потребитель и сумма обрабатывались в одном потоке.
Разделы 3, 4 и 5 будут рассмотрены в части 2, и вы можете получить код на GitHub .
Дальнейшее чтение
Является ли Spring Reactive уже устаревшим? Взгляд на инверсию резьбового соединения