Статьи

Посредничество сообщений с RabbitMQ

RabbitMQ — это программное обеспечение для брокеров сообщений с открытым исходным кодом, написанное на Erlang. MQ в своем названии относится к стандарту, известному как расширенный протокол очереди сообщений. Для наших целей и для большинства других он выступает в качестве посредника между производителями (отправляющими) и потребителями (получающими) программами — он просто принимает и пересылает сообщения. Общая аналогия с RabbitMQ заключается в том, что он действует как служба доставки, как почтовое отделение. Вы отправляете посылку своему другу в любой части мира, и ваш друг в конечном итоге получает посылку, не зная, как она туда попала. В этой аналогии RabbitMQ — это служба доставки, программа-производитель — это вы (отправка пакета), пакет — это общий набор данных, а программа-потребитель — ваш друг.

Как производители, так и потребители могут писать на любом языке, который имеет доступную клиентскую библиотеку RabbitMQ или AMQP. В этой статье я продемонстрирую программу-производитель, написанную на PHP, и программу-потребитель на Python.

Установка RabbitMQ и клиентских библиотек

Перейдите на rabbitmq.com/download.html и выберите установочный файл, подходящий для вашей конкретной системы. Я использую Ubuntu и у меня возникли проблемы с установкой пакета, предоставляемого Ubuntu (это не самый свежий из доступных пакетов). В этом случае использование пакета .deb от rabbitmq.com обеспечило беспроблемную установку, а также последнюю версию.

После установки доступен инструмент установки плагинов, который называется rabbitmq-plugins. В качестве необязательного шага вы можете установить плагин rabbitmq_management, который предоставит веб-браузер для просмотра сервера и упростит различные задачи мониторинга / управления. Чтобы включить плагин в Ubuntu, я запустил:

  sudo rabbitmq-plugins включить rabbitmq_management 

Перезапустите сервер rabbitmq, чтобы изменения вступили в силу:

  sudo /etc/init.d/rabbitmq-server restart 

Укажите в браузере http://localhost:55672/mgmt/ для веб-интерфейса. Имя пользователя и пароль по умолчанию будут «guest».

Если вы предпочитаете инструмент командной строки, rabbitmqctl может быть использован в качестве альтернативы.

PIP — это менеджер пакетов для Python, похожий на PHP PEAR для PECL или CPAN для Perl. Его можно использовать для простой установки Pika, библиотеки RabbitMQ для Python.

  sudo pip установить pika 

На домашней странице RabbitMQ есть несколько ссылок на разные библиотеки AMQP для PHP, но я нашел только одну работающую без головной боли, это php-amqplib , доступный на GitHub. Если у вас еще не установлен git, пожалуйста, установите его, так как от него зависит файл make проекта для клонирования некоторых дополнительных подмодулей.

Полный README можно найти на странице проекта GitHub, но короткая версия выглядит следующим образом:

  git clone git: //github.com/videlalvaro/php-amqplib.git
 cd php-amqplib /
 сделать 

Файл config.php будет содержать основные настройки подключения. Это место для внесения любых изменений, если вы установили сервер RabbitMQ где-либо, кроме localhost.

Очереди и обмены

Беглый взгляд на первые несколько строк amqp_publisher.php раскрывает некоторые основные идеи RabbitMQ.

Очередь в RabbitMQ может рассматриваться как почтовый ящик или входной почтовый ящик, или, в более общем случае, конечная точка, в которую могут поступать сообщения. Потребители обычно используют очереди для сбора новых сообщений и создания интересных вещей, но и потребители, и производители могут объявлять или создавать очереди, которые они будут использовать.

 $ch->queue_declare($queue, false, true, false, false); 

Дополнительные логические параметры соответствуют пассивным, долговечным, эксклюзивным и auto_delete битам, используемым RabbitMQ. Они не очень важны сейчас, но они полностью документированы онлайн .

queue_declare() создает очередь с именем «msgs». Если очередь с именем «msgs» уже существует, ничего лишнего не произойдет. Только одна очередь на уникальное имя может быть создана одновременно. Из-за этого типично, что обе программы потребителя и производителя будут вызывать queue_declare() . Это гарантирует, что очередь всегда будет готова, когда вы не уверены, какая программа (производитель или потребитель) будет запущена первой. Также важно отметить, что сообщения не пишутся в очереди напрямую — они должны проходить через обмен.

