Статьи

Divided We Win: источник событий / CQRS, предполагаемый при разделении моделей записи и чтения

Некоторое время назад мы начали исследовать архитектуру разделения ответственности командных запросов (CQRS) как альтернативный способ разработки распределенных систем. В прошлый раз мы рассмотрели только команды и события, но не запросы. Цель этого сообщения в блоге — заполнить пробел и обсудить способы обработки запросов в соответствии с архитектурой CQRS .

Мы начнем с того места, где остановились в прошлый раз, с примером приложения, которое могло обрабатывать команды и сохранять события в журнале. Для поддержки пути чтения или запросов мы собираемся ввести хранилище данных. Для простоты пусть это будет база данных H2 в памяти. Уровень доступа к данным будет обрабатываться потрясающей библиотекой Slick .

Для начала нам нужно создать простую модель данных для класса User , управляемую постоянным субъектом UserAggregate . В этом отношении класс Users является типичным отображением реляционной таблицы:

1
2
3
4
5
6
class Users(tag: Tag) extends Table[User](tag, "USERS") {
  def id = column[String]("ID", O.PrimaryKey)
  def email = column[String]("EMAIL", O.Length(512))
  def uniqueEmail = index("USERS_EMAIL_IDX", email, true)
  def * = (id, email) <> (User.tupled, User.unapply)
}

На этом этапе важно отметить, что мы применяем ограничение уникальности электронной почты пользователя. Мы вернемся к этой тонкой детали позже, во время интеграции с постоянным действующим лицом UserAggregate . Следующее, что нам нужно, это сервис для управления доступом к хранилищу данных, а именно сохранение и запрос пользователей . Поскольку мы находимся во вселенной Акка , очевидно, что она тоже будет актером. Вот:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
case class CreateSchema()
case class FindUserByEmail(email: String)
case class UpdateUser(id: String, email: String)
case class FindAllUsers()
 
trait Persistence {
  val users = TableQuery[Users] 
  val db = Database.forConfig("db")
}
 
class PersistenceService extends Actor with ActorLogging with Persistence {
  import scala.concurrent.ExecutionContext.Implicits.global
    
  def receive = {
    case CreateSchema => db.run(DBIO.seq(users.schema.create))
       
    case UpdateUser(id, email) => {
      val query = for { user <- users if user.id === id } yield user.email
      db.run(users.insertOrUpdate(User(id, email)))
    }
     
    case FindUserByEmail(email) => {
      val replyTo = sender
      db.run(users.filter( _.email === email.toLowerCase).result.headOption)
        .onComplete { replyTo ! _ }
    }
     
    case FindAllUsers => {
      val replyTo = sender
      db.run(users.result) onComplete { replyTo ! _ }
    }
  }
}

Пожалуйста, обратите внимание, что PersistenceService является обычным нетипичным актером Akka , а не постоянным. Чтобы все было сосредоточено, мы собираемся поддерживать только четыре вида сообщений:

  • CreateSchema для инициализации схемы базы данных
  • UpdateUser для обновления адреса электронной почты пользователя
  • FindUserByEmail для запроса пользователя по его адресу электронной почты
  • FindAllUsers для запроса всех пользователей в хранилище данных

Хорошо, сервисы хранилища данных готовы, но на самом деле ничто не наполняет их данными. Переходя к следующему шагу, мы проведем рефакторинг UserAggregate , точнее то, как он обрабатывает команду UserEmailUpdate . В его текущей реализации обновление электронной почты пользователя происходит безоговорочно. Но помните, мы наложили ограничения уникальности на электронные письма, поэтому мы собираемся изменить командную логику, чтобы учесть это: перед фактическим выполнением обновления мы запустим запрос к модели чтения (хранилище данных), чтобы удостовериться, что ни один пользователь, чье электронное письмо уже зарегистрировано ,

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
val receiveCommand: Receive = {
  case UserEmailUpdate(email) =>
    try {
      val future = (persistence ? FindUserByEmail(email)).mapTo[Try[Option[User]]]
      val result = Await.result(future, timeout.duration) match {
        case Failure(ex) => Error(id, ex.getMessage)
        case Success(Some(user)) if user.id != id => Error(id, s"Email '$email' already registered")
        case _ => persist(UserEmailUpdated(id, email)) { event =>
          updateState(event)
          persistence ! UpdateUser(id, email)
        }
        Acknowledged(id)
      }
         
      sender ! result
  } catch {
    case ex: Exception if NonFatal(ex) => sender ! Error(id, ex.getMessage)
  }
}

