Статьи

Источник событий + бесплатные монады = бесплатный источник?

Недавно я писал о Event Sourcing и Free Monads . Следующим естественным шагом является объединение двух! Как бы выглядела бесплатная монада, настроенная на источник событий?

Пожалуйста, имейте в виду, что ниже приведен только черновик, а не полное решение. Поэтому любые предложения по улучшению приветствуются! Но давайте рассмотрим основные предположения. Наше приложение генерирует события определенного типа E Обычно это будет иерархия классов дел. Поскольку мы хотим быть свободными от какой-либо конкретной интерпретации побочных эффектов, мы будем использовать данные типа A[_] для описания всех действий, которые могут происходить в системе (опять же, обычно с использованием классов дел) Действия включают чтение и запись модели, отправку электронных писем, индексацию данных в поисковых подсистемах и т. Д. Каждое действие возвращает некоторый результат; записи обычно приводят в Unit , а чтение — в Option[SomeData] .

События записывают то, что произошло в системе, и управляют всей «бизнес-логикой». Все события сохраняются, однако мы не указываем, как заранее; поток событий формирует основной «источник истины».

Есть две основные функции, которые будут интерпретировать события:

  • обновление модели : как следует из названия, на основании события обновляется модель. Например, для события UserRegistered это должно записать пользователя в базу данных или сохранить его в памяти в памяти. Не может генерировать новые события, только выполнять действия.
  • слушатели событий : запустить некоторую логику на основе событий. Например, для события UserRegistered это может вызвать действие, отправляющее электронное письмо. Здесь мы можем генерировать новые события, такие как создание некоторых начальных данных для нового пользователя (в нашем примере это будет ключ API).

Основная идея этого различия заключается в том, что при наличии списка событий обновление модели можно использовать для перестройки модели. Мы можем сделать это несколько раз; например, мы можем хранить события в постоянном хранилище, а модель — в памяти. Затем мы могли бы использовать функцию обновления модели, чтобы заново создать модель на новом узле. Или мы можем запустить функции обновления модели, чтобы создать новое представление данных.

С другой стороны, прослушиватели событий должны запускаться только один раз, и они должны выполнять основную «бизнес-логику» для события. Отправка приветственного сообщения по электронной почте должна производиться только один раз, даже если событие воспроизводится для повторного создания модели. Здесь мы также можем генерировать новые события, и они будут рекурсивно обрабатываться с использованием двух функций.

Точки входа верхнего уровня — это команды , параметризованные предоставленными пользователем данными (например, из веб-формы или конечной точки REST), которые на основе модели проверяют входные данные, генерируют некоторые события и возвращают значение, которое затем возвращается пользователю.

Чтобы описать программы, которые могут генерировать события, выполнять действия и где события могут обрабатываться с использованием функций обновления модели и прослушивателя событий, мы будем использовать тип данных ES[E, A, R] .

ES[E, A, R] — это описание программы, которая генерирует события типа E , содержит действия типа A и выдает результат типа R

Три компонента, описанные выше, должны иметь следующие подписи:

  • команда: AnyUserData => ES[E, A, R]
  • обновление модели: PartialFunction[E, ES[Nothing, A, Unit]] (не может генерировать события; частичное, поскольку нам не требуется обновление модели для каждого события)
  • прослушиватель событий: PartialFunction[E, ES[E, A, Unit]]

Что такое ES ?

ES является расширением свободной монады над A , которая помимо обычных конструкторов ( FlatMap , Pure и Suspend — используется для действий), содержит дополнительный конструктор Emit (его значение не должно вызывать удивления). Очень похоже на бесплатную монаду, мы можем определить метод для интерпретации ES используя любую монаду, учитывая интерпретацию действий и событий (функция foldMap ).

Вот основная структура ES :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
sealed trait ES[E, A[_], R] {
    def flatMap[R2](f: R => ES[E, A, R2]): ES[E, A, R2] = FlatMap(this, f)
    def map[R2](f: R => R2): ES[E, A, R2] = FlatMap(this, f andThen (x => Pure(x)))
    ...
}
  
case class Pure[E, A[_], R](r: R) extends ES[E, A, R] { … }
case class Emit[E, A[_]](e: E) extends ES[E, A, Unit] { … }
case class Suspend[E, A[_], R](a: A[R]) extends ES[E, A, R] { … }
case class FlatMap[E, A[_], R1, R2](
   c: ES[E, A, R1],
   f: R1 => ES[E, A, R2]) extends ES[E, A, R2] { … }
  
