Итак, у вас есть причудливый дизайн с использованием актеров, вы выбрали JVM и мощный, лояльный подход Quasar к этой теме. Все мудрые решения, но каковы ваши варианты их распределения в кластере?
галактика
Galaxy — это действительно крутой вариант: быстрая сетка данных в памяти, оптимизированная для локальности данных с репликацией, опциональным постоянством, распределенным реестром акторов и даже миграцией актеров между узлами! Есть только одно предупреждение: пройдет еще несколько месяцев, прежде чем мы выпустим качественную, официально проверенную версию Galaxy. Текущая версия Galaxy не рекомендуется для производственного использования.
Что если нам нужно жить до этого?
К счастью, модель программирования блокировок Quasar Actors настолько проста, что ее интеграция с большинством решений для обмена сообщениями очень проста, и чтобы продемонстрировать это, давайте сделаем это с двумя быстрыми, популярными и совершенно разными: Apache Kafka и ØMQ .
Код и план
Все следующие примеры можно найти на GitHub , просто кратко рассмотрите короткий README и вы сразу же их запустите.
Есть два примера для каждого из Kafka и ØMQ:
- Быстрый и грязный человек, выполняющий прямую публикацию / опрос или отправку / прием звонков от актеров.
- Более сложный способ прохождения через прокси-субъектов, которые защищают ваш код от API обмена сообщениями. В качестве доказательства того, что я не вру, эта программа использует одинаковые классы актера-производителя и потребителя для обеих технологий и почти одну и ту же программу начальной загрузки.
Кафка
В Apache Kafka наблюдается резкое увеличение популярности благодаря уникальной конструкции, основанной на журналах фиксации для обеспечения долговечности и группах потребителей для параллельного потребления сообщений: благодаря этой комбинации появился быстрый, надежный, гибкий и масштабируемый брокер.
API включает две разновидности производителей: синхронизацию и асинхронность и одну из потребителей (только синхронизация); Comsat включает в себя интеграцию производителей Kafka с поддержкой оптоволокна.
Дескриптор производителя Kafka является поточно-ориентированным, лучше всего работает при совместном использовании и может быть легко получен и использован в теле актера (или где-либо еще) следующим образом:
|
01
02
03
04
05
06
07
08
09
10
11
|
final Properties producerConfig = new Properties();producerConfig.put("bootstrap.servers", "localhost:9092");producerConfig.put("client.id", "DemoProducer");producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");try (final FiberKafkaProducer<Integer, byte[]> producer = new FiberKafkaProducer<>(new KafkaProducer<>(producerConfig))) { final byte[] myBytes = getMyBytes(); // ... final Future<RecordMetaData> res = producer.send(new ProducerRecord<>("MyTopic", i, myBytes)); res.get(); // Optional, blocks the fiber until the record is persisted; thre's also `producer.flush()`} |
Мы обертываем объект KafkaProducer вместе с KafkaProducer от Comsat, чтобы вернуть блокирующее волокно будущее.
Однако дескриптор потребителя не является поточно-ориентированным 1 и блокирует только потоки:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
final Properties producerConfig = new Properties();consumerConfig = new Properties();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP);consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) { consumer.subscribe(Collections.singletonList(TOPIC)); final ConsumerRecords<Integer, byte[]> records = consumer.poll(1000L); for (final ConsumerRecord<Integer, byte[]> record : records) { final byte[] v = record.value(); useMyBytes(v); // ... }} |
Так как мы не хотим блокировать базовый пул потоков волокна (кроме тех, которые Kafka блокирует под прикрытием — мы не можем ничего с ними поделать), в doRun нашего актера мы будем использовать вместо FiberAsync.runBlocking для подачи фиксированных Размер executor с асинхронной задачей, которая будет просто блокировать волокно, пока poll (который будет выполняться в данном пуле) не вернет:
|
01
02
03
04
05
06
07
08
09
10
|
final ExecutorService e = Executors.newFixedThreadPool(2);try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) { consumer.subscribe(Collections.singletonList(TOPIC)); final ConsumerRecords<Integer, byte[]> records = call(e, () -> consumer.poll(1000L)); for (final ConsumerRecord<Integer, byte[]> record : records) { final byte[] v = record.value(); useMyBytes(v); // ... }} |
Где call — это служебный метод, определенный следующим образом (если бы не эта ошибка Java-компилятора, он не был бы необходим):
|
01
02
03
04
05
06
07
08
09
10
|
@Suspendablepublic static <V> V call(ExecutorService es, Callable<V> c) throws InterruptedException, SuspendExecution { try { return runBlocking(es, (CheckedCallable<V, Exception>) c::call); } catch (final InterruptedException | SuspendExecution e) { throw e; } catch (final Exception e) { throw new RuntimeException(e); }} |
В первом полном примере мы отправляем тысячу сериализованных сообщений от актера-производителя к потребителю.
ØMQ
Ø MQ (или ZeroMQ) не является централизованным брокерским решением и является скорее обобщением сокетов для различных шаблонов коммуникации (запрос / ответ, публикация / подписка и т. Д.). В наших примерах мы будем использовать простейший шаблон запроса-ответа. Вот наш новый код производителя:
|
1
2
3
4
5
6
7
|
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */); final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) { trgt.connect("tcp://localhost:8000"); final byte[] myBytes = getMyBytes(); // ... trgt.send(baos.toByteArray(), 0 /* flags */) trgt.recv(); // Reply, e.g. ACK} |
Как вы можете видеть, контекст действует как фабрика сокетов, и ему передается количество потоков ввода-вывода, которые будут использоваться: это потому, что сокеты ØMQ не являются дескрипторами ОС, привязанными к соединению, а скорее простым интерфейсом для механизма, который будет обрабатывать повтор соединения, несколько соединений, эффективный параллельный ввод-вывод и даже организация очереди для вас. По этой причине send вызовов почти никогда не блокируется, и recv call — это не вызов ввода-вывода для соединения, а скорее синхронизация между вашим потоком и специализированной задачей ввода-вывода, которая передает входящие байты из одного или даже нескольких соединений.
Вместо потоков мы будем блокировать волокна в наших актерах, поэтому давайте использовать FiberAsync.runBlocking при вызовах read и, на всякий случай, он блокирует даже при send :
|
1
2
3
4
5
6
7
8
9
|
final ExecutorService ep = Executors.newFixedThreadPool(2);try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */); final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) { exec(e, () -> trgt.connect("tcp://localhost:8000")); final byte[] myBytes = getMyBytes(); // ... call(e, trgt.send(myBytes, 0 /* flags */)); call(e, trgt::recv); // Reply, e.g. ACK} |
Вот потребитель:
|
1
2
3
4
5
6
7
|
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */); final ZMQ.Socket src = zmq.socket(ZMQ.REP)) { exec(e, () -> src.bind("tcp://*:8000")); final byte[] v = call(e, src::recv); exec(e, () -> src.send("ACK")); useMyBytes(v); // ...} |
где exec — еще одна служебная функция, похожая на call :
|
01
02
03
04
05
06
07
08
09
10
|
@Suspendablepublic static void exec(ExecutorService es, Runnable r) throws InterruptedException, SuspendExecution { try { runBlocking(es, (CheckedCallable<Void, Exception>) () -> { r.run(); return null; }); } catch (final InterruptedException | SuspendExecution e) { throw e; } catch (final Exception e) { throw new RuntimeException(e); }} |
И вот полный первый пример .
Распределение без изменения логики: слабая связь для спасения
Это просто, не правда ли? Хотя есть кое-что раздражающее: мы имеем дело с актерами на другой стороне сети совсем не так, как местные. Вот актеры, которых мы хотели бы написать, независимо от того, где они находятся и как они связаны:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
public final class ProducerActor extends BasicActor<Void, Void> { private final ActorRef<Msg> target; public ProducerActor(ActorRef<Msg> target) { this.target = target; } @Override protected final Void doRun() throws InterruptedException, SuspendExecution { for (int i = 0; i < MSGS; i++) { final Msg m = new Msg(i); System.err.println("USER PRODUCER: " + m); target.send(m); } System.err.println("USER PRODUCER: " + EXIT); target.send(EXIT); return null; }} |
|
01
02
03
04
05
06
07
08
09
10
11
|
public final class ConsumerActor extends BasicActor<Msg, Void> { @Override protected final Void doRun() throws InterruptedException, SuspendExecution { for (;;) { final Msg m = receive(); System.err.println("USER CONSUMER: " + m); if (EXIT.equals(m)) return null; } }} |
К счастью, каждый участник, независимо от того, что он делает, имеет один и тот же очень простой интерфейс: очередь входящих сообщений, называемая почтовым ящиком . Это означает, что мы можем вставить между двумя взаимодействующими субъектами столько промежуточных участников или прокси-серверов , сколько мы хотим, и, в частности, нам нужен отправляющий прокси-сервер, который будет передавать сообщения через промежуточное ПО на хост назначения, и принимающий прокси-сервер, который будет захватывать входящие сообщения. и поместит их в почтовый ящик предполагаемого места назначения.
Таким образом, в нашей основной программе мы просто предоставим нашему ProducerActor подходящий прокси-сервер отправки и позволим нашему ConsumerActor получать ConsumerActor от подходящего прокси-сервера получения:
|
01
02
03
04
05
06
07
08
09
10
|
final ProducerActor pa = Actor.newActor(ProducerActor.class, getSendingProxy()); // ...final ConsumerActor ca = Actor.newActor(ConsumerActor.class);pa.spawn();System.err.println("USER PRODUCER started");subscribeToReceivingProxy(ca.spawn()); // ...System.err.println("USER CONSUMER started");pa.join();System.err.println("USER PRODUCER finished");ca.join();System.err.println("USER CONSUMER finished"); |
Давайте посмотрим, как мы можем реализовать эти прокси сначала с Kafka, а затем с ØMQ.
Кафка актер прокси
Фабрика прокси-актеров будет привязана к определенной теме Kafka: это потому, что тема может быть разделена таким образом, что несколько потребителей могут читать из нее одновременно. Мы хотим иметь возможность оптимально использовать максимальный уровень или параллельность каждой темы:
|
1
2
3
4
5
|
/* ... */ KafkaProxies implements AutoCloseable { /* ... */ KafkaProxies(String bootstrap, String topic) { /* ... */ } // ...} |
Конечно, мы хотим использовать тему для нескольких действующих лиц, поэтому при отправке прокси-серверов будет указан идентификатор субъекта-получателя, а принимающие прокси-серверы перенаправят сообщение только тем действующим лицам, которые привязаны к этому идентификатору:
|
1
2
3
4
|
/* ... */ <M> ActorRef<M> create(String actorID) { /* ... */ }/* ... */ void drop(ActorRef ref) throws ExecutionException, InterruptedException { /* ... */ }/* ... */ <M> void subscribe(ActorRef<? super M> consumer, String actorID) { /* ... */ }/* ... */ void unsubscribe(ActorRef<?> consumer, String actorID) { /* ... */ } |
Закрытие фабрики AutoClosable скажет всем прокси прекратить и AutoClosable бухгалтерские ссылки:
|
1
|
/* ... */ void close() throws Exception { /* ... */ } |
Реализация производителя довольно проста и неинтересна, в то время как потребителю немного больше специй, потому что он будет использовать избирательное получение Quasar Actors для сохранения входящих сообщений в своем почтовом ящике, пока не будет хотя бы один подписанный пользовательский субъект, который сможет их использовать:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
@Overrideprotected Void doRun() throws InterruptedException, SuspendExecution { //noinspection InfiniteLoopStatement for (;;) { // Try extracting from queue final Object msg = tryReceive((Object m) -> { if (EXIT.equals(m)) return EXIT; if (m != null) { //noinspection unchecked final ProxiedMsg rmsg = (ProxiedMsg) m; final List<ActorRef> l = subscribers.get(rmsg.actorID); if (l != null) { boolean sent = false; for (final ActorRef r : l) { //noinspection unchecked r.send(rmsg.payload); sent = true; } if (sent) // Someone was listening, remove from queue return m; } } return null; // No subscribers (leave in queue) or no messages }); // Something from queue if (msg != null) { if (EXIT.equals(msg)) { return null; } continue; // Go to next cycle -> precedence to queue } // Try receiving //noinspection Convert2Lambda final ConsumerRecords<Void, byte[]> records = call(e, () -> consumer.get().poll(100L)); for (final ConsumerRecord<Void, byte[]> record : records) { final byte[] v = record.value(); try (final ByteArrayInputStream bis = new ByteArrayInputStream(v); final ObjectInputStream ois = new ObjectInputStream(bis)) { //noinspection unchecked final ProxiedMsg rmsg = (ProxiedMsg) ois.readObject(); final List<ActorRef> l = subscribers.get(rmsg.actorID); if (l != null && l.size() > 0) { for (final ActorRef r : l) { //noinspection unchecked r.send(rmsg.payload); } } else { ref().send(rmsg); // Enqueue } } catch (final IOException | ClassNotFoundException e) { e.printStackTrace(); throw new RuntimeException(e); } }} |
Поскольку нам также необходимо обработать почтовый ящик, мы опрашиваем Кафку с достаточно небольшим таймаутом. Также обратите внимание, что многие участники могут подписаться на один и тот же идентификатор, и входящее сообщение будет транслироваться всем им. Количество полученных прокси-серверов-актеров (то есть волокон) для каждой темы, а также число потоков пула и дескрипторов consumer Kafka ( consumer является локальным для потока, поскольку потребители Kafka не являются поточно-ориентированными) будет равно числу разделов по теме: это позволяет получить максимальную пропускную способность.
В настоящее время эта реализация использует сериализацию Java для преобразования сообщений в байты и из них, но, конечно, можно использовать и другие платформы, такие как Kryo .
ØMQ актерские прокси
Модель ØMQ полностью децентрализована: здесь нет ни брокеров, ни тем, поэтому мы можем просто приравнять адрес / конечную точку ØMQ к набору действующих лиц, не используя дополнительный идентификатор участника:
|
1
2
3
4
5
6
7
8
|
/* ... */ ZeroMQProxies implements AutoCloseable { /* ... */ ZeroMQProxies(int ioThreads) { /* ... */ } /* ... */ <M> ActorRef<M> to(String trgtZMQAddress) { /* ... */ } /* ... */ void drop(String trgtZMQAddress) /* ... */ void subscribe(ActorRef<? super M> consumer, String srcZMQEndpoint) { /* ... */ } /* ... */ void unsubscribe(ActorRef<?> consumer, String srcZMQEndpoint) { /* ... */ } /* ... */ void close() throws Exception { /* ... */ }} |
В этом случае и по той же причине, что и раньше, потребитель немного интересен, к счастью, любые проблемы с безопасностью потоков, потому что сокеты ØMQ прекрасно работают в нескольких потоках:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
@Overrideprotected Void doRun() throws InterruptedException, SuspendExecution { try(final ZMQ.Socket src = zmq.socket(ZMQ.REP)) { System.err.printf("PROXY CONSUMER: binding %s\n", srcZMQEndpoint); Util.exec(e, () -> src.bind(srcZMQEndpoint)); src.setReceiveTimeOut(100); //noinspection InfiniteLoopStatement for (;;) { // Try extracting from queue final Object m = tryReceive((Object o) -> { if (EXIT.equals(o)) return EXIT; if (o != null) { //noinspection unchecked final List<ActorRef> l = subscribers.get(srcZMQEndpoint); if (l != null) { boolean sent = false; for (final ActorRef r : l) { //noinspection unchecked r.send(o); sent = true; } if (sent) // Someone was listening, remove from queue return o; } } return null; // No subscribers (leave in queue) or no messages }); // Something processable is there if (m != null) { if (EXIT.equals(m)) { return null; } continue; // Go to next cycle -> precedence to queue } System.err.println("PROXY CONSUMER: receiving"); final byte[] msg = Util.call(e, src::recv); if (msg != null) { System.err.println("PROXY CONSUMER: ACKing"); Util.exec(e, () -> src.send(ACK)); final Object o; try (final ByteArrayInputStream bis = new ByteArrayInputStream(msg); final ObjectInputStream ois = new ObjectInputStream(bis)) { o = ois.readObject(); } catch (final IOException | ClassNotFoundException e) { e.printStackTrace(); throw new RuntimeException(e); } System.err.printf("PROXY CONSUMER: distributing '%s' to %d subscribers\n", o, subscribers.size()); //noinspection unchecked for (final ActorRef s : subscribers.getOrDefault(srcZMQEndpoint, (List<ActorRef>) Collections.EMPTY_LIST)) //noinspection unchecked s.send(o); } else { System.err.println("PROXY CONSUMER: receive timeout"); } } }} |
Больше возможностей
Надеемся, что эта короткая рецензия дала представление о том, как легко можно легко связать актеров Quasar с решениями для обмена сообщениями благодаря их природе простых последовательных процессов; конечно можно пойти дальше, например:
- Поиск и обнаружение действующих лиц: как мы предоставляем глобальную службу именования и обнаружения участников? Например, Kafka использует ZooKeeper, так что, вероятно, это стоит использовать, но ØMQ делает серьезную ставку на децентрализацию и намеренно не обеспечивает готовую основу.
- Управление сбоями субъектов: как мы можем поддерживать связи и наблюдения за сбоями между участниками, которые работают в разных узлах?
- Маршрутизация сообщений : как динамически настроить потоки сообщений между узлами и субъектами, не меняя логику внутри участников?
- Мобильность акторов : как мы можем переместить акторов на другие узлы, например, ближе к их источнику сообщений, чтобы повысить производительность или в место с другими свойствами безопасности?
- Масштабируемость и отказоустойчивость : как управлять добавлением, удалением, уничтожением и разбиением узлов акторов? Распределенные IMDG, такие как Galaxy, и решения на основе брокеров, такие как Kafka, обычно уже делают это, но решения на уровне фабрики, такие как ØMQ, обычно этого не делают.
- Безопасность : как мы поддерживаем соответствующие свойства информационной безопасности?
- Тестирование, ведение журнала, мониторинг : как нам удобно тестировать, отслеживать и контролировать распределенный актерский ансамбль в целом?
Эти темы являются «крепким орешком» проектирования распределенных систем и, в частности, действующих лиц, поэтому для их эффективного решения могут потребоваться значительные усилия. Galaxy рассматривает все из них, но участники Quasar предоставляют SPI, который охватывает некоторые из вышеперечисленных тем и который обеспечивает более тесную интеграцию с технологиями распространения. Вас также может заинтересовать сравнение между Akka и Quasar + Galaxy, которое охватывает множество таких аспектов.
Вот и все, так что повеселитесь с вашими распределенными актерами Quasar и оставьте записку о вашем путешествии в группе пользователей Quasar-Pulsar !
- На самом деле это также запрещает использование любыми потоками, кроме первого.
| Ссылка: | Распространенные актеры квазара с Кафкой и ZeroMQ от нашего партнера по JCG Фабио Тудоне в блоге Parallel Universe . |