Статьи

Распределенные актеры квазара с Kafka и ZeroMQ

Итак, у вас есть причудливый дизайн с использованием актеров, вы выбрали JVM и мощный, лояльный подход Quasar к этой теме. Все мудрые решения, но каковы ваши варианты их распределения в кластере?

галактика

Galaxy — это действительно крутой вариант: быстрая сетка данных в памяти, оптимизированная для локальности данных с репликацией, опциональным постоянством, распределенным реестром акторов и даже миграцией актеров между узлами! Есть только одно предупреждение: пройдет еще несколько месяцев, прежде чем мы выпустим качественную, официально проверенную версию Galaxy. Текущая версия Galaxy не рекомендуется для производственного использования.

Что если нам нужно жить до этого?

К счастью, модель программирования блокировок Quasar Actors настолько проста, что ее интеграция с большинством решений для обмена сообщениями очень проста, и чтобы продемонстрировать это, давайте сделаем это с двумя быстрыми, популярными и совершенно разными: Apache Kafka и ØMQ .

Код и план

Все следующие примеры можно найти на GitHub , просто кратко рассмотрите короткий README и вы сразу же их запустите.

Есть два примера для каждого из Kafka и ØMQ:

  • Быстрый и грязный человек, выполняющий прямую публикацию / опрос или отправку / прием звонков от актеров.
  • Более сложный способ прохождения через прокси-субъектов, которые защищают ваш код от API обмена сообщениями. В качестве доказательства того, что я не вру, эта программа использует одинаковые классы актера-производителя и потребителя для обеих технологий и почти одну и ту же программу начальной загрузки.

Кафка

В Apache Kafka наблюдается резкое увеличение популярности благодаря уникальной конструкции, основанной на журналах фиксации для обеспечения долговечности и группах потребителей для параллельного потребления сообщений: благодаря этой комбинации появился быстрый, надежный, гибкий и масштабируемый брокер.

API включает две разновидности производителей: синхронизацию и асинхронность и одну из потребителей (только синхронизация); Comsat включает в себя интеграцию производителей Kafka с поддержкой оптоволокна.

Дескриптор производителя Kafka является поточно-ориентированным, лучше всего работает при совместном использовании и может быть легко получен и использован в теле актера (или где-либо еще) следующим образом:

01
02
03
04
05
06
07
08
09
10
11
final Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("client.id", "DemoProducer");
producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
 
try (final FiberKafkaProducer<Integer, byte[]> producer = new FiberKafkaProducer<>(new KafkaProducer<>(producerConfig))) {
     final byte[] myBytes = getMyBytes(); // ...
     final Future<RecordMetaData> res = producer.send(new ProducerRecord<>("MyTopic", i, myBytes));
     res.get(); // Optional, blocks the fiber until the record is persisted; thre's also `producer.flush()`
}

Мы обертываем объект KafkaProducer вместе с KafkaProducer от Comsat, чтобы вернуть блокирующее волокно будущее.

Однако дескриптор потребителя не является поточно-ориентированным 1 и блокирует только потоки:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
final Properties producerConfig = new Properties();
consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 
try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {
    consumer.subscribe(Collections.singletonList(TOPIC));
    final ConsumerRecords<Integer, byte[]> records = consumer.poll(1000L);
    for (final ConsumerRecord<Integer, byte[]> record : records) {
        final byte[] v = record.value();
        useMyBytes(v); // ...
    }
}

Так как мы не хотим блокировать базовый пул потоков волокна (кроме тех, которые Kafka блокирует под прикрытием — мы не можем ничего с ними поделать), в doRun нашего актера мы будем использовать вместо FiberAsync.runBlocking для подачи фиксированных Размер executor с асинхронной задачей, которая будет просто блокировать волокно, пока poll (который будет выполняться в данном пуле) не вернет:

01
02
03
04
05
06
07
08
09
10
final ExecutorService e = Executors.newFixedThreadPool(2);
 
