Итак, у вас есть причудливый дизайн с использованием актеров, вы выбрали JVM и мощный, лояльный подход Quasar к этой теме. Все мудрые решения, но каковы ваши варианты их распределения в кластере?
галактика
Galaxy — это действительно крутой вариант: быстрая сетка данных в памяти, оптимизированная для локальности данных с репликацией, опциональным постоянством, распределенным реестром акторов и даже миграцией актеров между узлами! Есть только одно предупреждение: пройдет еще несколько месяцев, прежде чем мы выпустим качественную, официально проверенную версию Galaxy. Текущая версия Galaxy не рекомендуется для производственного использования.
Что если нам нужно жить до этого?
К счастью, модель программирования блокировок Quasar Actors настолько проста, что ее интеграция с большинством решений для обмена сообщениями очень проста, и чтобы продемонстрировать это, давайте сделаем это с двумя быстрыми, популярными и совершенно разными: Apache Kafka и ØMQ .
Код и план
Все следующие примеры можно найти на GitHub , просто кратко рассмотрите короткий README
и вы сразу же их запустите.
Есть два примера для каждого из Kafka и ØMQ:
- Быстрый и грязный человек, выполняющий прямую публикацию / опрос или отправку / прием звонков от актеров.
- Более сложный способ прохождения через прокси-субъектов, которые защищают ваш код от API обмена сообщениями. В качестве доказательства того, что я не вру, эта программа использует одинаковые классы актера-производителя и потребителя для обеих технологий и почти одну и ту же программу начальной загрузки.
Кафка
В Apache Kafka наблюдается резкое увеличение популярности благодаря уникальной конструкции, основанной на журналах фиксации для обеспечения долговечности и группах потребителей для параллельного потребления сообщений: благодаря этой комбинации появился быстрый, надежный, гибкий и масштабируемый брокер.
API включает две разновидности производителей: синхронизацию и асинхронность и одну из потребителей (только синхронизация); Comsat включает в себя интеграцию производителей Kafka с поддержкой оптоволокна.
Дескриптор производителя Kafka является поточно-ориентированным, лучше всего работает при совместном использовании и может быть легко получен и использован в теле актера (или где-либо еще) следующим образом:
01
02
03
04
05
06
07
08
09
10
11
|
final Properties producerConfig = new Properties(); producerConfig.put( "bootstrap.servers" , "localhost:9092" ); producerConfig.put( "client.id" , "DemoProducer" ); producerConfig.put( "key.serializer" , "org.apache.kafka.common.serialization.IntegerSerializer" ); producerConfig.put( "value.serializer" , "org.apache.kafka.common.serialization.ByteArraySerializer" ); try ( final FiberKafkaProducer<Integer, byte []> producer = new FiberKafkaProducer<>( new KafkaProducer<>(producerConfig))) { final byte [] myBytes = getMyBytes(); // ... final Future<RecordMetaData> res = producer.send( new ProducerRecord<>( "MyTopic" , i, myBytes)); res.get(); // Optional, blocks the fiber until the record is persisted; thre's also `producer.flush()` } |
Мы обертываем объект KafkaProducer
вместе с KafkaProducer
от Comsat, чтобы вернуть блокирующее волокно будущее.
Однако дескриптор потребителя не является поточно-ориентированным 1 и блокирует только потоки:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
final Properties producerConfig = new Properties(); consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer" ); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true" ); consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000" ); consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000" ); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer" ); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer" ); try ( final Consumer<Integer, byte []> consumer = new KafkaConsumer<>(consumerConfig)) { consumer.subscribe(Collections.singletonList(TOPIC)); final ConsumerRecords<Integer, byte []> records = consumer.poll(1000L); for ( final ConsumerRecord<Integer, byte []> record : records) { final byte [] v = record.value(); useMyBytes(v); // ... } } |
Так как мы не хотим блокировать базовый пул потоков волокна (кроме тех, которые Kafka блокирует под прикрытием — мы не можем ничего с ними поделать), в doRun
нашего актера мы будем использовать вместо FiberAsync.runBlocking
для подачи фиксированных Размер executor с асинхронной задачей, которая будет просто блокировать волокно, пока poll
(который будет выполняться в данном пуле) не вернет:
01
02
03
04
05
06
07
08
09
10
|
final ExecutorService e = Executors.newFixedThreadPool( 2 ); try ( final Consumer<Integer, byte []> consumer = new KafkaConsumer<>(consumerConfig)) { consumer.subscribe(Collections.singletonList(TOPIC)); final ConsumerRecords<Integer, byte []> records = call(e, () -> consumer.poll(1000L)); for ( final ConsumerRecord<Integer, byte []> record : records) { final byte [] v = record.value(); useMyBytes(v); // ... } } |
Где call
— это служебный метод, определенный следующим образом (если бы не эта ошибка Java-компилятора, он не был бы необходим):
01
02
03
04
05
06
07
08
09
10
|
@Suspendable public static <V> V call(ExecutorService es, Callable<V> c) throws InterruptedException, SuspendExecution { try { return runBlocking(es, (CheckedCallable<V, Exception>) c::call); } catch ( final InterruptedException | SuspendExecution e) { throw e; } catch ( final Exception e) { throw new RuntimeException(e); } } |
В первом полном примере мы отправляем тысячу сериализованных сообщений от актера-производителя к потребителю.
ØMQ
Ø MQ (или ZeroMQ) не является централизованным брокерским решением и является скорее обобщением сокетов для различных шаблонов коммуникации (запрос / ответ, публикация / подписка и т. Д.). В наших примерах мы будем использовать простейший шаблон запроса-ответа. Вот наш новый код производителя:
1
2
3
4
5
6
7
|
try ( final ZMQ.Context zmq = ZMQ.context( 1 /* IO threads */ ); final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) { trgt.connect( "tcp://localhost:8000" ); final byte [] myBytes = getMyBytes(); // ... trgt.send(baos.toByteArray(), 0 /* flags */ ) trgt.recv(); // Reply, e.g. ACK } |
Как вы можете видеть, контекст действует как фабрика сокетов, и ему передается количество потоков ввода-вывода, которые будут использоваться: это потому, что сокеты ØMQ не являются дескрипторами ОС, привязанными к соединению, а скорее простым интерфейсом для механизма, который будет обрабатывать повтор соединения, несколько соединений, эффективный параллельный ввод-вывод и даже организация очереди для вас. По этой причине send
вызовов почти никогда не блокируется, и recv
call — это не вызов ввода-вывода для соединения, а скорее синхронизация между вашим потоком и специализированной задачей ввода-вывода, которая передает входящие байты из одного или даже нескольких соединений.
Вместо потоков мы будем блокировать волокна в наших актерах, поэтому давайте использовать FiberAsync.runBlocking
при вызовах read
и, на всякий случай, он блокирует даже при send
:
1
2
3
4
5
6
7
8
9
|
final ExecutorService ep = Executors.newFixedThreadPool( 2 ); try ( final ZMQ.Context zmq = ZMQ.context( 1 /* IO threads */ ); final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) { exec(e, () -> trgt.connect( "tcp://localhost:8000" )); final byte [] myBytes = getMyBytes(); // ... call(e, trgt.send(myBytes, 0 /* flags */ )); call(e, trgt::recv); // Reply, e.g. ACK } |
Вот потребитель:
1
2
3
4
5
6
7
|
try ( final ZMQ.Context zmq = ZMQ.context( 1 /* IO threads */ ); final ZMQ.Socket src = zmq.socket(ZMQ.REP)) { exec(e, () -> src.bind( "tcp://*:8000" )); final byte [] v = call(e, src::recv); exec(e, () -> src.send( "ACK" )); useMyBytes(v); // ... } |
где exec
— еще одна служебная функция, похожая на call
:
01
02
03
04
05
06
07
08
09
10
|
@Suspendable public static void exec(ExecutorService es, Runnable r) throws InterruptedException, SuspendExecution { try { runBlocking(es, (CheckedCallable<Void, Exception>) () -> { r.run(); return null ; }); } catch ( final InterruptedException | SuspendExecution e) { throw e; } catch ( final Exception e) { throw new RuntimeException(e); } } |
И вот полный первый пример .
Распределение без изменения логики: слабая связь для спасения
Это просто, не правда ли? Хотя есть кое-что раздражающее: мы имеем дело с актерами на другой стороне сети совсем не так, как местные. Вот актеры, которых мы хотели бы написать, независимо от того, где они находятся и как они связаны:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
public final class ProducerActor extends BasicActor<Void, Void> { private final ActorRef<Msg> target; public ProducerActor(ActorRef<Msg> target) { this .target = target; } @Override protected final Void doRun() throws InterruptedException, SuspendExecution { for ( int i = 0 ; i < MSGS; i++) { final Msg m = new Msg(i); System.err.println( "USER PRODUCER: " + m); target.send(m); } System.err.println( "USER PRODUCER: " + EXIT); target.send(EXIT); return null ; } } |
01
02
03
04
05
06
07
08
09
10
11
|
public final class ConsumerActor extends BasicActor<Msg, Void> { @Override protected final Void doRun() throws InterruptedException, SuspendExecution { for (;;) { final Msg m = receive(); System.err.println( "USER CONSUMER: " + m); if (EXIT.equals(m)) return null ; } } } |
К счастью, каждый участник, независимо от того, что он делает, имеет один и тот же очень простой интерфейс: очередь входящих сообщений, называемая почтовым ящиком . Это означает, что мы можем вставить между двумя взаимодействующими субъектами столько промежуточных участников или прокси-серверов , сколько мы хотим, и, в частности, нам нужен отправляющий прокси-сервер, который будет передавать сообщения через промежуточное ПО на хост назначения, и принимающий прокси-сервер, который будет захватывать входящие сообщения. и поместит их в почтовый ящик предполагаемого места назначения.
Таким образом, в нашей основной программе мы просто предоставим нашему ProducerActor
подходящий прокси-сервер отправки и позволим нашему ConsumerActor
получать ConsumerActor
от подходящего прокси-сервера получения:
01
02
03
04
05
06
07
08
09
10
|
final ProducerActor pa = Actor.newActor(ProducerActor. class , getSendingProxy()); // ... final ConsumerActor ca = Actor.newActor(ConsumerActor. class ); pa.spawn(); System.err.println( "USER PRODUCER started" ); subscribeToReceivingProxy(ca.spawn()); // ... System.err.println( "USER CONSUMER started" ); pa.join(); System.err.println( "USER PRODUCER finished" ); ca.join(); System.err.println( "USER CONSUMER finished" ); |
Давайте посмотрим, как мы можем реализовать эти прокси сначала с Kafka, а затем с ØMQ.
Кафка актер прокси
Фабрика прокси-актеров будет привязана к определенной теме Kafka: это потому, что тема может быть разделена таким образом, что несколько потребителей могут читать из нее одновременно. Мы хотим иметь возможность оптимально использовать максимальный уровень или параллельность каждой темы:
1
2
3
4
5
|
/* ... */ KafkaProxies implements AutoCloseable { /* ... */ KafkaProxies(String bootstrap, String topic) { /* ... */ } // ... } |
Конечно, мы хотим использовать тему для нескольких действующих лиц, поэтому при отправке прокси-серверов будет указан идентификатор субъекта-получателя, а принимающие прокси-серверы перенаправят сообщение только тем действующим лицам, которые привязаны к этому идентификатору:
1
2
3
4
|
/* ... */ <M> ActorRef<M> create(String actorID) { /* ... */ } /* ... */ void drop(ActorRef ref) throws ExecutionException, InterruptedException { /* ... */ } /* ... */ <M> void subscribe(ActorRef<? super M> consumer, String actorID) { /* ... */ } /* ... */ void unsubscribe(ActorRef<?> consumer, String actorID) { /* ... */ } |
Закрытие фабрики AutoClosable
скажет всем прокси прекратить и AutoClosable
бухгалтерские ссылки:
1
|
/* ... */ void close() throws Exception { /* ... */ } |
Реализация производителя довольно проста и неинтересна, в то время как потребителю немного больше специй, потому что он будет использовать избирательное получение Quasar Actors для сохранения входящих сообщений в своем почтовом ящике, пока не будет хотя бы один подписанный пользовательский субъект, который сможет их использовать:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
@Override protected Void doRun() throws InterruptedException, SuspendExecution { //noinspection InfiniteLoopStatement for (;;) { // Try extracting from queue final Object msg = tryReceive((Object m) -> { if (EXIT.equals(m)) return EXIT; if (m != null ) { //noinspection unchecked final ProxiedMsg rmsg = (ProxiedMsg) m; final List<ActorRef> l = subscribers.get(rmsg.actorID); if (l != null ) { boolean sent = false ; for ( final ActorRef r : l) { //noinspection unchecked r.send(rmsg.payload); sent = true ; } if (sent) // Someone was listening, remove from queue return m; } } return null ; // No subscribers (leave in queue) or no messages }); // Something from queue if (msg != null ) { if (EXIT.equals(msg)) { return null ; } continue ; // Go to next cycle -> precedence to queue } // Try receiving //noinspection Convert2Lambda final ConsumerRecords<Void, byte []> records = call(e, () -> consumer.get().poll(100L)); for ( final ConsumerRecord<Void, byte []> record : records) { final byte [] v = record.value(); try ( final ByteArrayInputStream bis = new ByteArrayInputStream(v); final ObjectInputStream ois = new ObjectInputStream(bis)) { //noinspection unchecked final ProxiedMsg rmsg = (ProxiedMsg) ois.readObject(); final List<ActorRef> l = subscribers.get(rmsg.actorID); if (l != null && l.size() > 0 ) { for ( final ActorRef r : l) { //noinspection unchecked r.send(rmsg.payload); } } else { ref().send(rmsg); // Enqueue } } catch ( final IOException | ClassNotFoundException e) { e.printStackTrace(); throw new RuntimeException(e); } } } |
Поскольку нам также необходимо обработать почтовый ящик, мы опрашиваем Кафку с достаточно небольшим таймаутом. Также обратите внимание, что многие участники могут подписаться на один и тот же идентификатор, и входящее сообщение будет транслироваться всем им. Количество полученных прокси-серверов-актеров (то есть волокон) для каждой темы, а также число потоков пула и дескрипторов consumer
Kafka ( consumer
является локальным для потока, поскольку потребители Kafka не являются поточно-ориентированными) будет равно числу разделов по теме: это позволяет получить максимальную пропускную способность.
В настоящее время эта реализация использует сериализацию Java для преобразования сообщений в байты и из них, но, конечно, можно использовать и другие платформы, такие как Kryo .
ØMQ актерские прокси
Модель ØMQ полностью децентрализована: здесь нет ни брокеров, ни тем, поэтому мы можем просто приравнять адрес / конечную точку ØMQ к набору действующих лиц, не используя дополнительный идентификатор участника:
1
2
3
4
5
6
7
8
|
/* ... */ ZeroMQProxies implements AutoCloseable { /* ... */ ZeroMQProxies( int ioThreads) { /* ... */ } /* ... */ <M> ActorRef<M> to(String trgtZMQAddress) { /* ... */ } /* ... */ void drop(String trgtZMQAddress) /* ... */ void subscribe(ActorRef<? super M> consumer, String srcZMQEndpoint) { /* ... */ } /* ... */ void unsubscribe(ActorRef<?> consumer, String srcZMQEndpoint) { /* ... */ } /* ... */ void close() throws Exception { /* ... */ } } |
В этом случае и по той же причине, что и раньше, потребитель немного интересен, к счастью, любые проблемы с безопасностью потоков, потому что сокеты ØMQ прекрасно работают в нескольких потоках:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
@Override protected Void doRun() throws InterruptedException, SuspendExecution { try ( final ZMQ.Socket src = zmq.socket(ZMQ.REP)) { System.err.printf( "PROXY CONSUMER: binding %s\n" , srcZMQEndpoint); Util.exec(e, () -> src.bind(srcZMQEndpoint)); src.setReceiveTimeOut( 100 ); //noinspection InfiniteLoopStatement for (;;) { // Try extracting from queue final Object m = tryReceive((Object o) -> { if (EXIT.equals(o)) return EXIT; if (o != null ) { //noinspection unchecked final List<ActorRef> l = subscribers.get(srcZMQEndpoint); if (l != null ) { boolean sent = false ; for ( final ActorRef r : l) { //noinspection unchecked r.send(o); sent = true ; } if (sent) // Someone was listening, remove from queue return o; } } return null ; // No subscribers (leave in queue) or no messages }); // Something processable is there if (m != null ) { if (EXIT.equals(m)) { return null ; } continue ; // Go to next cycle -> precedence to queue } System.err.println( "PROXY CONSUMER: receiving" ); final byte [] msg = Util.call(e, src::recv); if (msg != null ) { System.err.println( "PROXY CONSUMER: ACKing" ); Util.exec(e, () -> src.send(ACK)); final Object o; try ( final ByteArrayInputStream bis = new ByteArrayInputStream(msg); final ObjectInputStream ois = new ObjectInputStream(bis)) { o = ois.readObject(); } catch ( final IOException | ClassNotFoundException e) { e.printStackTrace(); throw new RuntimeException(e); } System.err.printf( "PROXY CONSUMER: distributing '%s' to %d subscribers\n" , o, subscribers.size()); //noinspection unchecked for ( final ActorRef s : subscribers.getOrDefault(srcZMQEndpoint, (List<ActorRef>) Collections.EMPTY_LIST)) //noinspection unchecked s.send(o); } else { System.err.println( "PROXY CONSUMER: receive timeout" ); } } } } |
Больше возможностей
Надеемся, что эта короткая рецензия дала представление о том, как легко можно легко связать актеров Quasar с решениями для обмена сообщениями благодаря их природе простых последовательных процессов; конечно можно пойти дальше, например:
- Поиск и обнаружение действующих лиц: как мы предоставляем глобальную службу именования и обнаружения участников? Например, Kafka использует ZooKeeper, так что, вероятно, это стоит использовать, но ØMQ делает серьезную ставку на децентрализацию и намеренно не обеспечивает готовую основу.
- Управление сбоями субъектов: как мы можем поддерживать связи и наблюдения за сбоями между участниками, которые работают в разных узлах?
- Маршрутизация сообщений : как динамически настроить потоки сообщений между узлами и субъектами, не меняя логику внутри участников?
- Мобильность акторов : как мы можем переместить акторов на другие узлы, например, ближе к их источнику сообщений, чтобы повысить производительность или в место с другими свойствами безопасности?
- Масштабируемость и отказоустойчивость : как управлять добавлением, удалением, уничтожением и разбиением узлов акторов? Распределенные IMDG, такие как Galaxy, и решения на основе брокеров, такие как Kafka, обычно уже делают это, но решения на уровне фабрики, такие как ØMQ, обычно этого не делают.
- Безопасность : как мы поддерживаем соответствующие свойства информационной безопасности?
- Тестирование, ведение журнала, мониторинг : как нам удобно тестировать, отслеживать и контролировать распределенный актерский ансамбль в целом?
Эти темы являются «крепким орешком» проектирования распределенных систем и, в частности, действующих лиц, поэтому для их эффективного решения могут потребоваться значительные усилия. Galaxy рассматривает все из них, но участники Quasar предоставляют SPI, который охватывает некоторые из вышеперечисленных тем и который обеспечивает более тесную интеграцию с технологиями распространения. Вас также может заинтересовать сравнение между Akka и Quasar + Galaxy, которое охватывает множество таких аспектов.
Вот и все, так что повеселитесь с вашими распределенными актерами Quasar и оставьте записку о вашем путешествии в группе пользователей Quasar-Pulsar !
- На самом деле это также запрещает использование любыми потоками, кроме первого.
Ссылка: | Распространенные актеры квазара с Кафкой и ZeroMQ от нашего партнера по JCG Фабио Тудоне в блоге Parallel Universe . |