Сегодня в качестве примера мы возьмем очень простой сценарий: размещение заказа сохраняет его и отправляет электронное письмо об этом заказе:
1
2
3
4
5
6
7
8
9
|
@Service class OrderService @Autowired () (orderDao: OrderDao, mailNotifier: OrderMailNotifier) { @Transactional def placeOrder(order: Order) { orderDao save order mailNotifier sendMail order } } |
Пока все хорошо, но функциональность электронной почты не имеет ничего общего с размещением заказа. Это просто побочный эффект, который отвлекает, а не часть бизнес-логики. Кроме того, отправка электронного письма излишне продлевает транзакцию и приводит к задержке. Поэтому мы решили разделить эти два действия с помощью событий. Для простоты я воспользуюсь встроенными пользовательскими событиями Spring, но наше обсуждение одинаково актуально для JMS или другой библиотеки / очереди производителя-потребителя.
01
02
03
04
05
06
07
08
09
10
11
12
|
case class OrderPlacedEvent(order: Order) extends ApplicationEvent @Service class OrderService @Autowired () (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) { @Transactional def placeOrder(order: Order) { orderDao save order eventPublisher publishEvent OrderPlacedEvent(order) } } |
Как вы можете видеть, вместо OrderMailNotifier
доступа к компоненту OrderMailNotifier
мы отправляем OrderPlacedEvent
обертку вновь созданного заказа. ApplicationEventPublisher
необходим для отправки события. Конечно, мы также должны реализовать на стороне клиента получение сообщений:
1
2
3
4
5
6
7
8
|
@Service class OrderMailNotifier extends ApplicationListener[OrderPlacedEvent] { def onApplicationEvent(event: OrderPlacedEvent) { //sending e-mail... } } |
ApplicationListener[OrderPlacedEvent]
указывает, какой тип событий нас интересует. Это работает, однако по умолчанию Spring ApplicationEvent
является синхронным, что означает, что publishEvent()
фактически блокирует. Зная Spring, не составит труда перевести трансляцию событий в асинхронный режим. На самом деле есть два способа: один предложен в JavaDoc, а другой я обнаружил, потому что сначала мне не удалось прочитать JavaDoc… Согласно документации, если вы хотите, чтобы ваши события доставлялись асинхронно, вы должны определить компонент с именем applicationEventMulticaster
типа SimpleApplicationEventMulticaster
и определить taskExecutor
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
@Bean def applicationEventMulticaster() = { val multicaster = new SimpleApplicationEventMulticaster() multicaster.setTaskExecutor(taskExecutor()) multicaster } @Bean def taskExecutor() = { val pool = new ThreadPoolTaskExecutor() pool.setMaxPoolSize( 10 ) pool.setCorePoolSize( 10 ) pool.setThreadNamePrefix( "Spring-Async-" ) pool } |
Spring уже поддерживает трансляцию событий с использованием специального TaskExecutor
. Я не знал об этом, поэтому сначала я просто аннотировал onApplicationEvent()
с помощью @Async
:
1
2
|
@Async def onApplicationEvent(event: OrderPlacedEvent) { //... |
дальнейших изменений нет, как только Spring обнаруживает метод @Async
он запускает его в другом потоке асинхронно. Период. Ну, вам все равно нужно включить поддержку @Async
если вы ее еще не используете:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
@Configuration @EnableAsync class ThreadingConfig extends AsyncConfigurer { def getAsyncExecutor = taskExecutor() @Bean def taskExecutor() = { val pool = new ThreadPoolTaskExecutor() pool.setMaxPoolSize( 10 ) pool.setCorePoolSize( 10 ) pool.setThreadNamePrefix( "Spring-Async-" ) pool } } |
Технически @EnableAsync
достаточно. Однако по умолчанию Spring использует SimpleAsyncTaskExecutor
который создает новый поток при каждом вызове @Async
. Немного прискорбно по умолчанию для корпоративной среды, к счастью, ее легко изменить. Несомненно, @Async
кажется чище, чем определение магических бобов.
Все вышеизложенное было просто подставой, чтобы раскрыть реальную проблему. Теперь мы отправляем асинхронное сообщение, которое обрабатывается в другом потоке. К сожалению, мы ввели состояние гонки, которое проявляется под большой нагрузкой или, может быть, только в какой-то конкретной операционной системе. Вы можете это заметить? Чтобы дать вам подсказку, вот что происходит:
- Начальная транзакция
- Хранение
order
в базе данных - Отправка
order
переноса сообщений - совершить
Тем временем какой-то асинхронный поток выбирает OrderPlacedEvent
и начинает его обрабатывать. Вопрос в том, происходит ли это сразу после пункта (3), но до пункта (4) или, возможно, после (4)? Это имеет большое значение! В первом случае транзакция еще не совершена, поэтому Order
еще не находится в базе данных. С другой стороны, отложенная загрузка может работать с этим объектом, поскольку он все еще привязан к PersistenceContext
(в случае, если мы используем JPA). Однако если исходная транзакция уже зафиксирована, order
будет вести себя по-другому. Если вы полагаетесь на одно или другое поведение из-за состояния гонки, ваш слушатель событий может внезапно потерпеть неудачу в сложных обстоятельствах.
Конечно, есть решение 1 : использование не очень известного TransactionSynchronizationManager
. В основном это позволяет нам регистрировать произвольное количество слушателей TransactionSynchronization
. Затем каждый такой слушатель будет уведомлен о различных событиях, таких как фиксация транзакции и откат. Вот базовый API:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
@Transactional def placeOrder(order: Order) { orderDao save order afterCommit { eventPublisher publishEvent OrderPlacedEvent(order) } } private def afterCommit[T](fun: => T) { TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronizationAdapter { override def afterCommit() { fun } }) } |
afterCommit()
принимает функцию и вызывает ее после фиксации текущей транзакции. Мы используем его, чтобы скрыть сложность Spring API. Можно безопасно вызывать registerSynchronization()
несколько раз — слушатели хранятся в Set
и являются локальными для текущей транзакции, исчезая после фиксации.
Таким образом, метод publishEvent()
будет вызван после фиксации транзакции, которая делает наш код предсказуемым и свободным условием гонки. Тем не менее, даже с функцией более высокого порядка afterCommit()
она все еще кажется немного громоздкой и излишне сложной. Более того, легко забыть оборачивать каждый publishEvent()
, что publishEvent()
обслуживания. Можем ли мы сделать лучше? Одно из решений состоит в том, чтобы использовать publishEvent()
записи пользовательских утилит класса publishEvent()
или использовать AOP. Но есть гораздо более простое, проверенное решение, которое прекрасно работает с Spring — шаблон Decorator . Мы завернем оригинальную реализацию ApplicationEventPublisher
предоставленную Spring, и publishEven()
его publishEven()
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
class TransactionAwareApplicationEventPublisher(delegate: ApplicationEventPublisher) extends ApplicationEventPublisher { override def publishEvent(event: ApplicationEvent) { if (TransactionSynchronizationManager.isActualTransactionActive) { TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronizationAdapter { override def afterCommit() { delegate publishEvent event } }) } else delegate publishEvent event } } |
Как вы можете видеть, активна ли транзакция, мы регистрируем прослушиватель фиксации и откладываем отправку сообщения до завершения транзакции. В противном случае мы просто перенаправляем событие в исходный ApplicationEventPublisher
, который доставляет его немедленно. Конечно, мы как-то должны подключить эту новую реализацию вместо оригинальной. @Primary
делает @Primary
дело:
1
2
3
4
5
6
7
|
@Resource val applicationContext: ApplicationContext = null @Bean @Primary def transactionAwareApplicationEventPublisher() = new TransactionAwareApplicationEventPublisher(applicationContext) |
Обратите внимание, что исходная реализация ApplicationEventPublisher
предоставляется базовым классом ApplicationContext
. После всех этих изменений наш код выглядит… точно так же, как и в начале:
1
2
3
4
5
6
7
8
|
@Service class OrderService @Autowired () (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) { @Transactional def placeOrder(order: Order) { orderDao save order eventPublisher publishEvent OrderPlacedEvent(order) } |
Однако на этот раз автоматически вводится eventPublisher
— наш собственный декоратор. В конце концов нам удалось исправить проблему состояния гонки, не затрагивая бизнес-код. Наше решение безопасно, предсказуемо и надежно. Обратите внимание, что точно такой же подход можно использовать для любой другой технологии организации очередей, включая JMS (если не использовался сложный менеджер транзакций) или пользовательские очереди. Мы также обнаружили интересный низкоуровневый API для прослушивания жизненного цикла транзакции. Может быть полезным в один прекрасный день.
1 — можно утверждать, что гораздо более простым решением было бы publishEvent()
вне транзакции:
1
2
3
4
5
6
7
|
def placeOrder(order: Order) { storeOrder(order) eventPublisher publishEvent OrderPlacedEvent(order) } @Transactional def storeOrder(order: Order) = orderDao save order |
Это правда, но это решение не «хорошо масштабируется » (что, если placeOrder()
должен быть частью большой транзакции?) И, скорее всего, неверно из-за особенностей прокси .