Статьи

Статические Groovy, GridGain и GPars для распределенного параллельного программирования

Эта статья продолжает историю, начатую в моих предыдущих статьях:

 Я собираюсь показать, как выразительность и производительность Groovy могут сочетаться с существующими инструментами для создания приложений с высокой степенью масштабируемости (как на уровне ядра, так и на уровне кластера).

Два замечательных инструмента, которые мы будем использовать:

 Если вы разработчик и не знакомы с этими инструментами, я настоятельно рекомендую попробовать.

 Как обычно, мы будем иметь дело с примером приложения. Легко представить, что GridGain будет использоваться для управления распределенной частью нашей истории, а GPars будет работать с параллельной.

Образец заявки

  • У нас есть несколько машин, работающих вместе
  • Главный компьютер (узел сетки) будет обрабатывать пользовательский ввод и распределять задачи по другим узлам.
  • Ведомые машины будут выполнять команды, полученные от мастера


Начнем с кода метода main () для главного узла.
Очень интересно, что нам не нужен специальный код для подчиненных узлов — магия GridGain позволяет нам использовать код узла сетки по умолчанию во многих ситуациях. Что еще более интересно, так это то, что во время разработки нам даже не нужно развертывать код на подчиненных (благодаря загрузке одноранговых классов GridGain)

  static void main (String [] args) {
def config = new GridConfigurationAdapter(
gridGainHome: "/Applications/gridgain-2.1.1",
gridLogger: new GridJavaLogger ()
)
new MasterActor(GridFactory.start(config)).start ()
}

Здесь мы настроили и запустили грид, а также создали и запустили главного актера на главном узле. Обратите внимание, насколько удобен синтаксис Groovy-карты для определения свойств при построении объекта.

Что такое актеры?

Актеры допускают модель параллелизма на основе обмена сообщениями, построенную из независимых активных объектов, которые обмениваются сообщениями и не имеют изменяемого общего состояния. Актеры могут естественным образом избегать таких проблем, как взаимоблокировки, блокировки в реальном времени или голодание, которые характерны для общей памяти. 

Мы будем использовать определенный тип акторов, предоставляемых GPars — DynamicDispatchActor. Класс DynamicDispatchActor — это объединенный субъект, позволяющий создать альтернативную структуру кода обработки сообщений. В общем случае DynamicDispatchActor многократно сканирует сообщения и отправляет поступившие сообщения одному из методов onMessage (message), определенных для субъекта.


Нам понадобятся типы актеров MasterActor и SlaveActor, которые будут запускаться из главного и подчиненного узлов соответственно. Для наших целей будет удобно, если у нас будет общий суперкласс

   static class RemoteNodeDiscovery { GridDiscoveryEventType type; GridNode node }

static class GridAwareActor extends DynamicDispatchActor {
Grid grid

GridAwareActor (Grid grid) {
setActorGroup Actors.defaultPooledActorGroup
this.grid = grid
}

protected void afterStart() {
grid.addMessageListener { nodeId, msg ->
if (active)
this << msg
else
grid.removeMessageListener this
}
grid.addDiscoveryListener { type, node ->
if (active)
this << new RemoteNodeDiscovery(type: type, node: node)
else
grid.removeDiscoveryListener this
}
}

protected void afterStop (Object ignore) {
actorGroup.threadPool.shutdown()
}
}

Что тут происходит? Когда актер запускается, он регистрирует двух слушателей событий, поступающих из сетки, и передает эти события актеру для обработки. После того, как актер остановил эти слушатели автоматически отменили регистрацию, чтобы избежать утечки ресурсов.

Опять же, Groovy экономит здесь много стандартного кода и делает код намного чище. В дополнение к этому статическая компиляция позволяет нам избежать динамической отправки и оставить код слушателя как можно скорее (всегда лучшая стратегия с асинхронной обработкой)

Интересно, что GPars помогает нам обрабатывать события, приходящие из сетки, которые мы переносим в RemoteNodeDiscovery, и сообщения, которые актеры посылают друг другу, абсолютно одинаково. Очень удобно!

Теперь мы готовы реализовать SlaveActor — тот, который будет запускаться на подчиненных узлах командой от ведущего.

   static abstract class ExecuteCommand implements Runnable, Serializable {}

static class SlaveActor extends GridAwareActor {
UUID masterId

SlaveActor (Grid grid, UUID masterId) {
super(grid)
this.masterId = masterId
}

void onMessage (RemoteNodeDiscovery msg) {
if (msg.node.id == masterId && (msg.type == GridDiscoveryEventType.LEFT || msg.type == GridDiscoveryEventType.FAILED)) {
stop ()
}
}

void onMessage(ExecuteCommand command) {
command.run ()
}
}


Итак, SlaveActor практически тривиален.
Он обрабатывает только два типа событий — события обнаружения из сетки (чтобы иметь возможность остановить себя, когда мастер исчезает) и команды, отправленные мастером (команда нормальная
java.lang.Runnable, и мы используем специальный класс ExecuteCommand по метологическим причинам и потому, что мы нужен java.lang.Serializable для передачи сообщений между узлами сетки)

Теперь нам нужно реализовать MainActor. Реализация немного длинная, поэтому я буду представлять ее по частям. Начнем с объявления полей

   static class MasterActor extends GridAwareActor {
Set<UUID> readyNodes = []

Thread stdInReader

MasterActor(Grid grid) {
super(grid)
}
// .................
}

Мы будем отслеживать подчиненные узлы, которые сообщили, что они готовы к работе, и у нас будет отдельный поток для чтения из стандартного ввода.

Обратите внимание, как элегантно по сравнению с Java мы инициализируем поля readyNode. Мы обсуждали это уже в предыдущих статьях, но мне это очень нравится ?

 Теперь мы можем создать поток, который будет читать стандартный ввод и отправлять сообщения нашему актеру для обработки. Сообщение в этом случае будет обычным java.lang.String

       private Thread createStdinReaderThread() {
[
run: {
def reader = new LineNumberReader(new InputStreamReader(System.in))
while (!Thread.currentThread().isInterrupted()) {
this << reader.readLine()
}
},

daemon: true
]
}

Эта часть мне тоже очень нравится. Вся мощь Groovy и вывод типа static Groovy вместе взятые. Посмотрите, как элегантно мы объединяем в одно подклассы выражения карты Groovy java.lang.Thread и обращаемся к setDaemon (true)


Теперь мы готовы справиться со стартом и остановкой нашего актера.
По правде говоря, ничего интересного здесь не происходит, и я предоставляю код только для удобства читателя.

       void afterStart() {
grid.remoteNodes.each { remoteNode ->
MasterActor.this << new RemoteNodeDiscovery ( type: GridDiscoveryEventType.JOINED, node:remoteNode )
}
stdInReader = GridTest.createStdinReaderThread(this)
stdInReader.start ()
super.afterStart()
}

void afterStop (List ignore) {
Actors.defaultPooledActorGroup.threadPool.shutdown()
stdInReader.stop()
}

Хорошо, мы прошли долгий путь и готовы к самой интересной части нашей истории — обработке сообщений. Наш актер должен обрабатывать три типа сообщений

  • java.lang.String — отправлено потоком читателя ввода
  • RemoteNodeReady — отправляется подчиненным актером, когда он был запущен. Чуть позже увидим как это происходит
  • RemoteNodeDiscovery — отправлено слушателем, которого мы зарегистрировали раньше

Давайте начнем один за другим

Обработка входных строк наиболее скучна. Это не команда выхода, мы посылаем сообщение всем подчиненным узлам, чтобы выполнить команду, которая напечатает это сообщение.

       void onMessage (String msg) {
switch (msg) {
case "exit":
GridFactory.stop(false)
stop ();
break;

default:
readyNodes.each { remoteNodeId ->
grid.sendMessage (grid.getNode(remoteNodeId), { println msg } as ExecuteCommand )
}
}
}

На самом деле, это не так скучно, как я говорю, если вы помните, что происходит после grid.sendMessage

  • GridGain отправляет сообщение на подчиненный узел
  • Сообщение представляет собой сериализованный код для выполнения (скучно то, что в нашем случае оно просто печатается, но представляет возможности)
  • Слушатель, зарегистрированный подчиненным актером, получает сообщение и отправляет его актеру.
  • Актер выполняет команду


