В первой части этой серии мы рассмотрели реактивную сеть и простой блокирующий и неблокирующий вызов. В этой статье мы подробно рассмотрим выполнение потоков и бизнес-процессы.
Неблокирующий вызов с выполнением потока.
Диаграмма выглядит сложной, но мы запускаем новый поток из потока запроса и затем вызываем реактивную функцию. Опять же, мы создаем новый поток из предыдущего потока, чтобы выполнить некоторую консольную печать. Здесь мы хотим протестировать или достичь того, чтобы как реактивная sum()
функция, так и консольная печать происходили параллельно. Выполнение sum()
не блокирует начало и печать нового потока.
Джава
xxxxxxxxxx
1
public static void nonBlockingCallAndStartNewThread(ApplicationContext ctx) throws InterruptedException {
2
Runnable r = new Runnable() {
4
5
public void run() {
6
System.out.println("In New Thread Run method:
7
"+Thread.currentThread());
8
try {
9
Consumer<Integer> consumer = C -> print(C);
10
ctx.getBean(ReactorComponent.class).nonBlockingSum(
11
new Integer[]
12
{400000,780,40,6760,30,3456450}).subscribe(consumer);
13
Runnable r1 = new Runnable() {
14
15
public void run() {
16
System.out.println("Started Another Thread:
17
"+Thread.currentThread());
18
for(int i =0; i <5; i++){
19
i++;
20
}
21
System.out.println("End of Another Thread:
22
"+Thread.currentThread());
23
}
24
};
25
new Thread(r1).start();
26
System.out.println("End of New Thread: "+Thread.currentThread());
27
} catch (InterruptedException e) {
28
e.printStackTrace();
29
}
30
}
31
};
32
new Thread(r).start();
33
}
34
Когда мы выполняем приведенный выше код, в нашей консоли выводится следующее:
xxxxxxxxxx
1
In ReactiveApplication.nonBlockingCallAndStartNewThread: Thread[main,5,main]
2
...
3
In New Thread Run method: Thread[Thread-3,5,main]
4
...
5
End of New Thread: Thread[Thread-3,5,main]
6
Started Another Thread: Thread[Thread-10,5,main]
7
End of Another Thread: Thread[Thread-10,5,main]
8
...
9
In Consumer/Lambada Result= 3864060 Thread: Thread[ReactiveScheduler-2,5,main]
Вышеуказанный вывод консоли ясно показывает, что потребитель / лямбада были выполнены в последний раз. Оба потока (Thread-3 и Thread-10) не ожидали вычисления sum()
метода.
На очень высокопроизводительном ПК вы не сможете получить такой же вывод на консоль. Положите немного сна в sum()
методе, чтобы вы могли связать точно, что происходит.
Обработка последовательных бизнес-потоков
До сих пор мы обсуждали поток и шаблоны его выполнения. Я надеюсь, что у вас есть четкое понимание реактивных обратных вызовов и того, как они выполняются иначе, чем запрос / основной поток. Без каких-либо задержек мы обсудим, как реагирующая структура может быть полезна в контексте обработки бизнес-потоков.
Давайте продолжим с нашим sum()
методом и создадим фиктивный бизнес-пример вычисления суммы, удалив первый и нижний элементы после сортировки. Простой способ сделать это - получить максимум и минимум и вычесть их из суммы. Формула будет выглядеть следующим образом : result = sum - max - min
. Обратите внимание, нас здесь не интересуют граничные условия. Давайте сначала посмотрим модель потока:
Определенно, это выглядит сложно. В этом примере мы вызываем последующий бизнес-метод из функции customer / lambada и собираем результат атомарным способом. Ниже приведен соответствующий фрагмент кода.
Джава
xxxxxxxxxx
1
public void businessService(Integer arr[], TestLambada <Temp> testLambada) throws InterruptedException {
2
System.out.println("In BusinessService.businessService:
4
"+Thread.currentThread());
5
// Start the execution in sequence.
6
Consumer<Integer> consumerForSum = sum -> {
7
AtomicInteger result = new AtomicInteger();
8
System.out.println("In consumerForSum: "+Thread.currentThread());
9
result.addAndGet(sum);
10
Consumer<Integer> consumerForMin = min -> {
11
System.out.println("In consumerForMin: "+Thread.currentThread());
12
result.set(result.intValue()-min);
13
Consumer <Integer> consumerForMax = max -> {
14
System.out.println("In consumerForMax: "+Thread.currentThread());
15
result.set(result.intValue()-max);
16
testLambada.get(new Temp(result.get()));
17
System.out.println("End of consumerForMax:
18
"+Thread.currentThread());
19
};
20
legacyComponent.getMax(arr).subscribe(consumerForMax);
21
System.out.println("End of consumerForMin: "+Thread.currentThread());
22
};
23
legacyComponent.getMin(arr).subscribe(consumerForMin);
24
System.out.println("End of consumerForSum: "+Thread.currentThread());
25
};
26
legacyComponent.nonBlockingSum(arr).subscribe(consumerForSum);
27
System.out.println("Returning from BusinessService.businessService:
28
"+Thread.currentThread());
29
}
Когда вы выполните приведенный выше код, вы получите следующий ответ:
Простой текст
xxxxxxxxxx
1
In ReactiveApplication.callBusinessWorkflowSerial: Thread[main,5,main]
2
…
3
In consumerForSum: Thread[ReactiveScheduler-2,5,main]
4
…
5
In consumerForMin: Thread[ReactiveScheduler-3,5,main]
6
…
7
In consumerForMax: Thread[ReactiveScheduler-2,5,main]
8
Printing result in tempTestLambada= 131
9
…
Если вы видите, что все обратные вызовы были вызваны в отдельных потоках, существует также повторное использование потока, так как оно поступает из пула исполнителей.
Код это невероятно сложный; Есть много асинхронных исполнений, и это очень классический анти-образец ада обратного вызова . Следовательно, вам нужно быть особенно осторожным в отношении того, какая часть вашего кода должна обрабатываться как обратный вызов, а какая часть должна находиться в одном и том же блоке потока. Я предлагаю вам создать баланс между реактивными и нереактивными компонентами.
Параллельная обработка бизнес-потоков
Если вы видите предыдущее экономическое обоснование, то было выполнено последовательное выполнение бизнес-потока. При этом общее время ответа будет представлять собой сумму каждого времени выполнения. Таким образом, во время последовательного выполнения потоки находились в пуле бездействующими и, в конечном итоге, ЦП не будет использоваться в достаточной степени.
Давайте попробуем заставить наш бизнес-поток выполняться параллельно. Здесь мы можем выполнить вычисление суммы, минимума и максимума для параллельного выполнения, а затем применить нашу формулу. Мы собираемся использовать завершаемое будущее, завернутое в Mono.zip
методе каркаса реактора. Обратите внимание, что не всегда возможно выполнить бизнес-поток параллельно, поскольку между потоками могут быть зависимости. Ниже приведена модель потока:
Фрагмент кода:
Джава
xxxxxxxxxx
1
public void busServiceInParallel(Integer arr[], TestLambada <Temp> testLambada) throws InterruptedException {
2
System.out.println("In BusinessService.busServiceInParallel:
4
"+Thread.currentThread());
5
Consumer<Tuple3<Integer,Integer,Integer>> consumer= a -> {
6
System.out.println("In consumer For Parallel: "+Thread.currentThread());
7
testLambada.get(new Temp(((Integer)a.get(0)-(Integer)a.get(1)-
8
(Integer)a.get(2))));
9
};
10
Mono.zip(legacyComponent.nonBlockingSum(arr),
11
legacyComponent.getMin(arr),
12
legacyComponent.getMax(arr)).subscribe(consumer);
13
System.out.println("Returning from BusinessService.busServiceInParallel:
14
"+Thread.currentThread());
15
}
Выполнение приведенного выше кода выведет следующее:
Простой текст
xxxxxxxxxx
1
In ReactiveApplication.callBusinessWorkflowParallel: Thread[main,5,main]
2
…
3
In ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
4
In ComputationService.getMin: Thread[ReactiveScheduler-3,5,main]
5
In ComputationService.getMax: Thread[ReactiveScheduler-4,5,main]
6
…
7
In consumer For Parallel: Thread[ReactiveScheduler-2,5,main]
8
In Parallel Business Workflow Lambada Result: 131 and Thread: Thread[ReactiveScheduler-2,5,main]
Если вы видите, сумма, мин и макс были выполнены в отдельных потоках. Потребитель / lambada, который выполняет агрегацию, также выполняется в отдельных потоках.
Указывает на заметку
- В случае блокирующего вызова могут быть растраты ресурсов, так как ваш текущий поток будет ожидать операции ввода-вывода. В реактивном вызове этого можно избежать. Следовательно, с точки зрения масштабирования будет выгодно перейти к реактивному с учетом коэффициента нагрузки.
- Миф, что синхронный код медленный, а реактивный код быстрый. Синхронный код будет выполняться в одном процессе и одном потоке, в то время как в реактивном режиме один поток будет выполнять несколько задач, возможно, циклически повторяя возможные задачи, которые он может выполнить. На самом деле синхронный код может быть быстрым, так как нет большого переключения потоков, как в случае реактивного, где контекст потока будет часто переключаться. Помните, переключение контекста потока стоит дорого. Но реактивный может ускорить, если вы выполняете параллельно.
- Баланс между синхронной и реактивной обработкой необходимо поддерживать. Напомним, реактивные работы на рабочих потоках исполнителя, и существует ограничение для каждого типа исполнителя. Если вы сформулируете свой код слишком активно, реактивные потоки исполнителя могут не хватать доступности. Поэтому поддержание баланса между синхронными и реактивными потоками имеет решающее значение.
- Чем больше потоков, тем больше памяти потребуется в случае реактивной обработки, так как параллельно будет много обработки. Даже переключение контекста потока также займет несколько килобайт оперативной памяти. Больше памяти, больше циклов сборки мусора (GC), которые могут снизить производительность вашего приложения.
Мой опыт с реактивным программированием
- Поскольку код будет не в последовательном порядке, это будет трудно с точки зрения читаемости. Это будет выглядеть сложно, пока вы не победите его.
- Прежде чем вы начнете писать реактивный код, нужно много думать о вашем подходе; даже простые взаимные вызовы потребуют глубокого обдумывания.
- Для юнит-тестирования будет создано множество новых испытаний, которые нужно подготовить.
- Поскольку ваш бизнес-поток распространяется на несколько потоков, потребуется переосмыслить стратегию ведения журнала, чтобы добавить идентификатор корреляции, поскольку многие потоки будут регистрировать один и тот же поток запросов.
- Отладка будет сложной задачей, так как потребуется добавить несколько точек останова и лучше следить за процессом. Просто перешагнуть не получится.
Заключение
Несмотря на предыдущие пункты и факты, реактивное программирование полезно для изучения, и я предлагаю вам начать. Там определенно кривая обучения, и вы можете чувствовать себя потерянным. Овладев этим, вы увидите огромные преимущества с точки зрения ресурсов. Добро пожаловать на новый взгляд на программирование.
Вы можете получить полный код в github .
Удачного кодирования.