Статьи

Async Akka-Persistence-HBase Journal

Поскольку у меня было немного времени на выходных, я решил сделать что-то полезное для Акки. В последнее время я в основном работал с HBase и Scalding, поэтому выбрать новые API Akka Persistence и реализовать плагин для HBase для него было отличной идеей на один день или два.

Akka Persistence  — это, по сути,  библиотека, основанная на  событиях,  только недавно было объявлено, что «источники событий становятся аккой-постоянством ». Как следует из оригинального названия — это по сути набор инструментов поверх Akka, помогающий создавать приложения на основе источников событий. Одним из наиболее типичных элементов таких приложений является «некий актер, который получает сообщения и может воспроизводить их в случае сбоя или если нам придется перезапустить систему». Эта часть API очень хорошо описана в  новых документах об akka-persistence . Мы постараемся сосредоточиться на этом посте в блоге: «  Процессорыпостоянные сообщения  и  журналы» .

Краткое примечание:  API все еще помечены как экспериментальные, поэтому они могут измениться. Но из того, что я видел в настоящее время, они уже кажутся действительно хорошими, поэтому я не ожидаю слишком больших изменений со временем.

Чтобы представить их в двух словах:  процессоры  являются субъектами с отслеживанием состояния, отправленные им сообщения ( постоянные сообщения  — поэтому сообщения, заключенные в   конверт Persistent () ) содержат как полезную нагрузку («фактическое сообщение»), так и порядковый номер. Порядковый номер важен для последующего воспроизведения сообщений. Постоянное сообщение сохраняется в  журнале  до того, как оно достигает метода получения процессоров — это гарантирует, что мы сохраняем все сообщения, которые получает актер, даже если система разрушается «в процессе обработки этого сообщения». Благодаря хранению сообщений в постоянном журнале мы можем раскрутить нового актера с тем же  идентификатором процессора и прежде чем возобновить получение новых сообщений, система «воспроизведет» предыдущие сообщения на него. После завершения воспроизведения (или «восстановления») он продолжит получать новые сообщения, как обычно.

Это общая идея — есть еще кое- что, например, «снимки» (которые сокращают время восстановления), но на сегодня давайте не будем о них говорить (а я еще не дошел до их реализации в  akka-persistence-hbase ).

Журналом по умолчанию, используемым akka, является  LevelDB , очень быстрая локальная база данных, разработанная Google (для Chrome, я полагаю?) Некоторое время назад. Как бы то ни было, вам, вероятно, понадобится распределенный магазин для поддержки вашего приложения Akka. Таким образом,  ktoso / akka-persistence-hbase   (это моё) или krasserm / akka-persistence-cassandra   (разработано Мартином, который работает над Akka / Eventsourced), если вы работаете с фан-базой cassandra.

Используя эти «плагины хранилища», вы можете создавать резервные копии вашего постоянного журнала в зависимости от того, какое хранилище данных вы предпочитаете. Реализация этих программ довольно проста с точки зрения API, поскольку вам нужно реализовать всего несколько методов  , расширяя  AsyncWriteJournal :

def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit]
def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit]
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit]
// and "replay":
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)
              (replayCallback: (PersistentRepr) => Unit): Future[Long]
Где первые три — это «основы», а четвертая дает нам «функцию воспроизведения». Стоит отметить, что эти API разработаны с учетом 
полностью асинхронных реализаций
 (таких как оба связанных примера). То есть даже при переходе в базу данных — вы никогда не блокируете. Работа с Promises и Futures в Scala настолько элегантна, что не представляет проблем для Scala, хотя вы должны выбрать правильный драйвер асинхронной базы данных. К счастью, есть такое значение для HBase — хотя это и не «официальный Java API» (он работает непосредственно на уровне RPC) — оно называется 
OpenTSDB / asynchbase
.

Например, вот как вы можете реализовать   метод asyncWrite :

  override def writeAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = {
    log.debug(s"Write async for ${persistentBatch.size} presistent messages")
    val futures = persistentBatch map { p =>
      import p._
      executePut(
        RowKey(processorId, sequenceNr).toBytes,
        Array(ProcessorId,          SequenceNr,          Marker,                  Message),
        Array(toBytes(processorId), toBytes(sequenceNr), toBytes(AcceptedMarker), persistentToBytes(p))
      )
    }
    Future.sequence(futures)
  }
  protected def executePut(key: Array[Byte], qualifiers: Array[Array[Byte]], values: Array[Array[Byte]]): Future[Unit] = {
    val request = new PutRequest(TableBytes, key, Family, qualifiers, values)
    client.put(request) // implicitly converted from Deferred => Future[Unit]
  }

Если вам интересно, или …  застряли с базой данных, в которой нет асинхронного драйвера , вы можете реализовать ту же функциональность, расширив SyncWriteJournal, который имеет те же методы, но с разделами «async» и «Future [_]». из сигнатур метода… ? Сначала я также реализовал  HBaseSyncWriteJournal , если вам интересно, вы можете проверить это на github.

Как только вы это получите, остальное — только оптимизация производительности и  избежание горячей области  (здесь хорошее объяснение проблемы — « точка доступа к регион-серверу »), которую я в настоящее время реализовал, используя «засоление области» (я префикс строки,   где .n % partitionCountpartitionCount >= regionCount

В целом, в  текущей версии  от  0.2  включает в  себя полностью асинхронной реализации журнала Hbase , и я буду улучшить производительность воспроизведения слишком скоро — вы очень приветствуем , чтобы посмотреть и дать ему спину (это на Maven центральный)!

PS: я думаю, мне придется урезать этот пост в блоге, получив немного слишком много слов, слишком мало кода…