Теперь мы можем разобраться с действительно скучной частью.
Когда мы получили сообщение RemoteNodeReady, нам нужно добавить идентификатор подчиненного узла, который отправил сообщение в список узлов, доступных нашему сервису (это означает, что узел подключен, код загружен, актер запущен и зарегистрирован).

       void onMessage (RemoteNodeReady msg) {
readyNodes << msg.nodeId
println "Node ${msg.nodeId} is connected and ready"
}

И, наконец, самая интересная часть — обработка событий открытия.

       void onMessage (RemoteNodeDiscovery msg) {
if (msg.type != GridDiscoveryEventType.METRICS_UPDATED)
println "${msg.type} ${msg.node}"

if (msg.type == GridDiscoveryEventType.JOINED)
grid.executeOnGridNode (msg.node, GridTest.createCodeToStartOnSlaveNodes(grid.localNode.id))

if (msg.type == GridDiscoveryEventType.LEFT || msg.type == GridDiscoveryEventType.FAILED) {
readyNodes.remove msg.node.id
}
}

 Когда подчиненный узел покинул нашу сетку или потерпел неудачу, мы просто удаляем его из набора доступных узлов. Это легкая часть. Интересная часть — обработка присоединенного узла. Нам нужно дать GridGain команду запустить подчиненный актер на узле, который только что присоединился. Вот как мы это делаем.

    static void executeOnGridNode (Grid grid, GridNode node, RemoteJob operation) {
GridTask task = [
map: { subgrid, arg -> [(operation) : node] },
result: { result, received -> GridJobResultPolicy.REDUCE },
reduce: { results -> results [0] }
]

grid.execute (task, null)
}

 Это, вероятно, требует некоторого объяснения. GridGain реализует шаблон карты / уменьшения. Чтобы выполнить некоторый код в сетке, нам нужно реализовать интерфейс GridTask. Для наших целей интересен только метод map. Он должен вернуть Map <GridJob, GridNode>. GridJob — это что выполнять, а GridNode — где выполнять. В нашем случае это особенно легко, поскольку у нас есть только одна работа и мы знаем, где именно мы хотим ее выполнить. Вывод типа Groovy помогает нам написать очень компактный код выше.

Существует две причины, по которым метод executeOnGridNode является статическим.

  1. как любой статический метод, он может использоваться как так называемый метод расширения  , поэтому мы можем написать grid.executeOnGridNode (…), как если бы это был метод интерфейса Grid, предоставленный GridGain.
  2. Если бы это был метод экземпляра, то экземпляр GridTask сохранял бы ссылку на экземпляр внешнего класса, что является огромной проблемой для правильной сериализации. В будущем статический компилятор будет содержать правильную обработку случая, когда такая ссылка на самом деле не используется


Теперь самый последний кусок кода.
Что такое RemoteJob и как мы создаем код для выполнения на подчиненном узле. На самом деле это просто техничность. Поскольку нам нужно, чтобы экземпляр Grid был доступен подчиненному актору, мы следуем некоторому ритуалу, который GridGain требует от нас (объявляем подкласс с полем, в которое будет добавлен экземпляр Grid). Затем Groovy-тип вывод делает свою работу.

   static abstract class RemoteJob implements GridJob {
@GridInstanceResource Grid grid

public void cancel() {}
}

static RemoteJob createCodeToStartOnSlaveNodes (UUID mainNodeId) {
{->
new SlaveActor(grid, mainNodeId).start()
grid.sendMessage(grid.getNode(mainNodeId), new RemoteNodeReady(nodeId: grid.localNode.id))
println "slave actor started"
}
}

 Снова метод createCodeToStartOnSlaveNodes является статическим из-за еще не реализованной функциональности, обсужденной выше.


Вуаля!
Мы сделали очень простое приложение, которое сочетает в себе довольно сложные концепции распределенных и параллельных вычислений. Static Groovy был нашим основным инструментом для упрощения кода и связывания вещей.