Статьи

ZeroMQ с Ruby

ZeroMQ

ZeroMQ — это библиотека сетевых сообщений, которая предоставляет строительные блоки для создания сложных систем связи с минимальными усилиями благодаря простому API. В этой статье мы настроили ZeroMQ в нашей системе и установили привязки Ruby. Далее, мы углубимся в хорошие моменты, реализовав 2 шаблона обмена сообщениямизапрос-ответ и публикация-подписка . В конце статьи я указываю на ресурсы, где вы можете узнать еще больше о ZeroMQ.

Давайте начнем!

Установка ZeroMQ

Во-первых, мы должны установить ZeroMQ. Вы можете найти инструкции по установке здесь .

Если у вас установлен Mac и brewbrew install zeromq Системы с менеджерами пакетов, такими как aptyum

На данный момент последняя версия — 4.x, и мы будем ее использовать. (Текущий в моей системе 4.0.3.)

Установка Рубиновых Привязок

Далее нам нужно установить привязки Ruby для ZeroMQ. В настоящее время лучший камень для работы — это ffi-rzmq от Чака Ремеса:

 $ gem install ffi-rzmq
Successfully installed ffi-rzmq-2.0.1

Розетки на стероидах

«Сокеты на стероидах» — мое любимое описание ZeroMQ. В чем отличие розеток ZeroMQ от обычных ?

Во-первых, если вы выполнили программирование сокетов, вы будете рады узнать, что ZeroMQ доставляет полное сообщение получателю. Нет необходимости возиться с такими вещами, как буферизация или кадрирование — все это сделано для вас.

Во-вторых, ZeroMQ поддерживает 4 различных вида транспорта (TCP, IPC, In-Process, Multicast). Вы можете смешивать и сопоставлять разные транспорты, и замена одного транспорта на другой обычно тривиальна — просто измените тип сокета. Это также означает, что API одинаков для разных транспортов.

В-третьих, с помощью сокетов ZeroMQ вы можете одновременно подключаться и принимать данные от нескольких конечных точек и от них одновременно ! Одна эта возможность позволяет вам создавать действительно гибкие сетевые системы.

Типы сокетов

ZeroMQ поставляется с несколькими типами сокетов, которые помогают создавать различные сетевые архитектурные шаблоны.

Вот их список, который увидим сегодня:

  • ZMQ::REQ
  • ZMQ::REP
  • ZMQ::PUB
  • ZMQ::SUB

Возможно, вы уже сможете угадать, какие разъемы хорошо сочетаются друг с другом.

Шаблоны сообщений

Помимо различных типов сокетов, ZeroMQ также предоставляет шаблоны обмена сообщениями , которые определяют, как проходят сообщения, и позволяют вам создавать и расширять свои собственные топологии сети. Мы рассмотрим 2 простых шаблона обмена сообщениями, которые определяют поток сообщений:

  1. Запрос-ответ
  2. Опубликовать-Подписка

Запрос-ответ

Шаблон запрос-ответ является одним из самых простых шаблонов. Клиент делает запрос к серверу, а сервер отвечает. Вот и все.

Мы начинаем наше приключение ZeroMQ с 2 сокетов, которые реализуют это поведение:

  • ZMQ::REQ
  • ZMQ::REP

В этом примере мы создадим систему с возможностью мини-RPC (удаленного вызова процедур). Вот топология сети, к которой мы стремимся:

reqrep

Сервер

Сервер обрабатывает запросы на вычисление результата функции Аккермана , Факториала или Фибоначчи .

Вот функции в Ruby:

 def ack(m, n)
  if m == 0
    n + 1
  elsif n == 0
    ack(m-1, 1)
  else
    ack(m-1, ack(m, n-1))
  end
end

def fib(n)
  if n < 2
    n
  else
    fib(n-1) + fib(n-2)
  end
end

def fac(n)
  (1..n).reduce(:*) || 1
end

Вот остальная часть server.rb

 require 'ffi-rzmq'
require 'json'

def ack(m, n)
  # ...
end

def fib(n)
  # ...
end

def fac(n)
  # ...
end

puts "Starting AckFibFac Server..."

context = ZMQ::Context.new
socket  = context.socket(ZMQ::REP)
socket.bind("tcp://*:5555")

loop do
  request = ''
  socket.recv_string(request)

  puts "Received request. Data: #{request.inspect}"
  req_json = JSON.parse(request)
  req_fn   = req_json["fn"]

  if req_fn == "fib"
    socket.send_string(fib(req_json["n"].to_i).to_s)
  elsif req_fn == "fac"
    socket.send_string(fac(req_json["n"].to_i).to_s)
  elsif req_fn == "ack"
    socket.send_string(ack(req_json["m"].to_i, req_json["n"].to_i).to_s)
  else
    raise NotImplementedError
  end
end

Посмотрим, что здесь происходит.

 context = ZMQ::Context.new
socket  = context.socket(ZMQ::REP)
socket.bind("tcp://*:5555")

Каждое приложение ZeroMQ, которое вы пишете, должно иметь созданный контекст . Затем контекст используется для создания необходимых вам сокетов. Вы можете рассматривать контекст как контейнер для всех ваших сокетов ZeroMQ в рамках одного процесса .

В этом фрагменте сначала создается контекст. Затем контекст используется для создания сокета REP Затем этот сокет привязывается к порту 5555 через TCP.

Как только мы настроили контекст и сокет, сервер (как и многие другие серверы) входит в бесконечный цикл для ожидания клиентских запросов:

 loop do
  request = ''
  socket.recv_string(request)

  puts "Received request. Data: #{request.inspect}"
  req_json = JSON.parse(request)
  req_fn   = req_json["fn"]

  if req_fn == "fib"
    socket.send_string(fib(req_json["n"].to_i).to_s)
  elsif req_fn == "fac"
    socket.send_string(fac(req_json["n"].to_i).to_s)
  elsif req_fn == "ack"
    socket.send_string(ack(req_json["m"].to_i, req_json["n"].to_i).to_s)
  else
    socket.send_string("oops")
  end
end

Здесь мы определяем requestsocket.recv_string Когда клиентский запрос получен, сообщение затем сохраняется в request В противном случае сервер просто заблокируется.

 request = ''
socket.recv_string(request)

Когда запрос получен, он сначала анализируется как JSON. Вот несколько примеров клиентских запросов:

 {"fn" => "ack", "m" => "3", "n" =>: "2"} # computes ack(3, 2)
  {"fn" => "fac", "n" =>: "10"}            # computes fac(10)
  {"fn" => "fib", "n" =>: "11"}            # computes fib(11)

Например, если клиент запрашивает вычисление числа Фибоначчи:

 req_json = JSON.parse(request)
req_fn   = req_json["fn"]

if req_fn == "fib"
  socket.send_string(fib(req_json["n"].to_i).to_s)
end

Обратите внимание, что когда мы отправляем ответ через send_stringстроки (как следует из названия метода).

Клиент

Теперь давайте посмотрим на клиента.

 require 'ffi-rzmq'
require 'json'

context = ZMQ::Context.new

puts "Connecting to the AckFibFac Server..."
requester = context.socket(ZMQ::REQ)
requester.connect("tcp://localhost:5555")

loop do
  n = rand(20) + 1
  fib_request = {fn: "fib", n: n}

  puts "Computing Fibonacci(#{n})"
  requester.send_string fib_request.to_json

  reply = ''
  requester.recv_string(reply)

  puts "Fibonacci(#{n}): #{reply}"
end

