Статьи

Начало работы с RabbitMQ: облачное связующее ПО, ориентированное на сообщения

В этой статье рассказывается о RabbitMQ, промежуточном программном обеспечении брокера сообщений, рекомендованном OpenStack для облачных развертываний. Он соответствует стандартам AMQP и разработан в Erlang. Примеры кода разработаны с использованием библиотек Python и PIKA .

1. Message Broker

Посредник сообщений — это программный компонент, который обеспечивает связь между приложениями в кластере корпоративных приложений. Он также известен как Message Oriented Middleware (MOM) в сервис-ориентированной архитектуре (SOA). Приложения в корпоративном кластере используют брокер сообщений, такой как почтовый обмен или почтовое отделение, для отправки сообщений в другие приложения.

RabbitMQ соответствует стандарту AMQP, который является открытым стандартом для деловых сообщений между приложениями и организациями. Это двоичный протокол, а не спецификация интерфейса. Стандарт AMQP обеспечивает обмен сообщениями в виде облачной службы, расширенный шаблон публикации-подписки, маршрутизацию на основе пользовательских заголовков и независимость от языка программирования.

 2. Модель RabbitMQ

Модель RabbitMQ состоит из различных компонентов. Это:  Производитель  (отправитель), Потребитель (получатель), Exchange , Привязки и очереди сообщений . Эти компоненты работают вместе, как описано ниже:

  • Производитель отправляет сообщение на биржу
  • Exchange пересылает сообщение в очереди на основе привязок
  • Привязки устанавливаются очередями для прикрепления к обмену
  • Потребители (получатель) получают сообщения из соответствующих очередей сообщений.

Система RabbitMQ ECO

Производителем и потребителем сообщений являются внешние объекты. Обмены, очереди и привязки являются внутренними объектами брокера сообщений.

2.1 Обмены

Биржи являются ядром RabbitMQ. Производитель отправляет сообщения на биржи, чтобы переслать его нужному потребителю. Биржи принимают решение о пересылке на основе типа обмена, routing_key в сообщении, стандартных полей заголовка в сообщении и routing_key, зарегистрированных в очереди с обменом.

Обмены могут быть настроены на длительный срок, так что они будут выдерживать перезапуски сервера RabbitMQ. Обмены могут быть «внутренними» для сервера, так что они могут быть опубликованы другими обменами внутри сервера. Биржи могут быть настроены на автоматическое удаление, при этом очереди больше не связаны с ним.

2.2 Очереди

Очереди используются для пересылки сообщений целевым потребителям и обеспечивают упорядоченную доставку сообщений. Потребитель привязывает себя к очереди с помощью функции обратного вызова, так что потребитель будет уведомлен о получении сообщения.

Очереди можно настроить на длительный срок, чтобы они могли выдержать перезапуски сервера RabbitMQ. Он также может быть настроен как «исключительный» для соединения, после того как соединение закрыто, очередь будет удалена. Автоматически удаляемые очереди удаляются, когда ни один потребитель не подписан на очередь.

2.3 Привязки

Привязки используются биржами для принятия решения о маршрутизации сообщения. По сути, привязки связывают обмены с очередями и используют тип обмена, ключ маршрутизации, установленный в очереди, настраиваемые заголовки сообщений и ключ маршрутизации, присутствующие в сообщении, отправленном производителем. Если для принятия решения о пересылке сообщения недостаточно привязок, сообщение может быть отброшено или отправлено обратно производителю на основании свойств, установленных в сообщении.

3. Производитель и Потребитель

В типичной банковской системе, когда клиент снимает деньги с банкомата или смахивает свою кредитную карту в магазине, клиент получает уведомление с помощью SMS или электронного письма для предотвращения мошенничества. В этом случае основная банковская система отправит сообщение в подсистемы обмена сообщениями и продолжит обрабатывать запросы других клиентов. Подсистемы обмена сообщениями, прослушивающие очереди, будут уведомлены, и они предпримут соответствующие действия, основываясь на предпочтениях абонента. В этом примере основная банковская система является производителем сообщений, а подсистема обмена сообщениями является потребителем сообщений.

3.1 Продюсер

Согласно дизайну RabbitMQ, производитель отправляет сообщение на обмен. Поведение обмена определяется типом обмена, route_key & bindings в очередях сообщений. Давайте посмотрим, как создать продюсера, используя библиотеку Python pika. Если у вас нет установки RabbitMQ, обратитесь к разделу Приложение-А, чтобы установить и настроить RabbitMQ.

