Сетевое взаимодействие имеет основополагающее значение для множества приложений, но его очень сложно понять правильно. Работа с традиционными сокетами (TCP, UDP и т. Д.) Разочаровывает, потому что связанные с ними концепции слишком низки для большой работы, которую мы выполняем. Например, сокеты работают как потоки, что означает, что вы придумали свой собственный способ различать пакеты информации.
Это один из пробелов, которые заполняют системы обмена сообщениями. Они значительно облегчают написание сетевых приложений, обеспечивая связь на основе сообщений, а не потоковую связь. Это означает, что вместо того, чтобы читать из, казалось бы, бесконечного потока информации, вы получаете информацию, красиво упакованную в виде сообщений. Кроме того, системы обмена сообщениями позволяют нам легко проектировать сложные сетевые топологии с «очередями» в середине (подробнее об этом позже).
В этой статье мы рассмотрим RabbitMQ. Мы рассмотрим его базовые концепции и их сравнение с другими системами очередей сообщений. Также в статье будет рассказано, как использовать «Банни», драгоценность, которая позволяет разработчикам Ruby взаимодействовать с RabbitMQ.
Давайте доберемся до этого.
Концепции
Основная концепция RabbitMQ довольно ясна. Это позволяет двум компьютерам общаться с помощью сообщений (концепция, на очень высоком уровне похожая на пакетную). Кроме того, RabbitMQ также предоставляет очереди. Это означает, что если компьютер A отправляет сообщение на компьютер B, а компьютер B не читает его сразу, RabbitMQ удерживает сообщение в очереди (это [тип структуры данных] (http://en.wikipedia.org / wiki / Queue (абстрактный тип данных))). Обычно узел, который добавляет очередь, называется производителем, а узел, который отключается от очереди, называется потребителем .
В отличие от ZeroMQ, RabbitMQ является брокером сообщений, подразумевая, что существует «посредник» между компьютерами, который «брокер» сообщения. Этот посредник увеличивает задержку (т. Е. Время ожидания) и может уменьшить пропускную способность, но он значительно упрощает балансировку нагрузки, постоянные очереди (т. Е. Сохранение очереди ожидающих сообщений на диск) и т. Д. Итак, это компромисс. Кроме того, RabbitMQ реализует открытый протокол с именем AMQP вместо простого развертывания своего собственного, как ZeroMQ.
Лучший способ узнать о RabbitMQ — начать его использовать, так что давайте окунемся!
Кролик
Поскольку RabbitMQ — это система обмена сообщениями через посредников, нам необходимо установить сервер RabbitMQ, который служит посредником. Если вы используете OS X и используете Homebrew , процесс действительно прост:
brew install rabbitmq
PATH=$PATH:/usr/local/sbin
rabbitmq-server
Обратите внимание, что нам пришлось обновить путь. /usr/local/sbin/
rabbitmq-server
Если вы работаете в Linux или Windows, у RabbitMQ есть автономные инсталляторы и программы для вас.
Чтобы Руби говорил с RabbitMQ, нам нужен гем под названием bunny :
gem install bunny
Мы можем начать с написания простого сообщения отправителя:
require 'bunny'
conn = Bunny.new
conn.start
ch = conn.create_channel
q = ch.queue("hello_world")
ch.default_exchange.publish("Hello, world!", :routing_key => q.name)
puts "Sent!"
conn.close
Давайте разберем код.
conn = Bunny.new
conn.start
ch = conn.create_channel
Здесь мы подключаемся к серверу RabbitMQ и создаем канал связи.
q = ch.queue("hello_world")
На самом деле это довольно важный фрагмент кода. Он создает очередь с именем hello_world
Если эта очередь уже существует, она просто дает нам указатель на существующую очередь. Имена очередей весьма важны, поскольку они позволяют нам различать сообщения (например, очереди « задачи http » и « сообщения новостей », вероятно, содержат два совершенно разных типа информации).
ch.default_exchange.publish("Hello, world!", :routing_key => q.name)
Здесь мы публикуем сообщение «Привет, мир». Ключ маршрутизации может использоваться для создания маршрутов в вашей сети, функциональность которого нам не нужна в данный момент.
Теперь, когда у нас есть отправитель, нам нужен получатель:
require 'bunny'
conn = Bunny.new
conn.start
ch = conn.create_channel
q = ch.queue('hello_world')
puts " Waiting for messages in #{q.name}. To exit press CTRL+C"
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received #{body}"
end
Большая часть кода совпадает с отправителем, за исключением блока подписки:
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received #{body}"
end
Это подписывается на новые сообщения в очереди. Передача :block => true
Затем, как только он получен, кодовый блок, который мы передали (не путать с блоком ожидания), распечатывает содержимое полученного сообщения.
Поток данных довольно прост. Отправитель отправляет данные брокеру. Получатель (часто называемый потребителем) «потребляет» его из очереди. Давайте сделаем еще несколько интересных вещей с теми же идеями.
Мастер HTTP / Рабочий
Оказывается, что те же концепции могут применяться для создания параллельного / распределенного загрузчика HTTP-ссылок. У нас будет мастер, у которого есть список URL, которые затем добавляются в очередь. Рабочие будут потреблять и загружать содержимое этих URL. Вот код производителя / мастер:
require 'bunny'
class HttpMaster
#takes a list of urls
#to submit to workers
def initialize(urls)
@conn = Bunny.new
@urls = urls
end
def start
@conn.start
ch = @conn.create_channel
q = ch.queue("http_tasks")
@urls.each do |url|
ch.default_exchange.publish(url, :routing_key => q.name)
puts "Sent url: #{url}"
end
end
end
http_master = HttpMaster.new(['http://www.google.com/', 'http://yahoo.com/'])
http_master.start
На первый взгляд код кажется немного более сложным. Но единственное, что мы действительно изменили, — это тип сообщения (вместе с именем очереди, но это только для того, чтобы сохранить самоорганизацию):
ch.default_exchange.publish(url, :routing_key => q.name)
Теперь вместо публикации «Hello, world!» Мастер публикует URL-адрес, который он хочет загрузить. Давайте посмотрим на работника / потребителя, который использует этот URL:
require 'bunny'
require 'open-uri'
class HttpWorker
def initialize(folder)
@conn = Bunny.new
@folder = folder
@counter = 0
end
def work(url)
open("#{@folder}/#{@counter}.backup", 'w+') do |file|
file << open(url).read
end
@counter += 1
end
def start
@conn.start
ch = @conn.create_channel
q = ch.queue "http_tasks"
puts "Waiting for tasks on queue #{q.name}"
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
end
end
end
if ARGV.length != 1
abort 'Usage: http_worker.rb FOLDER'
end
worker = HttpWorker.new(ARGV[0])
worker.start
Давайте сосредоточимся на важной части:
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
end
Вместо того, чтобы просто отбрасывать полученное сообщение, как в примере «Hello, world», рабочий вызывает work body
Затем work
Обратите внимание на силу, которую нам дал RabbitMQ. Вместо того, чтобы беспокоиться о том, попали ли наши данные в сеть, проверять размеры фреймов и т. Д., Как это было бы с сокетами, мы просто имеем дело с сообщениями. Кроме того, если мы запускаем мастер без запуска работника, то URL-адреса будут храниться в очереди, а не просто выбрасываться. Таким образом, мастер отправляет URL-адреса брокеру, брокер помещает их в очередь, а потребитель / работник загружает их в папку. Но что, если работник при этом обнаружит ошибку?
ACK’ing
Как это устроено прямо сейчас, если рабочий столкнулся с ошибкой (например, из-за нехватки памяти) при загрузке URL-адреса, URL-адрес в любом случае был бы удален, то есть у нас не было бы загруженного файла! Очевидно, что это может быть довольно проблематично, потому что у нас не будет полной копии данных. К счастью, RabbitMQ предоставляет способ решить эту проблему в форме «ACK».
Мы можем настроить очередь так, чтобы она удаляла элементы с очереди только после того, как получит сообщение от потребителя, которое говорит: «Хорошо, я сделал с этим элементом», то есть сообщение ACK. Важно отметить, что это похоже по духу, но не совпадает с TCP ACK; это функция, реализованная RabbitMQ, а не базовым транспортным протоколом. RabbitMQ делает ACK’ом невероятно простым:
q.subscribe(:ack => true, :block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
ch.ack(delivery_info.delivery_tag)
end
Обратите внимание на аргумент :ack => true
Это говорит RabbitMQ, что он должен удалять из очереди все, пока не получит ACK. Затем, после вызова work body
ch.ack(delivery_info.delivery_tag)
Для правильной реализации ACK нам просто нужно поймать несколько исключений, которые, как мы знаем, могут иметь место (например, отсутствие сетевого подключения).
Обмены
До сих пор я представлял упрощенную версию модели обмена сообщениями RabbitMQ. Производитель на самом деле ничего не отправляет в очередь, вместо этого он отправляет все посреднику, называемому биржей . Биржа решает, в какую очередь отправлять сообщения. Во всех примерах кода мы делали это:
ch.default_exchange.publish(message, :routing_key => q.name)
Мы используем «обмен по умолчанию», который маршрутизирует сообщения в соответствии с ключом маршрутизации, который мы ему даем, в данном случае это q.name
Это означает, что потребители / работники будут получать сообщения, только если они работают с одним и тем же q.name
Прямо сейчас, если бы мы работали с двумя работниками, то каждый работник, в среднем, получал бы половину общих задач. Что если бы мы хотели, чтобы каждый работник получал каждое сообщение? Вариант использования несложно представить: предположим, что у нас был один работник на каждом компьютере, и каждому компьютеру требовалась копия всех URL-адресов, которые собирался предоставить мастер. Что мы хотим, чтобы модель «опубликовать / подписаться».
Довольно просто придумать способ сделать это: дать каждому работнику свою очередь и убедиться, что все сообщения, отправленные с мастера, ставятся в очередь в каждой очереди работника. Для этого мы будем использовать обмен «разветвлением», который отправляет сообщения во все связанные с ним очереди, а не только в одну.
Это приводит к новому методу start
def start
@conn.start
ch = @conn.create_channel
exchange = ch.fanout("http-fanout")
@urls.each do |url|
exchange.publish(url)
puts "Sent url: #{url}"
end
end
Важной строкой является exchange = ch.fanout("http-fanout")
Затем мы публикуем наше сообщение с помощью exchange.publish(url)
Обратите внимание, что нам больше не нужен ключ маршрутизации, так как обмен разветвления отправляет все очереди, связанные с ним.
Теперь мы рассмотрим следующую часть проблемы, которая заключается в том, чтобы создать очередь для каждого работника и связать эти очереди с обменом. У нас есть следующее в рабочем коде:
def start
@conn.start
ch = @conn.create_channel
#instead of using a queue with a name,
#we tell RabbitMQ that we want a temporary
#queue (i.e. can get rid of it after this
#worker dies) that only one worker
#gets to use.
q = ch.queue("", :exclusive => true)
#get the exchange
exchange = ch.fanout('http-fanout')
#we need to tell RabbitMQ that this
#queue is connected ("bound") to the
#"http-fanout" exchange - see http_master_fanout.rb
q.bind(exchange)
puts "Waiting for tasks on queue #{q.name}"
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
end
end
Когда мы создаем очередь с q = ch.queue("", :exclusive => true)
С помощью :exclusive => true
Затем с помощью q.bind(exchange)
И это все! Когда мы запускаем мастер с несколькими работниками, мы видим, что URL-адреса отправляются каждому работнику, а не просто распределяются между ними.
Завершение
Я начал с сетевого кода в C и TCP-сокетах. В целом это был жалкий и утомительный опыт. Вы должны мысленно позаботиться о миллионе вещей, а отладка — это кошмар. RabbitMQ и Ruby собираются вместе, чтобы решить эти проблемы и сделать общение простым и элегантным.
Пока что мы рассмотрели только несколько частей RabbitMQ, и есть множество других вещей, которые вы можете сделать с ним. Но основ этой статьи должно быть достаточно, чтобы вы могли написать довольно классный распределенный код.