В течение последних двух постов на reactmq я описал , как написать реактивные , постоянные очереди сообщений. Очередь имеет следующие характеристики:
- существует один брокер, хранящий сообщения, потенциально с несколькими клиентами, отправляющими или получающими сообщения
- обеспечивает по крайней мере один раз доставку; получение сообщения блокирует сообщение; если он не будет удален в течение некоторого времени, он снова будет доступен для доставки
- отправка и получение сообщений являются реактивными благодаря akka-streams : обратное давление применяется, если какой-либо из компонентов не справляется с нагрузкой
- сообщения являются постоянными, используя akka-persistence : все события отправки / получения / удаления сохраняются в журнале, и при перезапуске системы состояние субъекта очереди воссоздается.
Хранилище событий может быть реплицировано с использованием реплицированной реализации журнала, например, Cassandra . Тем не менее, отсутствует один фрагмент: кластеризация самого брокера.
К счастью, akka-cluster поможет! Давайте посмотрим, как с его помощью мы можем убедиться, что в нашем кластере всегда работает брокер.
Настройка кластера
Прежде всего нам нужен работающий кластер. Нам понадобятся две новые зависимости в нашем build.sbt
файле , akka-cluster
и akka-contrib
, поскольку мы будем использовать некоторые добавленные расширения.
Во-вторых, нам нужно предоставить конфигурацию для ActorSystem
с включенной кластеризацией. Хорошо, чтобы было ясно, какова цель файла конфигурации, поэтому есть cluster-broker-template.conf
файл :
akka { actor.provider = "akka.cluster.ClusterActorRefProvider" remote.netty.tcp.port = 0 // should be overriden remote.netty.tcp.hostname = "127.0.0.1" cluster { seed-nodes = [ "akka.tcp://[email protected]:9171", "akka.tcp://[email protected]:9172", "akka.tcp://[email protected]:9173" ] auto-down-unreachable-after = 10s roles = [ "broker" ] role.broker.min-nr-of-members = 2 } extensions = [ "akka.contrib.pattern.ClusterReceptionistExtension" ] }
Проходя через настройки:
- нам нужно использовать
ClusterActorRefProvider
для связи с актерами на других узлах кластера - так как мы будем использовать один и тот же файл конфигурации для нескольких узлов (для локального тестирования), порт будет переопределен в коде. Помимо порта нам также нужно указать имя хоста, к которому будет привязана связь кластера.
- узлу нужен начальный список начальных узлов, с которыми он будет пытаться связаться при запуске, чтобы сформировать кластер
- мы используем автоотключение, которое вызывает объявление узлов
down
через 10 секунд. Это может привести к разделам , однако мы также указываем, что для работы кластерного фрагмента должно быть как минимум 2 живых узла (всего у нас будет 3 узла, поэтому с 2 мы защищены от разделов) - наконец, мы заявляем, что любые узлы кластера, запущенные с использованием файла конфигурации, будут играть
broker
роль и использовать расширение приемной стороны кластера — но об этом позже.
На самом деле запуск кластерной системы акторов теперь очень прост, см. BrokerManager
Класс :
class BrokerManager(clusterPort: Int) { // … val conf = ConfigFactory .parseString(s"akka.remote.netty.tcp.port=$clusterPort") .withFallback(ConfigFactory.load("cluster-broker-template")) val system = ActorSystem(s"broker", conf) // ... }
Чтобы указать порт, мы создаем конфигурацию из строки, в которой указана только часть порта, а для использования других настроек мы возвращаемся к конфигурации из файла шаблона.
Запуск узлов кластера — это всего лишь запуск трех простых приложений :
object ClusteredBroker1 extends App { new BrokerManager(9171).run() }
Кластер синглтон
Подготовив кластер, мы должны запустить брокер на одном из узлов. В любой момент должен быть запущен только один посредник, иначе очередь будет повреждена. Для этого идеально подходит расширение Cluster Singleton contrib.
Чтобы использовать расширение, нам нужно создать актера, который будет управляться одноэлементным расширением и запускаться только на одном узле кластера. Следовательно, мы создаем BrokerManagerActor
и теперь можем запустить синглтон :
def run() { // … system.actorOf(ClusterSingletonManager.props( singletonProps = Props(classOf[BrokerManagerActor], clusterPort), singletonName = "broker", terminationMessage = PoisonPill, role = Some("broker")), name = "broker-manager") } class BrokerManagerActor(clusterPort: Int) extends Actor { val sendServerAddress = new InetSocketAddress( "localhost", clusterPort + 10) val receiveServerAddress = new InetSocketAddress( "localhost", clusterPort + 20) override def preStart() = { super.preStart() new Broker(sendServerAddress, receiveServerAddress)(context.system) .run() } override def receive = { case _ => } }
В ClusterSingletonManager
свойствах мы указываем действующего субъекта, сообщение, которое можно использовать для его завершения, и роли кластера, на которых может исполняться субъект (узлы наших кластеров имеют только одну роль broker
).
Он BrokerManagerActor
берет порт (должен быть уникальным для каждого узла, если мы хотим запустить пару на локальном хосте) и создает на его основе адрес, по которому будет слушать сокет для новых клиентов очереди отправки сообщений, а другой — для очереди. прослушивающий сокет -message-receive-client.
Клиент кластера и регистратор: сторона кластера
Теперь у нас есть один брокер, работающий в кластере, но как клиенты могут узнать, каков адрес синглтона? Ну, мы можем просто попросить BrokerManagerActor
актера об этом! Это можно сделать простым обменом сообщениями:
case object GetBrokerAddresses case class BrokerAddresses(sendServerAddress: InetSocketAddress, receiveServerAddress: InetSocketAddress) class BrokerManagerActor(clusterPort: Int) extends Actor { // as above, plus: override def receive = { case GetBrokerAddresses => sender() ! BrokerAddresses( sendServerAddress, receiveServerAddress) } }
Однако остается одна проблема. Клиенты, которые хотят использовать нашу очередь сообщений, не обязательно должны быть членами кластера. Здесь может помочь расширение Cluster Client contrib.
На стороне узла кластера клиентское расширение предоставляет регистратор, с помощью которого могут регистрироваться актеры, которые хотят быть видимыми снаружи. И вот что мы BrokerManagerActor
делаем при запуске :
class BrokerManagerActor(clusterPort: Int) extends Actor { // … override def preStart() = { // ... ClusterReceptionistExtension(context.system) .registerService(self) } }
Клиент кластера и регистратор: клиентская сторона
Что касается самих клиентов, им также нужна некоторая конфигурация для связи с кластером:
akka { actor.provider = "akka.remote.RemoteActorRefProvider" remote.netty.tcp.port = 0 remote.netty.tcp.hostname = "127.0.0.1" } cluster.client.initial-contact-points = [ "akka.tcp://[email protected]:9171", "akka.tcp://[email protected]:9172", "akka.tcp://[email protected]:9173" ]
Снова пройдемся по настройкам:
- для связи с удаленными субъектами (которые живут в кластере) нам нужно использовать
RemoteActorRefProvider
(ClusterARP
более богатая версияRemoteARP
) - аналогично начальным узлам, мы должны предоставить начальные контактные точки, чтобы у клиента был какой-то способ инициировать связь с кластером
На самом деле инициировать кластерный клиент довольно просто , нам нужно создать систему акторов и создать актера, который будет взаимодействовать с кластером:
val conf = ConfigFactory.load("cluster-client") implicit val system = ActorSystem(name, conf) val initialContacts = conf .getStringList("cluster.client.initial-contact-points") .asScala.map { case AddressFromURIString(addr) => system.actorSelection( RootActorPath(addr) / "user" / "receptionist") }.toSet val clusterClient = system.actorOf( ClusterClient.props(initialContacts), "cluster-client")
Чтобы запустить клиента из нашей очереди сообщений (у нас есть два типа клиентов: один отправляет сообщения в очередь, другой получает от нее сообщения), нам нужно выяснить, какой адрес у брокера. Для этого мы спрашиваем (используя шаблон запроса Akka) брокера, зарегистрированного у администратора, о его адресе:
clusterClient ? ClusterClient.Send( "/user/broker-manager/broker", GetBrokerAddresses, localAffinity = false) .mapTo[BrokerAddresses] .flatMap { ba => logger.info(s"Connecting a $name using broker address $ba.") runClient(ba, system) }
Наконец, когда поток клиента завершается (например, из-за того, что брокер не работает), мы пытаемся перезапустить его через 1 секунду. Вероятно, здесь будет полезен некоторый экспоненциальный механизм отсрочки.
Исполняемое приложение для запуска отправителей очереди сообщений и приемники использует тот же самый код, что и одного узла один, с той разницей, что адрес брокера получается из кластера:
object ClusterReceiver extends App with ClusterClientSupport { start("receiver", (ba, system) => new Receiver(ba.receiveServerAddress)(system).run()) }
Бег
Если вы попытаетесь выполнить ClusterReceiver
(любое число), ClusterSender
(любое число) и ClusteredBroker1
, ClusteredBroker2
и ClusteredBroker3
вы увидите, что сообщения передаются от отправителей через одного работающего посредника к получателям. Вы можете убить узел посредника, и через пару секунд другой узел будет запущен на другом узле кластера, и отправители / получатели повторно подключатся.
Я сказал бы, что это довольно хорошо для довольно небольшого количества кода, который мы написали!
Подводя итоги
Наша очередь сообщений сейчас:
- реактивный, используя акка-потоки
- постоянный, используя акка-персистентность
- кластеризованный, используя akka-cluster
И лучшая сторона в том, что в коде нам не нужно разбираться с деталями обработки противодавления, хранения сообщений на диске или обмена информацией и достижения консенсуса в кластере. Теперь у нас есть действительно реактивное приложение.
Спасибо Endre из команды Akka за помощь в добавлении обработки ошибок в потоки. Весь код доступен на GitHub . Наслаждайтесь!