import pika 

""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")

""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)

"""Close the communication channel """
connection.close()

В приведенном выше фрагменте кода сначала мы устанавливаем соединение с сервером, на котором размещен RabbitMQ. RabbitMQ позволяет установить несколько каналов в существующем соединении, и все коммуникации являются операциями, связанными с этим каналом. В третьей строке мы создаем биржу и называем ее «SMS-биржа», а ее тип — «прямой». В случае, если обмен уже создан, этот оператор не будет создавать ошибку, он просто возвращает его, если нет конфликта в типе обмена.

Функция «channel.basic_publish» используется для отправки сообщения на биржу. Производитель должен упомянуть это имя exchange и routing_key вместе с реальным сообщением. При «прямом» обмене ключ маршрутизации будет использоваться для принятия решения о пересылке сообщения. Наконец, закройте соединение, если оно больше не нужно.

3.2 Потребитель

Согласно дизайну RabbitMQ, потребитель является целевым приложением, для которого предназначены сообщения. Потребитель должен зарегистрироваться в очереди и связать его с обменом. Если в очереди регистрируется более одного потребителя, то RabbitMQ отправляет сообщения потребителям в циклическом порядке.

import pika

""" define a callback function to receive the message """
def callback(channel,method,properties,body):
    print "[X] Received Message ",body

""" Establish connection """
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
channel = connection.channel()

""" Declare the queue & bind to the exchange with routing key """
channel.queue_declare(queue="SMS-Queue")
channel.queue_bind(exchange="SMS-Exchange",queue="SMS-Queue",routing_key="SMS-Alert")

""" register the call back function """
channel.basic_consume(callback,queue="SMS-Queue",no_ack=True)

""" listen to the queue """
channel.start_consuming()

Установление соединения одинаково как для производителя, так и для потребителя. Но потребитель должен объявить очередь и связать ее с обменом вместе с routing_key. В приведенном выше коде «SMS-очередь» создается и связывается с «SMS-биржей» с помощью клавиши маршрутизации «SMS-оповещение». Это дает команду SMS-Exchange переадресовать сообщение с ключом маршрутизации «SMS-оповещение» на «SMS-очередь».

Функция обратного вызова, зарегистрированная в очереди, будет вызываться после получения сообщения в очереди. Оператор списка «channel.start_consuming ()» является блокирующим вызовом, когда потребитель ожидает каких-либо сообщений в зарегистрированной очереди.

3.3 Удаленное подключение

В приведенных выше примерах мы подключились к серверу RabbitMQ, который присутствует на локальном хосте. Давайте посмотрим фрагмент кода, как подключиться к удаленному серверу RabbitMQ:

import pika
credits = pika.PlainCredentials('scott','tiger')
params = pika.ConnectionParameters('10.0.0.1',5672,'/',credits)
connection = pika.BlockingConnection(parameters=params)

Приведенный выше код сначала создает объект кредитов, устанавливая учетные данные пользователя. Затем мы создаем объект параметров, устанавливая IP-адрес, номер порта, путь виртуального хоста и объект учетных данных. Этот параметр просто передается в метод BlockingConnection для установления соединения с заданными параметрами. Как только вы устанавливаете соединение, остальная часть кода остается той же самой для создания производителя и потребителя.

В этом упражнении мы обсудили разработку производителя и потребителя с использованием блокирующего соединения. RabbitMQ также поддерживает асинхронные соединения. Примеры можно найти по адресу  http://pika.readthedocs.org/en/latest/examples/asynchronous_publisher_example.html .

4. Типы обмена

В этом разделе мы узнаем, как развивать различные типы бирж.

4.1 Прямой обмен

Прямой обмен доставляет сообщения в очереди на основе ключа маршрутизации сообщений. Очередь привязывается к обмену ключом маршрутизации. Когда новое сообщение с этим ключом маршрутизации поступает на прямой обмен, это сообщение направляется в эту очередь. Прямые обмены используются для распределения задач между несколькими работниками в круговом порядке. Загрузка RabbitMQ балансирует потребителей, когда несколько потребителей прослушивают одну и ту же очередь. Прямой обмен идеален для одноадресной маршрутизации сообщений.

