Статьи

Написание модели чтения событий из CQRS

Дискуссии об источнике событий и CQRS, как правило, фокусируются на общей архитектуре системы или на различных видах доменного дизайна в контексте CQRS. Тем не менее, модели чтения часто пренебрегают, хотя и с этой стороны есть некоторые интересные соображения. В этой статье мы представим пример реализации заполнения модели представления с использованием потока событий. es_cqrs_projection_funnel_300_2

обзор

Идея чтения модели действительно проста. Вы берете журнал событий, применяете (воспроизводите) все события в первоначально пустой модели данных, используя соответствующие функции, и вы получаете заполненную модель. Код может выглядеть так:

1
2
3
4
5
List<Event> events = getEvents();
Model model = Model.empty();
for (Event event : events) {
    apply(model, event);
}

Мы можем сделать это еще короче с помощью функционального программирования:

1
2
3
Model m = reduce(getEvents(),
                 Model.empty(),
                 (m, e) -> apply(m, e));

Это суть. Обратите внимание, что это просто абстрактная схема, и реалистичная реализация, вероятно, будет отличаться, включая буферизацию, пакетную обработку (или потоковую передачу), постоянство и т. Д.

Применение событий

Фактический код Java для применения событий может выглядеть примерно так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
EventProcessingResult processEvents() {
    if (getState().isRunning()) {
        int batchSize = getEventsPerIteration();
        List<Event> events = eventStore.getEventsForAllStreams(getLastEventId(),
                                                               batchSize);
        if (events.isEmpty()) {
            return NO_EVENTS_TO_PROCESS;
        } else {
            return processEvents(events);
        }
    } else {
        return NOT_RUNNING;
    }
}
 
EventProcessingResult processEvents(List<Event> events) {
    try {
        for (Event event : events) {
            dispatchEvent(event);
        }
        return SUCCESS;
    } catch (RuntimeException e) {
        return FAILURE;
    }
}

В общем, это действительно просто и понятно. Это можно улучшить с помощью хуков до и после обработки отдельных событий и всего пакета. Такие крючки могут быть использованы для:

  • осуществлять транзакции,
  • подключить мониторинг,
  • реализовать обработку ошибок,
  • рассчитать размер партии в зависимости от скорости,
  • выполнять произвольные операции, например, настраивать что-либо или пересчитывать один раз за пакет.

Последний интересный фрагмент — это метод dispatchEvent . Помимо обхода иерархии типов, обработки ошибок и того, чтобы сделать все это необязательным, он сводится к:

1
2
3
4
void dispatchEvent(Event e) {
    Method handler = projector.getClass().findMethod("on", e.getClass());
    handler.invoke(projector, e);
}

Другими словами, для каждого типа события (например, OrderCreated ) мы ищем открытый метод, который OrderCreated единственный аргумент соответствующего типа в объекте projector .

Все вышеперечисленное является частью движка, частью инфраструктуры, поддерживающей множество моделей представлений. Все, что необходимо для реализации проекции, — это на самом деле предоставить проектор с обработчиками для интересных типов событий. Все остальные события будут просто игнорироваться.

Это может выглядеть так:

01
02
03
04
05
06
07
08
09
10
11
12
13
public class OrderProjector {
    @Inject
    private OrderDao orders;
 
    public void on(OrderCreated e) {
        orders.save(new Order(e.getOrderNumber()));
    }
 
    public void on(OrderApproved e) {
        Order o = orders.find(e.getOrderNumber());
        o.setApproved(true);
    }
}

Проекционная нить

Давайте на минуту обсудим многопоточность. Совместное изменяемое состояние немедленно приносит многочисленные проблемы, и его следует избегать, насколько это возможно . Один из способов справиться с этим — это не иметь параллелизма, например, ограничивая записи одним потоком. В большинстве случаев однопоточного модуля записи в сочетании с транзакциями ACID более чем достаточно, чтобы не отставать от нагрузки записи. (Загрузка чтения / запроса может быть большой и использовать много потоков — все детали здесь только о записи.)

Поток отвечает за применение событий к модели чтения — от запросов к хранилищу событий до обновления базы данных модели представления. Обычно он просто загружает партии событий из магазина и применяет их. Он продолжается до тех пор, пока есть еще события для обработки, и после того, как он захвачен, засыпает. Он просыпается через определенное время или при получении уведомления о новых событиях в хранилище событий.

У нас также есть некоторый контроль над жизненным циклом этого потока. Например, у нас есть способ программно приостанавливать и возобновлять каждый поток проекции, даже отображаемый в интерфейсе администратора.

projection_admin

Тяни или Тяни?

Благодаря хранилищу событий, поддерживаемому базой данных, очень легко многократно запрашивать новые события. Это модель тяги . К сожалению, это также означает, что вы можете закончить опрос слишком часто и генерировать ненужную нагрузку или опрос слишком редко и, таким образом, возможно, потребуется больше времени для распространения изменений в модели представления.

