Статьи

Синхронизация транзакций с асинхронными событиями в Spring

 Сегодня в качестве примера мы возьмем очень простой сценарий: размещение заказа сохраняет его и отправляет электронное письмо об этом заказе:

@Service
class OrderService @Autowired() (orderDao: OrderDao, mailNotifier: OrderMailNotifier) {
 
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order
        mailNotifier sendMail order
    }
}

Пока все хорошо, но функциональность электронной почты не имеет ничего общего с размещением заказа. Это просто побочный эффект, который отвлекает, а не часть бизнес-логики. Кроме того, отправка электронного письма излишне продлевает транзакцию и приводит к задержке. Поэтому мы решили разделить эти два действия с помощью событий. Для простоты я воспользуюсь встроенными пользовательскими событиями Spring,
но наше обсуждение одинаково актуально для JMS или другой библиотеки / очереди производителя-потребителя.

case class OrderPlacedEvent(order: Order) extends ApplicationEvent
 
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
 
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order
        eventPublisher publishEvent OrderPlacedEvent(order)
    }
 
}

Как видите, вместо OrderMailNotifierпрямого доступа к
бину мы отправляем
OrderPlacedEventупаковку вновь созданного заказа.
ApplicationEventPublisherнеобходимо отправить событие. Конечно, мы также должны реализовать на стороне клиента получение сообщений:

@Service
class OrderMailNotifier extends ApplicationListener[OrderPlacedEvent] {
 
    def onApplicationEvent(event: OrderPlacedEvent) {
        //sending e-mail...
    }
 
}

ApplicationListener[OrderPlacedEvent]указывает, какой тип событий нас интересует. Это работает, однако по умолчанию Spring
ApplicationEvents является синхронным, что означает
publishEvent()фактически блокирование. Зная Spring, не составит труда перевести трансляцию событий в асинхронный режим. На самом деле есть два пути: один предложен в JavaDoc, а другой я обнаружил, потому что сначала мне не удалось прочитать JavaDoc … Согласно документации, если вы хотите, чтобы ваши события доставлялись асинхронно, вы должны определить bean-имя
applicationEventMulticasterтипа
SimpleApplicationEventMulticasterи определить
taskExecutor:

@Bean
def applicationEventMulticaster() = {
    val multicaster = new SimpleApplicationEventMulticaster()
    multicaster.setTaskExecutor(taskExecutor())
    multicaster
}
 
@Bean
def taskExecutor() = {
    val pool = new ThreadPoolTaskExecutor()
    pool.setMaxPoolSize(10)
    pool.setCorePoolSize(10)
    pool.setThreadNamePrefix("Spring-Async-")
    pool
}

Spring уже поддерживает трансляцию событий с использованием пользовательских
TaskExecutor. Я не знал об этом , сначала я просто аннотированный
onApplicationEvent()с
@Async:

@Async
def onApplicationEvent(event: OrderPlacedEvent) {  //...

дальнейших изменений нет, как только Spring обнаруживает
@Asyncметод, он запускает его в другом потоке асинхронно. Период. Ну, вам все равно нужно включить
@Asyncподдержку, если вы ее еще не используете:

@Configuration
@EnableAsync
class ThreadingConfig extends AsyncConfigurer {
    def getAsyncExecutor = taskExecutor()
 
    @Bean
    def taskExecutor() = {
        val pool = new ThreadPoolTaskExecutor()
        pool.setMaxPoolSize(10)
        pool.setCorePoolSize(10)
        pool.setThreadNamePrefix("Spring-Async-")
        pool
    }
 
}

Технически
@EnableAsyncдостаточно. Однако по умолчанию Spring использует,
SimpleAsyncTaskExecutorкоторый создает новый поток при каждом
@Asyncвызове. Немного прискорбно по умолчанию для корпоративной среды, к счастью, ее легко изменить. Несомненно,
@Asyncкажется чище, чем определение некоторых волшебных бобов.


Все вышеизложенное было просто подставой, чтобы раскрыть реальную проблему. Теперь мы отправляем асинхронное сообщение, которое обрабатывается в другом потоке. К сожалению, мы ввели
состояние гонки, которое проявляется под большой нагрузкой или, может быть, только в какой-то конкретной операционной системе. Вы можете это заметить? Чтобы дать вам подсказку, вот что происходит:

  1. Начальная транзакция
  2. Хранение orderв базе данных
  3. Отправка сообщения упаковкой order
  4. совершить

Тем временем какой-то асинхронный поток берет на себя
OrderPlacedEventи начинает обрабатывать его. Вопрос в том, происходит ли это сразу после пункта (3), но до пункта (4) или, возможно, после (4)? Это имеет большое значение! В первом случае транзакция еще не совершена, поэтому
Orderее еще нет в базе данных. С другой стороны, отложенная загрузка
может работать с этим объектом, поскольку он все еще связан с aa
PersistenceContext(в случае, если мы используем JPA). Однако если исходная транзакция уже зафиксирована, она
orderбудет вести себя по-другому. Если вы полагаетесь на одно или другое поведение из-за состояния гонки, ваш слушатель событий может внезапно потерпеть неудачу в сложных обстоятельствах.

Конечно, есть решение
1 : использование не широко известных
TransactionSynchronizationManager, В основном это позволяет нам регистрировать произвольное количество
TransactionSynchronizationслушателей . Затем каждый такой слушатель будет уведомлен о различных событиях, таких как фиксация транзакции и откат. Вот базовый API:

@Transactional
def placeOrder(order: Order) {
    orderDao save order
    afterCommit {
        eventPublisher publishEvent OrderPlacedEvent(order)
    }
}
 
private def afterCommit[T](fun: => T) {
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter {
        override def afterCommit() {
            fun
        }
    })
}

