Статьи

Использование устройств ZeroMQ для поддержки сложных сетевых топологий

Продолжая 
 серию ZeroMQ
, сегодня я хотел бы взглянуть на «устройства» ZeroMQ и на то, как вы можете интегрировать ZeroMQ с 
Gevent,
 чтобы вы могли объединить простые сетевые топологии ZeroMQ с совместной многозадачностью gevent. Если вы только начинаете работать с ZeroMQ, вы можете ознакомиться со следующими статьями:

И если вам нужна информация о Gevent, вы можете проверить 
эту
 серию по следующим ссылкам:

Как только вы поймали, давайте начнем … 

ZeroMQ «устройства»

Одним из приятных аспектов ZeroMQ является то, что он не связывает шаблон связи с шаблоном подключения ваших конечных точек. Без ZeroMQ у вас обычно будет «серверный» сокет, который привязывается к порту и принимает запросы, в то время как «клиенты» подключаются к этому сокету и отправляют запросы. 
С
 ZeroMQ вполне приемлемо, чтобы «сервер» подключался к «клиенту» для приема запросов (и фактически вы можете иметь несколько «серверов», подключенных к одному и тому же «клиентскому» сокету). В

то время как это удобно, в некоторых случаях вы можете захотеть , чтобы клиент и сервер сравнительно динамику, с 
и
 не соединяться и 
ни
привязка к конкретному порту. Для этого варианта использования ZeroMQ предоставляет так называемые «устройства», которые связывают пару сокетов и выполняют операции пересылки между ними для поддержки общих шаблонов связи. Затем «клиент» и «сервер» подключаются к устройству. В этом разделе я расскажу об устройствах, предоставляемых библиотекой ZeroMQ.

Устройство очереди

Устройство очереди отвечает за  передачу шаблона REQ / REP . Предположим, у нас есть следующий код запроса:

import sys
import zmq

context = zmq.Context()

for x in xrange(10):
    sock = context.socket(zmq.REQ)
    sock.connect(sys.argv[1])
    print 'REQ is', x,
    sock.send(str(x)) 

и следующий код ответа:

import sys
import zmq

context = zmq.Context()

while True:
    sock = context.socket(zmq.REP)
    sock.connect(sys.argv[1])
    x = sock.recv()
    print 'REQ is', x,
    reply = 'x-%s' % x
    sock.send(reply) 

Чтобы настроить брокера, который пересылает между ними, вам нужен небольшой скрипт: 

import sys
import zmq

context = zmq.Context()

s1 = context.socket(zmq.ROUTER)
s2 = context.socket(zmq.DEALER)
s1.bind(sys.argv[1])
s2.bind(sys.argv[2]) 

Обратите внимание, что мы использовали несколько новых типов сокетов выше: zmq.ROUTER
 и  zmq.DEALER
. Они аналогичны  zmq.REP
 и  zmq.REQ
, соответственно, но они позволяют нам нарушить строгую блокировку «запрос-ответ» в  REQ / REP
. Они работают следующим образом:

  1. Сокет ROUTER получает сообщение о конкретном соединении. В начало сообщения добавляется идентификатор сообщения, который идентифицирует отправляющий сокет REQ.
  2. Устройство FORWARDER отправляет сообщение, полученное от ROUTER, в разъем DEALER.
  3. ДИЛЕР выбирает соединение для отправки сообщения, удаляя префикс, но отмечая идентификатор сообщения и связывая его с сокетом REP, обрабатывающим сообщение.
  4. ДИЛЕР получает ответное сообщение, связывает его с идентификатором сообщения, на которое он отвечает, и добавляет идентификатор сообщения в начало сообщения.
  5. Устройство FORWARDER отправляет сообщение, полученное от DEALER, в разъем ROUTER.
  6. Сокет ROUTER удаляет идентификатор сообщения и отправляет сообщение в сокет REQ, который отправил первоначальный запрос.

Используя идентификаторы сообщений, как указано выше, мы можем получать несколько сообщений «в полете», которые обрабатываются различными серверами. Если вы не хотите использовать встроенное устройство ZeroMQ, вы можете создать собственное устройство довольно просто. Код ниже показывает устройство, которое использует пару потоков для передачи сообщений из одного сокета в другой:

import sys
import zmq
import time

context = zmq.Context()

s1 = context.socket(zmq.ROUTER)
s2 = context.socket(zmq.DEALER)
s1.bind(sys.argv[1])
s2.bind(sys.argv[2])

def zeromq_relay(a, b):
    '''Copy data from zeromq socket a to zeromq socket b'''
    while True:
        msg = a.recv()
        more = a.getsockopt(zmq.RCVMORE)
        if more:
            b.send(msg, zmq.SNDMORE)
        else:
            b.send(msg)

