Статьи

Синхронизация транзакций с асинхронными событиями в Spring

Сегодня в качестве примера мы возьмем очень простой сценарий: размещение заказа сохраняет его и отправляет электронное письмо об этом заказе:

1
2
3
4
5
6
7
8
9
@Service
class OrderService @Autowired() (orderDao: OrderDao, mailNotifier: OrderMailNotifier) {
  
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order
        mailNotifier sendMail order
    }
}

Пока все хорошо, но функциональность электронной почты не имеет ничего общего с размещением заказа. Это просто побочный эффект, который отвлекает, а не часть бизнес-логики. Кроме того, отправка электронного письма излишне продлевает транзакцию и приводит к задержке. Поэтому мы решили разделить эти два действия с помощью событий. Для простоты я воспользуюсь встроенными пользовательскими событиями Spring, но наше обсуждение одинаково актуально для JMS или другой библиотеки / очереди производителя-потребителя.

01
02
03
04
05
06
07
08
09
10
11
12
case class OrderPlacedEvent(order: Order) extends ApplicationEvent
  
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
  
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order
        eventPublisher publishEvent OrderPlacedEvent(order)
    }
  
}

Как вы можете видеть, вместо OrderMailNotifier доступа к компоненту OrderMailNotifier мы отправляем OrderPlacedEvent обертку вновь созданного заказа. ApplicationEventPublisher необходим для отправки события. Конечно, мы также должны реализовать на стороне клиента получение сообщений:

1
2
3
4
5
6
7
8
@Service
class OrderMailNotifier extends ApplicationListener[OrderPlacedEvent] {
  
    def onApplicationEvent(event: OrderPlacedEvent) {
        //sending e-mail...
    }
  
}

ApplicationListener[OrderPlacedEvent] указывает, какой тип событий нас интересует. Это работает, однако по умолчанию Spring ApplicationEvent является синхронным, что означает, что publishEvent() фактически блокирует. Зная Spring, не составит труда перевести трансляцию событий в асинхронный режим. На самом деле есть два способа: один предложен в JavaDoc, а другой я обнаружил, потому что сначала мне не удалось прочитать JavaDoc… Согласно документации, если вы хотите, чтобы ваши события доставлялись асинхронно, вы должны определить компонент с именем applicationEventMulticaster типа SimpleApplicationEventMulticaster и определить taskExecutor :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
@Bean
def applicationEventMulticaster() = {
    val multicaster = new SimpleApplicationEventMulticaster()
    multicaster.setTaskExecutor(taskExecutor())
    multicaster
}
  
@Bean
def taskExecutor() = {
    val pool = new ThreadPoolTaskExecutor()
    pool.setMaxPoolSize(10)
    pool.setCorePoolSize(10)
    pool.setThreadNamePrefix("Spring-Async-")
    pool
}

Spring уже поддерживает трансляцию событий с использованием специального TaskExecutor . Я не знал об этом, поэтому сначала я просто аннотировал onApplicationEvent() с помощью @Async :

1
2
@Async
def onApplicationEvent(event: OrderPlacedEvent) {  //...

дальнейших изменений нет, как только Spring обнаруживает метод @Async он запускает его в другом потоке асинхронно. Период. Ну, вам все равно нужно включить поддержку @Async если вы ее еще не используете:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
@Configuration
@EnableAsync
class ThreadingConfig extends AsyncConfigurer {
    def getAsyncExecutor = taskExecutor()
  
    @Bean
    def taskExecutor() = {
        val pool = new ThreadPoolTaskExecutor()
        pool.setMaxPoolSize(10)
        pool.setCorePoolSize(10)
        pool.setThreadNamePrefix("Spring-Async-")
        pool
    }
  
}

Технически @EnableAsync достаточно. Однако по умолчанию Spring использует SimpleAsyncTaskExecutor который создает новый поток при каждом вызове @Async . Немного прискорбно по умолчанию для корпоративной среды, к счастью, ее легко изменить. Несомненно, @Async кажется чище, чем определение магических бобов.

Все вышеизложенное было просто подставой, чтобы раскрыть реальную проблему. Теперь мы отправляем асинхронное сообщение, которое обрабатывается в другом потоке. К сожалению, мы ввели состояние гонки, которое проявляется под большой нагрузкой или, может быть, только в какой-то конкретной операционной системе. Вы можете это заметить? Чтобы дать вам подсказку, вот что происходит:

  1. Начальная транзакция
  2. Хранение order в базе данных
  3. Отправка order переноса сообщений
  4. совершить

Тем временем какой-то асинхронный поток выбирает OrderPlacedEvent и начинает его обрабатывать. Вопрос в том, происходит ли это сразу после пункта (3), но до пункта (4) или, возможно, после (4)? Это имеет большое значение! В первом случае транзакция еще не совершена, поэтому Order еще не находится в базе данных. С другой стороны, отложенная загрузка может работать с этим объектом, поскольку он все еще привязан к PersistenceContext (в случае, если мы используем JPA). Однако если исходная транзакция уже зафиксирована, order будет вести себя по-другому. Если вы полагаетесь на одно или другое поведение из-за состояния гонки, ваш слушатель событий может внезапно потерпеть неудачу в сложных обстоятельствах.

Конечно, есть решение 1 : использование не очень известного TransactionSynchronizationManager . В основном это позволяет нам регистрировать произвольное количество слушателей TransactionSynchronization . Затем каждый такой слушатель будет уведомлен о различных событиях, таких как фиксация транзакции и откат. Вот базовый API:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
@Transactional
def placeOrder(order: Order) {
    orderDao save order
    afterCommit {
        eventPublisher publishEvent OrderPlacedEvent(order)
    }
}
  
private def afterCommit[T](fun: => T) {
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter {
        override def afterCommit() {
            fun
        }
    })
}

