Общая проблема, которую мы обсуждаем на наших семинарах: как перезапустить программу чтения очереди после сбоя.
Ответ не так прост, как вы думаете.
Мы проводим недельный семинар на месте, чтобы помочь начать новый проект, чтобы убедиться, что инфраструктура имеет хороший баланс скорости и простоты. Часто простота является наиболее важной, и она также будет достаточно быстрой. Вы можете связаться с отделом продаж Chronicle для получения более подробной информации.
Зная, когда сообщение должно быть воспроизведено при запуске.
Недостаточно знать, какие сообщения были воспроизведены. Вам необходимо знать, какие сообщения были успешно завершены в транзакции. Это предполагает, что вы должны обрабатывать каждое сообщение ровно один раз. Более простые альтернативы:
- Повторно воспроизводите каждое сообщение и игнорируйте дубликаты.
- Воспроизведите каждое сообщение с конца (или последних N минут) и предположите, что срок его действия истек.
При обновлении базы данных одним из способов достижения этого является обновление индекса, считанного в строке в базе данных, одним из следующих способов:
- Транзакция прошла успешно, и сообщение больше не нужно воспроизводить.
- Транзакция не удалась, и сообщение необходимо воспроизвести снова.
Важной деталью является то, что нет ситуации, когда неясно, следует ли воспроизводить входное сообщение.
Перезапуск считывателя при записи в очередь в качестве вывода.
В общем, мы предлагаем вам записать свои результаты в очередь вывода. Выходная очередь может быть заменой для ведения журнала и средством мониторинга, но также может записывать происхождение каждого события. В частности, и выходная очередь может помочь:
- Воспроизведение сообщений в том же порядке, которые были получены из нескольких входных очередей.
- Убедитесь, что после обновления программного обеспечения вы соблюдаете решения, принятые ранее. т.е. новое программное обеспечение, воспроизводящее входное сообщение, может принимать другие решения. Читая из вывода, вы гарантируете, что после обновления вы знаете, в каком состоянии вы должны быть.
- Перезапустите очередь ввода из последнего сообщения, успешно выданного для ввода из этой очереди.
В этом примере он читает только одно необработанное сообщение.
Прочитайте одно сообщение, которое не было обработано.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
try (SingleChronicleQueue in = SingleChronicleQueueBuilder.binary(queuePath) .sourceId( 1 ) ( 1 ) .build(); SingleChronicleQueue out = SingleChronicleQueueBuilder.binary(queuePath2) .rollCycle(RollCycles.TEST_DAILY) .build()) { MarketDataListener mdListener = out.createAppender() .methodWriterBuilder(MarketDataListener. class ) .recordHistory( true ) ( 2 ) .get(); SidedMarketDataCombiner combiner = new SidedMarketDataCombiner(mdListener); MethodReader reader = in.createTailer() .afterLastWritten(out) ( 3 ) .methodReader(combiner); assertTrue(reader.readOne()); } |
1
2
3
|
Give the queue a unique id for tracing purposes. Write a history for timings and sources for each message processed. Read the output queue to see what the last message processed was. |
Было бы действительно неэффективно делать все это каждый раз. Единственная строка, которая требуется для каждого сообщения:
1
|
reader.readOne(); |
Вывод
Несмотря на то, что есть несколько способов реализовать перезапуск, но если вам это нужно, полезно иметь встроенную поддержку одного из самых распространенных способов сделать это.
В следующей части.
Ссылка: | Микросервисы в мире хроники — часть 4 от нашего партнера по JCG Питера Лоури из блога Vanilla Java . |