Лучшим подходом, чем версия с необработанным потоком, является основанная на пуле потоков версия, в которой соответствующий размер пула потоков определяется на основе системы, в которой выполняется задача — Количество процессоров / (1-коэффициент блокировки задачи). Книга Venkat Subramaniams содержит более подробную информацию:
Сначала я определил пользовательскую задачу для генерации части отчета, учитывая запрос части отчета, это реализовано как Callable :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
public class ReportPartRequestCallable implements Callable<ReportPart> { private final ReportRequestPart reportRequestPart; private final ReportPartGenerator reportPartGenerator; public ReportPartRequestCallable(ReportRequestPart reportRequestPart, ReportPartGenerator reportPartGenerator) { this .reportRequestPart = reportRequestPart; this .reportPartGenerator = reportPartGenerator; } @Override public ReportPart call() { return this .reportPartGenerator.generateReportPart(reportRequestPart); } } |
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public class ExecutorsBasedReportGenerator implements ReportGenerator { private static final Logger logger = LoggerFactory.getLogger(ExecutorsBasedReportGenerator. class ); private ReportPartGenerator reportPartGenerator; private ExecutorService executors = Executors.newFixedThreadPool( 10 ); @Override public Report generateReport(ReportRequest reportRequest) { List<Callable<ReportPart>> tasks = new ArrayList<Callable<ReportPart>>(); List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts(); for (ReportRequestPart reportRequestPart : reportRequestParts) { tasks.add( new ReportPartRequestCallable(reportRequestPart, reportPartGenerator)); } List<Future<ReportPart>> responseForReportPartList; List<ReportPart> reportParts = new ArrayList<ReportPart>(); try { responseForReportPartList = executors.invokeAll(tasks); for (Future<ReportPart> reportPartFuture : responseForReportPartList) { reportParts.add(reportPartFuture.get()); } } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); } return new Report(reportParts); } ...... } |
Здесь пул потоков создается с помощью вызова Executors.newFixedThreadPool (10) с размером пула 10, для каждой части запроса отчета генерируется вызываемая задача, которая передается в пул потоков с использованием абстракции ExecutorService.
1
|
responseForReportPartList = executors.invokeAll(tasks); |
этот вызов возвращает список фьючерсов, которые поддерживают метод get (), который является блокирующим вызовом для ответа, который будет доступен.
Это, безусловно, намного лучшая реализация по сравнению с исходной версией потока, количество потоков ограничено управляемым числом под нагрузкой.
Внедрение Spring на основе интеграции
Подход, который мне лично нравится больше всего, заключается в использовании Spring Integration , причина в том, что в Spring Integration я сосредотачиваюсь на компонентах, выполняющих различные задачи, и оставляю его на усмотрение Spring Integration, чтобы связать поток вместе, используя конфигурацию на основе xml или аннотаций. Здесь я буду использовать конфигурацию на основе XML:
Компоненты в моем случае:
1. Компонент для генерации части отчета, учитывая запрос части отчета, который я показал ранее .
2. Компонент для разделения запроса отчета на части запроса отчета:
1
2
3
4
5
6
|
public class DefaultReportRequestSplitter implements ReportRequestSplitter{ @Override public List<ReportRequestPart> split(ReportRequest reportRequest) { return reportRequest.getRequestParts(); } } |
3. Компонент для сборки / объединения частей отчета в единый отчет:
1
2
3
4
5
6
7
8
|
public class DefaultReportAggregator implements ReportAggregator{ @Override public Report aggregate(List<ReportPart> reportParts) { return new Report(reportParts); } } |
И это весь код Java, который требуется для Spring Integration, остальная часть является проводной — здесь я использовал файл конфигурации Spring:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
<? xml version = '1.0' encoding = 'UTF-8' ?> < beans .... <int:channel id = 'report.partsChannel' /> < int:channel id = 'report.reportChannel' /> < int:channel id = 'report.partReportChannel' > < int:queue capacity = '50' /> </ int:channel > < int:channel id = 'report.joinPartsChannel' /> < int:splitter id = 'splitter' ref = 'reportsPartSplitter' method = 'split' input-channel = 'report.partsChannel' output-channel = 'report.partReportChannel' /> < task:executor id = 'reportPartGeneratorExecutor' pool-size = '10' queue-capacity = '50' /> < int:service-activator id = 'reportsPartServiceActivator' ref = 'reportPartReportGenerator' method = 'generateReportPart' input-channel = 'report.partReportChannel' output-channel = 'report.joinPartsChannel' > < int:poller task-executor = 'reportPartGeneratorExecutor' fixed-delay = '500' > </ int:poller > </ int:service-activator > < int:aggregator ref = 'reportAggregator' method = 'aggregate' input-channel = 'report.joinPartsChannel' output-channel = 'report.reportChannel' ></ int:aggregator > < int:gateway id = 'reportGeneratorGateway' service-interface = 'org.bk.sisample.springintegration.ReportGeneratorGateway' default-request-channel = 'report.partsChannel' default-reply-channel = 'report.reportChannel' /> < bean name = 'reportsPartSplitter' class = 'org.bk.sisample.springintegration.processors.DefaultReportRequestSplitter' ></ bean > < bean name = 'reportPartReportGenerator' class = 'org.bk.sisample.processors.DummyReportPartGenerator' /> < bean name = 'reportAggregator' class = 'org.bk.sisample.springintegration.processors.DefaultReportAggregator' /> < bean name = 'reportGenerator' class = 'org.bk.sisample.springintegration.SpringIntegrationBasedReportGenerator' /> </ beans > |
Spring Source Tool Suite предоставляет отличный способ визуализации этого файла:
это полностью соответствует моему первоначальному представлению о потоке пользователя:
В версии кода Spring Integration я определил различные компоненты для обработки различных частей потока:
1. Разделитель для преобразования запроса отчета в части запроса отчета:
1
2
|
< int:splitter id = 'splitter' ref = 'reportsPartSplitter' method = 'split' input-channel = 'report.partsChannel' output-channel = 'report.partReportChannel' /> |
2. Компонент активатора службы для создания части отчета из запроса части отчета:
1
2
3
4
5
|
< int:service-activator id = 'reportsPartServiceActivator' ref = 'reportPartReportGenerator' method = 'generateReportPart' input-channel = 'report.partReportChannel' output-channel = 'report.joinPartsChannel' > < int:poller task-executor = 'reportPartGeneratorExecutor' fixed-delay = '500' > </ int:poller > </ int:service-activator > |
3. Агрегатор для объединения частей отчета обратно в отчет, и он достаточно умен, чтобы соответствующим образом соотнести исходные запросы отчета без какого-либо явного кодирования:
1
2
|
< int:aggregator ref = 'reportAggregator' method = 'aggregate' input-channel = 'report.joinPartsChannel' output-channel = 'report.reportChannel' ></ int:aggregator > |
Что интересно в этом коде, так это то, что, как и в примере на основе исполнителей, количество потоков, обслуживающих каждый из этих компонентов, полностью настраивается с помощью файла XML, с помощью соответствующих каналов для соединения различных компонентов и с помощью исполнителей задач с размер пула потоков, установленный в качестве атрибута исполнителя.
В этом коде я определил канал очереди, куда входят части запроса отчета:
1
2
3
|
< int:channel id = 'report.partReportChannel' > < int:queue capacity = '50' /> </ int:channel > |
и обслуживается компонентом-активатором службы с использованием исполнителя задач с пулом потоков размером 10 и емкостью 50:
1
2
3
4
5
6
7
|
< task:executor id = 'reportPartGeneratorExecutor' pool-size = '10' queue-capacity = '50' /> < int:service-activator id = 'reportsPartServiceActivator' ref = 'reportPartReportGenerator' method = 'generateReportPart' input-channel = 'report.partReportChannel' output-channel = 'report.joinPartsChannel' > < int:poller task-executor = 'reportPartGeneratorExecutor' fixed-delay = '500' > </ int:poller > </ int:service-activator > |
Все это через конфигурацию!
Вся база кода для этого образца доступна по адресу github: https://github.com/bijukunjummen/si-sample
Ссылка: Параллелизм — Исполнители и Spring Integration от нашего партнера JCG Биджу Кунджуммен в блоге all and sundry.