try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {
    consumer.subscribe(Collections.singletonList(TOPIC));
    final ConsumerRecords<Integer, byte[]> records = call(e, () -> consumer.poll(1000L));
    for (final ConsumerRecord<Integer, byte[]> record : records) {
        final byte[] v = record.value();
        useMyBytes(v); // ...
    }
}

Где call — это служебный метод, определенный следующим образом (если бы не эта ошибка Java-компилятора, он не был бы необходим):

01
02
03
04
05
06
07
08
09
10
@Suspendable
public static <V> V call(ExecutorService es, Callable<V> c) throws InterruptedException, SuspendExecution {
    try {
        return runBlocking(es, (CheckedCallable<V, Exception>) c::call);
    } catch (final InterruptedException | SuspendExecution e) {
        throw e;
    } catch (final Exception e) {
        throw new RuntimeException(e);
    }
}

В первом полном примере мы отправляем тысячу сериализованных сообщений от актера-производителя к потребителю.

ØMQ

Ø MQ (или ZeroMQ) не является централизованным брокерским решением и является скорее обобщением сокетов для различных шаблонов коммуникации (запрос / ответ, публикация / подписка и т. Д.). В наших примерах мы будем использовать простейший шаблон запроса-ответа. Вот наш новый код производителя:

1
2
3
4
5
6
7
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
     final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {
    trgt.connect("tcp://localhost:8000");
    final byte[] myBytes = getMyBytes(); // ...
    trgt.send(baos.toByteArray(), 0 /* flags */)
    trgt.recv(); // Reply, e.g. ACK
}

Как вы можете видеть, контекст действует как фабрика сокетов, и ему передается количество потоков ввода-вывода, которые будут использоваться: это потому, что сокеты ØMQ не являются дескрипторами ОС, привязанными к соединению, а скорее простым интерфейсом для механизма, который будет обрабатывать повтор соединения, несколько соединений, эффективный параллельный ввод-вывод и даже организация очереди для вас. По этой причине send вызовов почти никогда не блокируется, и recv call — это не вызов ввода-вывода для соединения, а скорее синхронизация между вашим потоком и специализированной задачей ввода-вывода, которая передает входящие байты из одного или даже нескольких соединений.

Вместо потоков мы будем блокировать волокна в наших актерах, поэтому давайте использовать FiberAsync.runBlocking при вызовах read и, на всякий случай, он блокирует даже при send :

1
2
3
4
5
6
7
8
9
final ExecutorService ep = Executors.newFixedThreadPool(2);
 
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
     final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {
    exec(e, () -> trgt.connect("tcp://localhost:8000"));
    final byte[] myBytes = getMyBytes(); // ...
    call(e, trgt.send(myBytes, 0 /* flags */));
    call(e, trgt::recv); // Reply, e.g. ACK
}

Вот потребитель:

1
2
3
4
5
6
7
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
     final ZMQ.Socket src = zmq.socket(ZMQ.REP)) {
    exec(e, () -> src.bind("tcp://*:8000"));
    final byte[] v = call(e, src::recv);
    exec(e, () -> src.send("ACK"));
    useMyBytes(v); // ...
}

где exec — еще одна служебная функция, похожая на call :

01
02
03
04
05
06
07
08
09
10
@Suspendable
public static void exec(ExecutorService es, Runnable r) throws InterruptedException, SuspendExecution {
    try {
        runBlocking(es, (CheckedCallable<Void, Exception>) () -> { r.run(); return null; });
    } catch (final InterruptedException | SuspendExecution e) {
        throw e;
    } catch (final Exception e) {
        throw new RuntimeException(e);
    }
}

И вот полный первый пример .

Распределение без изменения логики: слабая связь для спасения

Это просто, не правда ли? Хотя есть кое-что раздражающее: мы имеем дело с актерами на другой стороне сети совсем не так, как местные. Вот актеры, которых мы хотели бы написать, независимо от того, где они находятся и как они связаны:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
public final class ProducerActor extends BasicActor<Void, Void> {
    private final ActorRef<Msg> target;
 