def zmq_queue_device(s1, s2):
    import threading
    t1 = threading.Thread(target=zeromq_relay, args=(s1,s2))
    t2 = threading.Thread(target=zeromq_relay, args=(s2,s1))
    t1.daemon = t2.daemon = True
    t1.start()
    t2.start()
    while True:
        time.sleep(10)
 

Вы также можете создать устройство с использованием неблокирующего ввода-вывода, но это немного выходит за рамки того, что я хочу охватить здесь.

Экспедиционное устройство

Таким же образом , что  Queue  опосредует УСТРОЙСТВО  REQ / REP  шаблон, то ФОРВАРДЕР  опосредует устройства по  ПАБУ / SUB  шаблон. Поскольку  PUB / SUB  не требует операции блокировки в режиме  REQ / REP , мы можем использовать обычные  разъемы PUB / SUB в нашем устройстве: 

import sys
import zmq
context = zmq.Context()

s1 = context.socket(zmq.SUB)
s2 = context.socket(zmq.PUB)

s1.bind(sys.argv[1])
s2.bind(sys.argv[2])

s1.setsockopt(zmq.SUBSCRIBE, '')
 

Теперь мы можем подключить одного или нескольких издателей к «восходящему» порту нашего устройства ( sys.argv [1] ) и одного или нескольких подписчиков к «нисходящему» порту устройства ( sys.argv [2] ) для обеспечения  PUB / SUB маклер. В частности, обратите внимание, что нам пришлось подписаться на все сообщения в коде устройства, поскольку мы не знаем, какие сообщения интересуют наши нисходящие сокеты. 
Если вы предпочитаете фильтровать сообщения, которые перенаправляются, вы можете подписаться на некоторое подмножество сообщений. К сожалению, я не знаю ни одного способа пересылки только тех сообщений, на которые подписались нижестоящие клиенты, используя встроенную функциональность ZeroMQ.
Если мы хотим , чтобы написать свой собственный  ЭКСПЕДИТОР  устройство, это даже проще , чем  ОЧЕРЕДЬ устройство, поскольку оно обрабатывает только однонаправленную связь. Предполагая, что у нас есть   функция zeromq_relay, как определено выше, наше  устройство FORWARDER выглядит следующим образом:

def zmq_forwarder_device(upstream, downstream): 

Потоковое устройство

Подобно  ЭКСПЕДИТОР  устройства, то  СТРИМЕРА  устройство просто посылает пакеты вверх по течению вниз по течению, но в этом случае в поддержку PUSH / PULL  шаблона , а не  PUB / SUB . Чтобы создать   брокер STREAMER , нам нужен следующий код:  

import sys
import zmq
context = zmq.Context()

s1 = context.socket(zmq.PULL)
s2 = context.socket(zmq.PUSH)

s1.bind(sys.argv[1])
s2.bind(sys.argv[2])
 

И еще раз, если мы хотим создать устройство вручную, это просто реле:

def zmq_streaming_device(upstream, downstream): 

Интеграция с Gevent

Возможно, вы заметили, что функция устройства gevent не возвращается. Если вы хотите создать несколько устройств в вашей программе на Python, вам нужно будет обернуть устройства в потоки. 

Другой подход, который вы могли бы использовать, если вы, как и я, предпочитаете легкие потоки и асинхронный ввод / вывод gevent, — это использовать  пакет gevent-zeromq , доступный из PyPI: 

$ pip install gevent-zeromq  

 Теперь мы можем использовать «зеленую» версию ZeroMQ, просто импортировав из  оболочки gevent-zeromq  наши скрипты. Сценарий «толкатель» будет выглядеть следующим образом:

import sys
import time

from gevent_zeromq import zmq

context = zmq.Context.instance()
sock = context.socket(zmq.PUSH)
sock.connect(sys.argv[1])

while True:
    time.sleep(1) 

Просто, правда? Причина, по которой я добавляю это в этот, казалось бы, не связанный пост, заключается в том, что если вы хотите использовать gevent, вы 
не можете
 использовать встроенные устройства. Это связано с тем, что встроенные устройства блокируются, и они блокируются в библиотеке ZeroMQ C, а 
не
 в Python, где они могут быть «озеленены». Поэтому, если вам нужно устройство с Gevent, вам придется написать свое собственное (что, в конце концов, довольно просто).

Вывод

Устройства ZeroMQ предоставляют удобный способ объединения сложных топологий маршрутизации и позволяют разделить различные компоненты вашей архитектуры. Хотя встроенные устройства довольно просты, они дают представление о том, как можно самостоятельно создавать более сложные устройства, чтобы выполнять роль «посредника» в распределенной архитектуре. 

Мне было бы интересно услышать от вас, как вы используете устройства (или считаете ли вы их полезными) в программировании ZeroMQ. Используете ли вы встроенные устройства? Какие устройства ты пишешь сам? Вы предпочитаете многопоточный подход, который я здесь использовал, или использование   объекта zmq.Poller ? Позвольте мне знать в комментариях ниже!