Статьи

Еще более простая масштабируемость с Akka через RegistryActor

Давайте представим, что ваша система развернута на одном узле, где вы запускаете актеров, которые буквально истощают ресурсы. Вы хотите добавить еще несколько узлов, на которых выполняются актеры того же типа, чтобы сбалансировать нагрузку на кластер.

В идеале, когда вы добавляете новый узел в существующую инфраструктуру, вам не нужно ни создавать прокси для удаленных участников на исходном узле вручную, ни изменять конфигурацию какого-либо узла, чтобы он знал, что был добавлен дополнительный элемент. Другими словами, когда какой-либо узел добавляется в систему, все остальные узлы должны автоматически знать, какие акторы имеет новый узел, и использовать их, как если бы они были стандартными локальными действующими лицами.

Реестр актеров Акки

Когда вы привязаны к стандартным Scala Actors (некоторые производственные среды все еще должны использовать Java 5, к сожалению, не поддерживаемая Akka), и сделаете ставку на параллелизм через акторов, вы обнаружите, что вы реализуете их заново — это Supervision. Дерево (от Erlang, связанное с Супервайзерами в Акке) и Реестр актеров.

Хотя Реестр акторов в Akka на заднем плане имеет довольно лаконичную реализацию (своего рода умная обертка-одиночка для параллельных HashMaps, которая хранит ссылки на всех действующих лиц на узле), это мощная абстракция, без которой трудно выжить, когда вы используя актеров в реальном мире. Например, реестр значительно упрощает построение балансировщиков нагрузки, поскольку вам больше не нужно явно указывать рабочих, которые будут распределять нагрузку, а вместо этого сам балансировщик ищет участников по типу или идентификатору при запуске или в течение срока службы в реестр.

Единственное, чего не хватает на данный момент в реестре актеров Akka, — это интерфейс для удаленного доступа к нему. Добавление такого интерфейса упрощает решение проблемы, изложенной выше.

Реестр актера

Живя в мире актеров, первой идеей, когда вам нужно создать удаленный интерфейс к чему-либо, является создание актера, доступного удаленно (он же RemoteActor). В первом приближении должен существовать субъект, который обрабатывает сообщения со ссылками на участников на удаленных узлах, создает прокси и регистрирует их в локальном реестре субъектов:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class RegistryActor extends Actor{
   
  ...
 
  def defaultMessageHandler: PartialFunction[Any, Unit] = {
    case RegisterActor(actor) =>
      log.debug("Registering remote actor [%s]" format(actor))
      if(!isActorInRegistry(actor.uuid) && !isLinkToLocal(actor))
        ActorRegistry.register( // Hack for 0.10, 1.0-M1
          RemoteClient.actorFor(actor.uuid.toString, actor.className, actor.hostname, actor.port)
        ) // RemoteActorRefs will register themselves in 1.0-M1+
 
    case UnregisterActor(actor) => {
        log.debug("Unregistering remote actor [%s]" format(actor))
        ActorRegistry.foreach{act =>
          if(act.uuid == actor.uuid){
            ActorRegistry.unregister(act)           
          }}
        Option(linkedRegistries.get(RegistryLink(actor.hostname, actor.port))) match{
          case Some(_) => removeLinkToRegistry(RegistryLink(actor.hostname, actor.port))
          case None => log.debug("[%s] is not a registry link" format(actor.uuid))
        }
      }
    ...
    }
...
}

В качестве предварительного условия для будущего расширения также должна быть карта ссылок на реестры акторов, запущенные на других узлах (и способ добавления и обмена ссылками на реестры во время выполнения). Когда субъект реестра получает и разрешает новую ссылку на другого субъекта реестра, он отправляет обратно ссылку на себя и все другие известные реестры (чтобы оба участника реестра имели одинаковые согласованные наборы ссылок):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
   * RegistryActors located on the other hosts
   */
  protected[easyscale] val linkedRegistries = new ConcurrentHashMap[RegistryLink, ActorRef]()
 
  def defaultMessageHandler: PartialFunction[Any, Unit] = {
    
    ...
 
    case AddRegistryLink(link) =>
      if(!linkedRegistries.containsKey(link))
        addRegistryLink(link)
      else
        log.debug("Link to registry [%s] is already present" format(link))
      
       
    case RemoveRegistryLink(link) =>
        log.debug("Unlinking from registry [%s]" format(link))
        linkedRegistries.remove(link)
       
       
  }

