Статьи

Реактивный PostgreSQL: прослушивание уведомлений с помощью Scala

В прошлом я написал пару статей ( Создание службы REST в Scala с помощью Akka HTTP, Akka Streams и реактивного монго и ReactiveMongo с Akka, Scala и веб-сокетами) который использовал MongoDB для отправки обновлений непосредственно из базы данных в приложение Scala. Это очень хорошая функция, если вы просто хотите подписать свое приложение на список потоковых событий, где действительно не имеет значения, пропустите ли вы его, когда ваше приложение не работает. Хотя MongoDB — отличная база данных, она не подходит для всех целей. Иногда вам нужна реляционная база данных с четко определенной схемой или база данных, которая может объединять миры SQL и noSQL. Лично мне всегда очень нравился Postgresql. Это одна из лучших реляционных баз данных, отличная поддержка ГИС (которая мне действительно очень нравится), и все больше и больше поддержки без JSON / Schema (в которую мне нужно углубиться когда-нибудь). Одна из особенностей, о которых я не знал в Postgresql, заключалась в том, что он предоставляет своего рода механизм подписки.Я узнал об этом, когда читалПрослушивание общих JSON-уведомлений из PostgreSQL in Go », в которой показано, как использовать это из Go. В этой статье мы попытаемся увидеть, что вам нужно сделать, чтобы получить что-то подобное, работающее в Scala (подход к Java в значительной степени одно и тоже).

Как это работает в PostgreSQL?

На самом деле очень легко слушать уведомления в PostgreSQL. Все, что вам нужно сделать, это следующее:

LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.

Соединение, которое хочет прослушивать события, вызывает LISTEN с названием канала, по которому он хочет прослушивать. И отправляющее соединение просто запускает NOTIFY с названием канала и возможной полезной нагрузкой.

Подготовка базы данных

Крутая вещь из статьи о Go, о которой я упоминал во введении, состоит в том, что она предоставляет хранимую процедуру, которая автоматически отправляет уведомление всякий раз, когда строка таблицы вставляется, UPDATEd или DELETEd. Следующее, взятое из Прослушивания общих уведомлений JSON из PostgreSQL в Go, создает хранимую процедуру, которая отправляет уведомления при вызове.

Действительно интересная вещь в этой хранимой процедуре заключается в том, что данные преобразуются в JSON, поэтому мы можем легко обработать их в нашем приложении. В этом примере я буду использовать те же таблицы и данные, что и в статье Go, поэтому сначала создайте таблицу:

CREATE TABLE products (
  id SERIAL,
  name TEXT,
  quantity FLOAT
);

И создавать триггер всякий раз, когда что-то происходит с таблицей.

CREATE TRIGGER products_notify_event
AFTER INSERT OR UPDATE OR DELETE ON products
    FOR EACH ROW EXECUTE PROCEDURE notify_event();

На этом этапе, когда строка вставляется, обновляется или удаляется в таблице продуктов, создается событие уведомления. Мы можем просто проверить это с помощью командной строки pgsql:

triggers=# LISTEN events;
LISTEN
triggers=# INSERT INTO products(name, quantity) VALUES ('Something', 99999);
INSERT 0 1
Asynchronous notification "events" with payload "{"table" : "products", "action" : "INSERT", "data" : {"id":50,"name":"Something","quantity":99999}}" received from server process with PID 24131.
triggers=# 

Как видите, INSERT привел к асинхронному событию, которое содержит данные. Итак, до сих пор мы в значительной степени следовали шагам, также изложенным в статье Go. Теперь давайте посмотрим, как мы можем получить доступ к уведомлениям из Scala.

Доступ к уведомлениям из Scala

Сначала давайте настроим зависимости нашего проекта. Как всегда мы используем SBT. Build.sbt для этого проекта выглядит так:

Краткое описание зависимостей:

  • scalikeJDBC : Этот проект предоставляет простую в использовании оболочку вокруг JDBC, поэтому нам не нужно использовать Java-способ обработки соединений и прочее.
  • akka : мы используем инфраструктуру Akka для управления соединением с базой данных. Поскольку драйвер JDBC не асинхронный, он может выдавать данные, нам нужно установить интервал.
  • json4s : это простая библиотека Scala JSON. Мы используем это для быстрого преобразования поступающих данных в простой класс дел.

Сначала мы покажем вам полный исходный код для этого примера, а затем объясним различные части:

Если вы знакомы с Akka и scalikeJDBC, код будет выглядеть знакомо. Мы начнем с некоторых общих настроек:

Здесь мы определяем наш класс case, в который мы преобразуем входящий JSON, настраиваем пул соединений, определяем систему Akka и запускаем наш актер Poller. Здесь нет ничего особенного, единственное, что особенное — в строке 23. Чтобы добавить слушателя из Scala, нам нужен доступ к базовому соединению JDBC. Поскольку scalikeJDBC использует пул соединений, нам нужно явно вызвать setAccessToUnderlyingConnectionAllowed, чтобы убедиться, что нам разрешен доступ к реальному соединению, когда мы вызываем getInnerMostDelegate, а не просто обернуть одно из пула соединений. Интересно отметить, что если мы не установим это, мы не получим сообщение об ошибке или что-то еще, мы просто получим Null из этого вызова метода ….

Теперь, когда наш Актер начал, давайте посмотрим, что он делает:

Первое, что мы делаем в нашем актере, это устанавливаем некоторые свойства, необходимые для scalikeJDBC, и устанавливаем таймер, который запускает сообщение каждые 500 мс. Также обратите внимание на функции preStart и postStop. В предварительном запуске мы выполняем небольшой фрагмент SQL, который сообщает postgres, что это соединение будет прослушивать уведомления с именем «events». Мы также установили DB.autoClose на падение, чтобы избежать механизма объединения сеансов, закрывающего сеанс и соединение. Мы хотим сохранить их живыми, чтобы мы могли получать события. Когда актер заканчивается, мы убираем таймер и соединение.

В функции receive мы сначала получаем реальное PGConnection, а затем получаем уведомления от соединения:

Если уведомление Noll не будет возвращено, мы оборачиваем это в Option и просто возвращаем пустой массив в случае Null. Если есть какие-либо уведомления, мы просто обрабатываем их в цикле foreach и выводим результат:

Здесь вы также можете увидеть, что мы просто получаем элемент «data» из уведомления и преобразуем его в наш класс Product для дальнейшей обработки. Все, что вам нужно сделать сейчас, это запустить приложение и с того же терминала pgsql добавить несколько событий. Если все прошло хорошо, вы увидите вывод, похожий на этот в вашей консоли:

Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":47,"name":"pen","quantity":10200}} 
Received as object: Product(47,pen,10200)
Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":48,"name":"pen","quantity":10200}} 
Received as object: Product(48,pen,10200)
Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":49,"name":"pen","quantity":10200}} 
Received as object: Product(49,pen,10200)
Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":50,"name":"Something","quantity":99999}} 
Received as object: Product(50,Something,99999)

Теперь, когда эта базовая конструкция работает, тривиально использовать ее, например, в качестве источника для реактивных потоков или просто использовать веб-сокеты для дальнейшего распространения этих событий.