Статьи

Реактивный доступ к базе данных — часть 2 — актеры

Мы очень рады продолжить серию гостевых постов в блоге jOOQ Мануэля Бернхардта . В этой серии блогов Мануэль расскажет о мотивах так называемых реактивных технологий и после представления концепций фьючерсов и актеров использует их для доступа к реляционной базе данных в сочетании с jOOQ.

Мануэл-Бернхард Мануэль Бернхардт (Manuel Bernhardt ) — независимый консультант по программному обеспечению со страстью к созданию веб-систем, как внутренних, так и внешних. Он является автором «Реактивных веб-приложений» (Мэннинг) и начал работать со Scala, Akka и Play Framework в 2010 году, проведя много времени с Java. Он живет в Вене, где он является соорганизатором местной группы пользователей Scala . Он с энтузиазмом относится к технологиям, основанным на Scala, и к активному сообществу, и ищет способы распространения их использования в отрасли. С 6 лет он также занимается подводным плаванием и не может привыкнуть к отсутствию моря в Австрии.

Эта серия разделена на три части, которые мы опубликуем в следующем месяце:

Вступление

В нашем последнем посте мы представили концепцию реактивных приложений , объяснили преимущества асинхронного программирования и представили Futures, инструмент для выражения и манипулирования асинхронными значениями.

В этом посте мы рассмотрим другой инструмент для построения асинхронных программ, основанных на концепции коммуникаций, управляемых сообщениями: актеры.

Модель параллелизма на основе акторов была популяризирована языком программирования Erlang, а ее наиболее популярной реализацией на JVM является инструментарий параллелизма Akka .

С одной стороны, модель актера является объектно-ориентированной, сделанной «правильно»: состояние актера может быть изменчивым, но оно никогда не подвергается непосредственному воздействию внешнего мира. Вместо этого субъекты взаимодействуют друг с другом на основе асинхронной передачи сообщений, в которой сами сообщения являются неизменными. Актер может сделать только одну из трех вещей:

  • отправлять и получать любое количество сообщений
  • изменить свое поведение или состояние в ответ на поступающее сообщение
  • начать новые детские актеры

Актер всегда должен решить, в каком состоянии он готов поделиться, а когда его мутировать. Таким образом, эта модель значительно облегчает нам, людям, создание параллельных программ, которые не пронизаны условиями гонки или взаимоблокировками, которые мы могли ввести, случайно прочитав или записав устаревшее состояние, или используя блокировки как средство избежать последнего.

Далее мы увидим, как работают актеры и как их сочетать с фьючерсами.

Основы актера

Актеры — это легкие объекты, которые общаются друг с другом, отправляя и получая сообщения. У каждого участника есть почтовый ящик, в котором входящие сообщения помещаются в очередь до их обработки.

687474703a2f2f6d616e75656c2e6265726e68617264742e696f2f77702d636f6e74656e742f434830362d4163746f72732e706e67

Актеры имеют разные состояния: их можно запускать, возобновлять, останавливать и перезапускать. Возобновление или перезапуск актера полезен, когда актер падает, как мы увидим позже.

Актеры также имеют ссылку на актера, которая позволяет одному актеру связаться с другим. Как и номер телефона, ссылка на актера является указателем на актера, и если бы актер должен был быть перезапущен и заменен новым воплощением в случае сбоя, это не имело бы никакого значения для других актеров, пытающихся отправить ему сообщения, так как только То, что они знают об актере, это его ссылка, а не личность одного конкретного воплощения.

Отправка и получение сообщений

Давайте начнем с создания простого актера:

1
2
3
4
5
6
7
import akka.actor._
 
class Luke extends Actor {
  def receive = {
    case _ => // do nothing
  }
}

Это действительно все, что нужно для создания актера. Но это не очень интересно. Давайте немного оживим и определим реакцию на данное сообщение:

01
02
03
04
05
06
07
08
09
10
import akka.actor._
 
case object RevelationOfFathership
 
class Luke extends Actor {
  def receive = {
    case RevelationOfFathership =>
      System.err.println("Noooooooooo")
  }
}

Вот так! RevelationOfFathership — это объект case, т.е. неизменяемое сообщение. Эта последняя деталь довольно важна: ваши сообщения всегда должны быть автономными и не ссылаться на внутреннее состояние какого-либо субъекта, поскольку это фактически утечет это состояние наружу, что нарушает гарантию того, что только субъект может изменить свое внутреннее состояние. Этот последний бит имеет первостепенное значение для актеров, чтобы предложить лучшую, более дружественную для человека модель параллелизма и не получать никаких сюрпризов.

Теперь, когда Люк знает, как правильно реагировать на неудобную правду о том, что Темный Вейдер — его отец, все, что нам нужно, — это сам Темный Лорд.

01
02
03
04
05
06
07
08
09
10
11
import akka.actor._
 
class Vader extends Actor {
 
  override def preStart(): Unit =
    context.actorSelection("akka://application/user/luke") ! RevelationOfFathership
 
  def receive = {
    case _ => // ...
  }
}

Актер Vader использует preStart жизненного цикла preStart , чтобы инициировать отправку сообщения его сыну, когда он запускается. Мы используем контекст актера, чтобы отправить сообщение Люку.

687474703a2f2f6d616e75656c2e6265726e68617264742e696f2f77702d636f6e74656e742f434830362d4163746f72496e74726f2e706e67

Вся последовательность запуска этого примера будет выглядеть следующим образом:

1
2
3
4
5
import akka.actor._
 
