Статьи

Событийные актеры в Groovy

После публикации моего предыдущего поста об актерах с привязкой к потокам в Groovy я получил несколько запросов на улучшение GParallelizer с поддержкой акторов на основе событий . Вкратце, акторы допускают модель параллелизма на основе сообщений , построенную из независимых активных объектов, которые обмениваются сообщениями и не имеют изменяемого общего состояния. Актеры, естественно, избегают таких проблем, как взаимоблокировки, блокировки в реальном времени или голодание, которые характерны для общей памяти.

Акторы на основе событий, в отличие от актеров с привязкой к потокам, описанных в исходной публикации, совместно используют общий пул потоков, из которого они заимствуют поток только для обработки одного входящего сообщения, а затем возвращают его в пул, чтобы поток мог обслуживать другие актеры по очереди. Актеры отсоединяются от нижележащих потоков, поэтому относительно небольшой пул потоков может обслуживать потенциально неограниченное количество акторов. Фактически неограниченная масштабируемость числа акторов является основным преимуществом акторов, основанных на событиях, по сравнению с нитями с привязкой к потоку, где каждый актер имеет свой собственный эксклюзивный фоновый поток, связанный с ним.

Реализация возможности динамического присоединения и отсоединения акторов и потоков является сложной задачей. Однако оказалось, что жизнь намного проще, если вы можете стоять на плечах гигантов — пакета java.util.concurrent и Groovy в моем случае. Чтобы дать вам представление о том, что теперь возможно с актерами на основе событий в GParallelizer , я подготовил несколько примеров:

import static org.gparallelizer.actors.pooledActors.PooledActors.*

final def decryptor = actor {
loop {
react {String message->
reply message.reverse()
}
}
}.start()

actor {
decryptor.send 'suonorhcnysa si yvoorG'
react {
println 'Decrypted message: ' + it
}
}.start()

Как вы можете видеть, вы создаете новых акторов с помощью метода actor (), передающего тело актера в качестве параметра замыкания. Внутри тела актера вы можете использовать loop () для итерации, реагировать () для получения сообщений и ответить (), чтобы отправить сообщение актеру, который отправил вам обработанное в данный момент сообщение. С помощью метода start () вы планируете обработчик для обработки в базовом пуле потоков.

Вот важная часть: когда актер расшифровщика не находит сообщение в своей очереди сообщений, методact () оставляет поток и возвращает его обратно в пул потоков, чтобы другие участники могли его забрать. Только после того, как новое сообщение поступит в очередь сообщений субъекта, закрытие метода реагирования () запланировано для обработки с пулом. Актеры, основанные на событиях, внутренне имитируют продолжения — работа актера разбивается на последовательно запускаемые фрагменты, которые вызываются, как только сообщение становится доступным во входящей почте. Каждый кусок для одного актера может быть выполнен другим потоком из пула потоков.

Простой калькулятор

Другой пример
актера, управляемого событиями, — калькулятор, который получает два числовых сообщения, суммирует их и отправляет результат в консоль субъекта. Я намеренно решил установить размер пула равным 1, чтобы показать, что даже однопотоковый пул может обрабатывать несколько участников. 

import static org.gparallelizer.actors.pooledActors.PooledActors.*

//not necessary, just showing that a single-threaded pool can still handle multiple actors
getPool().initialize 1

final def console = actor {
loop {
react {
println 'Result: ' + it
}
}
}.start()

final def calculator = actor {
react {a ->
react {b ->
console.send(a + b)
}
}
}.start()

calculator.send 2
calculator.send 3

Обратите внимание, что субъекты, управляемые событиями, требуют особого внимания в отношении метода реагирования () . Так как актеры , управляемые события , необходимо разделить код на независимые куски назначаемых для разных потоков последовательно и продолжений не изначально поддерживаются на JVM, чанки созданы искусственно за кулисами с Runnable задач и исключениями. В результате методы реагировать () и цикл () никогда не возвращаются нормально, и код акторов должен быть соответствующим образом структурирован. Я верю, что программисты Scala увидят, откуда я черпал вдохновение.

Реагируют () метод позволяет таймаут быть указан с помощью TimeCategory DSL:

import static org.gparallelizer.actors.pooledActors.PooledActors.*

def me = actor {
friend.send('Hi')
react(10.seconds) {message ->
//continue conversation
}
}

me.metaClass.onTimeout = {->friend.send('I see, busy as usual. Never mind.')}

Обратите внимание на возможность использования метапрограммирования Groovy для динамического определения методов уведомления о жизненном цикле субъекта ( например, onTimeout () ).

Пример параллельной сортировки слиянием

Для сравнения с
примером, приведенным для связанных с потоками акторов, я включаю более сложный пример выполнения параллельного слияния в виде списка целых чисел с использованием акторов. Как вы можете видеть, мы подошли довольно близко к модели Scala, хотя сопоставление с образцом в Scala для обработки сообщений по-прежнему требует больших усилий.

Closure createMessageHandler(def parentActor) {
return {
react {List message ->
assert message != null
switch (message.size()) {
case 0..1:
parentActor.send(message)
break
case 2:
if (message[0] <= message[1]) parentActor.send(message)
else parentActor.send(message[-1..0])
break
default:
def splitList = split(message)

def child1 = actor(createMessageHandler(delegate))
def child2 = actor(createMessageHandler(delegate))
child1.start().send(splitList[0])
child2.start().send(splitList[1])

react {message1 ->
react {message2 ->
parentActor.send merge(message1, message2)
}
}
}
}
}
}

def console = actor {
react { println "Sorted array:\t${it}" }
}.start()

def sorter = actor(createMessageHandler(console))
sorter.start().send([1, 5, 2, 4, 3, 8, 6, 7, 3, 9, 5, 3])

В этом примере показано основное преимущество актеров, управляемых событиями, — независимо от того, какой массив вы хотите отсортировать и, следовательно, сколько акторов вы создаете по пути, все они могут быть обработаны одним потоком.

Есть, конечно, еще некоторые грубые края и недостающие части. Теперь я хотел бы получить обратную связь, чтобы вести меня дальше. Не стесняйтесь скачать GParallelizer и дайте мне знать, что вы думаете. Все комментарии приветствуются.