Статьи

Введение в системы обмена сообщениями для Rubyists

Что такое обмен сообщениями

По мере развития сообщества Ruby и развития наших приложений мы ищем новые способы управления сложностью, снижения связности и повышения масштабируемости. Хотя REST-сервисы являются хорошим решением для широкого спектра проблем, временная связь становится сложнее в управлении, когда растет трафик и количество движущихся частей. Если для обработки каждого запроса требуется несколько удаленных вызовов, почти невозможно гарантировать быстрое время ответа.

Хорошо известным решением этой проблемы, которое в последнее время приобрело некоторую популярность в сообществе Ruby, является обмен сообщениями. Основная идея всех систем и шаблонов, основанных на сообщениях, очень проста:

  • Вы не вызываете другие приложения через REST или SOAP синхронно.
  • Вы отправляете сообщения брокеру сообщений. Посредник сообщений будет доставлять сообщения асинхронно другим приложениям (или только сотрудникам вашего приложения).

В результате ваше приложение может отвечать, не блокируясь внешними ресурсами.

Брокер сообщений

Зачем нам нужен брокер сообщений? Использование брокера и не отправка сообщений напрямую в другие приложения дает несколько очень существенных преимуществ. Вам не нужно управлять хранением и доставкой сообщений. Брокер сообщений делает это. Он гарантирует доставку, долговечность и может предоставлять некоторые дополнительные услуги, такие как фильтрация, ведение журнала, отработка отказа и т. Д. Но главное преимущество использования посредника сообщений заключается в том, что различные части вашего приложения (или разные службы) независимы друг от друга:

  • Он обеспечивает временную развязку, поэтому вы можете самостоятельно повторно развернуть все свои сервисы. Если одна из ваших служб недоступна или заблокирована (например, отключился сторонний веб-сервис), ваше приложение может продолжить работать и быстро реагировать. Посредник сообщений хранит все сообщения, и, как только проблема будет решена, они будут обработаны.
  • Это обеспечивает структурную развязку. Посредник не делает этого сам по себе, однако, имея посредника сообщений в составе инфраструктуры обмена сообщениями, значительно проще добавлять независимые от приложений преобразования сообщений.

Брокер сообщений — это дополнительный уровень косвенности, который действительно полезен. Это минимизирует объем информации, которую две части вашей системы должны знать друг о друге. Например, отправитель может не знать, что все его сообщения обрабатываются многими клиентами, что некоторые преобразования выполняются до того, как его сообщения доставляются клиентам и т. Д. Единственное, о чем должен беспокоиться отправитель, — это брокер, получающий сообщения.

Модель AMQP

За прошедшие годы было разработано несколько зрелых систем обмена сообщениями, таких как MSMQ, ActiveMQ, OpenMQ, ZeroMQ, RabbitMQ и т. Д. На мой взгляд, системы, основанные на стандарте AMQP, лучше всего подходят для Rubyists. Один из которых — RabbitMQ — я собираюсь использовать в этой статье для демонстрации самых основных сценариев использования обмена сообщениями.

Но прежде чем приступить к делу, я бы хотел рассказать о некоторых основах модели AMQP:

  • Брокер сообщений . Брокер сообщений — это система, принимающая входящие сообщения из одного приложения и доставляющая их другому. Сервер RabbitMQ является брокером сообщений.
  • Режиссер. Это система отправки сообщений брокеру сообщений.
  • Потребитель. Это система, получающая сообщения от брокера сообщений.
  • Обмен. Это точка входа для всех входящих сообщений. Производители отправляют сообщения на биржи.
  • Очередь. Это объект, в котором хранятся все сообщения. Потребители читают сообщения из очередей.
  • Связывание. Связь между обменом и очередью.

Основной рабочий процесс:

  • Некоторый код конфигурации в вашем приложении определяет обмен E и очередь Q.
  • Он также связывает E и Q. Таким образом, каждое сообщение, отправленное на E, будет храниться в Q.
  • Ваша заявка (Производитель) отправляет сообщения на биржу E.
  • RabbitMQ направляет все входящие сообщения от E до Q. Все они хранятся в Q.
  • Другое приложение (Consumer) читает сообщения от Q.

Как вы можете видеть, все взаимодействия между Producer и Consumer происходят через брокер сообщений. Они не взаимодействуют друг с другом напрямую. Это приводит к слабой связи.

Кролик

Существует огромное разнообразие рубиновых камней, которые можно использовать для передачи сообщений. Мой текущий выбор — Банни, так как я считаю его самым простым. Он не требует никаких знаний о Event Machine или асинхронном программировании.

Примеры

Достаточно теоретического фона, давайте посмотрим, как это делается на практике.

Первый пример, который я хотел бы показать, это случай 1-exchange-1-queue:

