Статьи

Слушайте уведомления от Postgresql с Scala

В прошлом я написал пару статей ( Создание службы REST в Scala с помощью Akka HTTP, Akka Streams и реактивного mongo и ReactiveMongo с Akka, Scala и websockets ), в которых MongoDB передавал обновления непосредственно из базы данных в приложение Scala. , Это очень хорошая функция, если вы просто хотите подписать свое приложение на список потоковых событий, где действительно не имеет значения, пропустите ли вы его, когда ваше приложение не работает. Хотя MongoDB — отличная база данных, она не подходит для всех целей. Иногда вам нужна реляционная база данных с четко определенной схемой или база данных, которая может объединять миры SQL и noSQL. Лично мне всегда очень нравился Postgresql. Это одна из лучших реляционных баз данных, отличная поддержка ГИС (которая мне действительно очень нравится), и все больше и больше поддержки без JSON / Schema (в которую мне нужно углубиться когда-нибудь). Одна из особенностей, о которых я не знал в Postgresql, заключалась в том, что он предоставляет своего рода механизм подписки. Я узнал об этом, прочитав статью « Прослушивание общих уведомлений JSON из PostgreSQL в Go », в которой показано, как использовать это в Go. В этой статье мы попытаемся увидеть, что вам нужно сделать, чтобы что-то подобное работало в Scala (подход для Java почти такой же).

Как это работает в Postgresql

На самом деле очень легко слушать уведомления в Postgresql. Все, что вам нужно сделать, это следующее:

1
2
3
4
5
LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.

Соединение, которое хочет прослушивать события, вызывает LISTEN с названием канала, по которому он хочет прослушивать. И отправляющее соединение просто запускает NOTIFY с названием канала и возможной полезной нагрузкой.

Подготовка базы данных

Крутая вещь из статьи о Go, о которой я упоминал во введении, состоит в том, что она предоставляет хранимую процедуру, которая автоматически отправляет уведомление всякий раз, когда строка таблицы вставляется, обновляется или удаляется. Следующее, взятое из Прослушивания общих уведомлений JSON из PostgreSQL в Go, создает хранимую процедуру, которая отправляет уведомления при вызове.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
--
CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$    DECLARE
        data json;
        notification json;
     
    BEGIN
     
        -- Convert the old or new row to JSON, based on the kind of action.
        -- Action = DELETE?             -> OLD row
        -- Action = INSERT or UPDATE?   -> NEW row
        IF (TG_OP = 'DELETE') THEN
            data = row_to_json(OLD);
        ELSE
            data = row_to_json(NEW);
        END IF;
         
        -- Contruct the notification as a JSON string.
        notification = json_build_object(
                          'table',TG_TABLE_NAME,
                          'action', TG_OP,
                          'data', data);
         
                         
        -- Execute pg_notify(channel, notification)
        PERFORM pg_notify('events',notification::text);
         
        -- Result is ignored since this is an AFTER trigger
        RETURN NULL;
    END;
     
$$ LANGUAGE plpgsql;
---

Действительно интересная вещь в этой хранимой процедуре заключается в том, что данные преобразуются в JSON, поэтому мы можем легко обработать их в нашем приложении. В этом примере я буду использовать те же таблицы и данные, что и в статье Go, поэтому сначала создайте таблицу:

1
2
3
4
5
CREATE TABLE products (
  id SERIAL,
  name TEXT,
  quantity FLOAT
);

И создавать триггер всякий раз, когда что-то происходит с таблицей.

1
2
3
CREATE TRIGGER products_notify_event
AFTER INSERT OR UPDATE OR DELETE ON products
    FOR EACH ROW EXECUTE PROCEDURE notify_event();

На этом этапе, когда строка вставляется, обновляется или удаляется в таблице продуктов, создается событие уведомления. Мы можем просто проверить это с помощью командной строки pgsql:

1
2
3
4
5
6
triggers=# LISTEN events;
LISTEN
triggers=# INSERT INTO products(name, quantity) VALUES ('Something', 99999);
INSERT 0 1
Asynchronous notification "events" with payload "{"table" : "products", "action" : "INSERT", "data" : {"id":50,"name":"Something","quantity":99999}}" received from server process with PID 24131.
triggers=#

Как видите, INSERT привел к асинхронному событию, которое содержит данные. Итак, до сих пор мы в значительной степени следовали шагам, также изложенным в статье Go. Теперь давайте посмотрим, как мы можем получить доступ к уведомлениям из Scala.

