Любой, кто пытался внедрить полностью совместимую с ACID систему, знает, что вам нужно учитывать множество факторов. Необходимо убедиться, что объекты базы данных можно свободно создавать, изменять и удалять без риска ошибок, и в большинстве случаев решение будет стоить производительности. Одна из методологий, которая может быть использована для решения этой проблемы, — это проектирование системы на основе ряда событий, а не изменяемых состояний. Обычно это называется Event Sourcing.
В этой статье я продемонстрирую демонстрационное приложение, которое использует набор инструментов « Быстрый доступ» с открытым исходным кодом, чтобы быстро запустить и запустить масштабируемое приложение базы данных на основе событий. Полный исходный код для примера доступен здесь .
Что такое Event Sourcing?
В типичной системе реляционных баз данных состояние объекта сохраняется в виде строки в базе данных. Когда состояние изменяется, приложение изменяет строку с помощью оператора UPDATE или DELETE. Проблема этого метода заключается в том, что он добавляет множество требований к базе данных, когда необходимо убедиться, что ни одна строка не изменена таким образом, что система переводится в недопустимое состояние. Вы не хотите, чтобы кто-либо снимал больше денег, чем было на их счете, или предлагал цену на аукционе, который уже был закрыт.
В системе, основанной на событиях, мы применяем к этому другой подход. Вместо сохранения состояния объекта в базе данных вы сохраняете серию изменений, которые привели к этому состоянию. Событие является неизменным после его создания, а это означает, что вам нужно реализовать только две операции, CREATE и READ. Если объект обновляется или удаляется, это реализуется с помощью создания события «обновление» или «удаление».
Система источников событий может быть легко масштабирована для повышения производительности, поскольку любой узел может просто загрузить журнал событий и воспроизвести текущее состояние. Вы также получаете лучшую производительность благодаря тому, что написание и запросы обрабатываются на разных машинах. Это называется CQRS (сегрегация ответственности команд-запросов). Как вы увидите в примерах, мы можем получить в конечном итоге непротиворечивое материализованное представление и запустить его за очень короткое время, используя инструментарий Speedment.
Заказать сауну
Чтобы продемонстрировать рабочий процесс создания системы событий, мы создадим небольшое приложение для обработки бронирования общей сауны в жилом комплексе. У нас есть несколько арендаторов, заинтересованных в бронировании сауны, но мы должны гарантировать, что застенчивые арендаторы никогда не закажут ее дважды. Мы также хотим поддерживать несколько саун в одной системе.
Чтобы упростить связь с базой данных, мы собираемся использовать инструментарий Speedment . Speedment — это Java-инструмент, который позволяет нам генерировать полную модель домена из базы данных, а также позволяет легко запрашивать базу данных, используя оптимизированные потоки Java 8. Ускорение доступно под лицензией Apache 2, и на странице Github есть много отличных примеров для различных применений.
Шаг 1: Определите схему базы данных
Первым шагом является определение нашей базы данных (MySQL). У нас просто есть одна таблица под названием «бронирование», где мы храним события, связанные с бронированием сауны. Обратите внимание, что бронирование — это событие, а не организация. Если мы хотим отменить бронирование или внести в него изменения, нам нужно будет опубликовать дополнительные события с изменениями в виде новых строк. Нам не разрешено изменять или удалять опубликованную строку.
|
01
02
03
04
05
06
07
08
09
10
11
12
|
CREATE DATABASE `sauna`;CREATE TABLE `sauna`.`booking` ( `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, `booking_id` BIGINT NOT NULL, `event_type` ENUM('CREATE', 'UPDATE', 'DELETE') NOT NULL, `tenant` INT NULL, `sauna` INT NULL, `booked_from` DATE NULL, `booked_to` DATE NULL, PRIMARY KEY (`id`)); |
Столбец «id» — это увеличивающееся целое число, которое назначается автоматически каждый раз, когда в журнале публикуется новое событие. «Booking_id» сообщает нам, на какое бронирование мы ссылаемся. Если два события имеют один и тот же идентификатор бронирования, они относятся к одному и тому же объекту. У нас также есть перечисление с именем «event_type», которое описывает, какую операцию мы пытались выполнить. После этого поступает информация, которая относится к бронированию. Если столбец равен NULL, мы будем считать его неизмененным по сравнению с любым предыдущим значением.
Шаг 2: Генерация кода с использованием ускорения
Следующим шагом является создание кода для проекта с использованием Speedment. Просто создайте новый проект maven и добавьте следующий код в файл pom.xml.
pom.xml
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <speedment.version>3.0.0-EA2</speedment.version> <mysql.version>5.1.39</mysql.version></properties><build> <plugins> <plugin> <groupId>com.speedment</groupId> <artifactId>speedment-maven-plugin</artifactId> <version>${speedment.version}</version> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> </dependencies> </plugin> </plugins></build><dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <dependency> <groupId>com.speedment</groupId> <artifactId>runtime</artifactId> <version>${speedment.version}</version> <type>pom</type> </dependency></dependencies> |
Если вы создаете проект, в среде IDE должна появиться новая задача maven, называемая speedment: tool . Запустите его, чтобы запустить интерфейс пользователя Speedment. Там подключитесь к базе данных Sauna и сгенерируйте код, используя настройки по умолчанию. Теперь проект должен быть заполнен исходными файлами.
Совет: если вы вносите изменения в базу данных, вы можете загрузить новую конфигурацию, используя speedment: reload -goal и регенерировать источники, используя speedment: generate . Нет необходимости перезапускать инструмент!
Шаг 3: Создание материализованного представления
Материализованное представление — это компонент, который регулярно опрашивает базу данных, чтобы увидеть, были ли добавлены какие-либо новые строки, и, если это так, загружает и объединяет их в представление в правильном порядке. Поскольку опрос иногда может занимать много времени, мы хотим, чтобы этот процесс выполнялся в отдельном потоке. Мы можем сделать это с помощью Java Timer и TimerTask.
Опрашивать базу данных? В самом деле? Что ж, важно принять во внимание то, что опросить базу данных будет только сервер, а не клиенты. Это дает нам очень хорошую масштабируемость, поскольку у нас может быть несколько серверов, опрашивающих базу данных, которые в свою очередь обслуживают сотни тысяч арендаторов. Сравните это с обычной системой, где каждый клиент запрашивает ресурс у сервера, который, в свою очередь, связывается с базой данных.
BookingView.java
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
public final class BookingView { ... public static BookingView create(BookingManager mgr) { final AtomicBoolean working = new AtomicBoolean(false); final AtomicLong last = new AtomicLong(); final AtomicLong total = new AtomicLong(); final String table = mgr.getTableIdentifier().getTableName(); final String field = Booking.ID.identifier().getColumnName(); final Timer timer = new Timer(); final BookingView view = new BookingView(timer); final TimerTask task = ...; timer.scheduleAtFixedRate(task, 0, UPDATE_EVERY); return view; }} |
Задача таймера определяется анонимно, и именно там будет находиться логика опроса.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
final TimerTask task = new TimerTask() { @Override public void run() { boolean first = true; // Make sure no previous task is already inside this block. if (working.compareAndSet(false, true)) { try { // Loop until no events was merged // (the database is up to date). while (true) { // Get a list of up to 25 events that has not yet // been merged into the materialized object view. final List added = unmodifiableList( mgr.stream() .filter(Booking.ID.greaterThan(last.get())) .sorted(Booking.ID.comparator()) .limit(MAX_BATCH_SIZE) .collect(toList()) ); if (added.isEmpty()) { if (!first) { System.out.format( "%s: View is up to date. A total of " + "%d rows have been loaded.%n", System.identityHashCode(last), total.get() ); } break; } else { final Booking lastEntity = added.get(added.size() - 1); last.set(lastEntity.getId()); added.forEach(view::accept); total.addAndGet(added.size()); System.out.format( "%s: Downloaded %d row(s) from %s. " + "Latest %s: %d.%n", System.identityHashCode(last), added.size(), table, field, Long.parseLong("" + last.get()) ); } first = false; } // Release this resource once we exit this block. } finally { working.set(false); } } }}; |
Иногда задача объединения может занять больше времени, чем интервал таймера. Чтобы избежать этой проблемы, мы используем AtomicBoolean, чтобы проверить и убедиться, что только одна задача может выполняться одновременно. Это похоже на семафор, за исключением того, что мы хотим, чтобы задачи, для которых у нас нет времени, отбрасывались, а не помещались в очередь, поскольку нам не нужно выполнять каждую задачу, новая будет приходить всего за секунду.
Конструктор и базовые методы-члены довольно легко реализовать. Мы сохраняем таймер, переданный классу, в качестве параметра в конструкторе, чтобы мы могли отменить этот таймер, если нам когда-нибудь понадобится его остановить. Мы также храним карту, которая сохраняет текущий вид всех заказов в памяти.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
private final static int MAX_BATCH_SIZE = 25;private final static int UPDATE_EVERY = 1_000; // Millisecondsprivate final Timer timer;private final Map<Long, Booking> bookings;private BookingView(Timer timer) { this.timer = requireNonNull(timer); this.bookings = new ConcurrentHashMap<>();}public Stream<Booking> stream() { return bookings.values().stream();}public void stop() { timer.cancel();} |
Последним отсутствующим элементом класса BookingView является метод accept (), использованный выше в процедуре слияния. Это где новые события принимаются во внимание и объединяются в представлении.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
private boolean accept(Booking ev) { final String type = ev.getEventType(); // If this was a creation event switch (type) { case "CREATE" : // Creation events must contain all information. if (!ev.getSauna().isPresent() || !ev.getTenant().isPresent() || !ev.getBookedFrom().isPresent() || !ev.getBookedTo().isPresent() || !checkIfAllowed(ev)) { return false; } // If something is already mapped to that key, refuse the // event. return bookings.putIfAbsent(ev.getBookingId(), ev) == null; case "UPDATE" : // Create a copy of the current state final Booking existing = bookings.get(ev.getBookingId()); // If the specified key did not exist, refuse the event. if (existing != null) { final Booking proposed = new BookingImpl(); proposed.setId(existing.getId()); // Update non-null values proposed.setSauna(ev.getSauna().orElse( unwrap(existing.getSauna()) )); proposed.setTenant(ev.getTenant().orElse( unwrap(existing.getTenant()) )); proposed.setBookedFrom(ev.getBookedFrom().orElse( unwrap(existing.getBookedFrom()) )); proposed.setBookedTo(ev.getBookedTo().orElse( unwrap(existing.getBookedTo()) )); // Make sure these changes are allowed. if (checkIfAllowed(proposed)) { bookings.put(ev.getBookingId(), proposed); return true; } } return false; case "DELETE" : // Remove the event if it exists, else refuse the event. return bookings.remove(ev.getBookingId()) != null; default : System.out.format( "Unexpected type '%s' was refused.%n", type); return false; }} |
В системе источников событий правила применяются не при получении событий, а при их материализации. По сути, любой может вставить новые события в систему, если они это делают в конце таблицы. Именно в этом методе мы выбираем сброс событий, которые не следуют настройке правил.
Шаг 4: Пример использования
В этом примере мы будем использовать стандартный API-интерфейс Speedment для вставки трех новых заказов в базу данных, две из которых действительны, а третья пересекает одну из предыдущих. Затем мы дождемся обновления вида и распечатаем каждое сделанное бронирование.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
public static void main(String... params) { final SaunaApplication app = new SaunaApplicationBuilder() .withPassword("password") .build(); final BookingManager bookings = app.getOrThrow(BookingManager.class); final SecureRandom rand = new SecureRandom(); rand.setSeed(System.currentTimeMillis()); // Insert three new bookings into the system. bookings.persist( new BookingImpl() .setBookingId(rand.nextLong()) .setEventType("CREATE") .setSauna(1) .setTenant(1) .setBookedFrom(Date.valueOf(LocalDate.now().plus(3, DAYS))) .setBookedTo(Date.valueOf(LocalDate.now().plus(5, DAYS))) ); bookings.persist( new BookingImpl() .setBookingId(rand.nextLong()) .setEventType("CREATE") .setSauna(1) .setTenant(2) .setBookedFrom(Date.valueOf(LocalDate.now().plus(1, DAYS))) .setBookedTo(Date.valueOf(LocalDate.now().plus(2, DAYS))) ); bookings.persist( new BookingImpl() .setBookingId(rand.nextLong()) .setEventType("CREATE") .setSauna(1) .setTenant(3) .setBookedFrom(Date.valueOf(LocalDate.now().plus(2, DAYS))) .setBookedTo(Date.valueOf(LocalDate.now().plus(7, DAYS))) ); final BookingView view = BookingView.create(bookings); // Wait until the view is up-to-date. try { Thread.sleep(5_000); } catch (final InterruptedException ex) { throw new RuntimeException(ex); } System.out.println("Current Bookings for Sauna 1:"); final SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd"); final Date now = Date.valueOf(LocalDate.now()); view.stream() .filter(Booking.SAUNA.equal(1)) .filter(Booking.BOOKED_TO.greaterOrEqual(now)) .sorted(Booking.BOOKED_FROM.comparator()) .map(b -> String.format( "Booked from %s to %s by Tenant %d.", dt.format(b.getBookedFrom().get()), dt.format(b.getBookedTo().get()), b.getTenant().getAsInt() )) .forEachOrdered(System.out::println); System.out.println("No more bookings!"); view.stop();} |
Если мы запустим его, мы получим следующий вывод:
|
1
2
3
4
5
6
|
677772350: Downloaded 3 row(s) from booking. Latest id: 3.677772350: View is up to date. A total of 3 rows have been loaded.Current Bookings for Sauna 1:Booked from 2016-10-11 to 2016-10-12 by Tenant 2.Booked from 2016-10-13 to 2016-10-15 by Tenant 1.No more bookings! |
Полный исходный код этого демонстрационного приложения доступен на моей странице GitHub . Там вы также можете найти много других примеров того, как использовать Speedment в различных сценариях для быстрой разработки приложений баз данных.
Резюме
В этой статье мы разработали материализованное представление таблицы базы данных, которая оценивает события при материализации, а не при вставке. Это позволяет раскручивать несколько экземпляров приложения, не беспокоясь о синхронизации их, поскольку в конечном итоге они будут согласованными. Затем мы закончили, показав, как материализованное представление может быть запрошено с использованием API ускорения для создания списка текущих заказов.
Спасибо за чтение и, пожалуйста, ознакомьтесь с другими примерами Speedment на странице Github!
| Ссылка: | Event-Sourcing и CQRS на практике от нашего партнера JCG Эмиля Форслунда из блога Age of Java . |