Довольно просто, но не совсем идиоматично: Await.result не выглядит так, как принадлежит этому коду. Моей первой попыткой было использование будущего конвейера / map / recovery / pipeTo, чтобы сохранить поток полностью асинхронным. Однако побочный эффект, который я наблюдал, заключается в том, что в этом случае блок persist (UserEmailUpdated (id, email)) {event =>…} должен выполняться в другом потоке большую часть времени (если результат не готов), но это не очень вероятно из-за переключения контекста потока. Таким образом, Await.result был здесь на помощь.

Теперь каждый раз, когда происходит обновление электронной почты пользователя, наряду с сохранением события, мы собираемся записать этот факт и в хранилище данных. Хорошо, мы приближаемся на один шаг.

Последнее, что мы должны рассмотреть, это как заполнить хранилище данных из журнала событий? Здесь очень полезен еще один экспериментальный модуль из портфолио Akka Persistence , Akka Persistence Query . Среди других функций он предоставляет возможность запрашивать журнал событий по идентификаторам персистентности, и это то, что мы собираемся сделать, чтобы заполнить хранилище данных из журнала. Не удивительно, что за это отвечает актер UserJournal .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
case class InitSchema()
 
class UserJournal(persistence: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case InitSchema => {
      val journal = PersistenceQuery(context.system)
        .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
      val source = journal.currentPersistenceIds()
       
      implicit val materializer = ActorMaterializer()
      source
        .runForeach { persistenceId =>
          journal.currentEventsByPersistenceId(persistenceId, 0, Long.MaxValue)
            .runForeach { event =>
              event.event match {
                case UserEmailUpdated(id, email) => persistence ! UpdateUser(id, email)
              }
            }
        }
    }
  }
}

По сути, код достаточно прост, но давайте немного повторим, что он делает. Прежде всего, мы спрашиваем журнал обо всех идентификаторах постоянства, которые он имеет, используя метод currentPersistenceIds . Во-вторых, для каждого идентификатора постоянства мы запрашиваем все события из журнала. Поскольку в нашем случае существует только одно событие UserEmailUpdated , мы просто напрямую преобразуем его в сообщение UpdateUser службы хранилища данных.

Круто, мы в основном сделали! Самое простое в конце — добавить еще одну конечную точку в UserRoute, которая возвращает список существующих пользователей, запрашивая модель чтения.

01
02
03
04
05
06
07
08
09
10
11
12
pathEnd {
  get {
    complete {
      (persistence ? FindAllUsers).mapTo[Try[Vector[User]]] map {
        case Success(users) =>
          HttpResponse(status = OK, entity = users.toJson.compactPrint)
        case Failure(ex) =>
          HttpResponse(status = InternalServerError, entity = ex.getMessage)
      }
    }
  }
}

Мы готовы протестировать наше обновленное приложение-образец CQRS ! После того, как он будет запущен, убедитесь, что наш журнал и хранилище данных пусты.

1
2
$ curl -X GET http://localhost:38080/api/v1/users
[]

Имеет смысл, так как мы еще не создали ни одного пользователя. Давайте сделаем это, обновив двух пользователей с разными адресами электронной почты и снова запросив модель чтения.

01
02
03
04
05
06
07
08
09
10
11
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d [email protected]
Email updated: a@b.com
 
$ curl -X PUT http://localhost:38080/api/v1/users/124 -d [email protected]
Email updated: a@c.com
 
$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"[email protected]"},
  {"id":"124","email":"[email protected]"}
]

Как и ожидалось, на этот раз результат отличается, и мы видим, что возвращаются два пользователя. Момент истины, давайте попробуем обновить электронную почту пользователя со 124 до почты с 123 .

1
2
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d [email protected]
Email '[email protected]' already registered

Потрясающе, это то, что мы хотели! Модель чтения (или запроса) очень полезна и прекрасно работает. Обратите внимание, что когда мы перезапускаем приложение, хранилище прочитанных данных должно быть заново заполнено из журнала и возвращено ранее созданных пользователей:

1
2
3
4
5
$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"[email protected]"},
  {"id":"124","email":"[email protected]"}
]

Завершая этот пост, мы завершаем введение в архитектуру CQRS . Хотя можно сказать, что многие вещи остались за рамками, я надеюсь, что примеры, представленные в нашем путешествии, были полезны для иллюстрации идеи, и CQRS мог бы стать интересным вариантом для рассмотрения в следующих проектах.

  • Как всегда, полный исходный код доступен на GitHub .

Большое спасибо Regis Leray и Esfandiar Amirrahimi , двум замечательным разработчикам, за помощь в этой серии постов в блоге.