В идеале, когда вы добавляете новый узел в существующую инфраструктуру, вам не нужно ни создавать прокси для удаленных участников на исходном узле вручную, ни изменять конфигурацию какого-либо узла, чтобы он знал, что был добавлен дополнительный элемент. Другими словами, когда какой-либо узел добавляется в систему, все остальные узлы должны автоматически знать, какие акторы имеет новый узел, и использовать их, как если бы они были стандартными локальными действующими лицами.
Реестр актеров Акки
Когда вы привязаны к стандартным Scala Actors (некоторые производственные среды все еще должны использовать Java 5, к сожалению, не поддерживаемая Akka), и сделаете ставку на параллелизм через акторов, вы обнаружите, что вы реализуете их заново — это Supervision. Дерево (от Erlang, связанное с Супервайзерами в Акке) и Реестр актеров.
Хотя Реестр акторов в Akka на заднем плане имеет довольно лаконичную реализацию (своего рода умная обертка-одиночка для параллельных HashMaps, которая хранит ссылки на всех действующих лиц на узле), это мощная абстракция, без которой трудно выжить, когда вы используя актеров в реальном мире. Например, реестр значительно упрощает построение балансировщиков нагрузки, поскольку вам больше не нужно явно указывать рабочих, которые будут распределять нагрузку, а вместо этого сам балансировщик ищет участников по типу или идентификатору при запуске или в течение срока службы в реестр.
Единственное, чего не хватает на данный момент в реестре актеров Akka, — это интерфейс для удаленного доступа к нему. Добавление такого интерфейса упрощает решение проблемы, изложенной выше.
Реестр актера
Живя в мире актеров, первой идеей, когда вам нужно создать удаленный интерфейс к чему-либо, является создание актера, доступного удаленно (он же RemoteActor). В первом приближении должен существовать субъект, который обрабатывает сообщения со ссылками на участников на удаленных узлах, создает прокси и регистрирует их в локальном реестре субъектов:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
class RegistryActor extends Actor{ ... def defaultMessageHandler : PartialFunction[Any, Unit] = { case RegisterActor(actor) = > log.debug( "Registering remote actor [%s]" format(actor)) if (!isActorInRegistry(actor.uuid) && !isLinkToLocal(actor)) ActorRegistry.register( // Hack for 0.10, 1.0-M1 RemoteClient.actorFor(actor.uuid.toString, actor.className, actor.hostname, actor.port) ) // RemoteActorRefs will register themselves in 1.0-M1+ case UnregisterActor(actor) = > { log.debug( "Unregistering remote actor [%s]" format(actor)) ActorRegistry.foreach{act = > if (act.uuid == actor.uuid){ ActorRegistry.unregister(act) }} Option(linkedRegistries.get(RegistryLink(actor.hostname, actor.port))) match { case Some( _ ) = > removeLinkToRegistry(RegistryLink(actor.hostname, actor.port)) case None = > log.debug( "[%s] is not a registry link" format(actor.uuid)) } } ... } ... } |
В качестве предварительного условия для будущего расширения также должна быть карта ссылок на реестры акторов, запущенные на других узлах (и способ добавления и обмена ссылками на реестры во время выполнения). Когда субъект реестра получает и разрешает новую ссылку на другого субъекта реестра, он отправляет обратно ссылку на себя и все другие известные реестры (чтобы оба участника реестра имели одинаковые согласованные наборы ссылок):
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
/** * RegistryActors located on the other hosts */ protected [easyscale] val linkedRegistries = new ConcurrentHashMap[RegistryLink, ActorRef]() def defaultMessageHandler : PartialFunction[Any, Unit] = { ... case AddRegistryLink(link) = > if (!linkedRegistries.containsKey(link)) addRegistryLink(link) else log.debug( "Link to registry [%s] is already present" format(link)) case RemoveRegistryLink(link) = > log.debug( "Unlinking from registry [%s]" format(link)) linkedRegistries.remove(link) } |
Регистрация актеров при запуске
Вторым шагом к решению проблемы является автоматическая регистрация локальных участников в удаленном реестре, когда добавляется ссылка на него:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/** * Publishes all local actors as remote references to the linked registry * when the registry link is added */ trait StartupActorRefsDistribution extends RegistryActor{ /** * Adds link to remote registry, and register all local actors at there */ protected override def addRegistryLink(link : RegistryLink) = { super .addRegistryLink(link) registerActorsAt(linkedRegistries.get(link)) } /** * Registers all local actors at the remote node */ private def registerActorsAt(remoteRegistry : ActorRef) = { ActorRegistry.filter(actor = > actor.id == REGISTRY _ ACTOR && isActorLocal(actor)) .foreach{actor = > remoteRegistry ! RegisterActor(actor)} remoteRegistry } } |
Каждый новый узел должен изначально знать по крайней мере об одном узле, работающем в кластере (соседний узел):
1
2
3
4
|
neighbour { # One of the hosts in the group that has a started RegistryActor hostname = "localhost" port = 9999 } |
Таким образом, когда новый реестр субъектов сообщает о себе «соседу», он запускает цепную реакцию всех других реестров субъектов, заполняющих ссылки своих локальных субъектов на новый реестр, и наоборот, так что все реестры в конце знают всех действующих лиц в кластере (и доступ к ним через локальный интерфейс или через прокси (RemoteActorRef)).
Регистрация актеров началась еще при жизни
У Akka ActorRegistry есть простой механизм уведомлений, который позволяет обрабатывать события, возникающие, когда актер регистрируется / незарегистрируется в системе (по умолчанию все акторы (за исключением RemoteActorRefs в Akka 1.0-M1) регистрируются в реестре при запуске / завершении работы) , Его можно использовать для заполнения ссылок на новых участников по всей системе:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
/** * Publishes all registered local actor as a remote ref on all * linked remote registries */ trait InlifeActorRefsDistribution extends RegistryActor{ override def specificMessageHandler = { case ActorRegistered(actor) = > log.debug( "Actor [%s] is registered" format(actor)) registerOnLinks(actor) case ActorUnregistered(actor) = > log.debug( "Actor [%s] is unregistered" format(actor)) if (isActorLocal(actor)) actor.id match { case REGISTRY _ ACTOR = > ActorRegistry.foreach(act = > if (act.getClass.isAssignableFrom(classOf[LocalActorRef])) unregisterOnLinks(act)) case _ = > unregisterOnLinks(actor) } } /** * Makes the actor remote, and registers at remote nodes */ private def registerOnLinks(actor : ActorRef) = if (isActorLocal(actor)){ ... val iterator = linkedRegistries.values.iterator while (iterator.hasNext) iterator.next ! RegisterActor(actor) } ... } |
Использование Actor Registry
Теперь, когда мы автоматически получаем ссылки на всех участников в кластере, мы можем создать балансировщик, который будет распределять сообщения между участниками одного и того же типа:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
class SimpleTypedBalancer[T]( implicit manifest : Manifest[T]) extends Actor{ def receive = { case message : AnyRef = > forwardMessage(message, self.getSenderFuture orElse self.getSender, idleWorkerId) } def idleWorkerId = Futures.awaitOne{ ActorRegistry.filter{actor = > Class.forName(actor.actorClassName).isAssignableFrom(manifest.erasure) }.map( _ !!! IsReady()).toList }.result.flatMap( _ match { case Ready(actorUuid) = > Option(actorUuid) case _ = > None }) def forwardMessage(message : AnyRef, originalSender : Option[AnyRef], workerId : Option[Uuid]) = { for {id <- workerId; worker <- ActorRegistry.actors.find(actor = > actor.id == id.toString || actor.uuid == id )}{ if (originalSender.isDefined) worker.forward(message) else worker.sendOneWay(message) } } } |
Проблема решена
Предположим, что есть узел с 3 действующими действующими лицами типа SimpleActor:
01
02
03
04
05
06
07
08
09
10
|
RemoteNode.start log.info( "Starting registry actor at %s:%s" format(RemoteServer.HOSTNAME, RemoteServer.PORT)) val registryActor = actorOf( new RegistryActor with StartupActorRefsDistribution with InlifeActorRefsDistribution).start RegistryActorUtil.initialize ( 1 to 3 ).foreach( _ = > actorOf[SimpleActor].start) |
Этот узел ничего не знает об инфраструктуре кластера в будущем, и на данный момент он запускает только удаленный API для реестра — RegistryActor. Скажем, мы хотим использовать актеры (3 экземпляра SimpleActor), работающие на узле № 1, чтобы разделить нагрузку на актеров SimpleActor, работающих на узле № 2. Узел № 2 имеет то же определение, что и узел № 1 (единственное отличие заключается в том, что узел № 1 явно настроен как соседний узел).
Посмотрим, будут ли сообщения, отправленные на балансировщик, распределены между локальными и удаленными субъектами:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
doBeforeSpec{ ( 1 to 3 ).foreach( _ = > actorOf[SimpleActor].start) actorOf( new SimpleTypedBalancer[SimpleActor]).start } "Messages sent to the balancer should be distributed across local and remote workers" in { val balancer = ActorRegistry.filter(actor = > Class.forName(actor.actorClassName) .isAssignableFrom(classOf[SimpleTypedBalancer[SimpleActor]]) ).head log.info( "========SENDING MESSAGES TO BALANCER=========" ) val start = System.currentTimeMillis val futures = ( 1 to 30 ).map(i = > balancer !!! "" + i).toList log.info( "All messages are disaptched..." ) Futures.awaitAll(futures) val processedByWorkers = futures.flatMap(future = > future.result).toSet.size log.info( "Process time by %s workers: %s" format(processedByWorkers, System.currentTimeMillis - start)) processedByWorkers must beGreaterThan( 3 ) } |
Тест работает нормально. Это означает, что более 3 действующих лиц, работающих на местах, были задействованы, и, следовательно, были использованы удаленные участники, зарегистрированные на местах:
01
02
03
04
05
06
07
08
09
10
11
|
[INFO] [ 2010 - 11 - 23 23 : 23 : 18 , 687 ] [Thread- 18 ] s.s.a.a.Actor$ : Adding RegistryActor as listener to local actor registry [INFO] [ 2010 - 11 - 23 23 : 23 : 18 , 703 ] [Thread- 18 ] s.s.a.a.Actor$ : Making RegistryActor remote... [INFO] [ 2010 - 11 - 23 23 : 23 : 18 , 703 ] [Thread- 18 ] s.s.a.a.Actor$ : Adding link to a neighbouring host... [INFO] [ 2010 - 11 - 23 23 : 23 : 18 , 859 ] [Thread- 18 ] s.s.a.a.Actor$ : ======== SENDING MESSAGES TO BALANCER ========= [INFO] [ 2010 - 11 - 23 23 : 23 : 18 , 890 ] [Thread- 18 ] s.s.a.a.Actor$ : All messages are disaptched... [INFO] [ 2010 - 11 - 23 23 : 23 : 19 , 656 ] [akka : event-driven : dispatcher : global- 1 ] s.s.a.r.RemoteClient : Starting remote client co nnection to [localhost : 9999 ] [INFO] [ 2010 - 11 - 23 23 : 23 : 24 , 906 ] [Thread- 18 ] s.s.a.a.Actor$ : Process time by 6 workers : 6031 [INFO] [ 2010 - 11 - 23 23 : 23 : 24 , 968 ] [Thread- 18 ] s.s.a.a.Actor$ : ==== SHUTTING DOWN ACTOR REGISTRY ==== [info] + Messages sent to the balancer should be distributed across local and remote workers [info] == com.vasilrem.akka.easyscale.behavior.RemoteTypedBalancerSpec == |
Изобретая колесо
Как когда-то упоминалось в списках рассылки Akka, однажды ActorRegistry будет иметь удаленный интерфейс «из коробки». До этого времени вам придется в конечном итоге найти собственное решение или использовать экспериментальную поддержку JCluster, которая обычно нацелена на ту же проблему, но использует другой подход.
Код RegistryActor доступен на GitHub. Это изменится со временем, когда я буду использовать его в производстве.
Справка: еще более простая масштабируемость с Akka через RegistryActor от нашего партнера по JCG Васила Ременюка в блоге Васила Ременюка
Статьи по Теме :
- Скала на 2012 год? Решать, стоит ли инвестировать в язык программирования
- Да, Вирджиния, Скала тяжело
- Значительные разработки в области разработки программного обеспечения 2011 года
- Scala Tutorial — блоки кода, стиль кодирования, замыкания, проект документации scala
- Использование Scala менее эффективно, чем использование Java, по крайней мере, для половины всех проектов Java.