afterCommit()берет функцию и вызывает ее после фиксации текущей транзакции. Мы используем его, чтобы скрыть сложность Spring API. Можно безопасно вызывать
registerSynchronization()несколько раз — слушатели хранятся в a
Setи являются локальными для текущей транзакции, исчезая после фиксации.

Таким образом,
publishEvent()метод будет вызван
после фиксации транзакции, что делает наш код предсказуемым и свободным условием гонки. Однако, даже с функцией более высокого порядка
afterCommit()это все еще кажется немного громоздким и излишне сложным. Кроме того, легко забыть обернуть каждый
publishEvent(), таким образом, ремонтопригодность страдает. Можем ли мы сделать лучше? Одно из решений заключается в использовании записи пользовательских утилит класса
publishEvent()или нанять АОП. Но есть гораздо более простое, проверенное решение, которое прекрасно работает с Spring —
шаблон Decorator . Мы завернем оригинальную реализацию
ApplicationEventPublisherпредоставленной Spring и украсим ее
publishEven():

class TransactionAwareApplicationEventPublisher(delegate: ApplicationEventPublisher)
    extends ApplicationEventPublisher {
 
    override def publishEvent(event: ApplicationEvent) {
        if (TransactionSynchronizationManager.isActualTransactionActive) {
            TransactionSynchronizationManager.registerSynchronization(
                new TransactionSynchronizationAdapter {
                    override def afterCommit() {
                        delegate publishEvent event
                    }
                })
        }
        else
            delegate publishEvent event
    }
 
}

Как вы можете видеть, активна ли транзакция, мы регистрируем прослушиватель фиксации и откладываем отправку сообщения до завершения транзакции. В противном случае мы просто перенаправляем событие на оригинал
ApplicationEventPublisher, который доставляет его немедленно. Конечно, мы как-то должны подключить эту новую реализацию вместо оригинальной.
@Primaryделает трюк:

@Resource
val applicationContext: ApplicationContext = null
 
@Bean
@Primary
def transactionAwareApplicationEventPublisher() =
    new TransactionAwareApplicationEventPublisher(applicationContext)

Обратите внимание, что исходная реализация
ApplicationEventPublisherпредоставляется базовым
ApplicationContextклассом. После всех этих изменений наш код выглядит … точно так же, как и в начале:

@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
 
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order
        eventPublisher publishEvent OrderPlacedEvent(order)
    }

Однако на этот раз автоматически вводится
eventPublisherнаш собственный декоратор. В конце концов нам удалось исправить проблему состояния гонки, не затрагивая бизнес-код. Наше решение безопасно, предсказуемо и надежно. Обратите внимание, что точно такой же подход можно использовать для любой другой технологии организации очередей, включая JMS (если не использовался сложный менеджер транзакций) или пользовательские очереди. Мы также обнаружили интересный низкоуровневый API для прослушивания жизненного цикла транзакции. Может быть полезным в один прекрасный день.


1 — можно утверждать, что гораздо более простым решением было бы
publishEvent()вне транзакции:

def placeOrder(order: Order) {
    storeOrder(order)
    eventPublisher publishEvent OrderPlacedEvent(order)
}
 
@Transactional
def storeOrder(order: Order) = orderDao save order

Это правда, но это решение не «
хорошо
масштабируется » (что, если
placeOrder()должно быть частью большой транзакции?) И, скорее всего, неверно из-за
особенностей прокси .