Статьи

Event-Sourcing и CQRS на практике

Любой, кто пытался внедрить полностью совместимую с ACID систему, знает, что вам нужно учитывать множество факторов. Необходимо убедиться, что объекты базы данных можно свободно создавать, изменять и удалять без риска ошибок, и в большинстве случаев решение будет стоить производительности. Одна из методологий, которая может быть использована для решения этой проблемы, — это проектирование системы на основе ряда событий, а не изменяемых состояний. Обычно это называется Event Sourcing.

В этой статье я продемонстрирую демонстрационное приложение, которое использует набор инструментов « Быстрый доступ» с открытым исходным кодом, чтобы быстро запустить и запустить масштабируемое приложение базы данных на основе событий. Полный исходный код для примера доступен здесь .

Что такое Event Sourcing?

В типичной системе реляционных баз данных состояние объекта сохраняется в виде строки в базе данных. Когда состояние изменяется, приложение изменяет строку с помощью оператора UPDATE или DELETE. Проблема этого метода заключается в том, что он добавляет множество требований к базе данных, когда необходимо убедиться, что ни одна строка не изменена таким образом, что система переводится в недопустимое состояние. Вы не хотите, чтобы кто-либо снимал больше денег, чем было на их счете, или предлагал цену на аукционе, который уже был закрыт.

В системе, основанной на событиях, мы применяем к этому другой подход. Вместо сохранения состояния объекта в базе данных вы сохраняете серию изменений, которые привели к этому состоянию. Событие является неизменным после его создания, а это означает, что вам нужно реализовать только две операции, CREATE и READ. Если объект обновляется или удаляется, это реализуется с помощью создания события «обновление» или «удаление».

Система источников событий может быть легко масштабирована для повышения производительности, поскольку любой узел может просто загрузить журнал событий и воспроизвести текущее состояние. Вы также получаете лучшую производительность благодаря тому, что написание и запросы обрабатываются на разных машинах. Это называется CQRS (сегрегация ответственности команд-запросов). Как вы увидите в примерах, мы можем получить в конечном итоге непротиворечивое материализованное представление и запустить его за очень короткое время, используя инструментарий Speedment.

Заказать сауну

Чтобы продемонстрировать рабочий процесс создания системы событий, мы создадим небольшое приложение для обработки бронирования общей сауны в жилом комплексе. У нас есть несколько арендаторов, заинтересованных в бронировании сауны, но мы должны гарантировать, что застенчивые арендаторы никогда не закажут ее дважды. Мы также хотим поддерживать несколько саун в одной системе.

Чтобы упростить связь с базой данных, мы собираемся использовать инструментарий Speedment . Speedment — это Java-инструмент, который позволяет нам генерировать полную модель домена из базы данных, а также позволяет легко запрашивать базу данных, используя оптимизированные потоки Java 8. Ускорение доступно под лицензией Apache 2, и на странице Github есть много отличных примеров для различных применений.

Шаг 1: Определите схему базы данных

Первым шагом является определение нашей базы данных (MySQL). У нас просто есть одна таблица под названием «бронирование», где мы храним события, связанные с бронированием сауны. Обратите внимание, что бронирование — это событие, а не организация. Если мы хотим отменить бронирование или внести в него изменения, нам нужно будет опубликовать дополнительные события с изменениями в виде новых строк. Нам не разрешено изменять или удалять опубликованную строку.

01
02
03
04
05
06
07
08
09
10
11
12
CREATE DATABASE `sauna`;
 
