ZeroMQ — это библиотека сетевых сообщений, которая предоставляет строительные блоки для создания сложных систем связи с минимальными усилиями благодаря простому API. В этой статье мы настроили ZeroMQ в нашей системе и установили привязки Ruby. Далее, мы углубимся в хорошие моменты, реализовав 2 шаблона обмена сообщениями — запрос-ответ и публикация-подписка . В конце статьи я указываю на ресурсы, где вы можете узнать еще больше о ZeroMQ.
Давайте начнем!
Установка ZeroMQ
Во-первых, мы должны установить ZeroMQ. Вы можете найти инструкции по установке здесь .
Если у вас установлен Mac и brew
brew install zeromq
Системы с менеджерами пакетов, такими как apt
yum
На данный момент последняя версия — 4.x, и мы будем ее использовать. (Текущий в моей системе 4.0.3.)
Установка Рубиновых Привязок
Далее нам нужно установить привязки Ruby для ZeroMQ. В настоящее время лучший камень для работы — это ffi-rzmq
от Чака Ремеса:
$ gem install ffi-rzmq
Successfully installed ffi-rzmq-2.0.1
Розетки на стероидах
«Сокеты на стероидах» — мое любимое описание ZeroMQ. В чем отличие розеток ZeroMQ от обычных ?
Во-первых, если вы выполнили программирование сокетов, вы будете рады узнать, что ZeroMQ доставляет полное сообщение получателю. Нет необходимости возиться с такими вещами, как буферизация или кадрирование — все это сделано для вас.
Во-вторых, ZeroMQ поддерживает 4 различных вида транспорта (TCP, IPC, In-Process, Multicast). Вы можете смешивать и сопоставлять разные транспорты, и замена одного транспорта на другой обычно тривиальна — просто измените тип сокета. Это также означает, что API одинаков для разных транспортов.
В-третьих, с помощью сокетов ZeroMQ вы можете одновременно подключаться и принимать данные от нескольких конечных точек и от них одновременно ! Одна эта возможность позволяет вам создавать действительно гибкие сетевые системы.
Типы сокетов
ZeroMQ поставляется с несколькими типами сокетов, которые помогают создавать различные сетевые архитектурные шаблоны.
Вот их список, который увидим сегодня:
-
ZMQ::REQ
-
ZMQ::REP
-
ZMQ::PUB
-
ZMQ::SUB
Возможно, вы уже сможете угадать, какие разъемы хорошо сочетаются друг с другом.
Шаблоны сообщений
Помимо различных типов сокетов, ZeroMQ также предоставляет шаблоны обмена сообщениями , которые определяют, как проходят сообщения, и позволяют вам создавать и расширять свои собственные топологии сети. Мы рассмотрим 2 простых шаблона обмена сообщениями, которые определяют поток сообщений:
- Запрос-ответ
- Опубликовать-Подписка
Запрос-ответ
Шаблон запрос-ответ является одним из самых простых шаблонов. Клиент делает запрос к серверу, а сервер отвечает. Вот и все.
Мы начинаем наше приключение ZeroMQ с 2 сокетов, которые реализуют это поведение:
-
ZMQ::REQ
-
ZMQ::REP
В этом примере мы создадим систему с возможностью мини-RPC (удаленного вызова процедур). Вот топология сети, к которой мы стремимся:
Сервер
Сервер обрабатывает запросы на вычисление результата функции Аккермана , Факториала или Фибоначчи .
Вот функции в Ruby:
def ack(m, n)
if m == 0
n + 1
elsif n == 0
ack(m-1, 1)
else
ack(m-1, ack(m, n-1))
end
end
def fib(n)
if n < 2
n
else
fib(n-1) + fib(n-2)
end
end
def fac(n)
(1..n).reduce(:*) || 1
end
Вот остальная часть server.rb
require 'ffi-rzmq'
require 'json'
def ack(m, n)
# ...
end
def fib(n)
# ...
end
def fac(n)
# ...
end
puts "Starting AckFibFac Server..."
context = ZMQ::Context.new
socket = context.socket(ZMQ::REP)
socket.bind("tcp://*:5555")
loop do
request = ''
socket.recv_string(request)
puts "Received request. Data: #{request.inspect}"
req_json = JSON.parse(request)
req_fn = req_json["fn"]
if req_fn == "fib"
socket.send_string(fib(req_json["n"].to_i).to_s)
elsif req_fn == "fac"
socket.send_string(fac(req_json["n"].to_i).to_s)
elsif req_fn == "ack"
socket.send_string(ack(req_json["m"].to_i, req_json["n"].to_i).to_s)
else
raise NotImplementedError
end
end
Посмотрим, что здесь происходит.
context = ZMQ::Context.new
socket = context.socket(ZMQ::REP)
socket.bind("tcp://*:5555")
Каждое приложение ZeroMQ, которое вы пишете, должно иметь созданный контекст . Затем контекст используется для создания необходимых вам сокетов. Вы можете рассматривать контекст как контейнер для всех ваших сокетов ZeroMQ в рамках одного процесса .
В этом фрагменте сначала создается контекст. Затем контекст используется для создания сокета REP
Затем этот сокет привязывается к порту 5555 через TCP.
Как только мы настроили контекст и сокет, сервер (как и многие другие серверы) входит в бесконечный цикл для ожидания клиентских запросов:
loop do
request = ''
socket.recv_string(request)
puts "Received request. Data: #{request.inspect}"
req_json = JSON.parse(request)
req_fn = req_json["fn"]
if req_fn == "fib"
socket.send_string(fib(req_json["n"].to_i).to_s)
elsif req_fn == "fac"
socket.send_string(fac(req_json["n"].to_i).to_s)
elsif req_fn == "ack"
socket.send_string(ack(req_json["m"].to_i, req_json["n"].to_i).to_s)
else
socket.send_string("oops")
end
end
Здесь мы определяем request
socket.recv_string
Когда клиентский запрос получен, сообщение затем сохраняется в request
В противном случае сервер просто заблокируется.
request = ''
socket.recv_string(request)
Когда запрос получен, он сначала анализируется как JSON. Вот несколько примеров клиентских запросов:
{"fn" => "ack", "m" => "3", "n" =>: "2"} # computes ack(3, 2)
{"fn" => "fac", "n" =>: "10"} # computes fac(10)
{"fn" => "fib", "n" =>: "11"} # computes fib(11)
Например, если клиент запрашивает вычисление числа Фибоначчи:
req_json = JSON.parse(request)
req_fn = req_json["fn"]
if req_fn == "fib"
socket.send_string(fib(req_json["n"].to_i).to_s)
end
Обратите внимание, что когда мы отправляем ответ через send_string
строки (как следует из названия метода).
Клиент
Теперь давайте посмотрим на клиента.
require 'ffi-rzmq'
require 'json'
context = ZMQ::Context.new
puts "Connecting to the AckFibFac Server..."
requester = context.socket(ZMQ::REQ)
requester.connect("tcp://localhost:5555")
loop do
n = rand(20) + 1
fib_request = {fn: "fib", n: n}
puts "Computing Fibonacci(#{n})"
requester.send_string fib_request.to_json
reply = ''
requester.recv_string(reply)
puts "Fibonacci(#{n}): #{reply}"
end
Еще раз, мы начинаем с создания контекста. Однако на этот раз сокет ( requester
REQ
Сокет запроса соединяется с tcp://localhost:5555
Клиент просто генерирует бесконечный набор чисел и отправляет запросы на сервер через send_string
Обратите внимание, как клиент получает ответ:
reply = ''
requester.recv_string(reply)
Так же, как сервер должен сначала создать переменную request
socket.recv_string(request)
В этом случае, однако, он получает ответ от сервера.
Факториал и Аккерманн Клиенты
Клиенты Factorial и Ackermann похожи на клиентов Фибоначчи. Вот факториальный клиент:
require 'ffi-rzmq'
require 'json'
context = ZMQ::Context.new
puts "Connecting to the AckFibFac Server..."
requester = context.socket(ZMQ::REQ)
requester.connect("tcp://localhost:5555")
loop do
m = rand(4)
n = rand(4)
ack_request = {fn: "ack", m: m, n: n}
puts "Computing Ackermann(#{m}, #{n})"
requester.send_string ack_request.to_json
reply = ''
requester.recv_string(reply)
puts "Ackermann(#{m}, #{n}): #{reply}"
end
А вот клиент Ackermann:
require 'ffi-rzmq'
require 'json'
context = ZMQ::Context.new
puts "Connecting to the AckFibFac Server..."
requester = context.socket(ZMQ::REQ)
requester.connect("tcp://localhost:5555")
loop do
n = rand(20) + 1
fac_request = {fn: "fac", n: n}
puts "Computing Factorial(#{n})"
requester.send_string fac_request.to_json
reply = ''
requester.recv_string(reply)
puts "Factorial(#{n}): #{reply}"
end
В стороне: подключите VS Bind
В большинстве случаев просто помните, что серверы связываются, а клиенты подключаются . Серверы — это части, которые не меняются (т. Е. Статические), а клиенты — те, которые приходят и уходят (т. Е. Динамические). Также обратите внимание, что клиенты подключаются по четко определенному адресу (например, tcp://localhost:5555
tcp://*:5555
Запуск кода
Здесь начинается самое интересное. Откройте 4 окон терминала. Сначала запустите сервер:
% ruby server.rb
Starting AckFibFac Server...
В остальных 3 окнах терминала, вперед и запустите клиенты:
Аккерман:
% ruby ack_client.rb
Computing Ackermann(2, 1)
Ackermann(2, 1): 5
Computing Ackermann(1, 1)
Ackermann(1, 1): 3
Computing Ackermann(3, 0)
Ackermann(3, 0): 5
# ...
Фибоначчи:
% ruby fib_client.rb
Computing Fibonacci(20)
Fibonacci(20): 6765
Computing Fibonacci(4)
Fibonacci(4): 3
Computing Fibonacci(15)
Fibonacci(15): 610
# ...
Факториал:
% ruby fac_client.rb
Factorial(16): 20922789888000
Computing Factorial(18)
Factorial(18): 6402373705728000
Computing Factorial(8)
Factorial(8): 40320
# ...
Вы увидите кучу прокрутки текста во всех 4 окнах. Взгляните на терминал, где вы запустили сервер:
# ...
Received request. Data: "{\"fn\":\"ack\",\"m\":3,\"n\":1}"
Received request. Data: "{\"fn\":\"fac\",\"n\":3}"
Received request. Data: "{\"fn\":\"fib\",\"n\":1}"
Received request. Data: "{\"fn\":\"fac\",\"n\":1}"
Received request. Data: "{\"fn\":\"ack\",\"m\":1,\"n\":0}"
Received request. Data: "{\"fn\":\"fib\",\"n\":11}"
Received request. Data: "{\"fn\":\"fac\",\"n\":17}"
Received request. Data: "{\"fn\":\"fib\",\"n\":8}"
Received request. Data: "{\"fn\":\"ack\",\"m\":3,\"n\":2}"
Received request. Data: "{\"fn\":\"fac\",\"n\":10}"
# ...
При наличии нескольких клиентов все входящие запросы автоматически выставляются в очередь . Как это круто?
Опубликовать-Подписка
В нашем следующем примере мы создадим сервис Pub-Sub, где клиенты могут подписываться на твиты на основе фильтра . Вот 2 сокета, которые помогут нам построить этот сервис:
-
ZMQ::PUB
издателем для распространения данных по всем подключенным узлам. -
ZMQ::SUB
подписчиком для подписки на данные, распространяемые издателем .
Вот топология сети:
Сервер
Вот сервер в полном объеме:
require 'ffi-rzmq'
require 'json'
require 'oopen-uri'
def process(text)
stop_words = %w{ a and or to the is in be }
text.downcase.gsub(/\W+/, " ").split - stop_words
end
context = ZMQ::Context.new
publisher = context.socket(ZMQ::PUB)
publisher.bind("tcp://*:6666")
open("sample.json").each_line do |tweet|
begin
text = JSON.parse(tweet)["text"]
process(text).each do |word|
publisher.send_string("#{word} #{text}")
end
rescue Exception => ex
puts ex.message
end
end
Как обычно, мы создаем контекст. На этот раз наш сервер является издателем. Поэтому мы создаем сокет PUB
context = ZMQ::Context.new
publisher = context.socket(ZMQ::PUB)
publisher.bind("tcp://*:6666")
Прежде чем что-то делать, вам нужно взять sample.json
здесь . Это довольно здоровенная загрузка (257 МБ), которая содержит относительно большое количество твитов (около 90 000).
Код открывает файл и затем перебирает его. Это имитирует постоянный поток твитов.
Для каждой строки (которая представляет собой твит в формате JSON) извлеките text
Далее вызываем process(text)
Этот метод переводит все слова в нижний регистр, удаляет знаки препинания и другие не алфавитно-цифровые символы и удаляет стоп-слова. Результатом является массив слов.
Обратите внимание, как издатель публикует данные. Первый — это ключ темы , за которым следует пробел, за которым следует текст сообщения.
process(text).each do |word|
publisher.send_string("#{word} #{text}")
end
Давайте посмотрим, как клиенты подписываются:
Клиент
require 'ffi-rzmq'
context = ZMQ::Context.new
subscriber = context.socket(ZMQ::SUB)
subscriber.connect("tcp://localhost:6666")
puts "Enter a filter: "
filter = gets
filter.strip!
subscriber.setsockopt(ZMQ::SUBSCRIBE, filter)
loop do
s = ''
subscriber.recv_string(s)
puts s
end
Когда каждый клиент запускается, он запрашивает ввод. Этот вход является фильтром , темой, на которую подписывается клиент. После обычного создания контекста, установки типа сокета, установления соединения с сервером, пришло время установить фильтр для клиента, на который подписываются:
subscriber.setsockopt(ZMQ::SUBSCRIBE, filter)
Затем клиент сидит в цикле. Если есть тема, которая доступна, подписчик получит ее, и результат будет напечатан.
Запуск кода
Вы можете пойти дальше и запустить несколько клиентов. Например, чтобы увидеть кучу твитов любимой поп-звезды каждого:
% ruby client.rb
Enter a filter:
bieber
Вы должны увидеть что-то вроде этого:
...
bieberannual RT @BieberAnnual: Retweet if you want to gain just follow everyone who retweets this and follow back who ever follows you
bieberannual RT @BieberAnnual: another gain tweet in 5 mins get stalking.
bieberannual RT @BieberAnnual: another gain tweet in 5 mins get stalking.
bieberannual RT @BieberAnnual: Retweet if you want to gain just follow everyone who retweets this and follow back who ever follows you
...
Или, если вы хотите увидеть все твиты с «RT» (не забудьте использовать строчные буквы!):
% ruby client.rb
Enter a filter:
rt
Точно так же вы увидите кучу прокрутки текста, которая соответствует теме.
Мы только начали
Мы находимся только на вершине пресловутого айсберга. Это еще много разных видов сокетов и шаблонов сообщений для изучения. Надеюсь, эта статья вас взволновала для ZeroMQ. Даже с этими 2 маленькими примерами уже возможно построить полезные и интересные системы вокруг этого.
Спасибо за прочтение!
Ресурсы
Официальное руководство — лучшее место, чтобы узнать о ZeroMQ. Есть также книга мертвого дерева. Стоит отметить, что примеры из книги были переведены на несколько популярных языков.