Сегодня в качестве примера мы возьмем очень простой сценарий: размещение заказа сохраняет его и отправляет электронное письмо об этом заказе:
@Service class OrderService @Autowired() (orderDao: OrderDao, mailNotifier: OrderMailNotifier) { @Transactional def placeOrder(order: Order) { orderDao save order mailNotifier sendMail order } }
Пока все хорошо, но функциональность электронной почты не имеет ничего общего с размещением заказа. Это просто побочный эффект, который отвлекает, а не часть бизнес-логики. Кроме того, отправка электронного письма излишне продлевает транзакцию и приводит к задержке. Поэтому мы решили разделить эти два действия с помощью событий. Для простоты я воспользуюсь встроенными пользовательскими событиями Spring,
но наше обсуждение одинаково актуально для JMS или другой библиотеки / очереди производителя-потребителя.
case class OrderPlacedEvent(order: Order) extends ApplicationEvent @Service class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) { @Transactional def placeOrder(order: Order) { orderDao save order eventPublisher publishEvent OrderPlacedEvent(order) } }
Как видите, вместо OrderMailNotifier
прямого доступа к
бину мы отправляем
OrderPlacedEvent
упаковку вновь созданного заказа.
ApplicationEventPublisher
необходимо отправить событие. Конечно, мы также должны реализовать на стороне клиента получение сообщений:
@Service class OrderMailNotifier extends ApplicationListener[OrderPlacedEvent] { def onApplicationEvent(event: OrderPlacedEvent) { //sending e-mail... } }
ApplicationListener[OrderPlacedEvent]
указывает, какой тип событий нас интересует. Это работает, однако по умолчанию Spring
ApplicationEvent
s является синхронным, что означает
publishEvent()
фактически блокирование. Зная Spring, не составит труда перевести трансляцию событий в асинхронный режим. На самом деле есть два пути: один предложен в JavaDoc, а другой я обнаружил, потому что сначала мне не удалось прочитать JavaDoc … Согласно документации, если вы хотите, чтобы ваши события доставлялись асинхронно, вы должны определить bean-имя
applicationEventMulticaster
типа
SimpleApplicationEventMulticaster
и определить
taskExecutor
:
@Bean def applicationEventMulticaster() = { val multicaster = new SimpleApplicationEventMulticaster() multicaster.setTaskExecutor(taskExecutor()) multicaster } @Bean def taskExecutor() = { val pool = new ThreadPoolTaskExecutor() pool.setMaxPoolSize(10) pool.setCorePoolSize(10) pool.setThreadNamePrefix("Spring-Async-") pool }
Spring уже поддерживает трансляцию событий с использованием пользовательских
TaskExecutor
. Я не знал об этом , сначала я просто аннотированный
onApplicationEvent()
с
@Async
:
@Async def onApplicationEvent(event: OrderPlacedEvent) { //...
дальнейших изменений нет, как только Spring обнаруживает
@Async
метод, он запускает его в другом потоке асинхронно. Период. Ну, вам все равно нужно включить
@Async
поддержку, если вы ее еще не используете:
@Configuration @EnableAsync class ThreadingConfig extends AsyncConfigurer { def getAsyncExecutor = taskExecutor() @Bean def taskExecutor() = { val pool = new ThreadPoolTaskExecutor() pool.setMaxPoolSize(10) pool.setCorePoolSize(10) pool.setThreadNamePrefix("Spring-Async-") pool } }
Технически
@EnableAsync
достаточно. Однако по умолчанию Spring использует,
SimpleAsyncTaskExecutor
который создает новый поток при каждом
@Async
вызове. Немного прискорбно по умолчанию для корпоративной среды, к счастью, ее легко изменить. Несомненно,
@Async
кажется чище, чем определение некоторых волшебных бобов.
Все вышеизложенное было просто подставой, чтобы раскрыть реальную проблему. Теперь мы отправляем асинхронное сообщение, которое обрабатывается в другом потоке. К сожалению, мы ввели
состояние гонки, которое проявляется под большой нагрузкой или, может быть, только в какой-то конкретной операционной системе. Вы можете это заметить? Чтобы дать вам подсказку, вот что происходит:
- Начальная транзакция
- Хранение
order
в базе данных - Отправка сообщения упаковкой
order
- совершить
Тем временем какой-то асинхронный поток берет на себя
OrderPlacedEvent
и начинает обрабатывать его. Вопрос в том, происходит ли это сразу после пункта (3), но до пункта (4) или, возможно, после (4)? Это имеет большое значение! В первом случае транзакция еще не совершена, поэтому
Order
ее еще нет в базе данных. С другой стороны, отложенная загрузка
может работать с этим объектом, поскольку он все еще связан с aa
PersistenceContext
(в случае, если мы используем JPA). Однако если исходная транзакция уже зафиксирована, она
order
будет вести себя по-другому. Если вы полагаетесь на одно или другое поведение из-за состояния гонки, ваш слушатель событий может внезапно потерпеть неудачу в сложных обстоятельствах.
Конечно, есть решение
1 : использование не широко известных
TransactionSynchronizationManager
, В основном это позволяет нам регистрировать произвольное количество
TransactionSynchronization
слушателей . Затем каждый такой слушатель будет уведомлен о различных событиях, таких как фиксация транзакции и откат. Вот базовый API:
@Transactional def placeOrder(order: Order) { orderDao save order afterCommit { eventPublisher publishEvent OrderPlacedEvent(order) } } private def afterCommit[T](fun: => T) { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter { override def afterCommit() { fun } }) }
afterCommit()
берет функцию и вызывает ее после фиксации текущей транзакции. Мы используем его, чтобы скрыть сложность Spring API. Можно безопасно вызывать
registerSynchronization()
несколько раз — слушатели хранятся в a
Set
и являются локальными для текущей транзакции, исчезая после фиксации.
Таким образом,
publishEvent()
метод будет вызван
после фиксации транзакции, что делает наш код предсказуемым и свободным условием гонки. Однако, даже с функцией более высокого порядка
afterCommit()
это все еще кажется немного громоздким и излишне сложным. Кроме того, легко забыть обернуть каждый
publishEvent()
, таким образом, ремонтопригодность страдает. Можем ли мы сделать лучше? Одно из решений заключается в использовании записи пользовательских утилит класса
publishEvent()
или нанять АОП. Но есть гораздо более простое, проверенное решение, которое прекрасно работает с Spring —
шаблон Decorator . Мы завернем оригинальную реализацию
ApplicationEventPublisher
предоставленной Spring и украсим ее
publishEven()
:
class TransactionAwareApplicationEventPublisher(delegate: ApplicationEventPublisher) extends ApplicationEventPublisher { override def publishEvent(event: ApplicationEvent) { if (TransactionSynchronizationManager.isActualTransactionActive) { TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronizationAdapter { override def afterCommit() { delegate publishEvent event } }) } else delegate publishEvent event } }
Как вы можете видеть, активна ли транзакция, мы регистрируем прослушиватель фиксации и откладываем отправку сообщения до завершения транзакции. В противном случае мы просто перенаправляем событие на оригинал
ApplicationEventPublisher
, который доставляет его немедленно. Конечно, мы как-то должны подключить эту новую реализацию вместо оригинальной.
@Primary
делает трюк:
@Resource val applicationContext: ApplicationContext = null @Bean @Primary def transactionAwareApplicationEventPublisher() = new TransactionAwareApplicationEventPublisher(applicationContext)
Обратите внимание, что исходная реализация
ApplicationEventPublisher
предоставляется базовым
ApplicationContext
классом. После всех этих изменений наш код выглядит … точно так же, как и в начале:
@Service class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) { @Transactional def placeOrder(order: Order) { orderDao save order eventPublisher publishEvent OrderPlacedEvent(order) }
Однако на этот раз автоматически вводится
eventPublisher
наш собственный декоратор. В конце концов нам удалось исправить проблему состояния гонки, не затрагивая бизнес-код. Наше решение безопасно, предсказуемо и надежно. Обратите внимание, что точно такой же подход можно использовать для любой другой технологии организации очередей, включая JMS (если не использовался сложный менеджер транзакций) или пользовательские очереди. Мы также обнаружили интересный низкоуровневый API для прослушивания жизненного цикла транзакции. Может быть полезным в один прекрасный день.
1 — можно утверждать, что гораздо более простым решением было бы
publishEvent()
вне транзакции:def placeOrder(order: Order) { storeOrder(order) eventPublisher publishEvent OrderPlacedEvent(order) } @Transactional def storeOrder(order: Order) = orderDao save order
Это правда, но это решение не «
хорошо масштабируется » (что, если
placeOrder()
должно быть частью большой транзакции?) И, скорее всего, неверно из-за
особенностей прокси .