afterCommit() принимает функцию и вызывает ее после фиксации текущей транзакции. Мы используем его, чтобы скрыть сложность Spring API. Можно безопасно вызывать registerSynchronization() несколько раз — слушатели хранятся в Set и являются локальными для текущей транзакции, исчезая после фиксации.

Таким образом, метод publishEvent() будет вызван после фиксации транзакции, которая делает наш код предсказуемым и свободным условием гонки. Тем не менее, даже с функцией более высокого порядка afterCommit() она все еще кажется немного громоздкой и излишне сложной. Более того, легко забыть оборачивать каждый publishEvent() , что publishEvent() обслуживания. Можем ли мы сделать лучше? Одно из решений состоит в том, чтобы использовать publishEvent() записи пользовательских утилит класса publishEvent() или использовать AOP. Но есть гораздо более простое, проверенное решение, которое прекрасно работает с Spring — шаблон Decorator . Мы завернем оригинальную реализацию ApplicationEventPublisher предоставленную Spring, и publishEven() его publishEven() :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
class TransactionAwareApplicationEventPublisher(delegate: ApplicationEventPublisher)
    extends ApplicationEventPublisher {
  
    override def publishEvent(event: ApplicationEvent) {
        if (TransactionSynchronizationManager.isActualTransactionActive) {
            TransactionSynchronizationManager.registerSynchronization(
                new TransactionSynchronizationAdapter {
                    override def afterCommit() {
                        delegate publishEvent event
                    }
                })
        }
        else
            delegate publishEvent event
    }
  
}

Как вы можете видеть, активна ли транзакция, мы регистрируем прослушиватель фиксации и откладываем отправку сообщения до завершения транзакции. В противном случае мы просто перенаправляем событие в исходный ApplicationEventPublisher , который доставляет его немедленно. Конечно, мы как-то должны подключить эту новую реализацию вместо оригинальной. @Primary делает @Primary дело:

1
2
3
4
5
6
7
@Resource
val applicationContext: ApplicationContext = null
  
@Bean
@Primary
def transactionAwareApplicationEventPublisher() =
    new TransactionAwareApplicationEventPublisher(applicationContext)

Обратите внимание, что исходная реализация ApplicationEventPublisher предоставляется базовым классом ApplicationContext . После всех этих изменений наш код выглядит… точно так же, как и в начале:

1
2
3
4
5
6
7
8
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
  
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order
        eventPublisher publishEvent OrderPlacedEvent(order)
    }

Однако на этот раз автоматически вводится eventPublisher — наш собственный декоратор. В конце концов нам удалось исправить проблему состояния гонки, не затрагивая бизнес-код. Наше решение безопасно, предсказуемо и надежно. Обратите внимание, что точно такой же подход можно использовать для любой другой технологии организации очередей, включая JMS (если не использовался сложный менеджер транзакций) или пользовательские очереди. Мы также обнаружили интересный низкоуровневый API для прослушивания жизненного цикла транзакции. Может быть полезным в один прекрасный день.

1 — можно утверждать, что гораздо более простым решением было бы publishEvent() вне транзакции:

1
2
3
4
5
6
7
def placeOrder(order: Order) {
    storeOrder(order)
    eventPublisher publishEvent OrderPlacedEvent(order)
}
  
@Transactional
def storeOrder(order: Order) = orderDao save order

Это правда, но это решение не «хорошо масштабируется » (что, если placeOrder() должен быть частью большой транзакции?) И, скорее всего, неверно из-за особенностей прокси .