Регистрация актеров при запуске

Вторым шагом к решению проблемы является автоматическая регистрация локальных участников в удаленном реестре, когда добавляется ссылка на него:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
 * Publishes all local actors as remote references to the linked registry
 * when the registry link is added
 */
trait StartupActorRefsDistribution extends RegistryActor{
 
  /**
   * Adds link to remote registry, and register all local actors at there
   */
  protected override def addRegistryLink(link: RegistryLink) = {
    super.addRegistryLink(link)
    registerActorsAt(linkedRegistries.get(link))
  }
 
  /**
   * Registers all local actors at the remote node
   */
  private def registerActorsAt(remoteRegistry: ActorRef) = {
    ActorRegistry.filter(actor =>
      actor.id == REGISTRY_ACTOR && isActorLocal(actor))
    .foreach{actor => remoteRegistry ! RegisterActor(actor)}
    remoteRegistry
  }
 
}

Каждый новый узел должен изначально знать по крайней мере об одном узле, работающем в кластере (соседний узел):

1
2
3
4
neighbour { # One of the hosts in the group that has a started RegistryActor
   hostname = "localhost"
   port = 9999
}

Таким образом, когда новый реестр субъектов сообщает о себе «соседу», он запускает цепную реакцию всех других реестров субъектов, заполняющих ссылки своих локальных субъектов на новый реестр, и наоборот, так что все реестры в конце знают всех действующих лиц в кластере (и доступ к ним через локальный интерфейс или через прокси (RemoteActorRef)).

Регистрация актеров началась еще при жизни

У Akka ActorRegistry есть простой механизм уведомлений, который позволяет обрабатывать события, возникающие, когда актер регистрируется / незарегистрируется в системе (по умолчанию все акторы (за исключением RemoteActorRefs в Akka 1.0-M1) регистрируются в реестре при запуске / завершении работы) , Его можно использовать для заполнения ссылок на новых участников по всей системе:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
 * Publishes all registered local actor as a remote ref on all
 * linked remote registries
 */
trait InlifeActorRefsDistribution extends RegistryActor{
 
  override def specificMessageHandler = {
    case ActorRegistered(actor) =>
      log.debug("Actor [%s] is registered" format(actor))
      registerOnLinks(actor)
       
    case ActorUnregistered(actor) =>
      log.debug("Actor [%s] is unregistered" format(actor))
      if(isActorLocal(actor))
        actor.id match {
          case REGISTRY_ACTOR => ActorRegistry.foreach(act =>
              if(act.getClass.isAssignableFrom(classOf[LocalActorRef]))
                unregisterOnLinks(act))
              case _ => unregisterOnLinks(actor)
            }
        }
    
      /**
       * Makes the actor remote, and registers at remote nodes
       */
      private def registerOnLinks(actor: ActorRef) =
        if(isActorLocal(actor)){         
          ...
          val iterator = linkedRegistries.values.iterator
          while(iterator.hasNext) iterator.next ! RegisterActor(actor)
        }
 
  ...
 
  }

Использование Actor Registry

Теперь, когда мы автоматически получаем ссылки на всех участников в кластере, мы можем создать балансировщик, который будет распределять сообщения между участниками одного и того же типа:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class SimpleTypedBalancer[T](implicit manifest: Manifest[T]) extends Actor{
 
  def receive = {
    case message :AnyRef =>
      forwardMessage(message,
                     self.getSenderFuture orElse self.getSender,
                     idleWorkerId)     
  }
 
  def idleWorkerId = Futures.awaitOne{
    ActorRegistry.filter{actor =>
      Class.forName(actor.actorClassName).isAssignableFrom(manifest.erasure)
    }.map(_ !!! IsReady()).toList
  }.result.flatMap(_ match {
      case Ready(actorUuid) =>       
        Option(actorUuid)
      case _ =>
        None       
    })
 
  def forwardMessage(message: AnyRef, originalSender: Option[AnyRef], workerId: Option[Uuid]) =
  {
    for{id <- workerId; worker <- ActorRegistry.actors.find(actor =>
        actor.id == id.toString ||
        actor.uuid == id )}{
      if(originalSender.isDefined)
        worker.forward(message)
      else worker.sendOneWay(message)
    }
  }
 
}