Производитель: приведенный ниже сегмент кода создает обмен с именем «Direct-X» и устанавливает тип обмена «прямой». Метод postMsg отправляет сообщение этому обмену с параметром routing_key, установленным в «Key1»

import pika 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

""" Declare a exchange with type as direct """
channel.exchange_declare(exchange="Direct-X",exchange_type="direct")

""" Publish a Message on the Channel """
Msg = raw_input("Please enter the message :")
channel.basic_publish(exchange = "Direct-X",routing_key="Key1",body=Msg)

"""Close the communication channel """
connection.close()

Потребитель:

Приведенный ниже фрагмент кода создает очередь с именем «Direct-Q1» и регистрирует ее в обмене «Direct-X» для сообщений с параметром routing_key как «Key1».

import pika

""" define a callback function to receive the message """
def callback(channel,method,properties,body):
    print "[X] Received Message ",body

""" Establish connection """
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
channel = connection.channel()

""" Declare the queue & bind to the exchange with routing key """
channel.queue_declare(queue="Direct-Q")
channel.queue_bind(exchange="Direct-X",queue="Direct-Q",routing_key="Key1")
""" register the call back function """
channel.basic_consume(callback,queue="Direct-Q",no_ack=True)

""" listen to the queue """
channel.start_consuming()

Примечание. Если несколько пользователей прослушивают одну и ту же очередь, загрузка RabbitMQ балансирует сообщения между пользователями в циклическом режиме.

4.2 Fanout Exchange

Разветвленный обмен маршрутизирует сообщения во все очереди, которые ограничены, независимо от ключа маршрутизации. Это наиболее подходит для трансляции.

Производитель: 
Создать производителя, который отправил сообщение на биржу «Fanout-X».

import pika 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

""" Declare a exchange with type as direct """
channel.exchange_declare(exchange="Fanout-X",exchange_type="fanout")

""" Publish a Message on the Channel """
Msg = raw_input("Please enter the message :")
channel.basic_publish(exchange = "Fanout-X",routing_key="Key1",body=Msg)

"""Close the communication channel """
connection.close()

Потребитель 1:

Создает очередь «Faount-Q1» и связывается с обменом «Fanout-X», который имеет тип обмена Fanout. Даже если потребитель регистрирует ключ routing_key, он не окажет никакого влияния на решение о переадресации обмена, потому что это обмен разветвления.

import pika

""" define a callback function to receive the message """
def callback(channel,method,properties,body):
    print "[X] Received Message ",body

""" Establish connection """
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
channel = connection.channel()

""" Declare the queue & bind to the exchange with routing key """
channel.queue_declare(queue="Fanout-Q1")
channel.queue_bind(exchange="Direct-X",queue="Fanout-Q1",routing_key="Key1")
""" register the call back function """
channel.basic_consume(callback,queue="Fanout-Q1",no_ack=True)

""" listen to the queue """
channel.start_consuming()

Потребитель 2: Создает другую очередь «Fanout-Q2» и привязывает ее к тому же обмену «Fanout-X»

import pika

""" define a callback function to receive the message """
def callback(channel,method,properties,body):
    print "[X] Received Message ",body

""" Establish connection """
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
channel = connection.channel()

""" Declare the queue & bind to the exchange with routing key """
channel.queue_declare(queue="Fanout-Q2")
channel.queue_bind(exchange="Fanout-X",queue="Fanout-Q2",routing_key="Key1")
""" register the call back function """
channel.basic_consume(callback,queue="Fanout-Q2",no_ack=True)

""" listen to the queue """
channel.start_consuming()

Теперь вы увидите, что отправленное издателем сообщение будет получено обоими потребителями.

4.3 Тематический обмен

Тема обмена сообщениями о маршруте с одной или несколькими очередями на основе ключа маршрутизации сообщений и шаблона ключа, используемого для привязки очереди к обмену. Этот обмен используется для реализации шаблона публикации / подписки . Обмен темами подходит для многоадресной рассылки.

Producer  создает обмен с именем «Topic-X» типа «topic» и отправляет различные сообщения с различными значениями ключа.

import pika 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

""" Declare a exchange with type as direct """
channel.exchange_declare(exchange="Topic-X",exchange_type="topic")