object ES {
    implicit def esMonad[E, A[_]]: Monad[({type x[X] = ES[E, A, X]})#x] =
       new Monad[({type x[X] = ES[E, A, X]})#x] {
          override def pure[X](x: X) = Pure(x)
          override def flatMap[X, Y](fa: ES[E, A, X])(f: X => ES[E, A, Y]) =
             fa.flatMap(f)
       }
  
    def pure[E, A[_], R](r: R): ES[E, A, R] = Pure(r)
    def done[E, A[_]]: ES[E, A, Unit] = pure(())
    def emit[E, A[_]](e: E): ES[E, A, Unit] = Emit(e)
    def suspend[E, A[_], R](a: A[R]): ES[E, A, R] = Suspend(a)
  }

Event-Sourcing ES Особенности

В дополнение к обычным операциям монады нам нужны две операции над типом данных ES . Во-первых, нам нужен способ предоставить функции обновления модели и прослушивателя событий, которые должны применяться к описанию программы. В результате мы хотим получить программу, в которой обрабатываются все события и которая содержит соответствующие действия (для каждого события сначала действия, описанные обновлением модели, а затем действия, описанные прослушивателем событий).

Следовательно, ES содержит функцию:

1
2
3
4
5
sealed trait ES[E, A[_], R] {
   def handleEvents(
      modelUpdate: PartialFunction[E, ES[Nothing, A, Unit]],
      eventListener: PartialFunction[E, ES[E, A, Unit]]): ES[Handled[E], A, R]
}

Обратите внимание, что в возвращаемом типе события помещаются в оболочку case class Handled[E](e: E) . Это сделано для того, чтобы мы не могли просто вызвать handleEvents дважды и заново интерпретировать события. И нам нужно сохранить события в описании программы, чтобы иметь возможность сохранять их при окончательной интерпретации.

Что подводит нас к функции интерпретации. Учитывая описание программы с обработанными событиями, как описано нашими функциями обновления / прослушивания событий модели, мы хотим интерпретировать ее в любой монаде, поэтому мы получаем функцию:

1
2
3
4
implicit class ESHandled[E, A[_], R](es: ES[Handled[E], A, R]) {
   def run[M[_]](ai: A ~> M, storeEvent: E => M[Unit])(
      implicit M: Monad[M]): M[R] = ???
}

Чтобы сделать интерпретацию, нам нужна как интерпретация действий (естественное преобразование A ~> M ), так и способ хранения событий. Учитывая программу ES[E, A, R] и монаду M , мы получаем результат: M[R] со всеми обработанными событиями, сохраненными и интерпретированными действиями.

Пример использования

Как будет выглядеть простой пример использования? Мы опишем программу, в которой пользователи могут зарегистрироваться, используя уникальный адрес электронной почты, и для каждого нового пользователя создается ключ API. Вот данные, действия и события, которые мы будем использовать:

01
02
03
04
05
06
07
08
09
10
11
12
13
case class User(id: Long, email: String, password: String)
case class ApiKey(userId: Long, key: String)
  
sealed trait Action[R]
case class FindUserByEmail(email: String) extends Action[Option[User]]
case class WriteUser(u: User) extends Action[Unit]
case class FindApiKeyByUserId(userId: Long) extends Action[Option[ApiKey]]
case class WriteApiKey(ak: ApiKey) extends Action[Unit]
case class SendEmail(to: String, body: String) extends Action[Unit]
  
sealed trait Event
case class UserRegistered(u: User) extends Event
case class ApiKeyCreated(ak: ApiKey) extends Event

Точкой входа будет команда для регистрации пользователей:

01
02
03
04
05
06
07
08
09
10
11
12
def registerUserCommand(
   email: String, password: String): ES[Event, Action, Either[String, Unit]] = {
  
   action(FindUserByEmail(email)).flatMap {
      case None =>
         emit(UserRegistered(User(new Random().nextInt(), email, password)))
            .map(_ => Right(()))
  
      case Some(user) =>
         pure(Left("User with the given email already exists"))
   }
}

Обратите внимание, что мы используем конкретные типы ( Event , Action ) в качестве параметров типа для ES . Результатом команды может быть либо сообщение об ошибке (представленное левой частью либо), либо успех ( Right(()) ). В команде мы выполняем действия (поиск пользователя по электронной почте) и генерируем события (зарегистрированный пользователь), если проверка прошла успешно.

Нам также нужны функции прослушивания обновления модели / события, чтобы интерпретировать события:

01
02
03
04
05
06
07
08
09
10
11
val modelUpdate: PartialFunction[Event, ES[Nothing, Action, Unit]] = {
   case UserRegistered(u) => action(WriteUser(u))
   case ApiKeyCreated(ak) => action(WriteApiKey(ak))
}
  
val eventListeners: PartialFunction[Event, ES[Event, Action, Unit]] = {
   case UserRegistered(u) => for {
      _ <- emit(ApiKeyCreated(ApiKey(u.id, UUID.randomUUID().toString)))
      _ <- action(SendEmail(u.email, "Welcome!"))
   } yield ()
}

Для ApiKeyCreated нет прослушивателей ApiKeyCreated , а для UserRegistered мы UserRegistered другое событие и выполняем действие.

Учитывая пользовательский ввод, мы можем обработать результат команды и интерпретировать его в монаде Id :

01
02
03
04
05
06
07
08
09
10
11
12
val handledCommand = registerUserCommand("[email protected]", "1234")
   .handleEvents(modelUpdate, eventListeners)
  
val result: Either[String, Unit] = handledCommand.run[Id](new (Action ~> Id) {
   override def apply[A](fa: Action[A]) = fa match {
      case FindUserByEmail(email) => println(s"Find user by email: $email"); None
      case WriteUser(u) => println(s"Write user $u")
      case FindApiKeyByUserId(id) => println(s"Find api key by user id: $id"); None
      case WriteApiKey(ak) => println(s"Write api key: $ak")
      case SendEmail(to, body) => println(s"Send email to $to, body: $body")
   }
}, e => println("Store event: " + e))

Id отлично подходит для тестирования, в реальной жизни вы будете интерпретировать действия, используя, например, Future или Task и записывать результаты в базу данных. После выполнения вы увидите список действий, выполняемых программой.

Вспомогательный код

Код для ES в основном является расширением интерпретации свободной монады. Полный (довольно короткий) источник доступен в гисте , наряду с примером выше. Он использует кошек для монады и естественных преобразований абстракций.

Подводя итоги

Чтобы сделать это пригодным для использования в реальном сценарии, пара функций отсутствует. Прежде всего, нам нужно было бы добавить поддержку объединения нескольких различных типов событий / действий. Во-вторых, нам нужно было бы предоставить удобный способ обогащения событий, например, для захвата метки времени события, более широкого контекста (ip / id пользователя) и т. Д.