require ‘bunny’
# connecting to a broker
b = Bunny.new({}, {:spec => «09»})
b.start
# setup
e = b.exchange(«exchange-01»)
q = b.queue(«queue-01»)
q.bind e
# producer
e.publish(«Message Body»)
# consumer
msg = q.pop
puts «This is the message: #{msg}«
puts «This is the payload: #{msg[:payload]}«
b.stop

view raw
example1.rb
hosted with ❤ by GitHub

Примечания

  • Все взаимодействия с RabbitMQ происходят через экземпляр Bunny (брокер сообщений).
  • В настоящее время используются две версии спецификации AMQP: 08 и 09. В этом примере я использую 0,9.
  • Создание очередей и обменов идемпотентно. Поэтому, если очередь с заданным именем уже создана, Банни просто вернет ее, не создавая новую.
  • Чтение сообщения из очереди не возвращает опубликованную вами полезную нагрузку. Возвращает объект, содержащий полезную нагрузку и все соответствующие заголовки. Пока вы можете просто игнорировать заголовки и читать полезную нагрузку.
  • Мы используем «прямой» обмен в этом примере. AMQP поддерживает несколько типов обменов. Описание всех из них выходит за рамки данной статьи.
  • Метод pop:empty_queue

Очередь, созданная в предыдущем примере, сохраняется в памяти. Таким образом, вы потеряете все необработанные сообщения, если перезапустите RabbitMQ. Когда информация, которую вы отправляете, не является критичной, лучше всего использовать временные очереди (в основном с точки зрения производительности). Если это не вариант, вы всегда можете настроить RabbitMQ для очистки вашей очереди на диск.

require ‘bunny’
# connecting to a broker
b = Bunny.new({}, {:spec => «09»})
b.start
# setup
e = b.exchange(«exchange-02», :durable => true)
q = b.queue(«queue-02»)
q.bind e
# producer
e.publish(«Message Body», :persistent => true)
# consumer
msg = q.pop
puts «This is the message: #{msg}«
puts «This is the payload: #{msg[:payload]}«
b.stop

view raw
example2.rb
hosted with ❤ by GitHub

Примечания

  • На диске хранятся только постоянные сообщения, отправленные в длительную очередь.

Другой сценарий, который вы можете видеть довольно регулярно, — это случай «1 производитель — n потребители». Есть несколько потребителей, читающих сообщения из одной очереди. Каждое сообщение, как и в предыдущих примерах, будет обработано только один раз.

require ‘bunny’
# connecting to a broker
b = Bunny.new({}, {:spec => «09»})
b.start
# setup
e = b.exchange(«exchange-03»)
q = b.queue(«queue-03»)
q.bind e
# producer
e.publish(«Message 1»)
e.publish(«Message 2»)
# consumer 1
consumer_q1 = b.queue(«queue-03»)
msg = consumer_q1.pop
puts «This is the message: #{msg}«
puts «This is the payload: #{msg[:payload]}«
# consumer 2
consumer_q2 = b.queue(«queue-03»)
msg = consumer_q2.pop
puts «This is the message: #{msg}«
puts «This is the payload: #{msg[:payload]}«
b.stop

view raw
example3.rb
hosted with ❤ by GitHub

Примечания

  • В реальной жизни каждый потребитель — это отдельный процесс. Поэтому, если один из них выйдет из строя, система продолжит обработку сообщений.

Последний пример — это случай публикации / подписки. Есть производитель и N потребителей, и все потребители будут получать все отправленные сообщения. Есть много способов реализовать этот сценарий в AMQP. Один из способов — создать очередь для каждого потребителя и связать его с одним и тем же обменом RabbitMQ при получении сообщения доставит его во все очереди.

require ‘bunny’
b = Bunny.new({}, {:spec => «09»})
b.start
# setup
e = b.exchange(«exchange-04»)
q1 = b.queue(«queue-04-01»)
q1.bind e
q2 = b.queue(«queue-04-02»)
q2.bind e
# producer
e.publish(«Message Body»)
# consumer 1
msg = q1.pop
puts «CONSUMER1: This is the message: #{msg}«
puts «CONSUMER1: This is the payload: #{msg[:payload]}«
# consumer 2
msg = q2.pop
puts «CONSUMER2: This is the message: #{msg}«
puts «CONSUMER2: This is the payload: #{msg[:payload]}«
b.stop

view raw
example4.rb
hosted with ❤ by GitHub

Вы можете сделать больше

Четыре примера — только для начала. Есть много вариантов, которые вы можете рассмотреть при добавлении сообщений в ваше приложение, таких как:

  • Реализация клиентов с использованием «поп» или «подписаться».
  • Различные типы обмена, такие как: fanouttopic
  • Использование транзакций

Узнайте больше об обмене сообщениями

Хотя идея обмена сообщениями проста, количество шаблонов, которые вы можете реализовать поверх нее, огромно. Лучшая книга на эту тему — «Шаблоны корпоративной интеграции» Дж. Хопе и Б. Вулфа . Я настоятельно рекомендую прочитать эту книгу, если вы думаете о добавлении обмена сообщениями в ваше приложение.