val system = ActorSystem("application")
val luke = system.actorOf(Props[Luke], name = "luke")
val vader = system.actorOf(Props[Vader], name = "vader")

Props — это способ описать, как получить экземпляр актера. Поскольку они являются неизменяемыми, они могут свободно использоваться, например, для разных JVM, работающих на разных машинах (это полезно, например, при работе с кластером Akka).

Актерский надзор

Актеры не просто существуют в дикой природе, но вместо этого являются частью иерархии акторов, и у каждого актера есть родитель. Создатели, которых мы создаем, контролируются User Guardian ActorSystem приложения, который является специальным актором, предоставленным Akka и ответственным за контроль всех действующих лиц в пространстве пользователя. Роль контролирующего актера заключается в том, чтобы решить, как справиться с неудачей дочернего актера, и действовать соответственно.

Сам User Guardian контролируется Root Guardian (который также контролирует другого специального актера, внутреннего по отношению к Akka), и сам контролируется специальным справочным актером. Легенда гласит, что эта ссылка существовала до того, как появились все другие ссылки на актеров, и называется «тот, кто бродит по пузырям пространства-времени» (если вы мне не верите, проверьте официальную документацию Akka ).

Организация участников в иерархиях предлагает преимущество кодирования обработки ошибок прямо в иерархии. Каждый родитель несет ответственность за действия своих детей. Если что-то пойдет не так и ребенок потерпит крах, у родителя будет возможность перезапустить это.

Вейдер, например, имеет несколько штурмовиков:

1
2
3
4
5
6
7
8
9
import akka.actor._
import akka.routing._
 
class Vader extends Actor {
 
  val troopers: ActorRef = context.actorOf(
    RoundRobinPool(8).props(Props[StromTrooper])
  )
}

RoundRobinPool — это средство выражения того факта, что сообщения, отправляемые troopers будут отправляться каждому ребенку отряда один за другим. Маршрутизаторы кодируют стратегии для отправки сообщений нескольким субъектам одновременно, Akka предоставляет множество предопределенных маршрутизаторов.

687474703a2f2f6d616e75656c2e6265726e68617264742e696f2f77702d636f6e74656e742f4b696c6c656453746f726d74726f6f706572732e6a7067

В конечном счете, актеры могут потерпеть крах, и тогда надзор за работой должен решить руководитель. Механизм принятия решений представлен так называемой стратегией надзора . Например, Вейдер может решить повторить попытку перезапуска штурмовика 3 раза, прежде чем бросить и остановить его:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
import akka.actor._
 
class Vader extends Actor {
 
  val troopers: ActorRef = context.actorOf(
    RoundRobinPool(8).props(Props[StromTrooper])
  )
 
  override def supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 3) {
      case t: Throwable =>
        log.error("StormTrooper down!", t)
        SupervisorStrategy.Restart
    }
}

Эта стратегия надзора довольно грубая, поскольку она одинаково работает со всеми типами Throwable . В следующем посте мы увидим, что стратегии надзора являются эффективным способом реагирования на различные типы сбоев различными способами.

Сочетание фьючерсов и актеров

Существует одно золотое правило работы с актерами: вы не должны выполнять какие-либо операции блокировки, такие как, например, блокирующий сетевой вызов. Причина проста: если субъект блокируется, он не может обрабатывать входящие сообщения, что может привести к заполнению полного почтового ящика (или, скорее, поскольку используемый по умолчанию почтовый ящик, используемый субъектами, неограничен, к OutOfMemoryException .

Вот почему может быть полезно иметь возможность использовать фьючерсы в актерах. Шаблон конвейера предназначен для этого: он отправляет результат Future актеру:

01
02
03
04
05
06
07
08
09
10
11
12
13
import akka.actor._
import akka.pattern.pipe
 
class Luke extends Actor {
  def receive = {
    case RevelationOfFathership =>
      sendTweet("Nooooooo") pipeTo self
    case tsr: TweetSendingResult =>
      // ...
  }
 
  def sendTweet(msg: String): Future[TweetSendingResult] = ...
}

В этом примере мы вызываем sendTweet Future после получения RevelationOfFathership и используем метод pipeTo чтобы указать, что мы хотим, чтобы результат Future был отправлен нам самим.

В приведенном выше коде есть только одна проблема: если в будущем произойдет сбой, мы получим сбойный объект в довольно неудобном формате, завернутый в сообщение типа akka.actor.Status.Failure , без какого-либо полезного контекста. Вот почему может быть более целесообразно восстанавливать сбои перед передачей результата:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
import akka.actor._
import akka.pattern.pipe
import scala.control.NonFatal
 
class Luke extends Actor {
  def receive = {
    case RevelationOfFathership =>
      val message = "Nooooooo"
      sendTweet(message) recover {
        case NonFatal(t) => TweetSendFailure(message, t)
      } pipeTo self
    case tsr: TweetSendingResult =>
      // ...
    case tsf: TweetSendingFailure =>
      // ...
  }
 
  def sendTweet(msg: String): Future[TweetSendingResult] = ...
}

Благодаря этой обработке ошибок мы теперь знаем, какое сообщение не удалось отправить в Twitter, и можем предпринять соответствующее действие (например, повторить попытку отправки).

Вот и все для этого короткого введения в актеры. В следующем и последнем посте этой серии мы увидим, как использовать Futures и Actors в комбинации для реактивного доступа к базе данных.

Читать дальше

Следите за обновлениями, так как вскоре мы опубликуем Часть 3 как часть этой серии: