Статьи

Обработка сообщений SQS с использованием Spring Boot и Project Reactor

Недавно я работал над проектом, в котором мне нужно было эффективно обрабатывать большое количество сообщений, передаваемых через очередь AWS SQS. В этом посте (и, возможно, еще одном) я расскажу о подходе, который я использовал для обработки сообщений с использованием отличного Project Reactor

Следующее — тип установки, к которой я стремлюсь:

Настройка локальной среды AWS

Прежде чем перейти к коду, позвольте мне получить некоторые предварительные сведения. Во-первых, как вы получаете локальную версию SNS и SQS. Один из самых простых способов — использовать локальный стек . Я использую докер-составную версию, описанную здесь

Вторая утилита, которую я буду использовать, — это CLI AWS. Этот сайт содержит подробную информацию о том, как установить его локально.

Как только обе эти утилиты будут установлены, быстрый тест должен подтвердить настройку:

1
2
3
4
5
6
7
8
# Create a queue
aws --endpoint http://localhost:4576 sqs create-queue --queue-name test-queue
 
# Send a sample message
aws --endpoint http://localhost:4576 sqs send-message --queue-url http://localhost:4576/queue/test-queue --message-body "Hello world"
 
# Receive the message
aws --endpoint http://localhost:4576 sqs receive-message --queue-url http://localhost:4576/queue/test-queue

Основы проекта Реактор

Project Reactor реализует спецификацию Reactive Streams и предоставляет способ обработки потоков данных через асинхронные границы с учетом противодавления. Здесь много слов, но по сути это так:

1. SQS производит данные

2. Приложение будет использовать и обрабатывать его как поток данных.

3. Приложение должно потреблять данные в устойчивом темпе — слишком много данных не должно быть закачано. Формально это называется
«Обратное давление»

AWS SDK 2

Библиотека, которую я буду использовать для получения данных AWS SQS,
AWS SDK 2 . Библиотека использует неблокирующие IO под обложками.

Библиотека предлагает как синхронизированную версию совершения звонков, так и асинхронную версию. Рассмотрим синхронный способ извлечения записей из очереди SQS:

01
02
03
04
05
06
07
08
09
10
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
import software.amazon.awssdk.services.sqs.SqsClient
 
val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(5)
    .waitTimeSeconds(10)
    .build()
 
val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()

Здесь «software.amazon.awssdk.services.sqs.SqsClient» используется для запроса sqs и синхронного получения пакета результатов. Асинхронный результат, с другой стороны, выглядит следующим образом:

1
2
3
4
5
6
7
8
9
val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(5)
    .waitTimeSeconds(10)
    .build()
 
val messages: CompletableFuture<List<Message>> = sqsAsyncClient
    .receiveMessage(receiveMessageRequest)
    .thenApply { result -> result.messages() }

Результат теперь является «CompletableFuture»

Бесконечный цикл и отсутствие противодавления

Моя первая попытка создания потока ( Flux ) сообщения довольно проста — бесконечный цикл, который опрашивает AWS sqs и создает из него Flux, используя оператор «Flux.create» , следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
fun listen(): Flux<Pair<String, () -> Unit>> {
    return Flux.create { sink: FluxSink<List<Message>> ->
            while (running) {
                try {
                    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
                        .queueUrl(queueUrl)
                        .maxNumberOfMessages(5)
                        .waitTimeSeconds(10)
                        .build()
 
                    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
                    LOGGER.info("Received: $messages")
                    sink.next(messages)
                } catch (e: InterruptedException) {
                    LOGGER.error(e.message, e)
                } catch (e: Exception) {
                    LOGGER.error(e.message, e)
                }
            }
        }
        .flatMapIterable(Function.identity())
        .doOnError { t: Throwable -> LOGGER.error(t.message, t) }
        .retry()
        .map { snsMessage: Message ->
            val snsMessageBody: String = snsMessage.body()
            val snsNotification: SnsNotification = readSnsNotification(snsMessageBody)
            snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) }
        }
}

Это работает так, что существует бесконечный цикл, который проверяет новые сообщения с помощью длинного опроса . Сообщения могут быть недоступны при каждом опросе, и в этом случае пустой список добавляется в поток.

Этот список максимум из 5 сообщений затем сопоставляется с потоком отдельных сообщений с помощью оператора «flatMapIterable» , который дополнительно сопоставляется путем извлечения сообщения из оболочки SNS (когда сообщение пересылается из SNS в SQS, SNS добавляет оболочку к сообщение) и способ удалить сообщение (deleteHandle), как только сообщение успешно обработано, возвращается как пара.

Этот подход прекрасно работает … но представьте себе случай, когда поступило огромное количество сообщений, так как цикл на самом деле не знает о пропускной способности в нисходящем направлении, он будет продолжать перекачивать данные в поток. Поведение по умолчанию для промежуточных операторов, чтобы буферизовать эти данные, поступающие в зависимости от того, как конечный потребитель потребляет данные. Поскольку этот буфер не ограничен, возможно, что система может достичь неустойчивого состояния.

Поток с противодавлением

Исправление заключается в использовании другого оператора для генерации потока данных —
Flux.Generate .

Используя этот оператор, код выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
fun listen(): Flux<Pair<String, () -> Unit>> {
    return Flux.generate { sink: SynchronousSink<List<Message>> ->
            val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
                .queueUrl(queueUrl)
                .maxNumberOfMessages(5)
                .waitTimeSeconds(10)
                .build()
 
            val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
            LOGGER.info("Received: $messages")
            sink.next(messages)
        }
        .flatMapIterable(Function.identity())
        .doOnError { t: Throwable -> LOGGER.error(t.message, t) }
        .retry()
        .map { snsMessage: Message ->
            val snsMessageBody: String = snsMessage.body()
            val snsNotification: SnsNotification = readSnsNotification(snsMessageBody)
            snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) }
        }
}

Это работает так, что блок, переданный оператору «Flux.generate», неоднократно вызывается — аналогично циклу while, в каждом цикле ожидается добавление одного элемента в поток. В этом случае элемент, добавленный в поток, оказывается списком, который, как и прежде, разбивается на отдельные сообщения.

Как работает противодавление в этом сценарии —

Итак, еще раз рассмотрим случай, когда нижестоящий потребитель обрабатывает с более медленной скоростью, чем генерирующий конец. В этом случае сам Flux будет замедляться со скоростью, с которой вызывается оператор генерации, и, таким образом, учитывает пропускную способность нисходящей системы.

Вывод

Это должно создать хороший конвейер для обработки сообщений из SQS, здесь есть еще несколько нюансов для параллельной обработки сообщений в потоке, о котором я расскажу в следующем посте.

Кодовая база этого примера доступна в моем репозитории github
здесь — https://github.com/bijukunjummen/boot-with-sns-sqs. Код имеет полный конвейер, который включает в себя обработку сообщения и удаление его после обработки.

См. Оригинальную статью здесь: Обработка сообщений SQS с использованием Spring Boot и Project Reactor

Мнения, высказанные участниками Java Code Geeks, являются их собственными.