Статьи

Divided We Win: источник событий / CQRS, предполагающий разделение моделей записи и чтения.

В сегодняшнем посте мы расскажем о некоторых очень интересных (на мой взгляд) архитектурных стилях: выделение событий и разделение ответственности по командным запросам ( CQRS ). По сути, в обоих из них события лежат в основе проектирования системы и отражают любые изменения состояния, которые происходят. Он сильно отличается от традиционной архитектуры CRUD, где обычно хранится только последнее известное состояние, при этом любой исторический фон фактически теряется.

Еще одна привлекательная опция, которую CQRS вносит в таблицу, — это естественное разделение модели записи (изменение состояния, инициируемое командами и результатом событий), и модели чтения (запрос, в большинстве случаев, последнего известного состояния). Это дает вам большую свободу в выборе правильного хранилища данных (или хранилищ) для обслуживания различных вариантов использования приложений и требований. Но, как всегда, нет бесплатного обеда: цена, которую нужно заплатить, — это повышенная сложность системы.

Чтобы публикация была достаточно короткой, все обсуждение разделено на две части: команды и события (эта) и запросы ( готовится к публикации). Используя в качестве примера очень простую модель пользовательского объекта, мы собираемся разработать приложение, которое использует архитектуру CQRS . На платформе JVM доступно несколько библиотек, но мы собираемся использовать Akka , точнее два его недавно добавленных компонента Akka Persistence и Akka HTTP . Пожалуйста, не беспокойтесь, если Akka является для вас чем-то новым, примеры будут действительно простыми и (надеюсь) простыми для подражания. Итак, давайте начнем!

Как мы упоминали ранее, изменения в нашей системе происходят в результате выполнения команды и могут привести к нулю или большему количеству событий. Давайте смоделируем это, определив две черты:

1
2
trait Event
trait Command

В свою очередь, эти события могут вызывать изменения состояния внутри сущностей приложения, поэтому давайте смоделируем это с общей чертой:

1
2
3
trait State[T] {
  def updateState(event: Event): State[T]
}

Пока все просто. Теперь, когда мы моделируем пользователя, UserAggregate будет составлять нашу модель и отвечать за применение обновлений для конкретного пользователя , идентифицируемого по его идентификатору . Кроме того, у каждого пользователя будет свойство электронной почты, которое может быть изменено после получения команды UserEmailUpdate и может привести к тому, что в результате будет создано событие UserEmailUpdated . Следующий фрагмент кода определяет все это в терминах состояния , команды и события в сопутствующем объекте UserAggregate .

01
02
03
04
05
06
07
08
09
10
object UserAggregate {
  case class User(id: String, email: String = "") extends State[User] {
    override def updateState(event: Event): State[User] = event match {
      case UserEmailUpdated(id, email) => copy(email = email)
    }
  }
 
  case class UserEmailUpdate(email: String) extends Command
  case class UserEmailUpdated(id: String, email: String) extends Event
}

Для читателей, знакомых с CQRS и источником событий , этот пример может показаться очень наивным, но я думаю, что он достаточно хорош для понимания основ.

Определив наши основополагающие блоки, настало время взглянуть на самое интересное: принимать команды, преобразовывать их в постоянные события и применять изменения состояния — все это входит в обязанности UserAggregate . В контексте CQRS и источников событий сохранение событий является важнейшей возможностью системы. События являются единственным источником истины, и состояние любого отдельного объекта можно восстановить в любой момент времени путем воспроизведения всех относящихся к нему событий. Это момент, когда Akka Persistance выходит на сцену.

Чтобы сделать шаг назад, Akka — великолепная библиотека (или даже инструментарий) для построения распределенных систем на основе актерской модели . Кроме того, Akka Persistence обогащает актеров возможностями персистентности, позволяя им сохранять свои сообщения (или события) в долговременном журнале. Можно сказать, что журнал может стать очень, очень большим, и воспроизведение всех событий для восстановления состояния может занять много времени. Это допустимая точка, поэтому Akka Persistence также добавляет возможность хранить постоянные снимки в долговременном хранилище. Это значительно ускоряет процесс, поскольку должны воспроизводиться только события, произошедшие с момента последнего снимка. По умолчанию LevelDB — это механизм длительного хранения, используемый Akka Persistence, но это подключаемая функция со многими доступными альтернативами.

Теперь давайте посмотрим на субъекта UserAggregate, который будет отвечать за управление сущностью пользователя .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class UserAggregate(id: String) extends PersistentActor with ActorLogging {
  import UserAggregate._
 
  override def persistenceId = id
  var state: State[User] = User(id)
 
  def updateState(event: Event): Unit = {
    state = state.updateState(event)
  }
   
  val receiveCommand: Receive = {
    case UserEmailUpdate(email) => {
      persist(UserEmailUpdated(id, email)) { event =>
        updateState(event)
        sender ! Acknowledged(id)
      }
    }
  }
 
  override def receiveRecover: Receive = {
    case event: Event => updateState(event)
    case SnapshotOffer(_, snapshot: User) => state = snapshot
  }
}

