Реактивная или неблокирующая обработка пользуется большим спросом, но прежде чем принимать ее, необходимо глубоко понять ее модель потоков. Для модели потока очень важно знать две вещи: связь потока и поток выполнения. В этом блоге я попытаюсь подробно объяснить обе эти темы.
Что такое реактивное программирование?
Есть много определений в Интернете; Wiki определение немного теоретический и общий характер . С точки зрения многопоточности моя версия «Реактивное программирование — это обработка потока асинхронных событий, который вы можете наблюдать».
Вы можете найти гораздо больше дискуссий о реактивном программировании в Интернете, но сейчас давайте придерживаться нашей темы о модели реактивных потоков. Давайте начнем с очень простого реактивного варианта использования, где мы хотим вернуть сумму целочисленного массива.
Наш основной поток запросов не должен блокироваться при обработке суммы целочисленного массива. Давайте начнем с создания простого WebServer.
Джава
xxxxxxxxxx
1
ServerSocket server = new ServerSocket(9090);
2
while (true) {
3
try (Socket socket = server.accept()) {
4
Consumer<Integer> response = a -> {
5
String responseStr = "HTTP/1.1 200 OK\r\n\r\n"+"Result= "+ a + " and Thread: "+Thread.currentThread();
6
try {
7
socket.getOutputStream().write(responseStr.getBytes("UTF-8"));
8
} catch (IOException e) {
9
e.printStackTrace();
10
}
11
};
12
Random random = new Random();
13
ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[] {
14
random.nextInt(), random.nextInt(), random.nextInt()
15
}).subscribe(response);
16
TimeUnit.MILLISECONDS.sleep(200);
17
18
}
Здесь мы создаем сервер сокетов, открываем сокет и поддерживаем сокет, пока асинхронная обработка не будет завершена. Асинхронная обработка происходит путем вызова nonBlockingSum
метода и передачи функции-потребителя или лямбды в качестве наблюдаемой. Как только сумма будет готова, наша функция / лямбда получит обратный вызов. Из обратного вызова мы возвращаем значение суммы клиенту через сокет.
Итак, если вы вызовете URL-адрес, http: // localhost: 9090 параллельно / sequence, вы получите следующий ответ:
Простой текст
xxxxxxxxxx
1
Result= 83903382 and Thread: Thread[ReactiveScheduler-2,5,main]
2
Result= -1908131554 and Thread: Thread[ReactiveScheduler-3,5,main]
Выше приведен только пример. В реальном мире вы должны использовать netty / undertow / servlet 3.1 в качестве реактивного веб-сервера. Теперь давайте углубимся и попытаемся понять следующие потоки:
- Блокировка вызова.
- Неблокирующий звонок.
- Неблокирующий вызов с выполнением потока.
- Последовательная обработка бизнес-потоков.
- Параллельная обработка бизнес-потоков.
Мы собираемся использовать Spring WebFlux, который построен поверх платформы Reactor для реактивного программирования. Давайте рассмотрим разделы 1 и 2 в этом блоге и другие разделы в части 2, чтобы их было очень легко понять.
Мы собираемся написать простой метод суммирования и сделать его реактивным с помощью функции поставщика.
Джава
xxxxxxxxxx
1
public Integer getSum(final Integer arr[]) {
2
Integer count = 0;
3
for (int i = 0; i < arr.length; i++) {
4
count += arr[i];
5
}
6
return count;
7
}
Джава
xxxxxxxxxx
1
public Mono<Integer> nonBlockingSum(final Integer arr[]) throws InterruptedException {
2
3
Mono<Integer> m = Mono.fromSupplier(() ->
4
this.computationService.getSum(arr)).subscribeOn(this.scheduler);
5
return m;
6
}
Блокировка вызова
Джава
xxxxxxxxxx
1
Integer t = ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[] {4,78,4,676,3,45}).block();
Как показано на диаграмме, поток запросов блокируется до тех пор, пока не будет завершено вычисление суммы. Если мы выполним код, мы получим следующий ответ:
Простой текст
xxxxxxxxxx
1
In ReactiveApplication.blockingCall: Thread[main,5,main]
2
In ReactorComponent.nonBlockingSum: Thread[main,5,main]
3
Returning form ReactorComponent.nonBlockingSum: Thread[main,5,main]
4
In ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
5
Returning from ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
6
Returning from ReactiveApplication.blockingCall result= 810
Это ясно показывает, что блокирующий вызов ждал, пока выполнение суммы не будет завершено.
Неблокирующий звонок
Джава
xxxxxxxxxx
1
public static void nonBlockingCall(ApplicationContext ctx) throws InterruptedException{
2
Consumer<Integer> display = a -> {
4
System.out.println("In Consumer/Lambada result= "+ a + " and Thread:
5
"+Thread.currentThread());
6
};
7
ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[]
8
{4,78,4,676,3,45}).subscribe(display);
9
}
Здесь поток запроса не блокируется, и выполнение суммы переносится на поток, выделенный из пула потоков. Обратный вызов и функция / лямбда также выполняются в одном потоке. Если мы выполним код, мы получим следующий ответ:
Простой текст
xxxxxxxxxx
1
In ReactiveApplication.nonBlockingCall: Thread[main,5,main]
2
In ReactorComponent.nonBlockingSum: Thread[main,5,main]
3
Returning form ReactorComponent.nonBlockingSum: Thread[main,5,main]
4
Returning from ReactiveApplication.nonBlockingCall: Thread[main,5,main]
5
In ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
6
Returning from ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
7
In Consumer/Lambada result= 810 and Thread: Thread[ReactiveScheduler-2,5,main]
Это ясно показывает, что поток запроса не ожидал, пока сумма не будет вычислена. Также потребитель и сумма обрабатывались в одном потоке.
Разделы 3, 4 и 5 будут рассмотрены в части 2 , и вы можете получить код на GitHub .