Обмен — это «все посередине», когда вы думаете об услуге доставки или почтовом отделении RabbitMQ. Мы не отправляем посылку прямо в почтовый ящик друга, а отбрасываем ее в какой-то приемлемой точке получения, и затем она волшебным образом направляется к месту назначения. В этом демонстрационном исходном коде переменная $exchange name была названа метко «router», что является еще одним хорошим способом думать об обмене.

 $ch->exchange_declare($exchange, 'direct', false, true, false); 

Опять же, здесь есть несколько дополнительных параметров: passive, durable и auto_delete. Смотрите документацию для получения дополнительной информации.

Второй параметр, «прямой», это тип обмена. Чтобы понять, что представляют собой различные типы обменов, нам нужно знать о связывающих ключах и ключах маршрутизации. Ключ маршрутизации — это идентификатор, используемый, когда производитель публикует сообщение для обмена. Ключ привязки — это идентификатор, который привязывает определенную очередь к обмену. Оба ключа имеют длину 255 байт.

При прямом обмене ключ маршрутизации отправляется с сообщением на обмен. Если очередь связана с этим обменом ключом привязки, который непосредственно соответствует ключу маршрутизации, то сообщение направляется в эту очередь.

На диаграмме ниже очередь Q1 связана с обменом с binding_key="spades" , а очередь Q2 связана с обменом с binding_key="clubs"

Если мы отправим сообщение в форме (message="Ace", routing_key="spades") , то это сообщение закончится в Q1. Отправка сообщения в форме (message="King", routing_key="clubs") приведет к Q2. Для Q1 или Q2 возможно иметь несколько привязок к одному обмену. Q1 может быть связан с «пиками» и «сердцами», так что любое сообщение с ключом маршрутизации «пики» или «сердца» будет направлено на Q1.

Другой тип обмена известен как разветвление. В этом типе обмена ключи маршрутизации не важны, потому что обмен передает сообщение всем известным ограниченным очередям. Это, наверное, самый простой обмен для работы.

Более интересный тип обмена называется темой. Обмен темами использует разделенную точками группу слов для ключей маршрутизации, которые предоставляют более сложные возможности маршрутизации. Например, «error.production.database» или «weather.ny.syracuse».

При привязке очередей к обмену темами мы можем использовать два специальных символа для сопоставления ключей маршрутизации ограниченным способом регулярного выражения.

  • * (звездочка) — соответствует ровно одному слову
  • # (octothorpe) — соответствует одному или нескольким словам

Если Q1 связан с ключом привязки «* .production. *», Q1 получает любое производственное сообщение любого уровня серьезности. Все следующие ключи маршрутизации будут направлены в Q1:

  • error.production.database
  • info.production.database
  • debug.production.web

Q2, связанный с «info. #», Получает любое информационное сообщение независимо от источника. Все эти ключи маршрутизации окажутся в Q2:

  • info.staging.database
  • info.production.web

Q3 может получать все сообщения базы данных с привязкой «*. *. Database».

Демо Продюсер и Потребитель

