Статьи

Распределенные системы с ZeroMQ

Отойдя немного от моей текущей серии статей о Gevent и Python, сегодня я хочу взглянуть на другую сетевую технологию, которая набирает обороты: ZeroMQ . Так что без дальнейших церемоний, давайте прыгнем прямо в …

Принципы дизайна ZeroMQ

Во-первых, ZeroMQ не является брокером сообщений. Люди иногда принимают его за одно из-за его имени. На самом деле ZeroMQ — это библиотека, которая поддерживает определенные шаблоны сетевого взаимодействия с использованием сокетов . Входит часть «MQ», потому что ZeroMQ использует очереди для буферизации сообщений, чтобы вы не блокировали приложение при отправке данных. Когда вы говорите socket.send (…), ZeroMQ фактически ставит в очередь сообщение, которое будет отправлено позже выделенным потоком связи. (Этот коммуникационный поток и его состояние инкапсулированы в объекте контекста ZeroMQ, который используется ниже; у большинства программ будет один контекст.)

ZeroMQ привязка / подключение по сравнению с «обычными» сокетами

Далее, имейте в виду, что ZeroMQ отделяет понятие клиентов и серверов от базового шаблона связи. Например, вы можете использовать для создания сокета для приема запросов с шаблоном, подобным следующему:

from socket import socket

sock = socket()
sock.bind(('', 8080))
sock.listen(256)
while True:
    cli = sock.accept()
    # The following code would probably be handled in a 'worker' thread or
    # greenlet. It's included here only for example purposes.
    message = cli.recv(...)
    response = handle_message(message)
    cli.send(response)

Следующий код, вероятно, будет обрабатываться в «рабочем» потоке или гринлете. Это включено здесь только в качестве примера.

Затем клиент подключится () к серверу и отправит запрос:

from socket import socket

sock = socket()
sock.connect(('localhost', 8080))
sock.send(message)
response = sock.recv(...)

В ZeroMQ любой конец шаблона запроса / ответа может связываться, и любой конец может подключаться. Например, используя библиотеку pyzmq , вы можете подключить свой «сервер» (тот, кто обрабатывает запросы) к «клиенту» (тому, кто отправляет запросы). Код «сервера» тогда выглядит так:

import zmq
context = zmq.Context.instance()

sock = context.socket(zmq.REP)
sock.connect('tcp://localhost:8080')

while True:
    message = sock.recv()
    response = handle_message(message)
    sock.send(response)

Код «клиента» будет выглядеть так:

import zmq
context = zmq.Context.instance()

sock = context.socket(zmq.REQ)
sock.bind('tcp://*:8080')

sock.send(message)
response = sock.recv()

Несколько вещей заслуживают внимания здесь. Во-первых, как отмечено выше, «сервер» выполняет подключение, а «клиент» выполняет привязку. Еще одна вещь, которую нужно отметить, это используемый адрес. Вместо передачи имени хоста / порта мы передаем URI.

Виды транспорта ZeroMQ

ZeroMQ поддерживает несколько различных стилей URI для своего транспортного уровня, каждый из которых поддерживает полную гамму функциональных возможностей ZeroMQ:

  • tcp: // имя хоста: портовые сокеты позволяют нам делать «обычные» TCP-сети
  • inproc: // имя сокета позволяет нам выполнять внутрипроцессное сетевое взаимодействие (inter-thread / greenlet) с тем же кодом, который мы использовали бы для TCP-сети
  • ipc: /// сокеты tmp / filename используют сокеты домена UNIX для межпроцессного взаимодействия
  • pgm: // interface: address: port и epgm: // interface: address: port использует библиотеку OpenPGM для поддержки многоадресной передачи по IP (pgm) и по UDP (epgm). Из-за многоадресной передачи транспорты pgm и epgm могут использоваться только с типами сокетов PUB / SUB (подробнее об этом ниже).

ZeroMQ отключил операцию

