В первой части мы рассмотрели реактивную сеть и простой блокирующий и неблокирующий вызов. В этой части мы подробно рассмотрим выполнение потоков и бизнес-процессы.
3) Неблокирующий вызов с выполнением потока
Диаграмма выглядит сложной, но мы запускаем новый поток из потока запроса и затем вызываем реактивную функцию. Опять же, создаем новый поток из предыдущего потока, чтобы выполнить некоторую консольную печать. Здесь мы хотим протестировать или добиться того, чтобы и реактивная функция sum (), и консольная печать выполнялись параллельно. Выполнение sum () не блокирует начало и печать нового потока.
Джава
xxxxxxxxxx
1
public static void nonBlockingCallAndStartNewThread(ApplicationContext ctx) throws InterruptedException {
2
Runnable r = new Runnable() {
4
5
public void run() {
6
System.out.println("In New Thread Run method:
7
"+Thread.currentThread());
8
try {
9
Consumer<Integer> consumer = C -> print(C);
10
ctx.getBean(ReactorComponent.class).nonBlockingSum(
11
new Integer[]
12
{400000,780,40,6760,30,3456450}).subscribe(consumer);
13
Runnable r1 = new Runnable() {
14
15
public void run() {
16
System.out.println("Started Another Thread:
17
"+Thread.currentThread());
18
for(int i =0; i <5; i++){
19
i++;
20
}
21
System.out.println("End of Another Thread:
22
"+Thread.currentThread());
23
}
24
};
25
new Thread(r1).start();
26
System.out.println("End of New Thread: "+Thread.currentThread());
27
} catch (InterruptedException e) {
28
e.printStackTrace();
29
}
30
}
31
};
32
new Thread(r).start();
33
}
34
Когда мы выполним следующее, оно будет напечатано на консоли:
xxxxxxxxxx
1
In ReactiveApplication.nonBlockingCallAndStartNewThread: Thread[main,5,main]
2
...
3
In New Thread Run method: Thread[Thread-3,5,main]
4
...
5
End of New Thread: Thread[Thread-3,5,main]
6
Started Another Thread: Thread[Thread-10,5,main]
7
End of Another Thread: Thread[Thread-10,5,main]
8
...
9
In Consumer/Lambada Result= 3864060 Thread: Thread[ReactiveScheduler-2,5,main]
Вышеуказанный вывод консоли ясно показывает, что потребитель / лямбада был выполнен последним. Оба потока (Thread-3 и Thread-10) не ожидали вычисления метода sum ().
На очень высокопроизводительном ПК вы не сможете получить такой же вывод на консоль. Поместите немного сна в метод sum (), чтобы вы могли точно связать происходящее.
Вы также можете быть заинтересованы в: Многопоточность в Spring Boot Использование CompletableFuture
4) Обработка последовательных бизнес-потоков
До сих пор мы обсуждали поток и шаблоны его выполнения. Я надеюсь, что вы получили четкое представление о реактивных обратных вызовах и о том, как они выполняются иначе, чем запрос / основной поток. Без каких-либо задержек мы обсудим обработку бизнес-потоков и то, как реактивные структуры могут быть полезны в этом случае.
Давайте продолжим с нашим методом sum () и создадим фиктивный бизнес-пример вычисления суммы, удалив первый и нижний элементы после сортировки.
Самый простой способ сделать это - получить максимум, минимум и вычесть из суммы. Формула будет выглядеть так: result = sum - max -min . Обратите внимание, что мы не заинтересованы в граничных условиях здесь. Давайте сначала посмотрим модель потока:
Это определенно выглядит сложным, но мы делаем то, что мы вызываем последующий бизнес-метод из функции customer / lambada и собираем результат атомарным способом. Ниже приведен фрагмент кода:
Джава
xxxxxxxxxx
1
public void businessService(Integer arr[], TestLambada <Temp> testLambada) throws InterruptedException {
2
System.out.println("In BusinessService.businessService:
4
"+Thread.currentThread());
5
// Start the execution in sequence.
6
Consumer<Integer> consumerForSum = sum -> {
7
AtomicInteger result = new AtomicInteger();
8
System.out.println("In consumerForSum: "+Thread.currentThread());
9
result.addAndGet(sum);
10
Consumer<Integer> consumerForMin = min -> {
11
System.out.println("In consumerForMin: "+Thread.currentThread());
12
result.set(result.intValue()-min);
13
Consumer <Integer> consumerForMax = max -> {
14
System.out.println("In consumerForMax: "+Thread.currentThread());
15
result.set(result.intValue()-max);
16
testLambada.get(new Temp(result.get()));
17
System.out.println("End of consumerForMax:
18
"+Thread.currentThread());
19
};
20
legacyComponent.getMax(arr).subscribe(consumerForMax);
21
System.out.println("End of consumerForMin: "+Thread.currentThread());
22
};
23
legacyComponent.getMin(arr).subscribe(consumerForMin);
24
System.out.println("End of consumerForSum: "+Thread.currentThread());
25
};
26
legacyComponent.nonBlockingSum(arr).subscribe(consumerForSum);
27
System.out.println("Returning from BusinessService.businessService:
28
"+Thread.currentThread());
29
}
При выполнении вы получите следующий ответ:
Джава
xxxxxxxxxx
1
In ReactiveApplication.callBusinessWorkflowSerial: Thread[main,5,main]
2
…
3
In consumerForSum: Thread[ReactiveScheduler-2,5,main]
4
…
5
In consumerForMin: Thread[ReactiveScheduler-3,5,main]
6
…
7
In consumerForMax: Thread[ReactiveScheduler-2,5,main]
8
Printing result in tempTestLambada= 131
9
…
Если вы видите, что все обратные вызовы были вызваны в отдельных потоках, и есть также повторное использование потока, как это происходит из пула исполнителей.
Код невероятно сложен, в нем много асинхронных исполнений, и это очень анти-образец ада обратного вызова . Следовательно, вам нужно быть особенно осторожным в отношении того, какая часть вашего кода должна обрабатываться как обратный вызов, а какая часть должна находиться в одном и том же блоке потока. Я предлагаю вам создать баланс между реактивным и не реактивным.
5) Параллельная обработка бизнес-потоков
Если вы видите предыдущий бизнес-пример, то было выполнено последовательное выполнение бизнес-потока, и при этом общее время ответа будет представлять собой сумму каждого времени выполнения. Таким образом, во время последовательного выполнения потоки находились в пуле бездействующими, и в итоге ЦП не будет использоваться в достаточной степени.
Давайте попробуем заставить наш бизнес-поток выполняться параллельно. В этом месте мы можем выполнить вычисление суммы, минимума и максимума для параллельного выполнения, а затем применить нашу формулу. Мы собираемся использовать завершаемое будущее, заключенное в метод Mono.zip каркаса реактора. Обратите внимание, что не всегда возможно выполнить бизнес-поток параллельно, поскольку между потоками могут быть зависимости. Ниже приведена модель потока:
Фрагмент кода:
Джава
xxxxxxxxxx
1
public void busServiceInParallel(Integer arr[], TestLambada <Temp> testLambada) throws InterruptedException {
2
System.out.println("In BusinessService.busServiceInParallel:
4
"+Thread.currentThread());
5
Consumer<Tuple3<Integer,Integer,Integer>> consumer= a -> {
6
System.out.println("In consumer For Parallel: "+Thread.currentThread());
7
testLambada.get(new Temp(((Integer)a.get(0)-(Integer)a.get(1)-
8
(Integer)a.get(2))));
9
};
10
Mono.zip(legacyComponent.nonBlockingSum(arr),
11
legacyComponent.getMin(arr),
12
legacyComponent.getMax(arr)).subscribe(consumer);
13
System.out.println("Returning from BusinessService.busServiceInParallel:
14
"+Thread.currentThread());
15
}
Выполнение кода выведет следующее:
Джава
xxxxxxxxxx
1
In ReactiveApplication.callBusinessWorkflowParallel: Thread[main,5,main]
2
…
3
In ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
4
In ComputationService.getMin: Thread[ReactiveScheduler-3,5,main]
5
In ComputationService.getMax: Thread[ReactiveScheduler-4,5,main]
6
…
7
In consumer For Parallel: Thread[ReactiveScheduler-2,5,main]
8
In Parallel Business Workflow Lambada Result: 131 and Thread: Thread[ReactiveScheduler-2,5,main]
Если вы видите, сумма, мин и макс были выполнены в отдельных потоках. Потребитель / lambada, который выполняет агрегацию, также выполняется в отдельных потоках.
Примечания к сведению:
- В случае блокирующего вызова возможна потеря ресурсов, так как текущий поток будет ожидать операции ввода-вывода. В реактивном режиме этого можно избежать. Следовательно, с точки зрения масштабирования будет выгодно перейти к реактивному с учетом коэффициента нагрузки.
- Миф о том, что синхронный код медленный, а реактивный код быстрый. Синхронный код будет выполняться в одном процессе и одном потоке, в то время как в реактивном, один поток будет выполнять несколько вещей, возможно, зацикливаясь на возможных задачах, которые он может выполнить. На самом деле синхронный режим может быть быстрым, поскольку переключение потоков происходит не так сильно, как в случае реактивного, когда контекст потока часто переключается. Помните, что переключение контекста потока стоит дорого. Но реактивный может ускорить, если вы выполняете параллельно.
- Баланс между синхронной и реактивной обработкой необходимо поддерживать. Напомним, реактивные работы на рабочих потоках исполнителя и, безусловно, существует ограничение для каждого типа исполнителя. Если вы сформулируете свой код со слишком большим количеством реактивов, то потокам исполнителей может не хватить доступности. Следовательно, баланс между синхронными и реактивными потоками становится критическим.
- В случае реактивной обработки потребуется больше потоков, больше памяти, так как параллельно будет много обработки. Даже переключение контекста потока также займет несколько килобайт оперативной памяти. Больше памяти, больше циклов сборки мусора (GC), которые могут снизить производительность.
Мой опыт работы с реактивным программированием:
- Поскольку код будет не в последовательном порядке, это будет трудно с точки зрения читаемости. Это будет выглядеть сложно, пока вы не победите его.
- Прежде чем начать писать реактивный код, нужно много думать; даже простые вызовы нуждаются в глубоком размышлении.
- Множество новых испытаний будет брошено на юнит-тестирование, к которому нужно подготовиться.
- Поскольку бизнес-потоки охватывают несколько потоков, потребуется переосмыслить стратегию ведения журнала, чтобы добавить идентификатор корреляции, поскольку многие потоки будут регистрировать один и тот же поток запросов.
- Отладка будет сложной задачей, так как необходимо добавить несколько точек останова и лучше следить за процессом. Просто делать шаг не будет работать.
Заключение
Несмотря на предыдущие пункты и факты, реактивное программирование - отличное место, чтобы начать, и я предлагаю начать, если это - новая разработка. Существует определенная кривая обучения, и вы можете чувствовать себя потерянным. Овладев этим, вы увидите огромные преимущества с точки зрения ресурсов. Добро пожаловать на новый способ программирования.
Вы можете получить полный код на GitHub .
Удачного кодирования!
Дальнейшее чтение
Является ли Spring Reactive уже устаревшим? Взгляд на инверсию резьбового соединения