    public ProducerActor(ActorRef<Msg> target) {
        this.target = target;
    }
 
    @Override
    protected final Void doRun() throws InterruptedException, SuspendExecution {
        for (int i = 0; i < MSGS; i++) {
            final Msg m = new Msg(i);
            System.err.println("USER PRODUCER: " + m);
            target.send(m);
        }
        System.err.println("USER PRODUCER: " + EXIT);
        target.send(EXIT);
        return null;
    }
}
01
02
03
04
05
06
07
08
09
10
11
public final class ConsumerActor extends BasicActor<Msg, Void> {
    @Override
    protected final Void doRun() throws InterruptedException, SuspendExecution {
        for (;;) {
            final Msg m = receive();
            System.err.println("USER CONSUMER: " + m);
            if (EXIT.equals(m))
                return null;
        }
    }
}

К счастью, каждый участник, независимо от того, что он делает, имеет один и тот же очень простой интерфейс: очередь входящих сообщений, называемая почтовым ящиком . Это означает, что мы можем вставить между двумя взаимодействующими субъектами столько промежуточных участников или прокси-серверов , сколько мы хотим, и, в частности, нам нужен отправляющий прокси-сервер, который будет передавать сообщения через промежуточное ПО на хост назначения, и принимающий прокси-сервер, который будет захватывать входящие сообщения. и поместит их в почтовый ящик предполагаемого места назначения.

Таким образом, в нашей основной программе мы просто предоставим нашему ProducerActor подходящий прокси-сервер отправки и позволим нашему ConsumerActor получать ConsumerActor от подходящего прокси-сервера получения:

01
02
03
04
05
06
07
08
09
10
final ProducerActor pa = Actor.newActor(ProducerActor.class, getSendingProxy()); // ...
final ConsumerActor ca = Actor.newActor(ConsumerActor.class);
pa.spawn();
System.err.println("USER PRODUCER started");
subscribeToReceivingProxy(ca.spawn()); // ...
System.err.println("USER CONSUMER started");
pa.join();
System.err.println("USER PRODUCER finished");
ca.join();
System.err.println("USER CONSUMER finished");

Давайте посмотрим, как мы можем реализовать эти прокси сначала с Kafka, а затем с ØMQ.

Кафка актер прокси

Фабрика прокси-актеров будет привязана к определенной теме Kafka: это потому, что тема может быть разделена таким образом, что несколько потребителей могут читать из нее одновременно. Мы хотим иметь возможность оптимально использовать максимальный уровень или параллельность каждой темы:

1
2
3
4
5
/* ... */ KafkaProxies implements AutoCloseable {
    /* ... */ KafkaProxies(String bootstrap, String topic) { /* ... */ }
 
    // ...
}

Конечно, мы хотим использовать тему для нескольких действующих лиц, поэтому при отправке прокси-серверов будет указан идентификатор субъекта-получателя, а принимающие прокси-серверы перенаправят сообщение только тем действующим лицам, которые привязаны к этому идентификатору:

1
2
3
4
/* ... */ <M> ActorRef<M> create(String actorID) { /* ... */ }
/* ... */ void drop(ActorRef ref) throws ExecutionException, InterruptedException { /* ... */ }
/* ... */ <M> void subscribe(ActorRef<? super M> consumer, String actorID) { /* ... */ }
/* ... */ void unsubscribe(ActorRef<?> consumer, String actorID) { /* ... */ }

Закрытие фабрики AutoClosable скажет всем прокси прекратить и AutoClosable бухгалтерские ссылки:

1
/* ... */ void close() throws Exception { /* ... */ }