""" Publish a Message on the Channel """
Msg = raw_input("Please enter the message :")
channel.basic_publish(exchange = "Topic-X",routing_key="in.sales.put",body=Msg)
channel.basic_publish(exchange = "Topic-X",routing_key="in.sales.post",body=Msg)
channel.basic_publish(exchange = "Topic-X",routing_key="in.sales.delete",body=Msg)
channel.basic_publish(exchange = "Topic-X",routing_key="in.rnd.put",body=Msg)
channel.basic_publish(exchange = "Topic-X",routing_key="in.rnd.post",body=Msg)
channel.basic_publish(exchange = "Topic-X",routing_key="in.rnd.delete",body=Msg)
"""Close the communication channel """ 
connection.close()

Потребитель 1: Создает очередь с именем «Topic-Q1» и связывается с обменом «Topic-X» для всех сообщений «post».

import pika

""" define a callback function to receive the message """
def callback(channel,method,properties,body):
    print "[X] Received Message ",body

""" Establish connection """
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
channel = connection.channel()

""" Declare the queue & bind to the exchange with routing key """
channel.queue_declare(queue="Topic-Q1")
channel.queue_bind(exchange="Topic-X",queue="Topic-Q1",routing_key="*.*.post")
""" register the call back function """
channel.basic_consume(callback,queue="Topic-Q1",no_ack=True)

""" listen to the queue """
channel.start_consuming()

Потребитель 2: Создает другую очередь с именем «Topic-Q2» и связывается с тем же обменом «Topic-X» для всех сообщений, поступающих с «rnd».

import pika

""" define a callback function to receive the message """
def callback(channel,method,properties,body):
    print "[X] Received Message ",body

""" Establish connection """
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
channel = connection.channel()

""" Declare the queue & bind to the exchange with routing key """
channel.queue_declare(queue="Topic-Q2")
channel.queue_bind(exchange="Topic-X",queue="Topic-Q2",routing_key="in.rnd.*")
""" register the call back function """
channel.basic_consume(callback,queue="Topic-Q2",no_ack=True)

""" listen to the queue """
channel.start_consuming()

4.4 Обмен заголовками

Заголовок обменивается сообщением маршрута на основе атрибутов в заголовке сообщения и игнорирует ключ маршрутизации. Если атрибуты заголовка сообщения совпадают с параметрами привязки очереди, то сообщение направляется в эти очереди.

Производитель может установить ключ, пару значений (словарь) в заголовке отправленного сообщения.

import pika

""" A dictionary to store the customer headers """
header={}
header['Source']='Core-Banking'
header['Destination']='Message-System'

""" Establish Blocking Connection with RabbitMQ Server """
conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
chan = conn.channel()

""" Declare an exchange and set type as headers """
chan.exchange_declare(exchange="Header-X",exchange_type="headers")

"""Set the headers using BasicProperties api """
prop = pika.BasicProperties(headers=header)

"""Publish the message to the exchange """
msg = "Hello Message-System, I am Core-Banking"
chan.basic_publish(exchange='Header-X',routing_key="Key1",body=msg,properties=prop)
chan.close()

Вышеуказанный производитель создает обмен с именем «Header-X» и устанавливает тип «headers». Затем он отправляет сообщение с ключом заголовка, установленным на «Source», и соответствующим значением, установленным на «Core-Banking». По сути, заголовок сообщения связан с отправителем сообщения, в данном примере это «Core-Banking».

Приведенный ниже Consumer создает очередь с именем « Header-Q1 » и связывает ее с « Header-X » вместе с интересующей его информацией заголовка. Если вы ссылаетесь на метод сообщения, это

que='Header-Q'
exg = 'Header-X'
key='Source'
val='Core-Banking'

""" Declare a call back function to receive the message """
def callback(ch,method,properties,body):
 print "[X] Received Msg",body

""" Establish blocking connection to the server """
conn = pika.BlockingConnection( pika.ConnectionParameters(host='localhost'))
chan = conn.channel()

""" Declare a Queue """
chan.queue_declare(queue=que)

"""Bind the queue to exchange and set arguments to match any of the given header """
chan.queue_bind(exchange=exg,queue=que, arguments={key:val,'x-match':'any'})

print '[*] Wait for messages. To exit press CTRL+C'

""" Link the callback function to the queue """
chan.basic_consume(callback,queue=que,no_ack=True)

"""Wait for the messages to arrive """
chan.start_consuming()

