В прошлый раз мы говорили о реализации функциональных очередей с Groovy ++. Сегодня мы будем использовать эти очереди для реализации нескольких алгоритмов обработки асинхронных сообщений. Вы можете найти исходный код и другие примеры в дистрибутиве Groovy ++
Мы хотим реализовать упрощенный субъект, объект, который последовательно обрабатывает асинхронно поступающие сообщения. Есть два типа актеров
- Актер с привязкой к потоку, который имеет выделенный поток обработки сообщений. Тема заблокирована, если нет доступных сообщений
- объединенный в пул актер, который выполняется в некотором пуле потоков. Прелесть объединенного актера в том, что он вообще не потребляет никаких ресурсов, если нет сообщений для обработки
Мы попытаемся использовать тот же подход, основанный на наших функциональных очередях, для реализации обоих.
Давайте начнем с определения интерфейса для канала сообщений
@Trait abstract class MessageChannel<T> {
abstract MessageChannel<T> post (T message)
MessageChannel<T> leftShift (T msg) {
post msg
}
}
Аннотация @Trait — это Groovy ++ способ определения интерфейса с реализацией некоторых методов по умолчанию. Каждый класс, реализующий такой интерфейс, наследует реализацию по умолчанию, если метод не реализован классом или суперклассом.
Мы используем черту Groovy ++ здесь не потому, что это необходимо для нашего примера, а потому, что образец представляет собой реальный код из среды исполнения Groovy ++.
Обратите внимание, что канал сообщений не имеет ничего общего с параллелизмом — мы можем реализовать метод post любым удобным для нас способом. Просто для удовольствия мы можем сделать следующий класс Multiplexor, который сразу же перераспределяет все входящие сообщения на все подписанные каналы
class Multiplexor<M> implements MessageChannel<M> {
private volatile FList<MessageChannel<M>> listeners = FList.emptyList
Multiplexor<M> subscribe(MessageChannel<M> channel) {
for (;;) {
def l = listeners
if (listeners.compareAndSet(l, l + channel))
return this
}
}
Multiplexor<M> subscribe(MessageChannel<M> ... channels) {
for (c in channels) {
subscribe(c)
}
this
}
Multiplexor<M> unsubscribe(MessageChannel<M> channel) {
for (;;) {
def l = listeners
if (listeners.compareAndSet(l, l - channel))
return this
}
}
final Multiplexor<M> post(M message) {
listeners.each { channel ->
channel << message
}
this
}
static Multiplexor<M> of (MessageChannel<M> ... channels) {
new Multiplexor().subscribe(channels)
}
}
Вы можете заметить, что было почти тривиально позволить подписчикам подписываться и отписываться асинхронно, используя наши функциональные списки
Хорошо, вернемся к нашей основной истории. Давайте реализуем канал с асинхронной очередью, который обрабатывает не более одного сообщения в любой момент времени.
Наша идея заключается в следующем:
- we use functional queue to add messages
- when we add message to the queue we signal subclass (whatever it means for subclassing algorithm)
- we introduce special state of the queue to be used by subclasses, which means that the queue is already empty but last message is not processed yet. This is probably most non-trivial part of our algorithms
Here is the implementation
abstract class QueuedChannel<M> implements MessageChannel<M> {
protected volatile FQueue<M> queue = FQueue.emptyQueue
protected static final FQueue busyEmptyQueue = FQueue.emptyQueue + null
MessageChannel<M> post(M message) {
for (;;) {
def oldQueue = queue
def newQueue = (oldQueue === busyEmptyQueue ? FQueue.emptyQueue : oldQueue) + message
if (queue.compareAndSet(oldQueue, newQueue)) {
signalPost(oldQueue, newQueue)
return this
}
}
}
protected abstract void signalPost (FQueue<M> oldQueue, FQueue<M> newQueue)
abstract void onMessage(M message)
}
Now we are ready to create our first real actor backed by Executor and scheduled for execution for each message. We call it «fair» because it does not try to take as much resources as possible but give chance to work for all it’s collegues.
Here is explaination of the algorithm
- our channel implements Runnable. That might be not perfect from OOP prospective bus save us additional object creation
- when message added to empty queue we schedule actor for execution
- if after processing of a message our queue still non-empty we schedule again
- special care taken for the case when we process last message in the queue — we have to make sure that while we are not done new messages will not schedule new execution of the actor
abstract static class FairExecutingChannel<M> extends QueuedChannel<M> implements Runnable {
Executor executor
void run () {
for (;;) {
def q = queue
def removed = q.removeFirst()
if (q.size() == 1) {
if (queue.compareAndSet(q, busyEmptyQueue)) {
onMessage removed.first
if (!queue.compareAndSet(busyEmptyQueue, FQueue.emptyQueue)) {
executor.execute this
}
break
}
}
else {
if (queue.compareAndSet(q, removed.second)) {
onMessage removed.first
executor.execute this
break
}
}
}
}
protected void signalPost(FQueue<M> oldQueue, FQueue<M> newQueue) {
if (oldQueue !== busyEmptyQueue && newQueue.size() == 1)
executor.execute this
}
}
Fair algorithm above has one downside — if processing of messages is really fast we waste a lot of cycles by being executed for each and every message. That leads us to the idea of «non-fair» algorithm, which process all available messages when Runnable executed. For amounts of small messages it runs 2-3 times faster.
Here is the implementation, which is even simplier
@Typed abstract class NonfairExecutingChannel<M> extends FairExecutingChannel<M> {
void run () {
for (;;) {
def q = queue
if (queue.compareAndSet(q, busyEmptyQueue)) {
for(m in q) {
if (m)
onMessage m
}
if(!queue.compareAndSet(busyEmptyQueue, FQueue.emptyQueue)) {
executor.execute this
}
break
}
}
}
}
Intersting to notice that we can develop some variations of algorithms above. For example we can process as many messages as we can in given timeframe (let say 250ms) or given number of messages in a run. Functional data structures gives us a lot of flexibility.
To have the picture complete we should also implement thread backed variation of our approach. We leave it as exercise for reader
Thank you for reading and hope it was interesting. Till next time.