Статьи

Асинхронные события CDI

Несколько дней назад, во время нашего регулярного обзора кода, один из моих коллег задал вопрос, что произойдет — и если это вообще возможно — когда CDI Observer (так, метод с параметром @Observes ) будет вызываться несколько раз в одно и то же время. для разных случаев. Другими словами, после создания нескольких событий возможно ли, что следующий метод будет обрабатываться более чем одним потоком одновременно:

1
public void observe(@Observes MyEvent myEvent) { ... }

Подумав об этом, я решил провести несколько тестов и описать результаты в этом посте.

Первые результаты: оказалось, что события CDI запускаются в синхронном режиме, что было для меня немного неожиданным. Почему?

До этого времени я видел это так: наблюдатели CDI позволяют мне очень четко отделить производителя событий от потребителя событий, поэтому у меня нет жестко запрограммированной регистрации слушателей, ведение списка слушателей и информирование их вручную. , Контейнер CDI делает все для меня.

Поэтому, если мы четко отделили производителей от потребителей, я подумал, что существует какая-то шина событий, работающая в специализированном пуле исполнителей потоков, который отвечает за посредничество между зарегистрированными событиями и вызываемыми методами наблюдателей. Я предполагаю, что я основал это предположение на других решениях для событий / слушателей, таких как Google Guava EventBus . Они дают вам возможность определить, хотите ли вы использовать синхронные (по умолчанию, EventBus ) или асинхронные диспетчеры событий ( AsyncEventBus .)

Более того, если EJB-компоненты — и производитель, и потребитель, я предполагаю, что он будет иметь те же функции, что и асинхронные вызовы EJB, когда речь идет о транзакциях. Единственный возможный атрибут транзакции JTA для наблюдателя асинхронного события: REQUIRES_NEW , REQUIRES_NEW или NOT_SUPPORTED .

Вот и все, как я ожидал, это сработает, что, похоже, сильно отличается от текущего состояния . Реальная жизнь показывает, что события CDI являются синхронными.

Существует проблема с тем, чтобы сделать асинхронные события доступными в CDI 1.1, но я не уверен, каково текущее состояние этой функции, и я не нашел ни слова об этом в CDI 1.1 (часть Java EE 7).

Давайте посмотрим, как мы можем справиться с этим самостоятельно.

Содержание

  1. Синхронные события по умолчанию
  2. Решение 1 — CDI Producer и Singleton EJB в качестве приемника
  3. Решение 2. Используйте Singleton EJB в качестве приемника с блокировкой чтения
  4. Решение 3 — EJB Producer и CDI Consumer
  5. Решение 4 — EJB Producer и EJB Consumer
  6. Решение 4 против решения 2
  7. Решение 5 — EJB Producer и CDI Consumer II
  8. Решение 6 — CDI с JMS
  9. Вывод

Синхронные события по умолчанию

Давайте начнем с основного примера, показывающего проблему. Взгляните на код — во-первых, производитель CDI Bean:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
@Path("/produce")
public class EventGenerator {
 
    @Inject
    private Logger logger;
 
    @Inject
    private Event<MyEvent> events;
 
    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) {
        for (int i = 0; i < numberOfEventsToGenerate; i++) {
            MyEvent event = new MyEvent(i);
 
            logger.info("Generating Event: " + event);
            events.fire(event);
        }
        return "Finished. Generated " + numberOfEventsToGenerate + " events.";
    }
}

MyEvent — это просто некоторый объект события, который здесь не очень важен. Он хранит порядковый номер события, который мы передаем при создании экземпляра.

Потребитель — это довольно простой компонент CDI:

01
02
03
04
05
06
07
08
09
10
11
public class EventConsumer {
 
    @Inject
    private Logger logger;
 
    public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException {
        logger.info("Receiving event: " + myEvent);
 
        TimeUnit.MILLISECONDS.sleep(500);
    }
}

Обратите внимание, что я вставил спящий поток, чтобы имитировать некоторый длительный процесс получения события.

Теперь давайте запустим этот пример, вызвав команду REST, предоставляемую этим EventProducer . Результат (запуск JBoss EAP 6.1 Alpha ) будет примерно таким:

