Несколько дней назад, во время нашего регулярного обзора кода, один из моих коллег задал вопрос, что произойдет — и если это вообще возможно — когда CDI Observer (так, метод с параметром @Observes
) будет вызываться несколько раз в одно и то же время. для разных случаев. Другими словами, после создания нескольких событий возможно ли, что следующий метод будет обрабатываться более чем одним потоком одновременно:
1
|
public void observe( @Observes MyEvent myEvent) { ... } |
Подумав об этом, я решил провести несколько тестов и описать результаты в этом посте.
Первые результаты: оказалось, что события CDI запускаются в синхронном режиме, что было для меня немного неожиданным. Почему?
До этого времени я видел это так: наблюдатели CDI позволяют мне очень четко отделить производителя событий от потребителя событий, поэтому у меня нет жестко запрограммированной регистрации слушателей, ведение списка слушателей и информирование их вручную. , Контейнер CDI делает все для меня.
Поэтому, если мы четко отделили производителей от потребителей, я подумал, что существует какая-то шина событий, работающая в специализированном пуле исполнителей потоков, который отвечает за посредничество между зарегистрированными событиями и вызываемыми методами наблюдателей. Я предполагаю, что я основал это предположение на других решениях для событий / слушателей, таких как Google Guava EventBus . Они дают вам возможность определить, хотите ли вы использовать синхронные (по умолчанию, EventBus ) или асинхронные диспетчеры событий ( AsyncEventBus .)
Более того, если EJB-компоненты — и производитель, и потребитель, я предполагаю, что он будет иметь те же функции, что и асинхронные вызовы EJB, когда речь идет о транзакциях. Единственный возможный атрибут транзакции JTA для наблюдателя асинхронного события: REQUIRES_NEW
, REQUIRES_NEW
или NOT_SUPPORTED
.
Вот и все, как я ожидал, это сработает, что, похоже, сильно отличается от текущего состояния . Реальная жизнь показывает, что события CDI являются синхронными.
Существует проблема с тем, чтобы сделать асинхронные события доступными в CDI 1.1, но я не уверен, каково текущее состояние этой функции, и я не нашел ни слова об этом в CDI 1.1 (часть Java EE 7).
Давайте посмотрим, как мы можем справиться с этим самостоятельно.
Содержание
- Синхронные события по умолчанию
- Решение 1 — CDI Producer и Singleton EJB в качестве приемника
- Решение 2. Используйте Singleton EJB в качестве приемника с блокировкой чтения
- Решение 3 — EJB Producer и CDI Consumer
- Решение 4 — EJB Producer и EJB Consumer
- Решение 4 против решения 2
- Решение 5 — EJB Producer и CDI Consumer II
- Решение 6 — CDI с JMS
- Вывод
Синхронные события по умолчанию
Давайте начнем с основного примера, показывающего проблему. Взгляните на код — во-первых, производитель CDI Bean:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@Path ( "/produce" ) public class EventGenerator { @Inject private Logger logger; @Inject private Event<MyEvent> events; @Path ( "/cdiBean/{eventsNum}" ) @GET public String generateEvents( @PathParam ( "eventsNum" ) int numberOfEventsToGenerate) { for ( int i = 0 ; i < numberOfEventsToGenerate; i++) { MyEvent event = new MyEvent(i); logger.info( "Generating Event: " + event); events.fire(event); } return "Finished. Generated " + numberOfEventsToGenerate + " events." ; } } |
MyEvent
— это просто некоторый объект события, который здесь не очень важен. Он хранит порядковый номер события, который мы передаем при создании экземпляра.
Потребитель — это довольно простой компонент CDI:
01
02
03
04
05
06
07
08
09
10
11
|
public class EventConsumer { @Inject private Logger logger; public void consumeEvent( @Observes MyEvent myEvent) throws InterruptedException { logger.info( "Receiving event: " + myEvent); TimeUnit.MILLISECONDS.sleep( 500 ); } } |
Обратите внимание, что я вставил спящий поток, чтобы имитировать некоторый длительный процесс получения события.
Теперь давайте запустим этот пример, вызвав команду REST, предоставляемую этим EventProducer
. Результат (запуск JBoss EAP 6.1 Alpha ) будет примерно таким:
14: 15: 59,196 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 0 ] 14: 15: 59,197 [com.piotrnowicki.EventConsumer] (http- / 127.0 .0.1: 8080-1) Получение события: MyEvent [ seqNo = 0 ] 14: 15: 59,697 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 1 ] 14 : 15: 59,698 [com.piotrnowicki.EventConsumer] (http- / 127.0.0.1: 8080-1) Событие получения: MyEvent [ seqNo = 1 ] 14: 16: 00,199 [com.piotrnowicki.EventGenerator] (http- / 127.0. 0.1: 8080-1) Создание события: MyEvent [ seqNo = 2 ] 14: 16: 00,200 [com.piotrnowicki.EventConsumer] (http- / 127.0.0.1: 8080-1) Получение события: MyEvent [ seqNo = 2 ]
Он показывает синхронный характер событий CDI — создание и использование событий происходит в одном и том же потоке и один за другим.
Итак, как добиться асинхронных событий с CDI?
Решение 1 — CDI Producer и Singleton EJB в качестве приемника
Продюсер остается на это был — чистый боб CDI:
1
2
3
4
5
6
|
@Path ( "/produce" ) public class EventGenerator { @Path ( "/cdiBean/{eventsNum}" ) @GET public String generateEvents( @PathParam ( "eventsNum" ) int numberOfEventsToGenerate) { ... } } |
Теперь, если вы превратите свой приемник в EJB @Singleton и пометите метод наблюдаемых как @Asynchronous следующим образом:
1
2
3
4
5
6
|
@Singleton public class EventConsumer { @Asynchronous public void consumeEvent( @Observes MyEvent myEvent) throws InterruptedException { ... } } |
Вы получите следующие результаты:
14: 21: 19,341 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 0] 14: 21: 19,343 [com.piotrnowicki.EventGenerator] (http- / 127.0 .0.1: 8080-1) Создание события: MyEvent [seqNo = 1] 14: 21: 19,343 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 2] 14 : 21: 19,347 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 2) Событие получения: MyEvent [seqNo = 1] 14:21: 19,848 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 1) Событие получения: MyEvent [seqNo = 0] 14:21: 20 350 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 3) Событие получения: MyEvent [seqNo = 2]
События создаются один за другим, и в отдельных потоках Singleton EJB обслуживает их один за другим (взгляните на время обработки событий). Это связано с неявной блокировкой записи для всех бизнес-методов Singleton EJB. Так что это:
Асинхронный: да
Потокобезопасный метод наблюдателя: да
Решение 2. Используйте Singleton EJB в качестве приемника с блокировкой чтения
Этот подход очень похож на Решение 1, однако он обеспечивает гораздо более высокую пропускную способность, поскольку вся обработка событий происходит параллельно.
Наш продюсер остался прежним — это боб CDI:
1
2
3
4
5
6
7
|
@Path ( "/produce" ) public class EventGenerator { @Path ( "/cdiBean/{eventsNum}" ) @GET public String generateEvents( @PathParam ( "eventsNum" ) int numberOfEventsToGenerate) { ... } } |
Наш потребитель @Lock(READ)
в свой метод наблюдений; это делает магию способности обслуживать несколько событий одновременно:
1
2
3
4
5
6
7
|
@Singleton public class EventConsumer { @Asynchronous @Lock (LockType.READ) public void consumeEvent( @Observes MyEvent myEvent) throws InterruptedException { ... } } |
Вот что вы можете получить в результате:
14: 24: 44,202 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 0] 14: 24: 44,204 [com.piotrnowicki.EventGenerator] (http- / 127.0 .0.1: 8080-1) Создание события: MyEvent [seqNo = 1] 14: 24: 44,205 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 2] 14 : 24: 44,207 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 4) Событие получения: MyEvent [seqNo = 0] 14:24: 44,207 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 6) Событие получения: MyEvent [seqNo = 2] 14:24: 44,207 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 5) Событие получения: MyEvent [seqNo = 1]
Различные потоки, обслуживающие события одновременно, дают вам большую пропускную способность. Так что это:
Асинхронный: да
Потокобезопасный метод наблюдателя: нет
Решение 3 — EJB Producer и CDI Consumer
CDI позволяет наблюдать за событиями на определенных этапах транзакции. Вы указываете это с помощью @Observes(during=TransactionPhase...)
. В нашем случае мы хотели бы, чтобы CDI суммировал все эти события и вызывал нашего наблюдателя только после завершения транзакции. Для этого нам нужно только добавить вышеуказанный атрибут к нашему обозревателю компонентов CDI:
1
2
3
|
public class EventConsumer { public void consumeEvent( @Observes (during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) { ... } } |
Теперь нам просто нужно убедиться, что у нас есть запущенная транзакция в методе EventGenerator
. Мы можем сделать это быстро, преобразовав наш @Stateless
CDI в EJB @Stateless
и используя его неявный REQUIRED
TransactionAttribute следующим образом:
1
2
3
4
5
6
7
8
|
@Stateless @Path ( "/produce" ) public class EventGenerator { @Path ( "/cdiBean/{eventsNum}" ) @GET public String generateEvents( @PathParam ( "eventsNum" ) int numberOfEventsToGenerate) { ... } } |
Это результат, который мы могли бы закончить с:
14: 39: 06,776 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 0] 14: 39: 06,776 [com.piotrnowicki.EventGenerator] (http- / 127.0 .0.1: 8080-1) Создание события: MyEvent [seqNo = 1] 14: 39: 06,776 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 2] 14 : 39: 06,778 [com.piotrnowicki.EventConsumer] (http- / 127.0.0.1: 8080-1) Событие получения: MyEvent [seqNo = 2] 14:39: 07,279 [com.piotrnowicki.EventConsumer] (http- / 127.0. 0.1: 8080-1) Событие получения: MyEvent [seqNo = 1] 14:39: 07,780 [com.piotrnowicki.EventConsumer] (http- / 127.0.0.1: 8080-1) Событие получения: MyEvent [seqNo = 0]
EJB EventGenerator
запускает транзакцию, и наблюдатель bean-компонента CDI будет вызываться сериализованным способом только после завершения транзакции.
Асинхронный: да
Потокобезопасный метод наблюдателя: да
Решение 4 — EJB Producer и EJB Consumer
Это очень похоже на Решение 3. Наш генератор остается тем же самым (Stateless EJB):
1
2
3
4
5
6
7
8
|
@Stateless @Path ( "/produce" ) public class EventGenerator { @Path ( "/cdiBean/{eventsNum}" ) @GET public String generateEvents( @PathParam ( "eventsNum" ) int numberOfEventsToGenerate) { ... } } |
И изменения внесены в EventConsumer
который прямо сейчас:
1
2
3
4
5
6
7
|
@Singleton public class EventConsumer { @Asynchronous @Lock (LockType.READ) public void consumeEvent( @Observes (during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) throws InterruptedException { ... } } |
Результат может быть следующим:
14: 44: 09,363 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 0] 14: 44: 09,464 [com.piotrnowicki.EventGenerator] (http- / 127.0 .0.1: 8080-1) Создание события: MyEvent [seqNo = 1] 14: 44: 09,564 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 2] 14 : 44: 09,670 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 8) Событие получения: MyEvent [seqNo = 2] 14:44: 09,670 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 2) Событие получения: MyEvent [seqNo = 1] 14:44: 09 670 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 1) Событие получения: MyEvent [seqNo = 0]
Здесь мы использовали две функции — одна заключается в том, что метод потребителя событий является асинхронным, а вторая — в том, что потребитель не будет уведомлен до завершения транзакции производителя. Это дает нам:
Асинхронный: да
Потокобезопасный метод наблюдателя: нет
Решение 4 против решения 2
Эти два решения кажутся одинаковыми. Они отличаются только от аннотации потребителя: @Observes
vs @Observes(during = TransactionPhase.AFTER_COMPLETION)
. Более того, они действуют одинаково для нашего тестового примера: они асинхронные, и несколько потоков могут одновременно обрабатывать получатели событий . Однако между ними есть одна большая разница.
В нашем тестовом примере мы запускали события одно за другим. Представьте, что между срабатыванием событий есть некоторые другие операции. В таком случае:
- Решение 2 (
@Observes
) начнет обрабатывать события сразу после того, как будет@Observes
первое , - Решение 4 (
@Observes(during = TransactionPhase.AFTER_COMPLETION)
(@Observes(during = TransactionPhase.AFTER_COMPLETION)
) начнет обработку сразу после завершения транзакции , то есть когда все события будут запущены.
Это показывает возможный результат такой ситуации:
Решение 2 ( @Observes
)
15: 01: 34,318 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 0 ] 15: 01: 34,320 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 3 ) Получение события: MyEvent [ seqNo = 0 ] 15: 01: 34,419 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 1 ] 15: 01: 34,420 [com .piotrnowicki.EventConsumer] (EJB по умолчанию — 6) Событие получения: MyEvent [ seqNo = 1 ] 15: 01: 34,520 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 2 ] 15: 01: 34,521 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 9) Событие получения: MyEvent [ seqNo = 2 ]
Решение 4 ( @Observes(during = TransactionPhase.AFTER_COMPLETION)
)
15: 00: 41,126 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 0 ] 15: 00: 41,226 [com.piotrnowicki.EventGenerator] (http- / 127.0 .0.1: 8080-1) Создание события: MyEvent [ seqNo = 1 ] 15: 00: 41,326 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 2 ] 15 : 00: 41,432 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 10) Событие получения: MyEvent [ seqNo = 2 ] 15: 00: 41,432 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 4) Событие получения: MyEvent [ seqNo = 1 ] 15: 00: 41,432 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 5) Событие получения: MyEvent [ seqNo = 0 ]
Решение 5 — EJB Producer и CDI Consumer II
До этого момента мы пытались сделать наш приемник асинхронным. Есть и обратный путь — мы можем сделать источник событий асинхронным . Мы можем добиться этого, пометив нашего производителя как @Stateless
и вызвав его собственный асинхронный метод, который просто вызовет событие:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@Stateless @Path ( "/produce" ) public class EventGenerator { // ... @Resource private SessionContext sctx; @Path ( "/cdiBean/{eventsNum}" ) @GET public String generateEvents( @PathParam ( "eventsNum" ) int numberOfEventsToGenerate) { for ( int i = 0 ; i < numberOfEventsToGenerate; i++) { sctx.getBusinessObject(EventGenerator. class ).fireEvent( new MyEvent(i)); } return "Finished. Generated " + numberOfEventsToGenerate + " events." ; } @Asynchronous public void fireEvent( final MyEvent event) { events.fire(event); } } |
Присмотритесь к SessionContext
EJB с помощью SessionContext
. В этом случае это необходимо, поскольку мы хотим, чтобы контейнер отправлял вызов нашего метода и добавлял его асинхронный характер. Мы не хотим делать это локальным вызовом, поэтому мы отказываемся использовать неявный this
объект.
Потребитель событий, с другой стороны, является простым компонентом CDI:
1
2
3
|
public class EventConsumer { public void consumeEvent( @Observes MyEvent myEvent) throws InterruptedException { ... } } |
Результат может быть следующим:
00: 40: 32,820 [com.piotrnowicki.EventGenerator] (EJB по умолчанию — 2) Создание события: MyEvent [seqNo = 1] 00: 40: 32,820 [com.piotrnowicki.EventGenerator] (EJB по умолчанию — 3) Создание события: MyEvent [ seqNo = 2] 00: 40: 32,820 [com.piotrnowicki.EventGenerator] (EJB по умолчанию — 1) Создание события: MyEvent [seqNo = 0] 00:40: 32,821 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 1) Получение событие: MyEvent [seqNo = 0] 00:40: 32,821 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 2) Получение события: MyEvent [seqNo = 1] 00:40: 32,821 [com.piotrnowicki.EventConsumer] (EJB по умолчанию) — 3) Получение события: MyEvent [seqNo = 2]
Асинхронный: да
Потокобезопасный метод наблюдателя: нет
Решение 6 — CDI с JMS
Это решение, представленное Джулиано Виана в своем блоге . Он использует JMS в качестве шины событий. Событие CDI создается, а затем выбирается некоторым классом, который отвечает за помещение этого события в тему / очередь JMS. MDB, который выбирает сообщения из темы / очереди, создает событие, которое вызывает реальный получатель. Это не только дает вам асинхронную доставку событий, но также добавляет к ней характер транзакции. Например, если получатель события не может обработать сообщение — он может откатить транзакцию, и очередь обеспечит повторную доставку сообщения (возможно, в следующий раз, когда ваш обработчик событий сможет обработать это событие?)
Вывод
CDI 1.0 не поддерживает генерацию асинхронных событий. Также не похоже, что CDI 1.1 будет иметь такую поддержку.
Это, однако, не означает, что вы не можете достичь асинхронной обработки. Уже существуют решения, основанные на EJB 3.1 или существующих атрибутах наблюдателя CDI. Вы также должны быть в состоянии написать портативное расширение CDI, которое добавляет эту функциональность в ваш код.