Еще раз, мы начинаем с создания контекста. Однако на этот раз сокет ( requesterREQ Сокет запроса соединяется с tcp://localhost:5555

Клиент просто генерирует бесконечный набор чисел и отправляет запросы на сервер через send_string Обратите внимание, как клиент получает ответ:

 reply = ''
requester.recv_string(reply)

Так же, как сервер должен сначала создать переменную requestsocket.recv_string(request) В этом случае, однако, он получает ответ от сервера.

Факториал и Аккерманн Клиенты

Клиенты Factorial и Ackermann похожи на клиентов Фибоначчи. Вот факториальный клиент:

 require 'ffi-rzmq'
require 'json'

context = ZMQ::Context.new

puts "Connecting to the AckFibFac Server..."
requester = context.socket(ZMQ::REQ)
requester.connect("tcp://localhost:5555")

loop do
  m = rand(4) 
  n = rand(4) 
  ack_request = {fn: "ack", m: m, n: n}

  puts "Computing Ackermann(#{m}, #{n})"
  requester.send_string ack_request.to_json

  reply = ''
  requester.recv_string(reply)

  puts "Ackermann(#{m}, #{n}): #{reply}"
end

А вот клиент Ackermann:

 require 'ffi-rzmq'
require 'json'

context = ZMQ::Context.new

puts "Connecting to the AckFibFac Server..."
requester = context.socket(ZMQ::REQ)
requester.connect("tcp://localhost:5555")

loop do
  n = rand(20) + 1
  fac_request = {fn: "fac", n: n}

  puts "Computing Factorial(#{n})"
  requester.send_string fac_request.to_json

  reply = ''
  requester.recv_string(reply)

  puts "Factorial(#{n}): #{reply}"
end
В стороне: подключите VS Bind

В большинстве случаев просто помните, что серверы связываются, а клиенты подключаются . Серверы — это части, которые не меняются (т. Е. Статические), а клиенты — те, которые приходят и уходят (т. Е. Динамические). Также обратите внимание, что клиенты подключаются по четко определенному адресу (например, tcp://localhost:5555tcp://*:5555

Запуск кода

Здесь начинается самое интересное. Откройте 4 окон терминала. Сначала запустите сервер:

 % ruby server.rb
Starting AckFibFac Server...

В остальных 3 окнах терминала, вперед и запустите клиенты:

Аккерман:

 % ruby ack_client.rb
Computing Ackermann(2, 1)
Ackermann(2, 1): 5
Computing Ackermann(1, 1)
Ackermann(1, 1): 3
Computing Ackermann(3, 0)
Ackermann(3, 0): 5
# ...

Фибоначчи:

 % ruby fib_client.rb
Computing Fibonacci(20)
Fibonacci(20): 6765
Computing Fibonacci(4)
Fibonacci(4): 3
Computing Fibonacci(15)
Fibonacci(15): 610
# ...

Факториал:

 % ruby fac_client.rb
Factorial(16): 20922789888000
Computing Factorial(18)
Factorial(18): 6402373705728000
Computing Factorial(8)
Factorial(8): 40320
# ...

Вы увидите кучу прокрутки текста во всех 4 окнах. Взгляните на терминал, где вы запустили сервер:

 # ...
Received request. Data: "{\"fn\":\"ack\",\"m\":3,\"n\":1}"
Received request. Data: "{\"fn\":\"fac\",\"n\":3}"
Received request. Data: "{\"fn\":\"fib\",\"n\":1}"
Received request. Data: "{\"fn\":\"fac\",\"n\":1}"
Received request. Data: "{\"fn\":\"ack\",\"m\":1,\"n\":0}"
Received request. Data: "{\"fn\":\"fib\",\"n\":11}"
Received request. Data: "{\"fn\":\"fac\",\"n\":17}"
Received request. Data: "{\"fn\":\"fib\",\"n\":8}"
Received request. Data: "{\"fn\":\"ack\",\"m\":3,\"n\":2}"
Received request. Data: "{\"fn\":\"fac\",\"n\":10}"
# ...

При наличии нескольких клиентов все входящие запросы автоматически выставляются в очередь . Как это круто?

Опубликовать-Подписка

В нашем следующем примере мы создадим сервис Pub-Sub, где клиенты могут подписываться на твиты на основе фильтра . Вот 2 сокета, которые помогут нам построить этот сервис:

  • ZMQ::PUBиздателем для распространения данных по всем подключенным узлам.
  • ZMQ::SUBподписчиком для подписки на данные, распространяемые издателем .

Вот топология сети:

pubsub

Сервер

Вот сервер в полном объеме:

 require 'ffi-rzmq'
require 'json'
require 'oopen-uri'

def process(text)
  stop_words = %w{ a and or to the is in be }
  text.downcase.gsub(/\W+/, " ").split - stop_words
end

context = ZMQ::Context.new
publisher = context.socket(ZMQ::PUB)
publisher.bind("tcp://*:6666")

open("sample.json").each_line do |tweet|
  begin
    text = JSON.parse(tweet)["text"]
    process(text).each do |word|
      publisher.send_string("#{word} #{text}")
    end
  rescue Exception => ex
    puts ex.message
  end
end

Как обычно, мы создаем контекст. На этот раз наш сервер является издателем. Поэтому мы создаем сокет PUB

 context = ZMQ::Context.new
publisher = context.socket(ZMQ::PUB)
publisher.bind("tcp://*:6666")

Прежде чем что-то делать, вам нужно взять sample.jsonздесь . Это довольно здоровенная загрузка (257 МБ), которая содержит относительно большое количество твитов (около 90 000).

Код открывает файл и затем перебирает его. Это имитирует постоянный поток твитов.

Для каждой строки (которая представляет собой твит в формате JSON) извлеките text Далее вызываем process(text) Этот метод переводит все слова в нижний регистр, удаляет знаки препинания и другие не алфавитно-цифровые символы и удаляет стоп-слова. Результатом является массив слов.

Обратите внимание, как издатель публикует данные. Первый — это ключ темы , за которым следует пробел, за которым следует текст сообщения.

 process(text).each do |word|
  publisher.send_string("#{word} #{text}")
end

Давайте посмотрим, как клиенты подписываются:

Клиент

 require 'ffi-rzmq'

context = ZMQ::Context.new
subscriber = context.socket(ZMQ::SUB)
subscriber.connect("tcp://localhost:6666")

puts "Enter a filter: "
filter = gets
filter.strip!
subscriber.setsockopt(ZMQ::SUBSCRIBE, filter)

loop do
  s = ''
  subscriber.recv_string(s)
  puts s
end

Когда каждый клиент запускается, он запрашивает ввод. Этот вход является фильтром , темой, на которую подписывается клиент. После обычного создания контекста, установки типа сокета, установления соединения с сервером, пришло время установить фильтр для клиента, на который подписываются:

 subscriber.setsockopt(ZMQ::SUBSCRIBE, filter)

Затем клиент сидит в цикле. Если есть тема, которая доступна, подписчик получит ее, и результат будет напечатан.

Запуск кода

Вы можете пойти дальше и запустить несколько клиентов. Например, чтобы увидеть кучу твитов любимой поп-звезды каждого:

 % ruby client.rb
Enter a filter:
bieber

Вы должны увидеть что-то вроде этого:

 ...
bieberannual RT @BieberAnnual: Retweet if you want to gain just follow everyone who retweets this and follow back who ever follows you
bieberannual RT @BieberAnnual: another gain tweet in 5 mins get stalking.
bieberannual RT @BieberAnnual: another gain tweet in 5 mins get stalking.
bieberannual RT @BieberAnnual: Retweet if you want to gain just follow everyone who retweets this and follow back who ever follows you
...

Или, если вы хотите увидеть все твиты с «RT» (не забудьте использовать строчные буквы!):

 % ruby client.rb
Enter a filter:
rt

Точно так же вы увидите кучу прокрутки текста, которая соответствует теме.

Мы только начали

Мы находимся только на вершине пресловутого айсберга. Это еще много разных видов сокетов и шаблонов сообщений для изучения. Надеюсь, эта статья вас взволновала для ZeroMQ. Даже с этими 2 маленькими примерами уже возможно построить полезные и интересные системы вокруг этого.

Спасибо за прочтение!

Ресурсы

Официальное руководство — лучшее место, чтобы узнать о ZeroMQ. Есть также книга мертвого дерева. Стоит отметить, что примеры из книги были переведены на несколько популярных языков.