CREATE TABLE `sauna`.`booking` (
  `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  `booking_id` BIGINT NOT NULL,
  `event_type` ENUM('CREATE', 'UPDATE', 'DELETE') NOT NULL,
  `tenant` INT NULL,
  `sauna` INT NULL,
  `booked_from` DATE NULL,
  `booked_to` DATE NULL,
  PRIMARY KEY (`id`)
);

Столбец «id» — это увеличивающееся целое число, которое назначается автоматически каждый раз, когда в журнале публикуется новое событие. «Booking_id» сообщает нам, на какое бронирование мы ссылаемся. Если два события имеют один и тот же идентификатор бронирования, они относятся к одному и тому же объекту. У нас также есть перечисление с именем «event_type», которое описывает, какую операцию мы пытались выполнить. После этого поступает информация, которая относится к бронированию. Если столбец равен NULL, мы будем считать его неизмененным по сравнению с любым предыдущим значением.

Шаг 2: Генерация кода с использованием ускорения

Следующим шагом является создание кода для проекта с использованием Speedment. Просто создайте новый проект maven и добавьте следующий код в файл pom.xml.

pom.xml

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
  <speedment.version>3.0.0-EA2</speedment.version>
  <mysql.version>5.1.39</mysql.version>
</properties>
 
<build>
  <plugins>
    <plugin>
      <groupId>com.speedment</groupId>
      <artifactId>speedment-maven-plugin</artifactId>
      <version>${speedment.version}</version>
 
      <dependencies>
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>${mysql.version}</version>
        </dependency>
      </dependencies>
    </plugin>
  </plugins>
</build>
 
<dependencies>
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql.version}</version>
  </dependency>
 
  <dependency>
    <groupId>com.speedment</groupId>
    <artifactId>runtime</artifactId>
    <version>${speedment.version}</version>
    <type>pom</type>
  </dependency>
</dependencies>

Если вы создаете проект, в среде IDE должна появиться новая задача maven, называемая speedment: tool . Запустите его, чтобы запустить интерфейс пользователя Speedment. Там подключитесь к базе данных Sauna и сгенерируйте код, используя настройки по умолчанию. Теперь проект должен быть заполнен исходными файлами.

Совет: если вы вносите изменения в базу данных, вы можете загрузить новую конфигурацию, используя speedment: reload -goal и регенерировать источники, используя speedment: generate . Нет необходимости перезапускать инструмент!

Шаг 3: Создание материализованного представления

Материализованное представление — это компонент, который регулярно опрашивает базу данных, чтобы увидеть, были ли добавлены какие-либо новые строки, и, если это так, загружает и объединяет их в представление в правильном порядке. Поскольку опрос иногда может занимать много времени, мы хотим, чтобы этот процесс выполнялся в отдельном потоке. Мы можем сделать это с помощью Java Timer и TimerTask.

Опрашивать базу данных? В самом деле? Что ж, важно принять во внимание то, что опросить базу данных будет только сервер, а не клиенты. Это дает нам очень хорошую масштабируемость, поскольку у нас может быть несколько серверов, опрашивающих базу данных, которые в свою очередь обслуживают сотни тысяч арендаторов. Сравните это с обычной системой, где каждый клиент запрашивает ресурс у сервера, который, в свою очередь, связывается с базой данных.

BookingView.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
public final class BookingView {
 
  ...
 
  public static BookingView create(BookingManager mgr) {
    final AtomicBoolean working = new AtomicBoolean(false);
    final AtomicLong last  = new AtomicLong();
    final AtomicLong total = new AtomicLong();
         
    final String table = mgr.getTableIdentifier().getTableName();
    final String field = Booking.ID.identifier().getColumnName();
 
    final Timer timer = new Timer();
    final BookingView view = new BookingView(timer);
    final TimerTask task = ...;
 
    timer.scheduleAtFixedRate(task, 0, UPDATE_EVERY);
    return view;
  }
}

Задача таймера определяется анонимно, и именно там будет находиться логика опроса.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
final TimerTask task = new TimerTask() {
  @Override
  public void run() {
    boolean first = true;
 
    // Make sure no previous task is already inside this block.
    if (working.compareAndSet(false, true)) {
      try {
 
        // Loop until no events was merged
        // (the database is up to date).
        while (true) {
 
          // Get a list of up to 25 events that has not yet
          // been merged into the materialized object view.
          final List added = unmodifiableList(
            mgr.stream()
              .filter(Booking.ID.greaterThan(last.get()))
              .sorted(Booking.ID.comparator())
              .limit(MAX_BATCH_SIZE)
              .collect(toList())
            );
 
          if (added.isEmpty()) {
            if (!first) {
              System.out.format(
                "%s: View is up to date. A total of " +
                "%d rows have been loaded.%n",
                System.identityHashCode(last),
                total.get()
              );
            }
 
            break;
          } else {
            final Booking lastEntity =
              added.get(added.size() - 1);
 
            last.set(lastEntity.getId());
            added.forEach(view::accept);
            total.addAndGet(added.size());
 
            System.out.format(
              "%s: Downloaded %d row(s) from %s. " +
              "Latest %s: %d.%n",
              System.identityHashCode(last),
              added.size(),
              table,
              field,
              Long.parseLong("" + last.get())
            );
          }
 
          first = false;
        }
 
        // Release this resource once we exit this block.
      } finally {
        working.set(false);
      }
    }
  }
};

Иногда задача объединения может занять больше времени, чем интервал таймера. Чтобы избежать этой проблемы, мы используем AtomicBoolean, чтобы проверить и убедиться, что только одна задача может выполняться одновременно. Это похоже на семафор, за исключением того, что мы хотим, чтобы задачи, для которых у нас нет времени, отбрасывались, а не помещались в очередь, поскольку нам не нужно выполнять каждую задачу, новая будет приходить всего за секунду.

Конструктор и базовые методы-члены довольно легко реализовать. Мы сохраняем таймер, переданный классу, в качестве параметра в конструкторе, чтобы мы могли отменить этот таймер, если нам когда-нибудь понадобится его остановить. Мы также храним карту, которая сохраняет текущий вид всех заказов в памяти.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
private final static int MAX_BATCH_SIZE = 25;
private final static int UPDATE_EVERY   = 1_000; // Milliseconds
 
private final Timer timer;
private final Map<Long, Booking> bookings;
 
private BookingView(Timer timer) {
  this.timer    = requireNonNull(timer);
  this.bookings = new ConcurrentHashMap<>();
}
 
public Stream<Booking> stream() {
  return bookings.values().stream();
}
 
public void stop() {
  timer.cancel();
}

Последним отсутствующим элементом класса BookingView является метод accept (), использованный выше в процедуре слияния. Это где новые события принимаются во внимание и объединяются в представлении.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private boolean accept(Booking ev) {
    final String type = ev.getEventType();
 
    // If this was a creation event
    switch (type) {
        case "CREATE" :
            // Creation events must contain all information.
            if (!ev.getSauna().isPresent()
            ||  !ev.getTenant().isPresent()
            ||  !ev.getBookedFrom().isPresent()
            ||  !ev.getBookedTo().isPresent()
            ||  !checkIfAllowed(ev)) {
                return false;
            }
 
            // If something is already mapped to that key, refuse the
            // event.
            return bookings.putIfAbsent(ev.getBookingId(), ev) == null;
 
        case "UPDATE" :
            // Create a copy of the current state
            final Booking existing = bookings.get(ev.getBookingId());
 
            // If the specified key did not exist, refuse the event.
            if (existing != null) {
                final Booking proposed = new BookingImpl();
                proposed.setId(existing.getId());
 
                // Update non-null values
                proposed.setSauna(ev.getSauna().orElse(
                    unwrap(existing.getSauna())
                ));
                proposed.setTenant(ev.getTenant().orElse(
                    unwrap(existing.getTenant())
                ));
                proposed.setBookedFrom(ev.getBookedFrom().orElse(
                    unwrap(existing.getBookedFrom())
                ));
                proposed.setBookedTo(ev.getBookedTo().orElse(
                    unwrap(existing.getBookedTo())
                ));
 
                // Make sure these changes are allowed.
                if (checkIfAllowed(proposed)) {
                    bookings.put(ev.getBookingId(), proposed);
                    return true;
                }
            }
 
            return false;
 
 
        case "DELETE" :
            // Remove the event if it exists, else refuse the event.
            return bookings.remove(ev.getBookingId()) != null;
 
        default :
            System.out.format(
                "Unexpected type '%s' was refused.%n", type);
            return false;
    }
}

В системе источников событий правила применяются не при получении событий, а при их материализации. По сути, любой может вставить новые события в систему, если они это делают в конце таблицы. Именно в этом методе мы выбираем сброс событий, которые не следуют настройке правил.

Шаг 4: Пример использования

В этом примере мы будем использовать стандартный API-интерфейс Speedment для вставки трех новых заказов в базу данных, две из которых действительны, а третья пересекает одну из предыдущих. Затем мы дождемся обновления вида и распечатаем каждое сделанное бронирование.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public static void main(String... params) {
  final SaunaApplication app = new SaunaApplicationBuilder()
    .withPassword("password")
    .build();
 
  final BookingManager bookings =
    app.getOrThrow(BookingManager.class);
 
  final SecureRandom rand = new SecureRandom();
  rand.setSeed(System.currentTimeMillis());
 
  // Insert three new bookings into the system.
  bookings.persist(
    new BookingImpl()
      .setBookingId(rand.nextLong())
      .setEventType("CREATE")
      .setSauna(1)
      .setTenant(1)
      .setBookedFrom(Date.valueOf(LocalDate.now().plus(3, DAYS)))
      .setBookedTo(Date.valueOf(LocalDate.now().plus(5, DAYS)))
  );
 
  bookings.persist(
    new BookingImpl()
      .setBookingId(rand.nextLong())
      .setEventType("CREATE")
      .setSauna(1)
      .setTenant(2)
      .setBookedFrom(Date.valueOf(LocalDate.now().plus(1, DAYS)))
      .setBookedTo(Date.valueOf(LocalDate.now().plus(2, DAYS)))
  );
 
  bookings.persist(
    new BookingImpl()
      .setBookingId(rand.nextLong())
      .setEventType("CREATE")
      .setSauna(1)
      .setTenant(3)
      .setBookedFrom(Date.valueOf(LocalDate.now().plus(2, DAYS)))
      .setBookedTo(Date.valueOf(LocalDate.now().plus(7, DAYS)))
  );
 
  final BookingView view = BookingView.create(bookings);
 
  // Wait until the view is up-to-date.
  try { Thread.sleep(5_000); }
  catch (final InterruptedException ex) {
    throw new RuntimeException(ex);
  }
 
  System.out.println("Current Bookings for Sauna 1:");
  final SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd");
  final Date now = Date.valueOf(LocalDate.now());
  view.stream()
    .filter(Booking.SAUNA.equal(1))
    .filter(Booking.BOOKED_TO.greaterOrEqual(now))
    .sorted(Booking.BOOKED_FROM.comparator())
    .map(b -> String.format(
      "Booked from %s to %s by Tenant %d.",
      dt.format(b.getBookedFrom().get()),
      dt.format(b.getBookedTo().get()),
      b.getTenant().getAsInt()
    ))
    .forEachOrdered(System.out::println);
 
  System.out.println("No more bookings!");
  view.stop();
}

Если мы запустим его, мы получим следующий вывод:

1
2
3
4
5
6
677772350: Downloaded 3 row(s) from booking. Latest id: 3.
677772350: View is up to date. A total of 3 rows have been loaded.
Current Bookings for Sauna 1:
Booked from 2016-10-11 to 2016-10-12 by Tenant 2.
Booked from 2016-10-13 to 2016-10-15 by Tenant 1.
No more bookings!

Полный исходный код этого демонстрационного приложения доступен на моей странице GitHub . Там вы также можете найти много других примеров того, как использовать Speedment в различных сценариях для быстрой разработки приложений баз данных.

Резюме

В этой статье мы разработали материализованное представление таблицы базы данных, которая оценивает события при материализации, а не при вставке. Это позволяет раскручивать несколько экземпляров приложения, не беспокоясь о синхронизации их, поскольку в конечном итоге они будут согласованными. Затем мы закончили, показав, как материализованное представление может быть запрошено с использованием API ускорения для создания списка текущих заказов.

Спасибо за чтение и, пожалуйста, ознакомьтесь с другими примерами Speedment на странице Github!

Ссылка: Event-Sourcing и CQRS на практике от нашего партнера JCG Эмиля Форслунда из блога Age of Java .