Обратите внимание, что оператор queue_bind принимает дополнительный аргумент с именем «arguments», который принимает пару ключ-значение. Если присутствует более одной пары ключ-значение, x-match: any гарантирует, что даже если одна запись соответствует, сообщение будет доставлено в эту очередь.

5. Резюме

Мы видели, как работает RabbitMQ, разбирались в различных компонентах и ​​моделях сообщений. RabbitMQ поддерживает среды кластеризации с высокой доступностью, что обеспечит нулевое время простоя, более высокую пропускную способность и увеличенную емкость, что делает его пригодным для облачных установок.

6. Ссылки

1. Message Broker — https://msdn.microsoft.com/en-us/library/ff648849.aspx
2. AMQP — https://www.rabbitmq.com/tutorials/amqp-concepts.html
3. Справочник команд RabbitMQCtl :  https://www.rabbitmq.com/man/rabbitmqctl.1.man.html
4. Документация PIKA — http://pika.readthedocs.org/en/latest/modules/index.html
5. Стандарты AMQP —  https : //www.amqp.org/

Приложение

А. Установка и настройка

Инфраструктура rabbitMQ установлена ​​в CentOS6.5

A.1 Установка

Пожалуйста, установите RabbitMQ, Erlang (RabbitMQ работает на Erlang), Python & Pika.

  • Чтобы установить RabbitMQ   (v3.5.3), обратитесь по  адресу https://www.rabbitmq.com/download.html  и следуйте инструкциям в зависимости от целевой операционной системы. Я установил rabbitMQ на машину CentOS.
  • Erlang  (v18.0) может быть установлен с   http://www.erlang.org/doc/installation_guide/INSTALL.html
  • Python 2.7.10 доступен https://www.python.org/downloads/ . Пожалуйста, установите версию 2.7.10, поскольку поддержка pika для 3.x пока недоступна и в настоящее время находится в стадии разработки.
  • Pika  (v0.9.13) может быть использована с помощью Python pip.

    •   pip install pika

A.2 Установить плагин управления RabbitMQ

RabbitMQ поставляется с веб-консолью управления. Программное обеспечение поставляется вместе с установочным программным обеспечением RabbitMQ, которое мы установили. Чтобы включить консоль управления, выполните следующую команду на вашем компьютере с Linux:

rabbitmq-plugins enable rabbitmq_management

Консоль управления RabbitMQ использует порт 15672. Таким образом, этот порт должен быть открыт. Пожалуйста, обратитесь к  https://www.rabbitmq.com/management.html для получения дополнительной информации о плагине управления.

После включения модуля управления вы можете открыть http: // ipaddress: 15672 / в своем браузере, чтобы увидеть веб-интерфейс управления RabbitMQ.

A.3 Настройка RabbitMQ (vhost / user / привилегии)

Once you have installed RabbitMQ it is ready for production.  But we have to make few mandatory configurations in order to allow remote clients, because the default configuration and user privilages will not allow the remove clients to access the server.

The idea here is to create a virtual environment with the RabbitMQ server and provide access to a user, where he can create a Message Queue, Exchange, Read  and Write into those resources.  So the users are restricted to a virtual environment and resources are contained within the environment.

A.3.1 Create a Virtual Host

rabbitmqctl add_vhost <virtual_host_name>

Note : RabbitMQ will have a default virtual host named “/”.

A.3.2 Create a New User with password

rabbitmqctl add_user <username> <password>
Eg. rabbitmqctl add_user scott tiger

Note: RabbitMQ will have a default user named “guest”, but it cannot used for remote access by external clients. But it allows when the server is acessed locally.

A.3.3 Assign permissions to the user to access a vhost

set_permissions [-p vhostpath] {user} {conf} {write} {read}
Eg: set_permission -p "/" "scott" ".*"  ".*" ".*"

Refer https://www.rabbitmq.com/access-control.html for more information.

A.4 Firewall Settings

We have to open the necessary ports of the Linux Firewall in order to allow remote clients.  Perform the following operations on the IP table to open the ports 5672 & 15672.

sudo iptables -I INPUT -p tcp --dport 5672 --syn -j ACCEPT
sudo iptables -I INPUT -p tcp --dport 15672 --syn -j ACCEPT

To make the changes permenent, make the configuration changes in the file /etc/sysconfig/iptables and restart the iptables servers: