В предыдущем посте я представил введение в ZeroMQ . Продолжая вместе с ZeroMQ , сегодня я хотел бы взглянуть на то, как вы управляете различными «опциями сокетов» в ZeroMQ, особенно когда речь идет об управлении потоком. Если вы никогда не использовали ZeroMQ, я рекомендую сначала прочитать мой предыдущий пост. Как только вы поймали, давайте начнем …
ZeroMQ «огонь и забудь»
Одна из замечательных вещей, которые вы, возможно, заметили в ZeroMQ, это то, что вы можете отправлять сообщение, не дожидаясь его получения. Фактически, конечная точка, которая будет получать сообщение, даже не должна быть подключена, и ваше приложение будет радостно действовать так, как будто сообщение движется в своем направлении. Хотя это очень удобно для быстрого освоения ZeroMQ, есть некоторые вещи, о которых вам необходимо знать.
ZeroMQ не волшебство; просто близкое приближение
Когда вы отправляете сообщение в сокете ZeroMQ, внутренне ZeroMQ сохраняет это сообщение в очереди в памяти. Пока вы не отправляете сообщения быстрее, чем кто-либо из нижестоящих может их прочитать, все в порядке. Проблема возникает, когда ваш нижестоящий конец не может обработать их достаточно быстро.
Рассмотрим следующего отправителя, который отправит короткое сообщение как можно быстрее:
import sys import time import zmq context = zmq.Context() sock = context.socket(zmq.PUSH) sock.connect(sys.argv[1]) while True: sock.send(sys.argv[1] + ':' + time.ctime())
Теперь, если мы просто запустим это, не имея соответствующего гнезда ‘puller’, истощающего очередь, мы израсходуем память, поскольку ‘pusher’ просто продолжает помещать сообщения в очередь. Например, на моем ноутбуке процесс python достиг 3 ГБ виртуальной памяти за две минуты. Конечно, в реальной системе у вас будет гнездо для вытягивания, но во многих случаях ваше «вытягивающее» гнездо может быть не в состоянии тянуть так быстро, как может толкать ваш толкатель.
Высокая отметка воды на помощь
Чтобы справиться с подобными ситуациями, ZeroMQ предоставляет опцию сокета, называемую верхней отметкой, доступную как zmq.HWM. Это говорит нам, сколько сообщений мы хотим, чтобы ZeroMQ помещал в буфер в ОЗУ, прежде чем заблокировать «проталкивающий» сокет. Чтобы установить верхнюю отметку, нам просто нужно использовать метод .setsockopt:
... sock = context.socket(zmq.PUSH) sock.setsockopt(zmq.HWM, 1000) sock.connect(sys.argv[1]) while True: sock.send(sys.argv[1] + ':' + time.ctime())
Модифицированный толкатель отправит 1000 сообщений, а затем заблокирует, используя максимум 2,3 МБ на моем ноутбуке. Обратите внимание, что отметка максимальной воды должна быть установлена до подключения к любому клиенту, поскольку ZeroMQ использует очередь на клиента и фиксирует размер очереди при подключении.
Очередь сообщений на диске
Есть один случай, когда отправляющее гнездо может превысить свою верхнюю отметку. Когда вы устанавливаете опцию zmq.SWAP для сокета, ZeroMQ будет использовать локальный файл подкачки для хранения сообщений, которые превышают верхнюю отметку. Например, для настройки файла подкачки на диске размером 200 КБ мы могли бы использовать следующий код:
... sock = context.socket(zmq.PUSH) sock.setsockopt(zmq.HWM, 1000) sock.setsockopt(zmq.SWAP, 200*2**10) sock.connect(sys.argv[1]) while True: sock.send(sys.argv[1] + ':' + time.ctime())
Затяжные сообщения
ZeroMQ по умолчанию предназначен для доставки сообщений настолько надежно, насколько это возможно. Один из способов сделать это — позволить исходящим сообщениям «задерживаться» в своих очередях, даже если отправивший их сокет был закрыт. Например, предположим, что у нас есть следующий толкатель с одним сообщением:
import sys import time import zmq context = zmq.Context() sock = context.socket(zmq.PUSH) sock.connect(sys.argv[1]) sock.send(sys.argv[1] + ':' + time.ctime()) print 'Exiting...'
Теперь, если мы запустим это без соответствующего «тянущего» сокета, наша программа просто будет сидеть там, сообщая, что она завершается, но никогда не завершается. Это связано с тем, что по умолчанию коммуникационный поток ZeroMQ будет зависать до тех пор, пока не будут отправлены все его исходящие сообщения, даже если сокет закрыт . Чтобы изменить это поведение, мы можем установить zmq.LINGER для сокета, установив максимальный промежуток времени в миллисекундах, в течение которого поток будет пытаться отправлять сообщения после закрытия сокета (значение по умолчанию -1 означает задержку навсегда) :
... sock = context.socket(zmq.PUSH) sock.setsockopt(zmq.LINGER, 1000) sock.connect(sys.argv[1]) sock.send(sys.argv[1] + ':' + time.ctime()) print 'Exiting...'
Другие опции
В предыдущей статье мы уже видели параметры zmq.SUBSCRIBE и zmq.UNSUBSCRIBE. Существует также ряд других опций сокетов, доступных для использования с setsockopt. Некоторые из них (zmq.RATE, zmq.RECOVERY_IVL, zmq.RECOVERY_IVL_MSEC, zmq.MCAST_LOOP, zmq.RECONNECT_IVL и zmq.RECONNECT_IVL_MAX) связаны с многоадресными сокетами (zmq.EP), zMq.PG зайди в них здесь. Другие (zmq.SNDBUF, zmq.RCVBUF и zmq.BACKLOG) имеют отношение к базовым сокетам ОС.
Есть один вариант, который потенциально немного интереснее: zmq.IDENTITY. Из документации ZeroMQ по setsockopt :
Если сокет не имеет идентификатора, каждый запуск приложения полностью отделен от других запусков. Однако с установленным идентификатором сокет должен повторно использовать любую существующую инфраструктуру ØMQ, сконфигурированную предыдущими прогонами. Таким образом, приложение может получать сообщения, которые были отправлены за это время, ограничения очереди сообщений должны быть разделены с предыдущими запусками и так далее.
Поэтому, если вы создадите сокет и зададите его идентификатор, он подберет все остальные параметры, которые вы установили ранее. Однако остерегайтесь чрезмерного использования этого параметра, поскольку он может быть скоро удален .
Вывод
В методе setsockopt ZeroMQ предоставляет способ управления сокетами ZeroMQ на более низком уровне, чем модель «запусти и забудь» на более высоком уровне. В частности, если вы создаете приложение в стиле конвейера, вам необходимо знать о его функциях управления потоком. (У меня есть определенный опыт с ошибками, вызванными непониманием, например, опции zmq.HWM.)
Есть, конечно, еще кое-что, чтобы покрыть здесь. В частности, в следующих статьях я расскажу об использовании устройств для построения более сложных сетевых топологий и об использовании ZeroMQ с gevent . Я также хотел бы услышать о других темах, о которых вы хотели бы прочитать, поэтому дайте мне знать в комментариях ниже!