Проблема решена

Предположим, что есть узел с 3 действующими действующими лицами типа SimpleActor:

01
02
03
04
05
06
07
08
09
10
RemoteNode.start
 
  log.info("Starting registry actor at %s:%s" format(RemoteServer.HOSTNAME, RemoteServer.PORT))
  val registryActor = actorOf(new RegistryActor
              with StartupActorRefsDistribution
              with InlifeActorRefsDistribution).start
 
  RegistryActorUtil.initialize
 
  (1 to 3).foreach(_ => actorOf[SimpleActor].start)

Этот узел ничего не знает об инфраструктуре кластера в будущем, и на данный момент он запускает только удаленный API для реестра — RegistryActor. Скажем, мы хотим использовать актеры (3 экземпляра SimpleActor), работающие на узле № 1, чтобы разделить нагрузку на актеров SimpleActor, работающих на узле № 2. Узел № 2 имеет то же определение, что и узел № 1 (единственное отличие заключается в том, что узел № 1 явно настроен как соседний узел).

Посмотрим, будут ли сообщения, отправленные на балансировщик, распределены между локальными и удаленными субъектами:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
doBeforeSpec{
    (1 to 3).foreach(_ => actorOf[SimpleActor].start)
    actorOf(new SimpleTypedBalancer[SimpleActor]).start
  }
 
  "Messages sent to the balancer should be distributed across local and remote workers" in {
 
    val balancer = ActorRegistry.filter(actor =>
      Class.forName(actor.actorClassName)
      .isAssignableFrom(classOf[SimpleTypedBalancer[SimpleActor]])
    ).head
 
    log.info("========SENDING MESSAGES TO BALANCER=========")
    val start = System.currentTimeMillis
    val futures = (1 to 30).map(i => balancer !!! "" + i).toList
    log.info("All messages are disaptched...")
    Futures.awaitAll(futures)
    val processedByWorkers = futures.flatMap(future => future.result).toSet.size
    log.info("Process time by %s workers: %s" format(processedByWorkers, System.currentTimeMillis - start))
    processedByWorkers must beGreaterThan(3)
  }

Тест работает нормально. Это означает, что более 3 действующих лиц, работающих на местах, были задействованы, и, следовательно, были использованы удаленные участники, зарегистрированные на местах:

01
02
03
04
05
06
07
08
09
10
11
[INFO] [2010-11-23 23:23:18,687] [Thread-18] s.s.a.a.Actor$: Adding RegistryActor as listener to local actor registry
[INFO] [2010-11-23 23:23:18,703] [Thread-18] s.s.a.a.Actor$: Making RegistryActor remote...
[INFO] [2010-11-23 23:23:18,703] [Thread-18] s.s.a.a.Actor$: Adding link to a neighbouring host...
[INFO] [2010-11-23 23:23:18,859] [Thread-18] s.s.a.a.Actor$: ========SENDING MESSAGES TO BALANCER=========
[INFO] [2010-11-23 23:23:18,890] [Thread-18] s.s.a.a.Actor$: All messages are disaptched...
[INFO] [2010-11-23 23:23:19,656] [akka:event-driven:dispatcher:global-1] s.s.a.r.RemoteClient: Starting remote client co
nnection to [localhost:9999]
[INFO] [2010-11-23 23:23:24,906] [Thread-18] s.s.a.a.Actor$: Process time by 6 workers: 6031
[INFO] [2010-11-23 23:23:24,968] [Thread-18] s.s.a.a.Actor$: ====SHUTTING DOWN ACTOR REGISTRY====
[info]   + Messages sent to the balancer should be distributed across local and remote workers
[info] == com.vasilrem.akka.easyscale.behavior.RemoteTypedBalancerSpec ==

Изобретая колесо

Как когда-то упоминалось в списках рассылки Akka, однажды ActorRegistry будет иметь удаленный интерфейс «из коробки». До этого времени вам придется в конечном итоге найти собственное решение или использовать экспериментальную поддержку JCluster, которая обычно нацелена на ту же проблему, но использует другой подход.

Код RegistryActor доступен на GitHub. Это изменится со временем, когда я буду использовать его в производстве.

Справка: еще более простая масштабируемость с Akka через RegistryActor от нашего партнера по JCG Васила Ременюка в блоге Васила Ременюка

Статьи по Теме :