Дискуссии об источнике событий и CQRS, как правило, фокусируются на общей архитектуре системы или на различных видах доменного дизайна в контексте CQRS. Тем не менее, модели чтения часто пренебрегают, хотя и с этой стороны есть некоторые интересные соображения. В этой статье мы представим пример реализации заполнения модели представления с использованием потока событий.
обзор
Идея чтения модели действительно проста. Вы берете журнал событий, применяете (воспроизводите) все события в первоначально пустой модели данных, используя соответствующие функции, и вы получаете заполненную модель. Код может выглядеть так:
1
2
3
4
5
|
List<Event> events = getEvents(); Model model = Model.empty(); for (Event event : events) { apply(model, event); } |
Мы можем сделать это еще короче с помощью функционального программирования:
1
2
3
|
Model m = reduce(getEvents(), Model.empty(), (m, e) -> apply(m, e)); |
Это суть. Обратите внимание, что это просто абстрактная схема, и реалистичная реализация, вероятно, будет отличаться, включая буферизацию, пакетную обработку (или потоковую передачу), постоянство и т. Д.
Применение событий
Фактический код Java для применения событий может выглядеть примерно так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
EventProcessingResult processEvents() { if (getState().isRunning()) { int batchSize = getEventsPerIteration(); List<Event> events = eventStore.getEventsForAllStreams(getLastEventId(), batchSize); if (events.isEmpty()) { return NO_EVENTS_TO_PROCESS; } else { return processEvents(events); } } else { return NOT_RUNNING; } } EventProcessingResult processEvents(List<Event> events) { try { for (Event event : events) { dispatchEvent(event); } return SUCCESS; } catch (RuntimeException e) { return FAILURE; } } |
В общем, это действительно просто и понятно. Это можно улучшить с помощью хуков до и после обработки отдельных событий и всего пакета. Такие крючки могут быть использованы для:
- осуществлять транзакции,
- подключить мониторинг,
- реализовать обработку ошибок,
- рассчитать размер партии в зависимости от скорости,
- выполнять произвольные операции, например, настраивать что-либо или пересчитывать один раз за пакет.
Последний интересный фрагмент — это метод dispatchEvent
. Помимо обхода иерархии типов, обработки ошибок и того, чтобы сделать все это необязательным, он сводится к:
1
2
3
4
|
void dispatchEvent(Event e) { Method handler = projector.getClass().findMethod( "on" , e.getClass()); handler.invoke(projector, e); } |
Другими словами, для каждого типа события (например, OrderCreated
) мы ищем открытый метод, который OrderCreated
единственный аргумент соответствующего типа в объекте projector
.
Все вышеперечисленное является частью движка, частью инфраструктуры, поддерживающей множество моделей представлений. Все, что необходимо для реализации проекции, — это на самом деле предоставить проектор с обработчиками для интересных типов событий. Все остальные события будут просто игнорироваться.
Это может выглядеть так:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
public class OrderProjector { @Inject private OrderDao orders; public void on(OrderCreated e) { orders.save( new Order(e.getOrderNumber())); } public void on(OrderApproved e) { Order o = orders.find(e.getOrderNumber()); o.setApproved( true ); } } |
Проекционная нить
Давайте на минуту обсудим многопоточность. Совместное изменяемое состояние немедленно приносит многочисленные проблемы, и его следует избегать, насколько это возможно . Один из способов справиться с этим — это не иметь параллелизма, например, ограничивая записи одним потоком. В большинстве случаев однопоточного модуля записи в сочетании с транзакциями ACID более чем достаточно, чтобы не отставать от нагрузки записи. (Загрузка чтения / запроса может быть большой и использовать много потоков — все детали здесь только о записи.)
Поток отвечает за применение событий к модели чтения — от запросов к хранилищу событий до обновления базы данных модели представления. Обычно он просто загружает партии событий из магазина и применяет их. Он продолжается до тех пор, пока есть еще события для обработки, и после того, как он захвачен, засыпает. Он просыпается через определенное время или при получении уведомления о новых событиях в хранилище событий.
У нас также есть некоторый контроль над жизненным циклом этого потока. Например, у нас есть способ программно приостанавливать и возобновлять каждый поток проекции, даже отображаемый в интерфейсе администратора.
Тяни или Тяни?
Благодаря хранилищу событий, поддерживаемому базой данных, очень легко многократно запрашивать новые события. Это модель тяги . К сожалению, это также означает, что вы можете закончить опрос слишком часто и генерировать ненужную нагрузку или опрос слишком редко и, таким образом, возможно, потребуется больше времени для распространения изменений в модели представления.
Вот почему в дополнение к опросу хранилища событий рекомендуется добавить уведомления, которые пробуждают прочитанные модели, как только сохраняются новые события. Это эффективно превращается в пуш- модель с минимальными задержками и нагрузкой. Мы обнаружили, что JGroups — это очень хороший инструмент для работы — он поддерживает несколько протоколов и очень прост в настройке, что требует гораздо меньше хлопот, чем полноценная очередь сообщений.
Уведомления могут содержать или не содержать фактические события.
В последнем (и более простом) проекте они распространяют только информацию о том, что новое событие было сохранено, вместе с его последовательным идентификатором (так, чтобы все проекции могли оценить, насколько они отстали). После пробуждения исполнитель может продолжить свой обычный путь, начиная с запроса к хранилищу событий.
Почему? Поскольку обработка событий, поступающих из одного источника, проще, но более важно, потому что хранилище событий, поддерживаемое БД, тривиально гарантирует порядок и не имеет проблем с потерянными или дублирующимися сообщениями. Запросы к базе данных очень быстрые, учитывая, что мы последовательно читаем одну таблицу по первичному ключу, и в большинстве случаев данные все равно находятся в кэш-памяти RAM. Узкое место в потоке проектирования обновляет свою базу данных для чтения моделей.
Тем не менее, нет никаких препятствий для размещения данных о событиях в уведомлениях (за исключением, возможно, размера или соображений сетевого трафика). Скорее всего, это уменьшит нагрузку на хранилище событий и сохранит некоторые поездки в базу данных. Проектор должен поддерживать буфер и возвращаться к запросам в хранилище событий, когда это необходимо. Или система может использовать более надежную очередь сообщений.
Перезапуск прогнозов
Помимо паузы / возобновления, на приведенном выше снимке экрана показано еще одно действие: перезагрузка. Безобидно, как это выглядит, это действительно хорошая и мощная функция.
Поскольку модель представления полностью получена из журнала событий, в любой момент ее можно выбросить и заново создать с самого начала (или из некоторого начального состояния / достаточно старого снимка). Данные в журнале событий находятся в безопасности, что является основным источником правды.
Это полезно, когда что-то в представлении изменяется: добавляется поле или таблица, исправляется ошибка, что-то вычисляется по-другому. Когда это происходит, часто проще (или необходимо) просто начать с самого начала, чем, например, реализовать скрипт масштабной миграции SQL.
Можно даже полностью автоматизировать его, чтобы при запуске системы и обнаружении, что схема БД не соответствует соответствующей модели Java, она может автоматически воссоздать схему и повторно обработать журнал событий. Это похоже на работу с политикой создания-отбрасывания Hibernate, за исключением того, что она не теряет данные.
Производительность
Решение может показаться довольно ограниченным в отношении производительности.
Один момент, который может поднять бровь — это однопоточный писатель . В действительности один поток обычно достаточно быстр, чтобы легко справиться с нагрузкой. Параллелизм не только сложнее реализовать и поддерживать, но и вносит конкуренцию. Чтения (запросы) могут быть многопоточными и легко масштабируемыми.
Мы также много выиграем, имея несколько моделей чтения, например, отделяя аналитику от административных и транзакционных данных. Каждая модель однопоточная (для записи), но несколько моделей потребляют события параллельно. Наконец, решение может быть модифицировано для использования шардинга или какой-либо обработки разветвления.
Еще один интересный момент — перезапуск прогнозов с нуля.
Хорошее решение — это что-то вроде архитектуры каппа :
- Поддерживайте устаревший прогноз и отвечайте на все вопросы.
- Начать новый прогноз, например, в другую базу данных. Просто позвольте ему обрабатывать события, не направляйте на него никакой трафик.
- Когда новая проекция догонит вас, перенаправьте трафик и закройте старый.
В очень маленьком случае, особенно для разработки, может быть даже возможно сделать перезапуск в сети, в том же случае. Это зависит от ответов на следующие вопросы: Сколько времени занимает повторная обработка всех событий? Допустимо ли, чтобы этот прогноз был устаревшим в течение 30 минут? Можем ли мы развернуть систему ночью или в выходные дни, когда никто не использует систему? Нужно ли переигрывать всю историю?
Еще один фактор, который необходимо учитывать, — это настойчивость. Если это слишком узкое место и не может быть дополнительно оптимизировано, рассмотрите возможность использования моделей представления в памяти.
Подводя итоги
По сути, это все, что нужно для реализации модели чтения, использующей хранилище событий. Он приобретает большую простоту благодаря линейному хранению событий и обработке всего в одном потоке. Настолько, что в итоге это просто цикл, реализующий сокращение, показанное в начале.
В следующих статьях я собираюсь углубиться в практические вопросы реализации прогнозов.
Ссылка: | Написание модели чтения событий из CQRS от нашего партнера JCG Конрада Гаруса в блоге Белки . |