Вот почему в дополнение к опросу хранилища событий рекомендуется добавить уведомления, которые пробуждают прочитанные модели, как только сохраняются новые события. Это эффективно превращается в пуш- модель с минимальными задержками и нагрузкой. Мы обнаружили, что JGroups — это очень хороший инструмент для работы — он поддерживает несколько протоколов и очень прост в настройке, что требует гораздо меньше хлопот, чем полноценная очередь сообщений.

Уведомления могут содержать или не содержать фактические события.

В последнем (и более простом) проекте они распространяют только информацию о том, что новое событие было сохранено, вместе с его последовательным идентификатором (так, чтобы все проекции могли оценить, насколько они отстали). После пробуждения исполнитель может продолжить свой обычный путь, начиная с запроса к хранилищу событий.

Почему? Поскольку обработка событий, поступающих из одного источника, проще, но более важно, потому что хранилище событий, поддерживаемое БД, тривиально гарантирует порядок и не имеет проблем с потерянными или дублирующимися сообщениями. Запросы к базе данных очень быстрые, учитывая, что мы последовательно читаем одну таблицу по первичному ключу, и в большинстве случаев данные все равно находятся в кэш-памяти RAM. Узкое место в потоке проектирования обновляет свою базу данных для чтения моделей.

Тем не менее, нет никаких препятствий для размещения данных о событиях в уведомлениях (за исключением, возможно, размера или соображений сетевого трафика). Скорее всего, это уменьшит нагрузку на хранилище событий и сохранит некоторые поездки в базу данных. Проектор должен поддерживать буфер и возвращаться к запросам в хранилище событий, когда это необходимо. Или система может использовать более надежную очередь сообщений.

Перезапуск прогнозов

Помимо паузы / возобновления, на приведенном выше снимке экрана показано еще одно действие: перезагрузка. Безобидно, как это выглядит, это действительно хорошая и мощная функция.

Поскольку модель представления полностью получена из журнала событий, в любой момент ее можно выбросить и заново создать с самого начала (или из некоторого начального состояния / достаточно старого снимка). Данные в журнале событий находятся в безопасности, что является основным источником правды.

Это полезно, когда что-то в представлении изменяется: добавляется поле или таблица, исправляется ошибка, что-то вычисляется по-другому. Когда это происходит, часто проще (или необходимо) просто начать с самого начала, чем, например, реализовать скрипт масштабной миграции SQL.

Можно даже полностью автоматизировать его, чтобы при запуске системы и обнаружении, что схема БД не соответствует соответствующей модели Java, она может автоматически воссоздать схему и повторно обработать журнал событий. Это похоже на работу с политикой создания-отбрасывания Hibernate, за исключением того, что она не теряет данные.

Производительность

Решение может показаться довольно ограниченным в отношении производительности.

Один момент, который может поднять бровь — это однопоточный писатель . В действительности один поток обычно достаточно быстр, чтобы легко справиться с нагрузкой. Параллелизм не только сложнее реализовать и поддерживать, но и вносит конкуренцию. Чтения (запросы) могут быть многопоточными и легко масштабируемыми.

Мы также много выиграем, имея несколько моделей чтения, например, отделяя аналитику от административных и транзакционных данных. Каждая модель однопоточная (для записи), но несколько моделей потребляют события параллельно. Наконец, решение может быть модифицировано для использования шардинга или какой-либо обработки разветвления.

Еще один интересный момент — перезапуск прогнозов с нуля.

Хорошее решение — это что-то вроде архитектуры каппа :

  • Поддерживайте устаревший прогноз и отвечайте на все вопросы.
  • Начать новый прогноз, например, в другую базу данных. Просто позвольте ему обрабатывать события, не направляйте на него никакой трафик.
  • Когда новая проекция догонит вас, перенаправьте трафик и закройте старый.

В очень маленьком случае, особенно для разработки, может быть даже возможно сделать перезапуск в сети, в том же случае. Это зависит от ответов на следующие вопросы: Сколько времени занимает повторная обработка всех событий? Допустимо ли, чтобы этот прогноз был устаревшим в течение 30 минут? Можем ли мы развернуть систему ночью или в выходные дни, когда никто не использует систему? Нужно ли переигрывать всю историю?

Еще один фактор, который необходимо учитывать, — это настойчивость. Если это слишком узкое место и не может быть дополнительно оптимизировано, рассмотрите возможность использования моделей представления в памяти.

Подводя итоги

По сути, это все, что нужно для реализации модели чтения, использующей хранилище событий. Он приобретает большую простоту благодаря линейному хранению событий и обработке всего в одном потоке. Настолько, что в итоге это просто цикл, реализующий сокращение, показанное в начале.

В следующих статьях я собираюсь углубиться в практические вопросы реализации прогнозов.

Ссылка: Написание модели чтения событий из CQRS от нашего партнера JCG Конрада Гаруса в блоге Белки .