Доступ к уведомлениям из Scala

Сначала давайте настроим зависимости нашего проекта. Как всегда мы используем SBT. Build.sbt для этого проекта выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
name := "postgresql-notifications"
 
version := "1.0"
 
scalaVersion := "2.11.7"
 
libraryDependencies ++= Seq("org.postgresql" % "postgresql" % "9.4-1200-jdbc41",
  "org.scalikejdbc" %% "scalikejdbc"       % "2.2.8",
  "com.typesafe.akka" %% "akka-actor" % "2.4-SNAPSHOT",
  "org.json4s" %% "json4s-native" % "3.2.10"
  )
 
resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"

Краткое описание зависимостей:

  • scalikeJDBC : Этот проект предоставляет простую в использовании оболочку вокруг JDBC, поэтому нам не нужно использовать Java-способ обработки соединений и прочее.
  • akka : мы используем инфраструктуру Akka для управления соединением с базой данных. Поскольку драйвер JDBC не асинхронный, он может выдавать данные, нам нужно установить интервал.
  • json4s : это простая библиотека Scala JSON. Мы используем это для быстрого преобразования поступающих данных в простой класс дел.

Сначала мы покажем вам полный исходный код для этого примера, а затем объясним различные части:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import akka.actor.{Props, ActorSystem, Actor}
import org.apache.commons.dbcp.{PoolingDataSource, DelegatingConnection}
import org.json4s.DefaultFormats
import org.postgresql.{PGNotification, PGConnection}
import scalikejdbc._
import org.json4s.native.JsonMethods._
import scala.concurrent.duration._
 
/**
 * Simple case class to marshall to from received event.
 */
case class Product(id : Long, name: String, quantity: Long)
 
/**
 * Main runner. Just setups the connection pool and the actor system
 */
object PostgresNotifications extends App {
 
  // initialize JDBC driver & connection pool
  Class.forName("org.postgresql.Driver")
  ConnectionPool.singleton("jdbc:postgresql://localhost:5432/triggers", "jos", "######")
  ConnectionPool.dataSource().asInstanceOf[PoolingDataSource].setAccessToUnderlyingConnectionAllowed(true)
 
  // initialize the actor system
  val system = ActorSystem("Hello")
  val a = system.actorOf(Props[Poller], "poller")
 
  // wait for the user to stop the server
  println("Press <enter> to exit.")
  Console.in.read.toChar
  system.terminate
}
 
class Poller extends Actor {
 
  // execution context for the ticks
  import context.dispatcher
 
  val connection = ConnectionPool.borrow()
  val db: DB = DB(connection)
  val tick = context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick")
 
  override def preStart() = {
    // make sure connection isn't closed when executing queries
    // we setup the
    db.autoClose(false)
    db.localTx { implicit session =>
      sql"LISTEN events".execute().apply()
    }
  }
 
  override def postStop() = {
    tick.cancel()
    db.close()
  }
 
  def receive = {
    case "tick" => {
      db.readOnly { implicit session =>
        val pgConnection = connection.asInstanceOf[DelegatingConnection].getInnermostDelegate.asInstanceOf[PGConnection]
        val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())
 
        notifications.foreach( not => {
          println(s"Received for: ${not.getName} from process with PID: ${not.getPID}")
          println(s"Received data: ${not.getParameter} ")
 
          // convert to object
          implicit val formats = DefaultFormats
          val json = parse(not.getParameter) \\ "data"
          val prod = json.extract[Product]
          println(s"Received as object: $prod\n")
        }
        )
      }
    }
  }
}

Если вы знакомы с Akka и scalikeJDBC, код будет выглядеть знакомо. Мы начнем с некоторых общих настроек:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
 * Simple case class to marshall to from received event.
 */
case class Product(id : Long, name: String, quantity: Long)
 
/**
 * Main runner. Just setups the connection pool and the actor system
 */
object PostgresNotifications extends App {
 
  // initialize JDBC driver & connection pool
  Class.forName("org.postgresql.Driver")
  ConnectionPool.singleton("jdbc:postgresql://localhost:5432/triggers", "jos", "######")
  ConnectionPool.dataSource().asInstanceOf[PoolingDataSource].setAccessToUnderlyingConnectionAllowed(true)
 
  // initialize the actor system
  val system = ActorSystem("Hello")
  val a = system.actorOf(Props[Poller], "poller")
 
