AMQP ( расширенный протокол очереди сообщений ) — это сетевой протокол, который может доставлять сообщения из одной конечной точки приложения в другую конечную точку приложения. Он не заботится о платформе или языке указанных приложений, если они поддерживают AMQP.
По сути, одна из конечных точек приложения отправляет сообщение, таким образом являясь источником, через брокера AMQP. В свою очередь, брокер доставит сообщение в другую конечную точку приложения, которая называется Consumer.
RabbitMQ — это брокер AMQP, который поддерживает несколько языков программирования, таких как PHP.
Преимущество наличия посредника сообщений, такого как RabbitMQ, и AMQP, являющегося сетевым протоколом, состоит в том, что производитель, посредник и потребитель могут жить на разных физических / виртуальных серверах в разных географических точках.
Кроме того, поскольку сети ненадежны и приложения могут не обрабатывать сообщение полностью, AMQP поддерживает подтверждения сообщений либо автоматически, либо когда конечная точка приложения решает отправить их.
RabbitMQ реализует протокол AMQP 0-9-1 . С первого взгляда следует следующая модель.
глоссарий
концепция | Определение | Икона |
---|---|---|
Режиссер | Конечная точка приложения, которая отправляет сообщения | |
потребитель | Конечная точка приложения, которая получает сообщения | |
соединение | Обрабатывает протокол, ошибки, аутентификацию и т. Д. Соединение выполняется по протоколу TCP. | — |
канал | Соединения мультиплексируются через каналы. Несмотря на то, что все каналы совместно используют одно и то же соединение TCP, связь с одного канала полностью независима от другого. | — |
обмен | Получает сообщения от производителей и помещает их в очереди. В зависимости от ситуации это может быть прозрачно для разработчика. | |
Очередь | Буфер, в котором хранятся сообщения | |
Сообщение | Часть произвольной информации, соответствующей формату AMQP, которая отправляется от производителя к потребителю через посредника. Брокер не может изменить информацию внутри сообщения. | — |
Подтверждения | Уведомление отправлено обратно потребителю, чтобы сообщить серверу, что сообщение было получено и обработано, поэтому сервер может удалить его из очереди. | — |
Другое преимущество AMQP 0-9-1 состоит в том, что приложение определяет логику маршрутизации вместо администратора брокера. Это дает разработчику большую гибкость, без необходимости изучать новый язык программирования / сценариев / разметки.
Подробнее о AMQP и RabbitMQ вы можете узнать из руководства « Объяснение модели AMQP 0-9-1 ». Хотя это и не обязательно для этого урока, я советую вам прочитать его полностью.
Установка RabbitMQ
Вы можете найти подробную документацию для вашей платформы здесь , но примерно для Ubuntu это выглядит следующим образом:
sudo vim /etc/apt/sources.list
Добавьте в файл следующую строку: deb http://www.rabbitmq.com/debian/ testing main
После этого просто выполните следующие команды
wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc sudo apt-key add rabbitmq-signing-key-public.asc sudo apt-get update sudo apt-get install rabbitmq-server sudo rabbitmqctl status
Вы должны увидеть что-то вроде следующего:
sudo rabbitmqctl status Status of node 'rabbit@localhost-sprabbitmq-926807' ... [{pid,790}, {running_applications,[{rabbit,"RabbitMQ","3.3.4"}, {os_mon,"CPO CXC 138 46","2.2.14"}, {mnesia,"MNESIA CXC 138 12","4.11"}, {xmerl,"XML parser","1.3.5"}, {sasl,"SASL CXC 138 11","2.3.4"}, {stdlib,"ERTS CXC 138 10","1.19.4"}, {kernel,"ERTS CXC 138 10","2.16.4"}]}, {os,{unix,linux}}, {erlang_version,"Erlang R16B03 (erts-5.10.4) [source] [64-bit] [smp:8:8] [async-threads:30] [kernel-poll:true]\n"}, {memory,[{total,53400096}, {connection_procs,2704}, {queue_procs,34432}, {plugins,0}, {other_proc,13983664}, {mnesia,64504}, {mgmt_db,0}, {msg_index,26056}, {other_ets,770880}, {binary,16112}, {code,16372077}, {atom,594537}, {other_system,21535130}]}, {alarms,[]}, {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,12677506662}, {disk_free_limit,50000000}, {disk_free,899366912}, {file_descriptors,[{total_limit,199900}, {total_used,5}, {sockets_limit,179908}, {sockets_used,1}]}, {processes,[{limit,1048576},{used,133}]}, {run_queue,0}, {uptime,34}] ...done.
Если вы получаете сообщение об ошибке, сервер, вероятно, должен быть запущен с помощью следующей команды
sudo invoke-rc.d rabbitmq-server start * Starting message broker rabbitmq-server ...done.
RabbitMQ определяет пользователя по умолчанию:
- Имя пользователя: гость
- Пароль: гость
Имейте в виду, что этот пользователь сможет использоваться только при подключении к RabbitMQ с локального хоста. Для истинно распределенной системы вам придется определять пользователей и роли. Вы можете прочитать больше о контроле доступа в документации . Для следующих примеров мы будем использовать вышеуказанные учетные данные.
Чтобы интегрировать ваше php-приложение с RabbitMQ, вам понадобится библиотека php-amqplib . Получить это легко с помощью composer, просто определите требования внутри вашего файла composer.json
:
{ ... "require": { ..., "videlalvaro/php-amqplib": "2.2.*" } ... }
После выполнения composer update
все будет готово.
отказ
Этот код ни в коем случае не должен использоваться в готовых к работе приложениях и не должен выполняться на производственных серверах. Никаких проверок безопасности и / или проверок не проводилось. Этот код был написан только для образовательных целей, с целью показать только основные функциональные возможности. Производительность, эффективность или возможность повторного использования не были приоритетом.
Обратите внимание, что хотя следующие примеры используют один и тот же хост для приложений производителя, брокера и потребителя, чтобы упростить разработку, развертывание и тестирование, в реальной жизни не имеет смысла иметь «распределенную систему» в одном блоке.
Полный список исходного кода вы можете проверить в этом репозитории GitHub , содержащем приложение, используемое в следующих примерах.
Простой пример: отправить запрос на асинхронную обработку данных
Предположим, у нас есть пиццерия, и мы получаем онлайн-заказы. Давайте также предположим, что у нас есть автоматизированная система, которая обрабатывает заказы, но эта система не может быть открыта для общественности…
Мы реализуем самые простые шаблоны:
Прежде всего, у нас есть следующий скрипт для приема запросов из формы:
<?php chdir(dirname(__DIR__)); require_once('vendor/autoload.php'); use Acme\AmqpWrapper\SimpleSender; $theName = filter_input(INPUT_POST, 'theName', FILTER_SANITIZE_STRING); $simpleSender = new SimpleSender(); $simpleSender->execute($theName); header("Location: orderReceived.html");
Этот код просто проверит параметр POST с именем theName
и отправит его объекту, который мы создали для его обработки. Давайте посмотрим на метод SimpleSender :: execute ():
<?php // ... SOME CODE HERE ... /** * Sends a message to the pizzaTime queue. * * @param string $message */ public function execute($message) { /** * Create a connection to RabbitAMQP */ $connection = new AMQPConnection( 'localhost', #host - host name where the RabbitMQ server is runing 5672, #port - port number of the service, 5672 is the default 'guest', #user - username to connect to server 'guest' #password ); /** @var $channel AMQPChannel */ $channel = $connection->channel(); $channel->queue_declare( 'pizzaTime', #queue name - Queue names may be up to 255 bytes of UTF-8 characters false, #passive - can use this to check whether an exchange exists without modifying the server state false, #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart false, #exclusive - used by only one connection and the queue will be deleted when that connection closes false #autodelete - queue is deleted when last consumer unsubscribes ); $msg = new AMQPMessage($message); $channel->basic_publish( $msg, #message '', #exchange 'pizzaTime' #routing key ); $channel->close(); $connection->close(); }
Строковая разбивка выглядит следующим образом:
<?php /* ... MORE CODE HERE ... */ $connection = new AMQPConnection( 'localhost', #host - host name where the RabbitMQ server is runing 5672, #port - port number of the service, 5672 is the default 'guest', #user - username to connect to server 'guest' #password ); /* ... MORE CODE HERE ... */
Сначала мы создаем объект подключения. Обратите внимание, что учетные данные guest: guest являются значениями по умолчанию для RabbitMQ. Тем не менее, вам будет разрешено подключаться к серверу только с его помощью, если вы подключаетесь с одного хоста (localhost).
Поскольку RabbitMQ прослушивает и обслуживает один порт, нам нужно создать канал (представьте его как виртуальный порт) с помощью $channel = $connection->channel();
поэтому другие клиенты могут подключаться к серверу.
<?php /* ... MORE CODE HERE ... */ $channel->queue_declare( 'pizzaTime', #queue name - Queue names may be up to 255 bytes of UTF-8 characters false, #passive - can use this to check whether an exchange exists without modifying the server state false, #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart false, #exclusive - used by only one connection and the queue will be deleted when that connection closes false #autodelete - queue is deleted when last consumer unsubscribes ); /* ... MORE CODE HERE ... */
Как только у нас будет готов канал, давайте объявим очередь для отправки запроса. Хорошая вещь о RabbitMQ заключается в том, что мы можем создавать очереди непосредственно из клиента, но мы должны быть осторожны при его создании. Давайте кратко объясним параметры, используемые для создания очереди с помощью $channel->queue_declare()
- Имя очереди : это произвольное имя, будет использоваться для идентификации очереди
- Пассивный : если установлено значение true, сервер будет проверять только, может ли быть создана очередь, false будет фактически пытаться создать очередь.
- Долговечный : как правило, если сервер останавливается или аварийно завершает работу, все очереди и сообщения теряются … если только мы не объявим очередь длительной, и в этом случае очередь сохранится, если сервер будет перезапущен.
- Исключительно : если true, очередь может использоваться только тем соединением, которое ее создало.
- Автоудаление : при значении true очередь будет удалена, если в ней нет сообщений и нет подключенных подписчиков.
В нашем примере очередь не сохранится, если сервер будет перезапущен, может использоваться другими подключениями и не будет удалена, если на него больше нет подписчиков.
Затем мы создали объект сообщения с $msg = new AMQPMessage($message);
$message
— параметр POST, theName
которое мы получили из формы. Сообщение может быть любой строкой.
<?php /* ... MORE CODE HERE ... */ $channel->basic_publish( $msg, #message '', #exchange 'pizzaTime' #routing key ); /* ... MORE CODE HERE ... */
Теперь мы должны опубликовать сообщение в очереди. Однако мы не можем публиковать сообщения напрямую в очередь, если это не происходит через обмен. Мы никогда не объявляли обмен, так как это будет возможно? Оказывается, что когда мы создаем очередь без определения обмена, к которому будет привязана очередь, будет использоваться обмен по умолчанию. Мы можем опубликовать сообщение в очереди через обмен по умолчанию с помощью $channel->basic_publish()
, параметры которого используются:
- Сообщение: сообщение, которое мы хотим отправить
- Exchange: обратите внимание, что мы используем пустую строку, потому что мы будем использовать обмен по умолчанию
- Ключ маршрутизации: имя очереди, в которую мы хотим доставить сообщение.
<?php /* ... MORE CODE HERE ... */ $channel->close(); $connection->close(); /* ... MORE CODE HERE ... */
После того, как мы закончим, мы должны закрыть соединение с каналом и сервером.
Обратите внимание, что мы не получили никакого ответа от сервера. В лучшем случае мы можем быть уверены, что сообщение было поставлено в очередь, но мы полностью игнорируем, достигло ли сообщение конечного получателя. Это является частью красоты AMQP… мы можем быстро отправлять клиентов на наш общедоступный сайт и асинхронно обрабатывать заказы в приложении для бэк-офиса.
Итак, заказ пиццы находится в очереди, как мы их получим? Прежде всего, мы должны знать, что потребитель должен установить постоянное соединение с сервером очередей (или подписаться), чтобы он мог получать сообщения от сервера. Сервер не будет отправлять эти сообщения в наше приложение. К счастью, создать такое соединение очень просто.
<?php namespace Acme\AmqpWrapper; use PhpAmqpLib\Connection\AMQPConnection; class SimpleReceiver { /* ... SOME CODE HERE ... */ /** * Listens for incoming messages */ public function listen() { $connection = new AMQPConnection( 'localhost', #host 5672, #port 'guest', #user 'guest' #password ); $channel = $connection->channel(); $channel->queue_declare( 'pizzaTime', #queue name, the same as the sender false, #passive false, #durable false, #exclusive false #autodelete ); $channel->basic_consume( 'pizzaTime', #queue '', #consumer tag - Identifier for the consumer, valid within the current channel. just string false, #no local - TRUE: the server will not send messages to the connection that published them true, #no ack - send a proper acknowledgment from the worker, once we're done with a task false, #exclusive - queues may only be accessed by the current connection false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method array($this, 'processOrder') #callback - method that will receive the message ); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } /** * @param $msg */ public function processOrder($msg) { /* ... CODE TO PROCESS ORDER HERE ... */ } }
Так же, как мы подключились, создали канал и объявили очередь в производителе, мы должны сделать то же самое внутри потребителя. Однако в потребителе мы должны подписаться на канал с помощью $channel->basic_consume()
, и используемые параметры определяются следующим образом:
- Очередь: должно быть таким же именем очереди, которое мы определили в производителе
- Потребительский тег: произвольное имя, данное потребителю. Если это поле не заполнено, сервер сгенерирует уникальный тег
- Нет локальных: это скрытый параметр, при активации сервер не будет доставлять свои собственные сообщения
- No Ack (nowledgement): автоматически подтвердит, что потребитель получил сообщение, поэтому нам не нужно делать это вручную.
- Нет ожидания: если установлено, сервер не будет ждать завершения процесса в получателе
- Обратный вызов: может быть именем функции, массивом, содержащим объект и имя метода, или замыканием, которое получит сообщение в очереди. Этот обратный вызов должен принимать параметр, содержащий такое сообщение. В нашем примере
array($this, 'processOrder')
используется для определения методаprocessOrder()
текущего объекта в качестве обратного вызова.
<?php /* ... SOME CODE HERE ... */ while(count($channel->callbacks)) { $channel->wait(); } /* ... SOME CODE HERE ... */
«Волшебство» происходит внутри цикла while. Если у подписчика есть хотя бы один определенный обратный вызов, мы будем ждать любого события в канале. Каждый раз, когда поступает сообщение, выполняется наш определенный обратный вызов processOrder()
, поэтому мы можем обрабатывать сообщение по мере необходимости.
Как мы можем запустить это? Просто создайте сценарий, который будет вызывать метод SimpleReceiver::listen()
например:
<?php chdir(dirname(__DIR__)); require_once('vendor/autoload.php'); use Acme\AmqpWrapper\SimpleReceiver; $receiver = new SimpleReceiver(); $receiver->listen();
Теперь запустите процесс в консоли с помощью php <script name>
и дайте ему выполнить свою работу. Если вы хотите убить потребителя, простой Ctrl + C
прервет процесс.
Одна из приятных особенностей сервера очередей заключается в том, что если по какой-либо причине произойдет сбой работы вашего потребителя, это не помешает службе пользователям вашего общедоступного приложения. Сообщения будут просто помещаться в очередь, и после повторного запуска получателя сообщения будут доставляться ему одно за другим для обработки.
Вывод
В этой части мы представили теорию AMQP и системы массового обслуживания и продемонстрировали их использование на простом примере. В продолжение этого поста мы рассмотрим еще два примера повышенной сложности и сложных концепций. Будьте на связи!