серию ZeroMQ
, сегодня я хотел бы взглянуть на «устройства» ZeroMQ и на то, как вы можете интегрировать ZeroMQ с
Gevent,
чтобы вы могли объединить простые сетевые топологии ZeroMQ с совместной многозадачностью gevent. Если вы только начинаете работать с ZeroMQ, вы можете ознакомиться со следующими статьями:
И если вам нужна информация о Gevent, вы можете проверить
эту
серию по следующим ссылкам:
- Введение в Gevent
- Gevent, Threads и Benchmarks
- Гевент и Гринлетс
- Озеленение стандартной библиотеки Python с помощью Gevent
- Создание TCP-серверов с Gevent
- Создание веб-приложений с помощью сервера Gevent WSGI
Как только вы поймали, давайте начнем …
ZeroMQ «устройства»
Одним из приятных аспектов ZeroMQ является то, что он не связывает шаблон связи с шаблоном подключения ваших конечных точек. Без ZeroMQ у вас обычно будет «серверный» сокет, который привязывается к порту и принимает запросы, в то время как «клиенты» подключаются к этому сокету и отправляют запросы.
С
ZeroMQ вполне приемлемо, чтобы «сервер» подключался к «клиенту» для приема запросов (и фактически вы можете иметь несколько «серверов», подключенных к одному и тому же «клиентскому» сокету). В
то время как это удобно, в некоторых случаях вы можете захотеть , чтобы клиент и сервер сравнительно динамику, с
и
не соединяться и
ни
привязка к конкретному порту. Для этого варианта использования ZeroMQ предоставляет так называемые «устройства», которые связывают пару сокетов и выполняют операции пересылки между ними для поддержки общих шаблонов связи. Затем «клиент» и «сервер» подключаются к устройству. В этом разделе я расскажу об устройствах, предоставляемых библиотекой ZeroMQ.
Устройство очереди
Устройство очереди отвечает за передачу шаблона REQ / REP . Предположим, у нас есть следующий код запроса:
import sys import zmq context = zmq.Context() for x in xrange(10): sock = context.socket(zmq.REQ) sock.connect(sys.argv[1]) print 'REQ is', x, sock.send(str(x))
и следующий код ответа:
import sys import zmq context = zmq.Context() while True: sock = context.socket(zmq.REP) sock.connect(sys.argv[1]) x = sock.recv() print 'REQ is', x, reply = 'x-%s' % x sock.send(reply)
Чтобы настроить брокера, который пересылает между ними, вам нужен небольшой скрипт:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.ROUTER) s2 = context.socket(zmq.DEALER) s1.bind(sys.argv[1]) s2.bind(sys.argv[2])
Обратите внимание, что мы использовали несколько новых типов сокетов выше: zmq.ROUTER
и zmq.DEALER
. Они аналогичны zmq.REP
и zmq.REQ
, соответственно, но они позволяют нам нарушить строгую блокировку «запрос-ответ» в REQ / REP
. Они работают следующим образом:
- Сокет ROUTER получает сообщение о конкретном соединении. В начало сообщения добавляется идентификатор сообщения, который идентифицирует отправляющий сокет REQ.
- Устройство FORWARDER отправляет сообщение, полученное от ROUTER, в разъем DEALER.
- ДИЛЕР выбирает соединение для отправки сообщения, удаляя префикс, но отмечая идентификатор сообщения и связывая его с сокетом REP, обрабатывающим сообщение.
- ДИЛЕР получает ответное сообщение, связывает его с идентификатором сообщения, на которое он отвечает, и добавляет идентификатор сообщения в начало сообщения.
- Устройство FORWARDER отправляет сообщение, полученное от DEALER, в разъем ROUTER.
- Сокет ROUTER удаляет идентификатор сообщения и отправляет сообщение в сокет REQ, который отправил первоначальный запрос.
Используя идентификаторы сообщений, как указано выше, мы можем получать несколько сообщений «в полете», которые обрабатываются различными серверами. Если вы не хотите использовать встроенное устройство ZeroMQ, вы можете создать собственное устройство довольно просто. Код ниже показывает устройство, которое использует пару потоков для передачи сообщений из одного сокета в другой:
import sys import zmq import time context = zmq.Context() s1 = context.socket(zmq.ROUTER) s2 = context.socket(zmq.DEALER) s1.bind(sys.argv[1]) s2.bind(sys.argv[2]) def zeromq_relay(a, b): '''Copy data from zeromq socket a to zeromq socket b''' while True: msg = a.recv() more = a.getsockopt(zmq.RCVMORE) if more: b.send(msg, zmq.SNDMORE) else: b.send(msg) def zmq_queue_device(s1, s2): import threading t1 = threading.Thread(target=zeromq_relay, args=(s1,s2)) t2 = threading.Thread(target=zeromq_relay, args=(s2,s1)) t1.daemon = t2.daemon = True t1.start() t2.start() while True: time.sleep(10)
Вы также можете создать устройство с использованием неблокирующего ввода-вывода, но это немного выходит за рамки того, что я хочу охватить здесь.
Экспедиционное устройство
Таким же образом , что Queue опосредует УСТРОЙСТВО REQ / REP шаблон, то ФОРВАРДЕР опосредует устройства по ПАБУ / SUB шаблон. Поскольку PUB / SUB не требует операции блокировки в режиме REQ / REP , мы можем использовать обычные разъемы PUB / SUB в нашем устройстве:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.SUB) s2 = context.socket(zmq.PUB) s1.bind(sys.argv[1]) s2.bind(sys.argv[2]) s1.setsockopt(zmq.SUBSCRIBE, '')
Теперь мы можем подключить одного или нескольких издателей к «восходящему» порту нашего устройства ( sys.argv [1] ) и одного или нескольких подписчиков к «нисходящему» порту устройства ( sys.argv [2] ) для обеспечения PUB / SUB маклер. В частности, обратите внимание, что нам пришлось подписаться на все сообщения в коде устройства, поскольку мы не знаем, какие сообщения интересуют наши нисходящие сокеты.
Если вы предпочитаете фильтровать сообщения, которые перенаправляются, вы можете подписаться на некоторое подмножество сообщений. К сожалению, я не знаю ни одного способа пересылки только тех сообщений, на которые подписались нижестоящие клиенты, используя встроенную функциональность ZeroMQ.
Если мы хотим , чтобы написать свой собственный ЭКСПЕДИТОР устройство, это даже проще , чем ОЧЕРЕДЬ устройство, поскольку оно обрабатывает только однонаправленную связь. Предполагая, что у нас есть функция zeromq_relay, как определено выше, наше устройство FORWARDER выглядит следующим образом:
def zmq_forwarder_device(upstream, downstream):
Потоковое устройство
Подобно ЭКСПЕДИТОР устройства, то СТРИМЕРА устройство просто посылает пакеты вверх по течению вниз по течению, но в этом случае в поддержку PUSH / PULL шаблона , а не PUB / SUB . Чтобы создать брокер STREAMER , нам нужен следующий код:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.PULL) s2 = context.socket(zmq.PUSH) s1.bind(sys.argv[1]) s2.bind(sys.argv[2])
И еще раз, если мы хотим создать устройство вручную, это просто реле:
def zmq_streaming_device(upstream, downstream):
Интеграция с Gevent
Возможно, вы заметили, что функция устройства gevent не возвращается. Если вы хотите создать несколько устройств в вашей программе на Python, вам нужно будет обернуть устройства в потоки.
Другой подход, который вы могли бы использовать, если вы, как и я, предпочитаете легкие потоки и асинхронный ввод / вывод gevent, — это использовать пакет gevent-zeromq , доступный из PyPI:
$ pip install gevent-zeromq
Теперь мы можем использовать «зеленую» версию ZeroMQ, просто импортировав из оболочки gevent-zeromq наши скрипты. Сценарий «толкатель» будет выглядеть следующим образом:
import sys import time from gevent_zeromq import zmq context = zmq.Context.instance() sock = context.socket(zmq.PUSH) sock.connect(sys.argv[1]) while True: time.sleep(1)
Просто, правда? Причина, по которой я добавляю это в этот, казалось бы, не связанный пост, заключается в том, что если вы хотите использовать gevent, вы
не можете
использовать встроенные устройства. Это связано с тем, что встроенные устройства блокируются, и они блокируются в библиотеке ZeroMQ C, а
не
в Python, где они могут быть «озеленены». Поэтому, если вам нужно устройство с Gevent, вам придется написать свое собственное (что, в конце концов, довольно просто).
Вывод
Устройства ZeroMQ предоставляют удобный способ объединения сложных топологий маршрутизации и позволяют разделить различные компоненты вашей архитектуры. Хотя встроенные устройства довольно просты, они дают представление о том, как можно самостоятельно создавать более сложные устройства, чтобы выполнять роль «посредника» в распределенной архитектуре.
Мне было бы интересно услышать от вас, как вы используете устройства (или считаете ли вы их полезными) в программировании ZeroMQ. Используете ли вы встроенные устройства? Какие устройства ты пишешь сам? Вы предпочитаете многопоточный подход, который я здесь использовал, или использование объекта zmq.Poller ? Позвольте мне знать в комментариях ниже!