  // wait for the user to stop the server
  println("Press <enter> to exit.")
  Console.in.read.toChar
  system.terminate
}

Здесь мы определяем наш класс case, в который мы преобразуем входящий JSON, настраиваем пул соединений, определяем систему Akka и запускаем наш актер Poller. Здесь нет ничего особенного, единственное, что особенное — в строке 23. Чтобы добавить слушателя из Scala, нам нужен доступ к базовому соединению JDBC. Так как scalikeJDBC использует пул соединений, нам нужно явно вызвать setAccessToUnderlyingConnectionAllowed, чтобы убедиться, что нам разрешен доступ к реальному соединению при вызове getInnerMostDelegate, а не просто обертка одного из пула соединений. Интересно отметить, что если мы не установим это, мы не получим сообщение об ошибке или что-то еще, мы просто получим Null из этого вызова метода….

Теперь, когда наш актер начал, давайте посмотрим, что он делает:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Poller extends Actor {
 
  // execution context for the ticks
  import context.dispatcher
 
  val connection = ConnectionPool.borrow()
  val db: DB = DB(connection)
  val tick = context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick")
 
  override def preStart() = {
    // make sure connection isn't closed when executing queries
    // we setup the
    db.autoClose(false)
    db.localTx { implicit session =>
      sql"LISTEN events".execute().apply()
    }
  }
 
  override def postStop() = {
    tick.cancel()
    db.close()
  }
 
  def receive = {
    case "tick" => {
      db.readOnly { implicit session =>
        val pgConnection = connection.asInstanceOf[DelegatingConnection].getInnermostDelegate.asInstanceOf[PGConnection]
        val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())
 
        notifications.foreach( not => {
          println(s"Received for: ${not.getName} from process with PID: ${not.getPID}")
          println(s"Received data: ${not.getParameter} ")
 
          // convert to object
          implicit val formats = DefaultFormats
          val json = parse(not.getParameter) \\ "data"
          val prod = json.extract[Product]
          println(s"Received as object: $prod\n")
        }
        )
      }
    }
  }
}

Первое, что мы делаем в нашем актере, это устанавливаем некоторые свойства, необходимые для scalikeJDBC, и устанавливаем таймер, который запускает сообщение каждые 500 мс. Также обратите внимание на функции preStart и postStop. В предварительном запуске мы выполняем небольшой фрагмент SQL, который сообщает postgres, что это соединение будет прослушивать уведомления с именем «events». Мы также установили DB.autoClose на падение, чтобы избежать механизма объединения сеансов, закрывающего сеанс и соединение. Мы хотим сохранить их живыми, чтобы мы могли получать события. Когда актер заканчивается, мы убираем таймер и соединение.

В функции receive мы сначала получаем реальное PGConnection, а затем получаем уведомления от соединения:

1
2
val pgConnection = connection.asInstanceOf[DelegatingConnection].getInnermostDelegate.asInstanceOf[PGConnection]
 val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())

Если уведомление Noll не будет возвращено, мы оборачиваем это в Option и просто возвращаем пустой массив в случае Null. Если есть какие-либо уведомления, мы просто обрабатываем их в цикле foreach и выводим результат:

1
2
3
4
implicit val formats = DefaultFormats
val json = parse(not.getParameter) \\ "data"
val prod = json.extract[Product]
println(s"Received as object: $prod\n")

Здесь вы также можете увидеть, что мы просто получаем элемент «data» из уведомления и преобразуем его в наш класс Product для дальнейшей обработки. Все, что вам нужно сделать сейчас, это запустить приложение и с того же терминала pgsql добавить несколько событий. Если все прошло хорошо, вы увидите вывод, похожий на этот в вашей консоли:

01
02
03
04
05
06
07
08
09
10
11
12
Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":47,"name":"pen","quantity":10200}}
Received as object: Product(47,pen,10200)
Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":48,"name":"pen","quantity":10200}}
Received as object: Product(48,pen,10200)
Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":49,"name":"pen","quantity":10200}}
Received as object: Product(49,pen,10200)
Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":50,"name":"Something","quantity":99999}}
Received as object: Product(50,Something,99999)

Теперь, когда эта базовая конструкция работает, тривиально использовать ее, например, в качестве источника для реактивных потоков или просто использовать веб-сокеты для дальнейшего распространения этих событий.

Ссылка: Прослушайте уведомления Postgresql со Scala от нашего партнера по JCG Йоса Дирксена в блоге Smart Java .