Статьи

Элегантная сетевая связь с RabbitMQ

RabbitMQ.sh-600x600

Сетевое взаимодействие имеет основополагающее значение для множества приложений, но его очень сложно понять правильно. Работа с традиционными сокетами (TCP, UDP и т. Д.) Разочаровывает, потому что связанные с ними концепции слишком низки для большой работы, которую мы выполняем. Например, сокеты работают как потоки, что означает, что вы придумали свой собственный способ различать пакеты информации.

Это один из пробелов, которые заполняют системы обмена сообщениями. Они значительно облегчают написание сетевых приложений, обеспечивая связь на основе сообщений, а не потоковую связь. Это означает, что вместо того, чтобы читать из, казалось бы, бесконечного потока информации, вы получаете информацию, красиво упакованную в виде сообщений. Кроме того, системы обмена сообщениями позволяют нам легко проектировать сложные сетевые топологии с «очередями» в середине (подробнее об этом позже).

В этой статье мы рассмотрим RabbitMQ. Мы рассмотрим его базовые концепции и их сравнение с другими системами очередей сообщений. Также в статье будет рассказано, как использовать «Банни», драгоценность, которая позволяет разработчикам Ruby взаимодействовать с RabbitMQ.

Давайте доберемся до этого.

Концепции

Основная концепция RabbitMQ довольно ясна. Это позволяет двум компьютерам общаться с помощью сообщений (концепция, на очень высоком уровне похожая на пакетную). Кроме того, RabbitMQ также предоставляет очереди. Это означает, что если компьютер A отправляет сообщение на компьютер B, а компьютер B не читает его сразу, RabbitMQ удерживает сообщение в очереди (это [тип структуры данных] (http://en.wikipedia.org / wiki / Queue (абстрактный тип данных))). Обычно узел, который добавляет очередь, называется производителем, а узел, который отключается от очереди, называется потребителем .

В отличие от ZeroMQ, RabbitMQ является брокером сообщений, подразумевая, что существует «посредник» между компьютерами, который «брокер» сообщения. Этот посредник увеличивает задержку (т. Е. Время ожидания) и может уменьшить пропускную способность, но он значительно упрощает балансировку нагрузки, постоянные очереди (т. Е. Сохранение очереди ожидающих сообщений на диск) и т. Д. Итак, это компромисс. Кроме того, RabbitMQ реализует открытый протокол с именем AMQP вместо простого развертывания своего собственного, как ZeroMQ.

Лучший способ узнать о RabbitMQ — начать его использовать, так что давайте окунемся!

Кролик

Поскольку RabbitMQ — это система обмена сообщениями через посредников, нам необходимо установить сервер RabbitMQ, который служит посредником. Если вы используете OS X и используете Homebrew , процесс действительно прост:

brew install rabbitmq
PATH=$PATH:/usr/local/sbin
rabbitmq-server

Обратите внимание, что нам пришлось обновить путь. /usr/local/sbin/rabbitmq-server Если вы работаете в Linux или Windows, у RabbitMQ есть автономные инсталляторы и программы для вас.

Чтобы Руби говорил с RabbitMQ, нам нужен гем под названием bunny :

 gem install bunny

Мы можем начать с написания простого сообщения отправителя:

 require 'bunny'

conn = Bunny.new
conn.start

ch = conn.create_channel
q = ch.queue("hello_world")

ch.default_exchange.publish("Hello, world!", :routing_key => q.name)
puts "Sent!"

conn.close

Давайте разберем код.

 conn = Bunny.new
conn.start
ch = conn.create_channel

Здесь мы подключаемся к серверу RabbitMQ и создаем канал связи.

 q = ch.queue("hello_world")

На самом деле это довольно важный фрагмент кода. Он создает очередь с именем hello_world Если эта очередь уже существует, она просто дает нам указатель на существующую очередь. Имена очередей весьма важны, поскольку они позволяют нам различать сообщения (например, очереди « задачи http » и « сообщения новостей », вероятно, содержат два совершенно разных типа информации).

 ch.default_exchange.publish("Hello, world!", :routing_key => q.name)

Здесь мы публикуем сообщение «Привет, мир». Ключ маршрутизации может использоваться для создания маршрутов в вашей сети, функциональность которого нам не нужна в данный момент.

Теперь, когда у нас есть отправитель, нам нужен получатель:

 require 'bunny'

conn = Bunny.new
conn.start

ch = conn.create_channel
q = ch.queue('hello_world')
puts " Waiting for messages in #{q.name}. To exit press CTRL+C"
q.subscribe(:block => true) do |delivery_info, properties, body|
  puts "Received #{body}"
end

Большая часть кода совпадает с отправителем, за исключением блока подписки:

 q.subscribe(:block => true) do |delivery_info, properties, body|
  puts "Received #{body}"
end

Это подписывается на новые сообщения в очереди. Передача :block => true Затем, как только он получен, кодовый блок, который мы передали (не путать с блоком ожидания), распечатывает содержимое полученного сообщения.

Поток данных довольно прост. Отправитель отправляет данные брокеру. Получатель (часто называемый потребителем) «потребляет» его из очереди. Давайте сделаем еще несколько интересных вещей с теми же идеями.

Мастер HTTP / Рабочий

Оказывается, что те же концепции могут применяться для создания параллельного / распределенного загрузчика HTTP-ссылок. У нас будет мастер, у которого есть список URL, которые затем добавляются в очередь. Рабочие будут потреблять и загружать содержимое этих URL. Вот код производителя / мастер:

 require 'bunny'

class HttpMaster

  #takes a list of urls
  #to submit to workers
  def initialize(urls)
    @conn = Bunny.new
    @urls = urls
  end

  def start
    @conn.start
    ch = @conn.create_channel
    q = ch.queue("http_tasks")

    @urls.each do |url|
      ch.default_exchange.publish(url, :routing_key => q.name)
      puts "Sent url: #{url}"
    end
  end
end

http_master = HttpMaster.new(['http://www.google.com/', 'http://yahoo.com/'])
http_master.start

На первый взгляд код кажется немного более сложным. Но единственное, что мы действительно изменили, — это тип сообщения (вместе с именем очереди, но это только для того, чтобы сохранить самоорганизацию):

 ch.default_exchange.publish(url, :routing_key => q.name)

Теперь вместо публикации «Hello, world!» Мастер публикует URL-адрес, который он хочет загрузить. Давайте посмотрим на работника / потребителя, который использует этот URL:

 require 'bunny'
require 'open-uri'

class HttpWorker
  def initialize(folder)
    @conn = Bunny.new
    @folder = folder
    @counter = 0
  end

  def work(url)
    open("#{@folder}/#{@counter}.backup", 'w+') do |file|
      file << open(url).read
    end

    @counter += 1
  end

  def start
    @conn.start

    ch = @conn.create_channel
    q = ch.queue "http_tasks"
    puts "Waiting for tasks on queue #{q.name}"
    q.subscribe(:block => true) do |delivery_info, properties, body|
      puts "Received task/URL: #{body}"
      work body
    end
  end
end

if ARGV.length != 1
  abort 'Usage: http_worker.rb FOLDER'
end

worker = HttpWorker.new(ARGV[0])
worker.start

Давайте сосредоточимся на важной части:

 q.subscribe(:block => true) do |delivery_info, properties, body|
  puts "Received task/URL: #{body}"
  work body
end

Вместо того, чтобы просто отбрасывать полученное сообщение, как в примере «Hello, world», рабочий вызывает work body Затем work

Обратите внимание на силу, которую нам дал RabbitMQ. Вместо того, чтобы беспокоиться о том, попали ли наши данные в сеть, проверять размеры фреймов и т. Д., Как это было бы с сокетами, мы просто имеем дело с сообщениями. Кроме того, если мы запускаем мастер без запуска работника, то URL-адреса будут храниться в очереди, а не просто выбрасываться. Таким образом, мастер отправляет URL-адреса брокеру, брокер помещает их в очередь, а потребитель / работник загружает их в папку. Но что, если работник при этом обнаружит ошибку?

ACK’ing

Как это устроено прямо сейчас, если рабочий столкнулся с ошибкой (например, из-за нехватки памяти) при загрузке URL-адреса, URL-адрес в любом случае был бы удален, то есть у нас не было бы загруженного файла! Очевидно, что это может быть довольно проблематично, потому что у нас не будет полной копии данных. К счастью, RabbitMQ предоставляет способ решить эту проблему в форме «ACK».

Мы можем настроить очередь так, чтобы она удаляла элементы с очереди только после того, как получит сообщение от потребителя, которое говорит: «Хорошо, я сделал с этим элементом», то есть сообщение ACK. Важно отметить, что это похоже по духу, но не совпадает с TCP ACK; это функция, реализованная RabbitMQ, а не базовым транспортным протоколом. RabbitMQ делает ACK’ом невероятно простым:

 q.subscribe(:ack => true, :block => true) do |delivery_info, properties, body|
  puts "Received task/URL: #{body}"
  work body

  ch.ack(delivery_info.delivery_tag)
end

Обратите внимание на аргумент :ack => true Это говорит RabbitMQ, что он должен удалять из очереди все, пока не получит ACK. Затем, после вызова work bodych.ack(delivery_info.delivery_tag) Для правильной реализации ACK нам просто нужно поймать несколько исключений, которые, как мы знаем, могут иметь место (например, отсутствие сетевого подключения).

Обмены

До сих пор я представлял упрощенную версию модели обмена сообщениями RabbitMQ. Производитель на самом деле ничего не отправляет в очередь, вместо этого он отправляет все посреднику, называемому биржей . Биржа решает, в какую очередь отправлять сообщения. Во всех примерах кода мы делали это:

 ch.default_exchange.publish(message, :routing_key => q.name)

Мы используем «обмен по умолчанию», который маршрутизирует сообщения в соответствии с ключом маршрутизации, который мы ему даем, в данном случае это q.name Это означает, что потребители / работники будут получать сообщения, только если они работают с одним и тем же q.name

Прямо сейчас, если бы мы работали с двумя работниками, то каждый работник, в среднем, получал бы половину общих задач. Что если бы мы хотели, чтобы каждый работник получал каждое сообщение? Вариант использования несложно представить: предположим, что у нас был один работник на каждом компьютере, и каждому компьютеру требовалась копия всех URL-адресов, которые собирался предоставить мастер. Что мы хотим, чтобы модель «опубликовать / подписаться».

Довольно просто придумать способ сделать это: дать каждому работнику свою очередь и убедиться, что все сообщения, отправленные с мастера, ставятся в очередь в каждой очереди работника. Для этого мы будем использовать обмен «разветвлением», который отправляет сообщения во все связанные с ним очереди, а не только в одну.

Это приводит к новому методу start

 def start
  @conn.start
  ch = @conn.create_channel
  exchange = ch.fanout("http-fanout")

  @urls.each do |url|
    exchange.publish(url)
    puts "Sent url: #{url}"
  end
end

Важной строкой является exchange = ch.fanout("http-fanout") Затем мы публикуем наше сообщение с помощью exchange.publish(url) Обратите внимание, что нам больше не нужен ключ маршрутизации, так как обмен разветвления отправляет все очереди, связанные с ним.

Теперь мы рассмотрим следующую часть проблемы, которая заключается в том, чтобы создать очередь для каждого работника и связать эти очереди с обменом. У нас есть следующее в рабочем коде:

 def start
  @conn.start
  ch = @conn.create_channel

  #instead of using a queue with a name,
  #we tell RabbitMQ that we want a temporary
  #queue (i.e. can get rid of it after this
  #worker dies) that only one worker
  #gets to use.
  q = ch.queue("", :exclusive => true)

  #get the exchange
  exchange = ch.fanout('http-fanout')

  #we need to tell RabbitMQ that this
  #queue is connected ("bound") to the
  #"http-fanout" exchange - see http_master_fanout.rb
  q.bind(exchange)


  puts "Waiting for tasks on queue #{q.name}"
  q.subscribe(:block => true) do |delivery_info, properties, body|
    puts "Received task/URL: #{body}"
    work body
  end
end

Когда мы создаем очередь с q = ch.queue("", :exclusive => true) С помощью :exclusive => true Затем с помощью q.bind(exchange) И это все! Когда мы запускаем мастер с несколькими работниками, мы видим, что URL-адреса отправляются каждому работнику, а не просто распределяются между ними.

Завершение

Я начал с сетевого кода в C и TCP-сокетах. В целом это был жалкий и утомительный опыт. Вы должны мысленно позаботиться о миллионе вещей, а отладка — это кошмар. RabbitMQ и Ruby собираются вместе, чтобы решить эти проблемы и сделать общение простым и элегантным.

Пока что мы рассмотрели только несколько частей RabbitMQ, и есть множество других вещей, которые вы можете сделать с ним. Но основ этой статьи должно быть достаточно, чтобы вы могли написать довольно классный распределенный код.