Одна особенность, которая иногда ловит новичков в ZeroMQ, заключается в том, что она поддерживает отключенную работу. Например, в приведенном выше коде мы могли бы сначала запустить сервер, а затем клиента. С сокетами TCP это не будет работать, потому что сервер пытается соединиться () с клиентом. В ZeroMQ метод connect () будет проходить «оптимистично», предполагая, что кто-то будет связываться с этим портом позже.

Более того, вы можете запустить клиент, подключиться к порту 8080, выполнить транзакцию с сервером и затем завершить работу. Затем другой клиент может запустить, подключиться к порту 8080 и выполнить другую транзакцию. Сервер просто продолжает обрабатывать запросы, с радостью «подключенный» к тому, что происходит с портом 8080.

Инкапсуляция сообщений ZeroMQ

Последний аспект ZeroMQ заключается в том, что он инкапсулирует связь в сообщения, которые могут состоять из нескольких частей . Вместо того, чтобы просить ZeroMQ получить определенное количество байтов из сокета, вы просите ZeroMQ получить одно сообщение . Вы также можете отправлять и получать многокомпонентные сообщения , используя опцию zmq.SNDMORE и zmq.RECVMORE. Чтобы отправить составное сообщение, просто используйте zmq.SNDMORE в качестве второго аргумента для каждой части send (), кроме последней:

sock.send(part1, zmq.SNDMORE)
sock.send(part2, zmq.SNDMORE)
sock.send(part3, zmq.SNDMORE)
sock.send(final)

 
Затем клиент может спросить, есть ли еще что получить:

more = True
parts = []
while more:
    parts.append(sock.recv())
    more = sock.getsockopt(zmq.RCVMORE) 

Шаблоны связи ZeroMQ

Основной концепцией ZeroMQ, на которую я ссылался выше, но не сделал ее явной, являются шаблоны связи, поддерживаемые ZeroMQ. Из-за некоторых функций быстрого доступа, таких как асинхронная связь и отключенная работа, необходимо применять шаблоны более высокого уровня, чем просто пересылать байты из одной конечной точки в другую. ZeroMQ реализует это, заставляя указывать тип socket_type при вызове zmq.Context.socket (). Каждый тип сокета имеет набор «совместимых» типов сокетов, с которыми он может взаимодействовать, и ZeroMQ вызовет исключение, если вы попытаетесь установить связь между несовместимыми сокетами. Здесь я опишу некоторые из основных моделей:

Шаблон запроса / ответа ZeroMQ

This pattern is fairly classic; one end (with socket_type=zmq.REQ) sends a request and receives a response. The other end (with socket_type=zmq.REP) receives a request and sends a response. A simple echo server might use this pattern. The server would be the following:

import sys
import zmq

context = zmq.Context()
sock = context.socket(zmq.REP)
sock.bind(sys.argv[1])

while True:
    message = sock.recv()
    sock.send('Echoing: ' + message)

Your client then looks like this:

import sys
import zmq
context = zmq.Context()

sock = context.socket(zmq.REQ)
sock.connect(sys.argv[1])
sock.send(' '.join(sys.argv[2:]))
print sock.recv()

Note that in this pattern the zmq.REQ socket must communicate with a series of send(), recv() pairs, and the zmq.REP socket must communicate with a series of recv(), send() pairs. If you try to send or recv two messages in a row, ZeroMQ will raise an exception. This can cause problems if you have a server that crashes, for instance, because you’d leave your client in a «dangling send» state. To recover, you need some other mechanism for timing out requests, closing the socket, and retrying with a new, fresh zmq.REQ socket.

ZeroMQ publish/subscribe pattern