14: 15: 59,196 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 0 ] 14: 15: 59,197 [com.piotrnowicki.EventConsumer] (http- / 127.0 .0.1: 8080-1) Получение события: MyEvent [ seqNo = 0 ] 14: 15: 59,697 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 1 ] 14 : 15: 59,698 [com.piotrnowicki.EventConsumer] (http- / 127.0.0.1: 8080-1) Событие получения: MyEvent [ seqNo = 1 ] 14: 16: 00,199 [com.piotrnowicki.EventGenerator] (http- / 127.0. 0.1: 8080-1) Создание события: MyEvent [ seqNo = 2 ] 14: 16: 00,200 [com.piotrnowicki.EventConsumer] (http- / 127.0.0.1: 8080-1) Получение события: MyEvent [ seqNo = 2 ]

Он показывает синхронный характер событий CDI — создание и использование событий происходит в одном и том же потоке и один за другим.

Итак, как добиться асинхронных событий с CDI?

Решение 1 — CDI Producer и Singleton EJB в качестве приемника

Продюсер остается на это был — чистый боб CDI:

1
2
3
4
5
6
@Path("/produce") public class EventGenerator {
 
    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}

Теперь, если вы превратите свой приемник в EJB @Singleton и пометите метод наблюдаемых как @Asynchronous следующим образом:

1
2
3
4
5
6
@Singleton
public class EventConsumer {
 
    @Asynchronous
    public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ...  }
}

Вы получите следующие результаты:

14: 21: 19,341 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 0] 14: 21: 19,343 [com.piotrnowicki.EventGenerator] (http- / 127.0 .0.1: 8080-1) Создание события: MyEvent [seqNo = 1] 14: 21: 19,343 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 2] 14 : 21: 19,347 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 2) Событие получения: MyEvent [seqNo = 1] 14:21: 19,848 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 1) Событие получения: MyEvent [seqNo = 0] 14:21: 20 350 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 3) Событие получения: MyEvent [seqNo = 2]

События создаются один за другим, и в отдельных потоках Singleton EJB обслуживает их один за другим (взгляните на время обработки событий). Это связано с неявной блокировкой записи для всех бизнес-методов Singleton EJB. Так что это:

Асинхронный: да
Потокобезопасный метод наблюдателя: да

Решение 2. Используйте Singleton EJB в качестве приемника с блокировкой чтения

Этот подход очень похож на Решение 1, однако он обеспечивает гораздо более высокую пропускную способность, поскольку вся обработка событий происходит параллельно.

Наш продюсер остался прежним — это боб CDI:

1
2
3
4
5
6
7
@Path("/produce")
public class EventGenerator {
 
    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}

Наш потребитель @Lock(READ) в свой метод наблюдений; это делает магию способности обслуживать несколько событий одновременно:

1
2
3
4
5
6
7
@Singleton
public class EventConsumer {
 
    @Asynchronous
    @Lock(LockType.READ)
    public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... }
}

Вот что вы можете получить в результате:

14: 24: 44,202 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 0] 14: 24: 44,204 [com.piotrnowicki.EventGenerator] (http- / 127.0 .0.1: 8080-1) Создание события: MyEvent [seqNo = 1] 14: 24: 44,205 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 2] 14 : 24: 44,207 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 4) Событие получения: MyEvent [seqNo = 0] 14:24: 44,207 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 6) Событие получения: MyEvent [seqNo = 2] 14:24: 44,207 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 5) Событие получения: MyEvent [seqNo = 1]

Различные потоки, обслуживающие события одновременно, дают вам большую пропускную способность. Так что это:

Асинхронный: да
Потокобезопасный метод наблюдателя: нет

Решение 3 — EJB Producer и CDI Consumer

CDI позволяет наблюдать за событиями на определенных этапах транзакции. Вы указываете это с помощью @Observes(during=TransactionPhase...) . В нашем случае мы хотели бы, чтобы CDI суммировал все эти события и вызывал нашего наблюдателя только после завершения транзакции. Для этого нам нужно только добавить вышеуказанный атрибут к нашему обозревателю компонентов CDI:

1
2
3
public class EventConsumer {
   public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) { ... }
}