Пока это самая сложная часть, поэтому давайте разберем ключевые части. Прежде всего, UserAggregate расширяет PersistentActor , который добавляет ему постоянные возможности. Во-вторых, каждый постоянный субъект должен иметь уникальный persistenceId : он используется в качестве идентификатора в журнале событий и хранилище снимков. И, наконец, в отличие от обычных акторов Akka , постоянные акторы имеют две точки входа: receiveCommand для обработки команд и receiveRecover для воспроизведения событий.

Возвращаясь к нашему примеру, как только UserAggregate получает команду UserEmailUpdate , он в первую очередь сохраняет событие UserEmailUpdated в журнале с помощью вызова persist (…) , а затем обновляет состояние агрегата с помощью вызова updateState (…) , отвечая с подтверждением отправителю. Чтобы увидеть полный пример в действии, давайте создадим простую конечную точку REST, используя другой замечательный проект из семейства Akka , Akka HTTP , вышедший из потрясающей инфраструктуры Spray .

На данный момент мы собираемся определить простой маршрут для обработки запроса PUT в / api / v1 / users / {id} , где {id} по сути является заполнителем для идентификатора пользователя. Он будет принимать один закодированный параметр электронной почты для обновления адреса электронной почты пользователя.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
object UserRoute {
  import scala.concurrent.ExecutionContext.Implicits.global
  import scala.language.postfixOps
   
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val timeout: Timeout = 5 seconds
 
  val route = {
    logRequestResult("eventsourcing-example") {
      pathPrefix("api" / "v1" / "users") {
        path(LongNumber) { id =>
          (put & formFields('email.as[String])) { email =>
            complete {
              system
                .actorSelection(s"user/user-$id")
                .resolveOne
                .recover {
                  case _: ActorNotFound =>
                    system.actorOf(Props(new UserAggregate(id.toString)), s"user-$id")
                }
                .map {                
                  _ ? UserEmailUpdate(email) map {
                    case Acknowledged(_) =>
                      HttpResponse(status = OK, entity = "Email updated: " + email)
                    case Error(_, message) =>
                      HttpResponse(status = Conflict, entity = message)
                  }
                }
            }
          }
        }
      }
    }
  }
}

Единственный пропущенный фрагмент — это исполняемый класс для подключения обработчика для этой конечной точки REST, поэтому давайте определим его, благодаря Akka HTTP это тривиально:

1
2
3
object Boot extends App with DefaultJsonProtocol {
  Http().bindAndHandle(route, "localhost", 38080)
}

Ничто не дает большей уверенности в том, что все действительно работает, кроме простых и удобочитаемых тестовых примеров, построенных с использованием инфраструктуры ScalaTest, дополненной Akka HTTP TestKit .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
class UserRouteSpec extends FlatSpec
      with ScalatestRouteTest
      with Matchers
      with BeforeAndAfterAll {
  import com.example.domain.user.UserRoute
   
  implicit def executionContext = scala.concurrent.ExecutionContext.Implicits.global
 
  "UserRoute" should "return success on email update" in {
    Put("http://localhost:38080/api/v1/users/123", FormData("email" ->  "[email protected]")) ~> UserRoute.route ~> check {
      response.status shouldBe StatusCodes.OK
      responseAs[String] shouldBe "Email updated: [email protected]"
    }
  }
}

Наконец, давайте запустим полный пример и используем curl из командной строки, чтобы выполнить реальный вызов конечной точки REST , заставив все части приложения работать вместе.

1
2
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d [email protected]
Email updated: a@b.com

Ницца! Результаты соответствуют нашим ожиданиям. Прежде чем мы закончим с этой частью, есть еще одна вещь: обратите внимание, что при перезапуске приложения будет восстановлено состояние для пользователей, для которых агрегаты уже существуют. Иначе говоря, используя особенности Akka Persistence , если журнал событий имеет события / снимки, связанные с persistenceId актера (вот почему он должен быть уникальным и постоянным), процесс восстановления происходит путем применения более свежего снимка (если есть) и воспроизведения событий. ,

В этом посте мы посмотрели за кулисы источников событий и CQRS , охватывая только часть записи (или команды) архитектуры. Есть много вещей, над которыми нужно работать, например, проверка уникальности электронной почты, извлечение последнего состояния пользователя…, которые представляют поток чтения (или запроса) приложения и будут обсуждаться в последующем сообщении в блоге. На данный момент, я надеюсь, что источники событий и CQRS немного демистифицированы и могут стать частью вашего будущего или существующих приложений.

Окончательный отказ от ответственности: Akka HTTP помечен как экспериментальный компонент на данный момент (тем не менее, основные цели остаются неизменными, API может немного измениться), в то время как Akka Persistance только что вышла из экспериментального статуса с недавним выпуском Akka 2.4.0-RC1 . Пожалуйста, знайте об этом.

Полный проект доступен на GitHub . Большое спасибо Regis Leray и Esfandiar Amirrahimi , двум замечательным разработчикам, за помощь в этой серии постов в блоге.