Давайте создадим пару простых демонстрационных программ, одна на PHP, которая будет генерировать сообщения, а другая на Python, которая будет действовать как потребитель. Все сообщения будут отправлены через тему обмена.

 <?php include("config.php"); use PhpAmqpLibConnectionAMQPConnection; use PhpAmqpLibMessageAMQPMessage; $exchange = "rabbitmq_demo"; $exchangeType = "topic"; $queue = "events"; $message = $_SERVER["argv"][1]; $routingKey = $_SERVER["argv"][2]; $connection = new AMQPConnection(HOST, PORT, USER, PASS, VHOST); $channel = $connection->channel(); // declare/create the queue $channel->queue_declare($queue, false, true, false, false); // declare/create the exchange as a topic exchange. $channel->exchange_declare($exchange, $exchangeType, false, false, false); $msg = new AMQPMessage($message, array("content_type" => "text/plain")); $channel->basic_publish($msg, $exchange, $routingKey); print "Sent $message ($routingKey)n"; $channel->close(); $connection->close(); в <?php include("config.php"); use PhpAmqpLibConnectionAMQPConnection; use PhpAmqpLibMessageAMQPMessage; $exchange = "rabbitmq_demo"; $exchangeType = "topic"; $queue = "events"; $message = $_SERVER["argv"][1]; $routingKey = $_SERVER["argv"][2]; $connection = new AMQPConnection(HOST, PORT, USER, PASS, VHOST); $channel = $connection->channel(); // declare/create the queue $channel->queue_declare($queue, false, true, false, false); // declare/create the exchange as a topic exchange. $channel->exchange_declare($exchange, $exchangeType, false, false, false); $msg = new AMQPMessage($message, array("content_type" => "text/plain")); $channel->basic_publish($msg, $exchange, $routingKey); print "Sent $message ($routingKey)n"; $channel->close(); $connection->close(); 
 import sys import pika EXCHANGE = "rabbitmq_demo" EXCHANGE_TYPE = "topic" QUEUE = "events" # consume callback function def callback(ch, method, properties, body):   print " - Received '%s' on routing_key %s" % (body, method.routing_key)   # Anything else could happen here:   # Send an email alert, send an xmnp message, trigger another process, etc connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange=EXCHANGE, type=EXCHANGE_TYPE) result = channel.queue_declare(queue=QUEUE, durable=True) if len(sys.argv) != 2:   sys.exit("You must provide a binding key.") else:   key = sys.argv[1] channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=key) print " Listening for messages..." channel.basic_consume(callback, queue=QUEUE, no_ack=True) channel.start_consuming() 

PHP-программа производителя примет два аргумента командной строки: само сообщение и ключ маршрутизации. Потребительская программа Python примет ключ привязки в качестве единственного аргумента. Они создадут обмен темами под названием «rabbitmq_demo» и очередь под названием «события».

Запустите программу Python следующим образом:

  python topic_consumer.py python topic_consumer.py errors.production. * 

Он будет прослушивать производственные ошибки из любого источника (базы данных, веб-сервера и т. Д.).

Теперь запустите программу PHP с сообщением об ошибке и ключом маршрутизации:

  php topicProducer.php "база данных prod удалена. позвоните в органы власти" errors.production.database 

Вы должны увидеть сообщение, полученное потребителем Python.

  Прослушивание сообщений ...
 - Получено 'база данных prod была удалена.  позвоните властям по адресу routing_key errors.production.database 

Это сообщение, отправленное производителем, должно игнорироваться потребителем, поскольку его ключ маршрутизации не совпадает:

  php topicProducer.php "Время ожидания DNS-сервера" warnings.prod.dns 

Вы можете в любой момент нажать Control + C, чтобы остановить потребителя Python, а затем снова запустить его с разными связывающими ключами, чтобы поиграться с тем, как маршрутизируются сообщения.

Этот базовый пример должен сделать две вещи. Во-первых, он должен предоставить возможность опробовать различные ключи маршрутизации и посмотреть, как обмен сообщениями может маршрутизировать сообщения. Во-вторых, это должно заставить вас задуматься о том, как относительно просто обеспечить средства связи между любым количеством независимых программ, написанных на любой перестановке языков.

Вывод

Что если вы работали в мире операций, поддерживая набор физических серверов и программных сервисов производственного уровня? А что, если у вас есть все, от Perl до Ruby и до программ на Haskell.NET, которые отслеживают разные части вашей системы, и вам нужен способ сообщить о них через центральный канал? RabbitMQ может сделать вашу жизнь намного проще. Особенно, если ваша потребительская программа смогла подключиться к электронной почте или библиотеке XMNP, чтобы предупредить реальных людей о проблемах производственного уровня.

Или что, если бы у вас был веб-интерфейс, написанный на PHP, который отвечал за загрузку, а затем обработку большого количества пользовательских фотографий или видео? Возможно, вы захотите перенести нагрузку на обработку на более быстрый или специализированный язык, который может работать в фоновом режиме или на другом экземпляре сервера. RabbitMQ может быть настроен как рабочая очередь для такой ситуации.

Надеюсь, я вызвал интерес к RabbitMQ, и, возможно, он окажется подходящим инструментом для решения конкретной проблемы развязки или масштабируемости. Он довольно прост в установке и имеет широкий выбор клиентских библиотек для вашего языка. Примеры системы мониторинга операций и рабочей очереди для выполнения больших заданий — это всего лишь два примера из реальной жизни, где RabbitMQ может быть полезен, но может быть гораздо больше сценариев, в которых он может помочь. Просто используйте свое воображение.

Изображение через Fotolia