В идеале, когда вы добавляете новый узел в существующую инфраструктуру, вам не нужно ни создавать прокси для удаленных участников на исходном узле вручную, ни изменять конфигурацию какого-либо узла, чтобы он знал, что был добавлен дополнительный элемент. Другими словами, когда какой-либо узел добавляется в систему, все остальные узлы должны автоматически знать, какие акторы имеет новый узел, и использовать их, как если бы они были стандартными локальными действующими лицами.
Реестр актеров Акки
Когда вы привязаны к стандартным Scala Actors (некоторые производственные среды все еще должны использовать Java 5, к сожалению, не поддерживаемая Akka), и сделаете ставку на параллелизм через акторов, вы обнаружите, что вы реализуете их заново — это Supervision. Дерево (от Erlang, связанное с Супервайзерами в Акке) и Реестр актеров.
Хотя Реестр акторов в Akka на заднем плане имеет довольно лаконичную реализацию (своего рода умная обертка-одиночка для параллельных HashMaps, которая хранит ссылки на всех действующих лиц на узле), это мощная абстракция, без которой трудно выжить, когда вы используя актеров в реальном мире. Например, реестр значительно упрощает построение балансировщиков нагрузки, поскольку вам больше не нужно явно указывать рабочих, которые будут распределять нагрузку, а вместо этого сам балансировщик ищет участников по типу или идентификатору при запуске или в течение срока службы в реестр.
Единственное, чего не хватает на данный момент в реестре актеров Akka, — это интерфейс для удаленного доступа к нему. Добавление такого интерфейса упрощает решение проблемы, изложенной выше.
Реестр актера
Живя в мире актеров, первой идеей, когда вам нужно создать удаленный интерфейс к чему-либо, является создание актера, доступного удаленно (он же RemoteActor). В первом приближении должен существовать субъект, который обрабатывает сообщения со ссылками на участников на удаленных узлах, создает прокси и регистрирует их в локальном реестре субъектов:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | classRegistryActor extendsActor{    ...  defdefaultMessageHandler:PartialFunction[Any, Unit] ={    caseRegisterActor(actor) =>      log.debug("Registering remote actor [%s]"format(actor))      if(!isActorInRegistry(actor.uuid) && !isLinkToLocal(actor))        ActorRegistry.register( // Hack for 0.10, 1.0-M1          RemoteClient.actorFor(actor.uuid.toString, actor.className, actor.hostname, actor.port)        ) // RemoteActorRefs will register themselves in 1.0-M1+    caseUnregisterActor(actor) => {        log.debug("Unregistering remote actor [%s]"format(actor))        ActorRegistry.foreach{act =>          if(act.uuid ==actor.uuid){            ActorRegistry.unregister(act)                      }}        Option(linkedRegistries.get(RegistryLink(actor.hostname, actor.port))) match{          caseSome(_) => removeLinkToRegistry(RegistryLink(actor.hostname, actor.port))          caseNone => log.debug("[%s] is not a registry link"format(actor.uuid))        }      }    ...    }...} | 
В качестве предварительного условия для будущего расширения также должна быть карта ссылок на реестры акторов, запущенные на других узлах (и способ добавления и обмена ссылками на реестры во время выполнения). Когда субъект реестра получает и разрешает новую ссылку на другого субъекта реестра, он отправляет обратно ссылку на себя и все другие известные реестры (чтобы оба участника реестра имели одинаковые согласованные наборы ссылок):
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 | /**   * RegistryActors located on the other hosts   */  protected[easyscale] vallinkedRegistries =newConcurrentHashMap[RegistryLink, ActorRef]()  defdefaultMessageHandler:PartialFunction[Any, Unit] ={       ...    caseAddRegistryLink(link) =>       if(!linkedRegistries.containsKey(link))        addRegistryLink(link)      else        log.debug("Link to registry [%s] is already present"format(link))               caseRemoveRegistryLink(link) =>         log.debug("Unlinking from registry [%s]"format(link))        linkedRegistries.remove(link)              } | 
Регистрация актеров при запуске
Вторым шагом к решению проблемы является автоматическая регистрация локальных участников в удаленном реестре, когда добавляется ссылка на него:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | /** * Publishes all local actors as remote references to the linked registry * when the registry link is added */traitStartupActorRefsDistribution extendsRegistryActor{  /**   * Adds link to remote registry, and register all local actors at there   */  protectedoverridedefaddRegistryLink(link:RegistryLink) ={    super.addRegistryLink(link)    registerActorsAt(linkedRegistries.get(link))  }  /**   * Registers all local actors at the remote node   */  privatedefregisterActorsAt(remoteRegistry:ActorRef) ={    ActorRegistry.filter(actor =>      actor.id ==REGISTRY_ACTOR && isActorLocal(actor))    .foreach{actor => remoteRegistry ! RegisterActor(actor)}    remoteRegistry  }} | 
Каждый новый узел должен изначально знать по крайней мере об одном узле, работающем в кластере (соседний узел):
| 1 2 3 4 | neighbour { #One of the hosts in the group that has a started RegistryActor   hostname ="localhost"   port =9999} | 
Таким образом, когда новый реестр субъектов сообщает о себе «соседу», он запускает цепную реакцию всех других реестров субъектов, заполняющих ссылки своих локальных субъектов на новый реестр, и наоборот, так что все реестры в конце знают всех действующих лиц в кластере (и доступ к ним через локальный интерфейс или через прокси (RemoteActorRef)).
Регистрация актеров началась еще при жизни
У Akka ActorRegistry есть простой механизм уведомлений, который позволяет обрабатывать события, возникающие, когда актер регистрируется / незарегистрируется в системе (по умолчанию все акторы (за исключением RemoteActorRefs в Akka 1.0-M1) регистрируются в реестре при запуске / завершении работы) , Его можно использовать для заполнения ссылок на новых участников по всей системе:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | /** * Publishes all registered local actor as a remote ref on all * linked remote registries */traitInlifeActorRefsDistribution extendsRegistryActor{  overridedefspecificMessageHandler ={    caseActorRegistered(actor) =>      log.debug("Actor [%s] is registered"format(actor))      registerOnLinks(actor)          caseActorUnregistered(actor) =>      log.debug("Actor [%s] is unregistered"format(actor))      if(isActorLocal(actor))         actor.id match{          caseREGISTRY_ACTOR => ActorRegistry.foreach(act =>              if(act.getClass.isAssignableFrom(classOf[LocalActorRef]))                unregisterOnLinks(act))              case_=> unregisterOnLinks(actor)            }        }         /**       * Makes the actor remote, and registers at remote nodes       */      privatedefregisterOnLinks(actor:ActorRef) =        if(isActorLocal(actor)){                    ...          valiterator =linkedRegistries.values.iterator          while(iterator.hasNext) iterator.next ! RegisterActor(actor)        }  ...  } | 
Использование Actor Registry
Теперь, когда мы автоматически получаем ссылки на всех участников в кластере, мы можем создать балансировщик, который будет распределять сообщения между участниками одного и того же типа:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | classSimpleTypedBalancer[T](implicitmanifest:Manifest[T]) extendsActor{  defreceive ={    casemessage :AnyRef =>      forwardMessage(message,                     self.getSenderFuture orElse self.getSender,                     idleWorkerId)        }  defidleWorkerId =Futures.awaitOne{    ActorRegistry.filter{actor =>      Class.forName(actor.actorClassName).isAssignableFrom(manifest.erasure)    }.map(_!!! IsReady()).toList  }.result.flatMap(_match{      caseReady(actorUuid) =>                Option(actorUuid)      case_=>        None            })  defforwardMessage(message:AnyRef, originalSender:Option[AnyRef], workerId:Option[Uuid]) =  {    for{id <- workerId; worker <- ActorRegistry.actors.find(actor =>        actor.id ==id.toString ||        actor.uuid ==id )}{      if(originalSender.isDefined)        worker.forward(message)      elseworker.sendOneWay(message)    }  }} | 
Проблема решена
Предположим, что есть узел с 3 действующими действующими лицами типа SimpleActor:
| 01 02 03 04 05 06 07 08 09 10 | RemoteNode.start  log.info("Starting registry actor at %s:%s"format(RemoteServer.HOSTNAME, RemoteServer.PORT))  valregistryActor =actorOf(newRegistryActor               withStartupActorRefsDistribution              withInlifeActorRefsDistribution).start  RegistryActorUtil.initialize  (1to 3).foreach(_=> actorOf[SimpleActor].start) | 
Этот узел ничего не знает об инфраструктуре кластера в будущем, и на данный момент он запускает только удаленный API для реестра — RegistryActor. Скажем, мы хотим использовать актеры (3 экземпляра SimpleActor), работающие на узле № 1, чтобы разделить нагрузку на актеров SimpleActor, работающих на узле № 2. Узел № 2 имеет то же определение, что и узел № 1 (единственное отличие заключается в том, что узел № 1 явно настроен как соседний узел).
Посмотрим, будут ли сообщения, отправленные на балансировщик, распределены между локальными и удаленными субъектами:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 | doBeforeSpec{    (1to 3).foreach(_=> actorOf[SimpleActor].start)    actorOf(newSimpleTypedBalancer[SimpleActor]).start  }  "Messages sent to the balancer should be distributed across local and remote workers"in {    valbalancer =ActorRegistry.filter(actor =>      Class.forName(actor.actorClassName)      .isAssignableFrom(classOf[SimpleTypedBalancer[SimpleActor]])    ).head    log.info("========SENDING MESSAGES TO BALANCER=========")    valstart =System.currentTimeMillis    valfutures =(1to 30).map(i => balancer !!! ""+ i).toList    log.info("All messages are disaptched...")    Futures.awaitAll(futures)    valprocessedByWorkers =futures.flatMap(future => future.result).toSet.size    log.info("Process time by %s workers: %s"format(processedByWorkers, System.currentTimeMillis - start))    processedByWorkers must beGreaterThan(3)  } | 
Тест работает нормально. Это означает, что более 3 действующих лиц, работающих на местах, были задействованы, и, следовательно, были использованы удаленные участники, зарегистрированные на местах:
| 01 02 03 04 05 06 07 08 09 10 11 | [INFO] [2010-11-2323:23:18,687] [Thread-18] s.s.a.a.Actor$:Adding RegistryActor as listener to local actor registry[INFO] [2010-11-2323:23:18,703] [Thread-18] s.s.a.a.Actor$:Making RegistryActor remote...[INFO] [2010-11-2323:23:18,703] [Thread-18] s.s.a.a.Actor$:Adding link to a neighbouring host...[INFO] [2010-11-2323:23:18,859] [Thread-18] s.s.a.a.Actor$:========SENDING MESSAGES TO BALANCER=========[INFO] [2010-11-2323:23:18,890] [Thread-18] s.s.a.a.Actor$:All messages are disaptched...[INFO] [2010-11-2323:23:19,656] [akka:event-driven:dispatcher:global-1] s.s.a.r.RemoteClient:Starting remote client connection to [localhost:9999][INFO] [2010-11-2323:23:24,906] [Thread-18] s.s.a.a.Actor$:Process time by 6workers:6031[INFO] [2010-11-2323:23:24,968] [Thread-18] s.s.a.a.Actor$:====SHUTTING DOWN ACTOR REGISTRY====[info]   + Messages sent to the balancer should be distributed across local and remote workers[info] ==com.vasilrem.akka.easyscale.behavior.RemoteTypedBalancerSpec == | 
Изобретая колесо
Как когда-то упоминалось в списках рассылки Akka, однажды ActorRegistry будет иметь удаленный интерфейс «из коробки». До этого времени вам придется в конечном итоге найти собственное решение или использовать экспериментальную поддержку JCluster, которая обычно нацелена на ту же проблему, но использует другой подход.
Код RegistryActor доступен на GitHub. Это изменится со временем, когда я буду использовать его в производстве.
Справка: еще более простая масштабируемость с Akka через RegistryActor от нашего партнера по JCG Васила Ременюка в блоге Васила Ременюка
Статьи по Теме :
- Скала на 2012 год? Решать, стоит ли инвестировать в язык программирования
- Да, Вирджиния, Скала тяжело
- Значительные разработки в области разработки программного обеспечения 2011 года
- Scala Tutorial — блоки кода, стиль кодирования, замыкания, проект документации scala
- Использование Scala менее эффективно, чем использование Java, по крайней мере, для половины всех проектов Java.