Теперь нам просто нужно убедиться, что у нас есть запущенная транзакция в методе EventGenerator . Мы можем сделать это быстро, преобразовав наш @Stateless CDI в EJB @Stateless и используя его неявный REQUIRED TransactionAttribute следующим образом:

1
2
3
4
5
6
7
8
@Stateless
@Path("/produce")
public class EventGenerator {
 
    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}

Это результат, который мы могли бы закончить с:

14: 39: 06,776 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 0] 14: 39: 06,776 [com.piotrnowicki.EventGenerator] (http- / 127.0 .0.1: 8080-1) Создание события: MyEvent [seqNo = 1] 14: 39: 06,776 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 2] 14 : 39: 06,778 [com.piotrnowicki.EventConsumer] (http- / 127.0.0.1: 8080-1) Событие получения: MyEvent [seqNo = 2] 14:39: 07,279 [com.piotrnowicki.EventConsumer] (http- / 127.0. 0.1: 8080-1) Событие получения: MyEvent [seqNo = 1] 14:39: 07,780 [com.piotrnowicki.EventConsumer] (http- / 127.0.0.1: 8080-1) Событие получения: MyEvent [seqNo = 0]

EJB EventGenerator запускает транзакцию, и наблюдатель bean-компонента CDI будет вызываться сериализованным способом только после завершения транзакции.

Асинхронный: да
Потокобезопасный метод наблюдателя: да

Решение 4 — EJB Producer и EJB Consumer

Это очень похоже на Решение 3. Наш генератор остается тем же самым (Stateless EJB):

1
2
3
4
5
6
7
8
@Stateless
@Path("/produce")
public class EventGenerator {
 
    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}

И изменения внесены в EventConsumer который прямо сейчас:

1
2
3
4
5
6
7
@Singleton
public class EventConsumer {
 
    @Asynchronous
    @Lock(LockType.READ)
    public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) throws InterruptedException { ...  }
}

Результат может быть следующим:

14: 44: 09,363 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 0] 14: 44: 09,464 [com.piotrnowicki.EventGenerator] (http- / 127.0 .0.1: 8080-1) Создание события: MyEvent [seqNo = 1] 14: 44: 09,564 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [seqNo = 2] 14 : 44: 09,670 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 8) Событие получения: MyEvent [seqNo = 2] 14:44: 09,670 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 2) Событие получения: MyEvent [seqNo = 1] 14:44: 09 670 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 1) Событие получения: MyEvent [seqNo = 0]

Здесь мы использовали две функции — одна заключается в том, что метод потребителя событий является асинхронным, а вторая — в том, что потребитель не будет уведомлен до завершения транзакции производителя. Это дает нам:

Асинхронный: да
Потокобезопасный метод наблюдателя: нет

Решение 4 против решения 2

Эти два решения кажутся одинаковыми. Они отличаются только от аннотации потребителя: @Observes vs @Observes(during = TransactionPhase.AFTER_COMPLETION) . Более того, они действуют одинаково для нашего тестового примера: они асинхронные, и несколько потоков могут одновременно обрабатывать получатели событий . Однако между ними есть одна большая разница.

В нашем тестовом примере мы запускали события одно за другим. Представьте, что между срабатыванием событий есть некоторые другие операции. В таком случае:

  • Решение 2 ( @Observes ) начнет обрабатывать события сразу после того, как будет @Observes первое ,
  • Решение 4 ( @Observes(during = TransactionPhase.AFTER_COMPLETION) ( @Observes(during = TransactionPhase.AFTER_COMPLETION) ) начнет обработку сразу после завершения транзакции , то есть когда все события будут запущены.

Это показывает возможный результат такой ситуации:

Решение 2 ( @Observes )

15: 01: 34,318 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 0 ] 15: 01: 34,320 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 3 ) Получение события: MyEvent [ seqNo = 0 ] 15: 01: 34,419 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 1 ] 15: 01: 34,420 [com .piotrnowicki.EventConsumer] (EJB по умолчанию — 6) Событие получения: MyEvent [ seqNo = 1 ] 15: 01: 34,520 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 2 ] 15: 01: 34,521 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 9) Событие получения: MyEvent [ seqNo = 2 ]

