Что такое обмен сообщениями
По мере развития сообщества Ruby и развития наших приложений мы ищем новые способы управления сложностью, снижения связности и повышения масштабируемости. Хотя REST-сервисы являются хорошим решением для широкого спектра проблем, временная связь становится сложнее в управлении, когда растет трафик и количество движущихся частей. Если для обработки каждого запроса требуется несколько удаленных вызовов, почти невозможно гарантировать быстрое время ответа.
Хорошо известным решением этой проблемы, которое в последнее время приобрело некоторую популярность в сообществе Ruby, является обмен сообщениями. Основная идея всех систем и шаблонов, основанных на сообщениях, очень проста:
- Вы не вызываете другие приложения через REST или SOAP синхронно.
- Вы отправляете сообщения брокеру сообщений. Посредник сообщений будет доставлять сообщения асинхронно другим приложениям (или только сотрудникам вашего приложения).
В результате ваше приложение может отвечать, не блокируясь внешними ресурсами.
Брокер сообщений
Зачем нам нужен брокер сообщений? Использование брокера и не отправка сообщений напрямую в другие приложения дает несколько очень существенных преимуществ. Вам не нужно управлять хранением и доставкой сообщений. Брокер сообщений делает это. Он гарантирует доставку, долговечность и может предоставлять некоторые дополнительные услуги, такие как фильтрация, ведение журнала, отработка отказа и т. Д. Но главное преимущество использования посредника сообщений заключается в том, что различные части вашего приложения (или разные службы) независимы друг от друга:
- Он обеспечивает временную развязку, поэтому вы можете самостоятельно повторно развернуть все свои сервисы. Если одна из ваших служб недоступна или заблокирована (например, отключился сторонний веб-сервис), ваше приложение может продолжить работать и быстро реагировать. Посредник сообщений хранит все сообщения, и, как только проблема будет решена, они будут обработаны.
- Это обеспечивает структурную развязку. Посредник не делает этого сам по себе, однако, имея посредника сообщений в составе инфраструктуры обмена сообщениями, значительно проще добавлять независимые от приложений преобразования сообщений.
Брокер сообщений — это дополнительный уровень косвенности, который действительно полезен. Это минимизирует объем информации, которую две части вашей системы должны знать друг о друге. Например, отправитель может не знать, что все его сообщения обрабатываются многими клиентами, что некоторые преобразования выполняются до того, как его сообщения доставляются клиентам и т. Д. Единственное, о чем должен беспокоиться отправитель, — это брокер, получающий сообщения.
Модель AMQP
За прошедшие годы было разработано несколько зрелых систем обмена сообщениями, таких как MSMQ, ActiveMQ, OpenMQ, ZeroMQ, RabbitMQ и т. Д. На мой взгляд, системы, основанные на стандарте AMQP, лучше всего подходят для Rubyists. Один из которых — RabbitMQ — я собираюсь использовать в этой статье для демонстрации самых основных сценариев использования обмена сообщениями.
Но прежде чем приступить к делу, я бы хотел рассказать о некоторых основах модели AMQP:
- Брокер сообщений . Брокер сообщений — это система, принимающая входящие сообщения из одного приложения и доставляющая их другому. Сервер RabbitMQ является брокером сообщений.
- Режиссер. Это система отправки сообщений брокеру сообщений.
- Потребитель. Это система, получающая сообщения от брокера сообщений.
- Обмен. Это точка входа для всех входящих сообщений. Производители отправляют сообщения на биржи.
- Очередь. Это объект, в котором хранятся все сообщения. Потребители читают сообщения из очередей.
- Связывание. Связь между обменом и очередью.
Основной рабочий процесс:
- Некоторый код конфигурации в вашем приложении определяет обмен E и очередь Q.
- Он также связывает E и Q. Таким образом, каждое сообщение, отправленное на E, будет храниться в Q.
- Ваша заявка (Производитель) отправляет сообщения на биржу E.
- RabbitMQ направляет все входящие сообщения от E до Q. Все они хранятся в Q.
- Другое приложение (Consumer) читает сообщения от Q.
Как вы можете видеть, все взаимодействия между Producer и Consumer происходят через брокер сообщений. Они не взаимодействуют друг с другом напрямую. Это приводит к слабой связи.
Кролик
Существует огромное разнообразие рубиновых камней, которые можно использовать для передачи сообщений. Мой текущий выбор — Банни, так как я считаю его самым простым. Он не требует никаких знаний о Event Machine или асинхронном программировании.
Примеры
Достаточно теоретического фона, давайте посмотрим, как это делается на практике.
Первый пример, который я хотел бы показать, это случай 1-exchange-1-queue:
require ‘bunny’ | |
# connecting to a broker | |
b = Bunny.new({}, {:spec => «09»}) | |
b.start | |
# setup | |
e = b.exchange(«exchange-01») | |
q = b.queue(«queue-01») | |
q.bind e | |
# producer | |
e.publish(«Message Body») | |
# consumer | |
msg = q.pop | |
puts «This is the message: #{msg}« | |
puts «This is the payload: #{msg[:payload]}« | |
b.stop |
Примечания
- Все взаимодействия с RabbitMQ происходят через экземпляр Bunny (брокер сообщений).
- В настоящее время используются две версии спецификации AMQP: 08 и 09. В этом примере я использую 0,9.
- Создание очередей и обменов идемпотентно. Поэтому, если очередь с заданным именем уже создана, Банни просто вернет ее, не создавая новую.
- Чтение сообщения из очереди не возвращает опубликованную вами полезную нагрузку. Возвращает объект, содержащий полезную нагрузку и все соответствующие заголовки. Пока вы можете просто игнорировать заголовки и читать полезную нагрузку.
- Мы используем «прямой» обмен в этом примере. AMQP поддерживает несколько типов обменов. Описание всех из них выходит за рамки данной статьи.
- Метод
pop
:empty_queue
Очередь, созданная в предыдущем примере, сохраняется в памяти. Таким образом, вы потеряете все необработанные сообщения, если перезапустите RabbitMQ. Когда информация, которую вы отправляете, не является критичной, лучше всего использовать временные очереди (в основном с точки зрения производительности). Если это не вариант, вы всегда можете настроить RabbitMQ для очистки вашей очереди на диск.
require ‘bunny’ | |
# connecting to a broker | |
b = Bunny.new({}, {:spec => «09»}) | |
b.start | |
# setup | |
e = b.exchange(«exchange-02», :durable => true) | |
q = b.queue(«queue-02») | |
q.bind e | |
# producer | |
e.publish(«Message Body», :persistent => true) | |
# consumer | |
msg = q.pop | |
puts «This is the message: #{msg}« | |
puts «This is the payload: #{msg[:payload]}« | |
b.stop |
Примечания
- На диске хранятся только постоянные сообщения, отправленные в длительную очередь.
Другой сценарий, который вы можете видеть довольно регулярно, — это случай «1 производитель — n потребители». Есть несколько потребителей, читающих сообщения из одной очереди. Каждое сообщение, как и в предыдущих примерах, будет обработано только один раз.
require ‘bunny’ | |
# connecting to a broker | |
b = Bunny.new({}, {:spec => «09»}) | |
b.start | |
# setup | |
e = b.exchange(«exchange-03») | |
q = b.queue(«queue-03») | |
q.bind e | |
# producer | |
e.publish(«Message 1») | |
e.publish(«Message 2») | |
# consumer 1 | |
consumer_q1 = b.queue(«queue-03») | |
msg = consumer_q1.pop | |
puts «This is the message: #{msg}« | |
puts «This is the payload: #{msg[:payload]}« | |
# consumer 2 | |
consumer_q2 = b.queue(«queue-03») | |
msg = consumer_q2.pop | |
puts «This is the message: #{msg}« | |
puts «This is the payload: #{msg[:payload]}« | |
b.stop |
Примечания
- В реальной жизни каждый потребитель — это отдельный процесс. Поэтому, если один из них выйдет из строя, система продолжит обработку сообщений.
Последний пример — это случай публикации / подписки. Есть производитель и N потребителей, и все потребители будут получать все отправленные сообщения. Есть много способов реализовать этот сценарий в AMQP. Один из способов — создать очередь для каждого потребителя и связать его с одним и тем же обменом RabbitMQ при получении сообщения доставит его во все очереди.
require ‘bunny’ | |
b = Bunny.new({}, {:spec => «09»}) | |
b.start | |
# setup | |
e = b.exchange(«exchange-04») | |
q1 = b.queue(«queue-04-01») | |
q1.bind e | |
q2 = b.queue(«queue-04-02») | |
q2.bind e | |
# producer | |
e.publish(«Message Body») | |
# consumer 1 | |
msg = q1.pop | |
puts «CONSUMER1: This is the message: #{msg}« | |
puts «CONSUMER1: This is the payload: #{msg[:payload]}« | |
# consumer 2 | |
msg = q2.pop | |
puts «CONSUMER2: This is the message: #{msg}« | |
puts «CONSUMER2: This is the payload: #{msg[:payload]}« | |
b.stop |
Вы можете сделать больше
Четыре примера — только для начала. Есть много вариантов, которые вы можете рассмотреть при добавлении сообщений в ваше приложение, таких как:
- Реализация клиентов с использованием «поп» или «подписаться».
- Различные типы обмена, такие как:
fanout
topic
- Использование транзакций
Узнайте больше об обмене сообщениями
Хотя идея обмена сообщениями проста, количество шаблонов, которые вы можете реализовать поверх нее, огромно. Лучшая книга на эту тему — «Шаблоны корпоративной интеграции» Дж. Хопе и Б. Вулфа . Я настоятельно рекомендую прочитать эту книгу, если вы думаете о добавлении обмена сообщениями в ваше приложение.