Поскольку у меня было немного времени на выходных, я решил сделать что-то полезное для Акки. В последнее время я в основном работал с HBase и Scalding, поэтому выбрать новые API Akka Persistence и реализовать плагин для HBase для него было отличной идеей на один день или два.
Akka Persistence — это, по сути, библиотека, основанная на событиях, только недавно было объявлено, что «источники событий становятся аккой-постоянством ». Как следует из оригинального названия — это по сути набор инструментов поверх Akka, помогающий создавать приложения на основе источников событий. Одним из наиболее типичных элементов таких приложений является «некий актер, который получает сообщения и может воспроизводить их в случае сбоя или если нам придется перезапустить систему». Эта часть API очень хорошо описана в новых документах об akka-persistence . Мы постараемся сосредоточиться на этом посте в блоге: « Процессоры , постоянные сообщения и журналы» .
Краткое примечание: API все еще помечены как экспериментальные, поэтому они могут измениться. Но из того, что я видел в настоящее время, они уже кажутся действительно хорошими, поэтому я не ожидаю слишком больших изменений со временем.
Чтобы представить их в двух словах: процессоры являются субъектами с отслеживанием состояния, отправленные им сообщения ( постоянные сообщения — поэтому сообщения, заключенные в конверт Persistent () ) содержат как полезную нагрузку («фактическое сообщение»), так и порядковый номер. Порядковый номер важен для последующего воспроизведения сообщений. Постоянное сообщение сохраняется в журнале до того, как оно достигает метода получения процессоров — это гарантирует, что мы сохраняем все сообщения, которые получает актер, даже если система разрушается «в процессе обработки этого сообщения». Благодаря хранению сообщений в постоянном журнале мы можем раскрутить нового актера с тем же идентификатором процессора и прежде чем возобновить получение новых сообщений, система «воспроизведет» предыдущие сообщения на него. После завершения воспроизведения (или «восстановления») он продолжит получать новые сообщения, как обычно.
Это общая идея — есть еще кое- что, например, «снимки» (которые сокращают время восстановления), но на сегодня давайте не будем о них говорить (а я еще не дошел до их реализации в akka-persistence-hbase ).
Журналом по умолчанию, используемым akka, является LevelDB , очень быстрая локальная база данных, разработанная Google (для Chrome, я полагаю?) Некоторое время назад. Как бы то ни было, вам, вероятно, понадобится распределенный магазин для поддержки вашего приложения Akka. Таким образом, ktoso / akka-persistence-hbase (это моё) или krasserm / akka-persistence-cassandra (разработано Мартином, который работает над Akka / Eventsourced), если вы работаете с фан-базой cassandra.
Используя эти «плагины хранилища», вы можете создавать резервные копии вашего постоянного журнала в зависимости от того, какое хранилище данных вы предпочитаете. Реализация этих программ довольно проста с точки зрения API, поскольку вам нужно реализовать всего несколько методов , расширяя AsyncWriteJournal :
def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] // and "replay": def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long) (replayCallback: (PersistentRepr) => Unit): Future[Long]
полностью асинхронных реализаций
(таких как оба связанных примера). То есть даже при переходе в базу данных — вы никогда не блокируете. Работа с Promises и Futures в Scala настолько элегантна, что не представляет проблем для Scala, хотя вы должны выбрать правильный драйвер асинхронной базы данных. К счастью, есть такое значение для HBase — хотя это и не «официальный Java API» (он работает непосредственно на уровне RPC) — оно называется
OpenTSDB / asynchbase
.
Например, вот как вы можете реализовать метод asyncWrite :
override def writeAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = { log.debug(s"Write async for ${persistentBatch.size} presistent messages") val futures = persistentBatch map { p => import p._ executePut( RowKey(processorId, sequenceNr).toBytes, Array(ProcessorId, SequenceNr, Marker, Message), Array(toBytes(processorId), toBytes(sequenceNr), toBytes(AcceptedMarker), persistentToBytes(p)) ) } Future.sequence(futures) } protected def executePut(key: Array[Byte], qualifiers: Array[Array[Byte]], values: Array[Array[Byte]]): Future[Unit] = { val request = new PutRequest(TableBytes, key, Family, qualifiers, values) client.put(request) // implicitly converted from Deferred => Future[Unit] }
Если вам интересно, или … застряли с базой данных, в которой нет асинхронного драйвера , вы можете реализовать ту же функциональность, расширив SyncWriteJournal, который имеет те же методы, но с разделами «async» и «Future [_]». из сигнатур метода… ? Сначала я также реализовал HBaseSyncWriteJournal , если вам интересно, вы можете проверить это на github.
Как только вы это получите, остальное — только оптимизация производительности и избежание горячей области (здесь хорошее объяснение проблемы — « точка доступа к регион-серверу »), которую я в настоящее время реализовал, используя «засоление области» (я префикс строки, где .n % partitionCount
partitionCount >= regionCount
В целом, в текущей версии от 0.2 включает в себя полностью асинхронной реализации журнала Hbase , и я буду улучшить производительность воспроизведения слишком скоро — вы очень приветствуем , чтобы посмотреть и дать ему спину (это на Maven центральный)!
PS: я думаю, мне придется урезать этот пост в блоге, получив немного слишком много слов, слишком мало кода…