Реализация производителя довольно проста и неинтересна, в то время как потребителю немного больше специй, потому что он будет использовать избирательное получение Quasar Actors для сохранения входящих сообщений в своем почтовом ящике, пока не будет хотя бы один подписанный пользовательский субъект, который сможет их использовать:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Override
protected Void doRun() throws InterruptedException, SuspendExecution {
    //noinspection InfiniteLoopStatement
    for (;;) {
    // Try extracting from queue
    final Object msg = tryReceive((Object m) -> {
        if (EXIT.equals(m))
            return EXIT;
        if (m != null) {
            //noinspection unchecked
            final ProxiedMsg rmsg = (ProxiedMsg) m;
            final List<ActorRef> l = subscribers.get(rmsg.actorID);
            if (l != null) {
                boolean sent = false;
                for (final ActorRef r : l) {
                    //noinspection unchecked
                    r.send(rmsg.payload);
                    sent = true;
                }
                if (sent) // Someone was listening, remove from queue
                    return m;
            }
        }
        return null; // No subscribers (leave in queue) or no messages
    });
    // Something from queue
    if (msg != null) {
        if (EXIT.equals(msg)) {
            return null;
        }
        continue; // Go to next cycle -> precedence to queue
    }
 
    // Try receiving
    //noinspection Convert2Lambda
    final ConsumerRecords<Void, byte[]> records = call(e, () -> consumer.get().poll(100L));
    for (final ConsumerRecord<Void, byte[]> record : records) {
        final byte[] v = record.value();
        try (final ByteArrayInputStream bis = new ByteArrayInputStream(v);
             final ObjectInputStream ois = new ObjectInputStream(bis)) {
 
            //noinspection unchecked
            final ProxiedMsg rmsg = (ProxiedMsg) ois.readObject();
            final List<ActorRef> l = subscribers.get(rmsg.actorID);
            if (l != null && l.size() > 0) {
                for (final ActorRef r : l) {
                    //noinspection unchecked
                    r.send(rmsg.payload);
                }
            } else {
                ref().send(rmsg); // Enqueue
            }
        } catch (final IOException | ClassNotFoundException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}

Поскольку нам также необходимо обработать почтовый ящик, мы опрашиваем Кафку с достаточно небольшим таймаутом. Также обратите внимание, что многие участники могут подписаться на один и тот же идентификатор, и входящее сообщение будет транслироваться всем им. Количество полученных прокси-серверов-актеров (то есть волокон) для каждой темы, а также число потоков пула и дескрипторов consumer Kafka ( consumer является локальным для потока, поскольку потребители Kafka не являются поточно-ориентированными) будет равно числу разделов по теме: это позволяет получить максимальную пропускную способность.

В настоящее время эта реализация использует сериализацию Java для преобразования сообщений в байты и из них, но, конечно, можно использовать и другие платформы, такие как Kryo .

ØMQ актерские прокси

Модель ØMQ полностью децентрализована: здесь нет ни брокеров, ни тем, поэтому мы можем просто приравнять адрес / конечную точку ØMQ к набору действующих лиц, не используя дополнительный идентификатор участника:

1
2
3
4
5
6
7
8
/* ... */ ZeroMQProxies implements AutoCloseable {
    /* ... */ ZeroMQProxies(int ioThreads) { /* ... */ }
    /* ... */ <M> ActorRef<M> to(String trgtZMQAddress) { /* ... */ }
    /* ... */ void drop(String trgtZMQAddress)
    /* ... */ void subscribe(ActorRef<? super M> consumer, String srcZMQEndpoint) { /* ... */ }
    /* ... */ void unsubscribe(ActorRef<?> consumer, String srcZMQEndpoint) { /* ... */ }
    /* ... */ void close() throws Exception { /* ... */ }
}

В этом случае и по той же причине, что и раньше, потребитель немного интересен, к счастью, любые проблемы с безопасностью потоков, потому что сокеты ØMQ прекрасно работают в нескольких потоках:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override
protected Void doRun() throws InterruptedException, SuspendExecution {
    try(final ZMQ.Socket src = zmq.socket(ZMQ.REP)) {
        System.err.printf("PROXY CONSUMER: binding %s\n", srcZMQEndpoint);
        Util.exec(e, () -> src.bind(srcZMQEndpoint));
        src.setReceiveTimeOut(100);
        //noinspection InfiniteLoopStatement
        for (;;) {
            // Try extracting from queue
            final Object m = tryReceive((Object o) -> {
                if (EXIT.equals(o))
                    return EXIT;
                if (o != null) {
                    //noinspection unchecked
                    final List<ActorRef> l = subscribers.get(srcZMQEndpoint);
                    if (l != null) {
                        boolean sent = false;
                        for (final ActorRef r : l) {
                            //noinspection unchecked
                            r.send(o);
                            sent = true;
                        }
                        if (sent) // Someone was listening, remove from queue
                            return o;
                    }
                }
                return null; // No subscribers (leave in queue) or no messages
            });
            // Something processable is there
            if (m != null) {
                if (EXIT.equals(m)) {
                    return null;
                }
                continue; // Go to next cycle -> precedence to queue
            }
 
            System.err.println("PROXY CONSUMER: receiving");
            final byte[] msg = Util.call(e, src::recv);
            if (msg != null) {
                System.err.println("PROXY CONSUMER: ACKing");
                Util.exec(e, () -> src.send(ACK));
                final Object o;
                try (final ByteArrayInputStream bis = new ByteArrayInputStream(msg);
                     final ObjectInputStream ois = new ObjectInputStream(bis)) {
                    o = ois.readObject();
                } catch (final IOException | ClassNotFoundException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
                System.err.printf("PROXY CONSUMER: distributing '%s' to %d subscribers\n", o, subscribers.size());
                //noinspection unchecked
                for (final ActorRef s : subscribers.getOrDefault(srcZMQEndpoint, (List<ActorRef>) Collections.EMPTY_LIST))
                    //noinspection unchecked
                    s.send(o);
            } else {
                System.err.println("PROXY CONSUMER: receive timeout");
            }
        }
    }
}

Больше возможностей

Надеемся, что эта короткая рецензия дала представление о том, как легко можно легко связать актеров Quasar с решениями для обмена сообщениями благодаря их природе простых последовательных процессов; конечно можно пойти дальше, например:

  • Поиск и обнаружение действующих лиц: как мы предоставляем глобальную службу именования и обнаружения участников? Например, Kafka использует ZooKeeper, так что, вероятно, это стоит использовать, но ØMQ делает серьезную ставку на децентрализацию и намеренно не обеспечивает готовую основу.
  • Управление сбоями субъектов: как мы можем поддерживать связи и наблюдения за сбоями между участниками, которые работают в разных узлах?
  • Маршрутизация сообщений : как динамически настроить потоки сообщений между узлами и субъектами, не меняя логику внутри участников?
  • Мобильность акторов : как мы можем переместить акторов на другие узлы, например, ближе к их источнику сообщений, чтобы повысить производительность или в место с другими свойствами безопасности?
  • Масштабируемость и отказоустойчивость : как управлять добавлением, удалением, уничтожением и разбиением узлов акторов? Распределенные IMDG, такие как Galaxy, и решения на основе брокеров, такие как Kafka, обычно уже делают это, но решения на уровне фабрики, такие как ØMQ, обычно этого не делают.
  • Безопасность : как мы поддерживаем соответствующие свойства информационной безопасности?
  • Тестирование, ведение журнала, мониторинг : как нам удобно тестировать, отслеживать и контролировать распределенный актерский ансамбль в целом?

Эти темы являются «крепким орешком» проектирования распределенных систем и, в частности, действующих лиц, поэтому для их эффективного решения могут потребоваться значительные усилия. Galaxy рассматривает все из них, но участники Quasar предоставляют SPI, который охватывает некоторые из вышеперечисленных тем и который обеспечивает более тесную интеграцию с технологиями распространения. Вас также может заинтересовать сравнение между Akka и Quasar + Galaxy, которое охватывает множество таких аспектов.

Вот и все, так что повеселитесь с вашими распределенными актерами Quasar и оставьте записку о вашем путешествии в группе пользователей Quasar-Pulsar !

  1. На самом деле это также запрещает использование любыми потоками, кроме первого.