Решение 4 ( @Observes(during = TransactionPhase.AFTER_COMPLETION) )

15: 00: 41,126 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 0 ] 15: 00: 41,226 [com.piotrnowicki.EventGenerator] (http- / 127.0 .0.1: 8080-1) Создание события: MyEvent [ seqNo = 1 ] 15: 00: 41,326 [com.piotrnowicki.EventGenerator] (http- / 127.0.0.1: 8080-1) Создание события: MyEvent [ seqNo = 2 ] 15 : 00: 41,432 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 10) Событие получения: MyEvent [ seqNo = 2 ] 15: 00: 41,432 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 4) Событие получения: MyEvent [ seqNo = 1 ] 15: 00: 41,432 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 5) Событие получения: MyEvent [ seqNo = 0 ]

Решение 5 — EJB Producer и CDI Consumer II

До этого момента мы пытались сделать наш приемник асинхронным. Есть и обратный путь — мы можем сделать источник событий асинхронным . Мы можем добиться этого, пометив нашего производителя как @Stateless и вызвав его собственный асинхронный метод, который просто вызовет событие:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Stateless
@Path("/produce")
public class EventGenerator {
    // ...
 
    @Resource
    private SessionContext sctx;
 
    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) {
        for (int i = 0; i < numberOfEventsToGenerate; i++) {
            sctx.getBusinessObject(EventGenerator.class).fireEvent(new MyEvent(i));
        }
 
        return "Finished. Generated " + numberOfEventsToGenerate + " events.";
    }
 
    @Asynchronous
    public void fireEvent(final MyEvent event) {
        events.fire(event);
    }
}

Присмотритесь к SessionContext EJB с помощью SessionContext . В этом случае это необходимо, поскольку мы хотим, чтобы контейнер отправлял вызов нашего метода и добавлял его асинхронный характер. Мы не хотим делать это локальным вызовом, поэтому мы отказываемся использовать неявный this объект.
Потребитель событий, с другой стороны, является простым компонентом CDI:

1
2
3
public class EventConsumer {
    public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... }
}

Результат может быть следующим:

00: 40: 32,820 [com.piotrnowicki.EventGenerator] (EJB по умолчанию — 2) Создание события: MyEvent [seqNo = 1] 00: 40: 32,820 [com.piotrnowicki.EventGenerator] (EJB по умолчанию — 3) Создание события: MyEvent [ seqNo = 2] 00: 40: 32,820 [com.piotrnowicki.EventGenerator] (EJB по умолчанию — 1) Создание события: MyEvent [seqNo = 0] 00:40: 32,821 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 1) Получение событие: MyEvent [seqNo = 0] 00:40: 32,821 [com.piotrnowicki.EventConsumer] (EJB по умолчанию — 2) Получение события: MyEvent [seqNo = 1] 00:40: 32,821 [com.piotrnowicki.EventConsumer] (EJB по умолчанию) — 3) Получение события: MyEvent [seqNo = 2]

Асинхронный: да
Потокобезопасный метод наблюдателя: нет

Решение 6 — CDI с JMS

Это решение, представленное Джулиано Виана в своем блоге . Он использует JMS в качестве шины событий. Событие CDI создается, а затем выбирается некоторым классом, который отвечает за помещение этого события в тему / очередь JMS. MDB, который выбирает сообщения из темы / очереди, создает событие, которое вызывает реальный получатель. Это не только дает вам асинхронную доставку событий, но также добавляет к ней характер транзакции. Например, если получатель события не может обработать сообщение — он может откатить транзакцию, и очередь обеспечит повторную доставку сообщения (возможно, в следующий раз, когда ваш обработчик событий сможет обработать это событие?)

Вывод

CDI 1.0 не поддерживает генерацию асинхронных событий. Также не похоже, что CDI 1.1 будет иметь такую ​​поддержку.

Это, однако, не означает, что вы не можете достичь асинхронной обработки. Уже существуют решения, основанные на EJB 3.1 или существующих атрибутах наблюдателя CDI. Вы также должны быть в состоянии написать портативное расширение CDI, которое добавляет эту функциональность в ваш код.