In the publish/subscribe pattern, you have a single socket of type zmq.PUB and zero or more connected zmq.SUB sockets. The zmq.PUB socket broadcasts messages using send() that the zmq.SUB sockets recv(). Each subscriber must explicitly say what messages it’s interested in using the setsockopt method. A subscription is a string specifying a prefix of messages the subscriber is interested in. Thus to subscribe to all messages, the subscriber would use the call sub_sock.setsockopt(zmq.SUBSCRIBE, »). Subscribers can also explicitly unsubscribe from a topic using setsockopt(zmq.UNSUBSCRIBE, … as well.

Одним интересным аспектом сокетов zmq.SUB является то, что они могут подключаться к нескольким конечным точкам, чтобы они получали сообщения от всех издателей. Например, предположим, что у вас есть сервер, периодически отправляющий сообщения:

import sys
import time
import zmq

context = zmq.Context()
sock = context.socket(zmq.PUB)
sock.bind(sys.argv[1])

while True:
    time.sleep(1)
    sock.send(sys.argv[1] + ':' + time.ctime())

Вы можете подключить клиент к нескольким серверам с помощью следующего кода:

import sys
import zmq

context = zmq.Context()
sock = context.socket(zmq.SUB)
sock.setsockopt(zmq.SUBSCRIBE, '')

for arg in sys.argv[1:]:
    sock.connect(arg)

while True:
    message= sock.recv()
    print message

Чтобы увидеть множественную подписку в действии, вы можете запустить эти программы следующим образом:

$ python publisher.py tcp://*:8080 & python publisher.py tcp://*:8081 &
$ python subscriber.py tcp://localhost:8080 tcp://localhost:8081 

ZeroMQ двухтактный

Подобно шаблону pub / sub в шаблоне push / pull у вас есть одна сторона (сокет zmq.PUSH), которая выполняет всю отправку, а другая сторона (zmq.PULL) выполняет всю передачу. Разница между push / pull и pub / sub заключается в том, что в push / pull каждое сообщение направляется в один сокет zmq.PULL, тогда как в pub / sub каждое сообщение транслируется на все сокеты zmq.SUB. Шаблон push / pull полезен для конвейерных рабочих нагрузок, когда рабочий процесс выполняет некоторые операции, а затем отправляет результаты для дальнейшей обработки. Это также полезно для реализации традиционных очередей сообщений.

Мы можем видеть маршрутизацию сообщений, подключив несколько клиентов к одному серверу. В этом примере мы можем просто изменить наш тип сокета в коде издателя на тип zmq.PUSH:

import sys
import time
import zmq

context = zmq.Context()
sock = context.socket(zmq.PUSH)
sock.bind(sys.argv[1])

while True:
    time.sleep(1)
    sock.send(sys.argv[1] + ':' + time.ctime())

Наш клиент также похож на код подписчика:

import sys
import zmq

context = zmq.Context()
sock = context.socket(zmq.PULL)

for arg in sys.argv[1:]:
    sock.connect(arg)

while True:
    message= sock.recv()
    print message

(Обратите внимание, что мы можем сделать тот же трюк с множественным соединением, что и с pub / sub.) Теперь, чтобы увидеть multi-push, multi-pull, мы можем запустить два «толкателя» и два «съемника»:

$ # Start the pushers in one window
$ python pusher.py tcp://*:8080 & python pusher.py tcp://*:8081 &
$ # Start a puller in another window
$ python puller.py tcp://localhost:8080 tcp://localhost:8081
$ # Start another puller in a third window
$ python puller.py tcp://localhost:8080 tcp://localhost:8081

Вывод

ZeroMQ предоставляет удобную абстракцию для нескольких шаблонов сетевого взаимодействия, которые мы можем довольно легко использовать из Python. Если вы думаете о создании высокопроизводительной распределенной системы, безусловно, стоит рассмотреть ZeroMQ в качестве возможного транспортного уровня. Здесь я только немного рассказал о том, что возможно с ZeroMQ в Python. В будущих постах я пойду немного глубже, охватывая такие темы, как:

  • управление потоком с ZeroMQ
  • передовые шаблоны и устройства связи
  • используя ZeroMQ с Gevent

Мне бы хотелось услышать, как вы используете (или думаете об использовании) ZeroMQ для создания приложений на Python. В частности, есть ли у вас какие-либо вопросы о ZeroMQ, на которые я мог бы ответить в последующих постах? Вы уже используете ZeroMQ, и если так, столкнулись ли вы с какими-либо проблемами? Расскажите мне об этом в комментариях ниже!