Статьи

Как использовать RabbitMQ с PHP

AMQP ( расширенный протокол очереди сообщений ) — это сетевой протокол, который может доставлять сообщения из одной конечной точки приложения в другую конечную точку приложения. Он не заботится о платформе или языке указанных приложений, если они поддерживают AMQP.

По сути, одна из конечных точек приложения отправляет сообщение, таким образом являясь источником, через брокера AMQP. В свою очередь, брокер доставит сообщение в другую конечную точку приложения, которая называется Consumer.

RabbitMQ — это брокер AMQP, который поддерживает несколько языков программирования, таких как PHP.

Преимущество наличия посредника сообщений, такого как RabbitMQ, и AMQP, являющегося сетевым протоколом, состоит в том, что производитель, посредник и потребитель могут жить на разных физических / виртуальных серверах в разных географических точках.

Кроме того, поскольку сети ненадежны и приложения могут не обрабатывать сообщение полностью, AMQP поддерживает подтверждения сообщений либо автоматически, либо когда конечная точка приложения решает отправить их.

RabbitMQ реализует протокол AMQP 0-9-1 . С первого взгляда следует следующая модель.

глоссарий

концепция Определение Икона
Режиссер Конечная точка приложения, которая отправляет сообщения
потребитель Конечная точка приложения, которая получает сообщения
соединение Обрабатывает протокол, ошибки, аутентификацию и т. Д. Соединение выполняется по протоколу TCP.
канал Соединения мультиплексируются через каналы. Несмотря на то, что все каналы совместно используют одно и то же соединение TCP, связь с одного канала полностью независима от другого.
обмен Получает сообщения от производителей и помещает их в очереди. В зависимости от ситуации это может быть прозрачно для разработчика.
Очередь Буфер, в котором хранятся сообщения
Сообщение Часть произвольной информации, соответствующей формату AMQP, которая отправляется от производителя к потребителю через посредника. Брокер не может изменить информацию внутри сообщения.
Подтверждения Уведомление отправлено обратно потребителю, чтобы сообщить серверу, что сообщение было получено и обработано, поэтому сервер может удалить его из очереди.

Другое преимущество AMQP 0-9-1 состоит в том, что приложение определяет логику маршрутизации вместо администратора брокера. Это дает разработчику большую гибкость, без необходимости изучать новый язык программирования / сценариев / разметки.

Подробнее о AMQP и RabbitMQ вы можете узнать из руководства « Объяснение модели AMQP 0-9-1 ». Хотя это и не обязательно для этого урока, я советую вам прочитать его полностью.

Установка RabbitMQ

Вы можете найти подробную документацию для вашей платформы здесь , но примерно для Ubuntu это выглядит следующим образом:

sudo vim /etc/apt/sources.list 

Добавьте в файл следующую строку: deb http://www.rabbitmq.com/debian/ testing main

После этого просто выполните следующие команды

 wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc sudo apt-key add rabbitmq-signing-key-public.asc sudo apt-get update sudo apt-get install rabbitmq-server sudo rabbitmqctl status 

Вы должны увидеть что-то вроде следующего:

 sudo rabbitmqctl status Status of node 'rabbit@localhost-sprabbitmq-926807' ... [{pid,790}, {running_applications,[{rabbit,"RabbitMQ","3.3.4"}, {os_mon,"CPO CXC 138 46","2.2.14"}, {mnesia,"MNESIA CXC 138 12","4.11"}, {xmerl,"XML parser","1.3.5"}, {sasl,"SASL CXC 138 11","2.3.4"}, {stdlib,"ERTS CXC 138 10","1.19.4"}, {kernel,"ERTS CXC 138 10","2.16.4"}]}, {os,{unix,linux}}, {erlang_version,"Erlang R16B03 (erts-5.10.4) [source] [64-bit] [smp:8:8] [async-threads:30] [kernel-poll:true]\n"}, {memory,[{total,53400096}, {connection_procs,2704}, {queue_procs,34432}, {plugins,0}, {other_proc,13983664}, {mnesia,64504}, {mgmt_db,0}, {msg_index,26056}, {other_ets,770880}, {binary,16112}, {code,16372077}, {atom,594537}, {other_system,21535130}]}, {alarms,[]}, {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,12677506662}, {disk_free_limit,50000000}, {disk_free,899366912}, {file_descriptors,[{total_limit,199900}, {total_used,5}, {sockets_limit,179908}, {sockets_used,1}]}, {processes,[{limit,1048576},{used,133}]}, {run_queue,0}, {uptime,34}] ...done. 

Если вы получаете сообщение об ошибке, сервер, вероятно, должен быть запущен с помощью следующей команды

 sudo invoke-rc.d rabbitmq-server start * Starting message broker rabbitmq-server ...done. 

RabbitMQ определяет пользователя по умолчанию:

  • Имя пользователя: гость
  • Пароль: гость

Имейте в виду, что этот пользователь сможет использоваться только при подключении к RabbitMQ с локального хоста. Для истинно распределенной системы вам придется определять пользователей и роли. Вы можете прочитать больше о контроле доступа в документации . Для следующих примеров мы будем использовать вышеуказанные учетные данные.

Чтобы интегрировать ваше php-приложение с RabbitMQ, вам понадобится библиотека php-amqplib . Получить это легко с помощью composer, просто определите требования внутри вашего файла composer.json :

 { ... "require": { ..., "videlalvaro/php-amqplib": "2.2.*" } ... } 

После выполнения composer update все будет готово.

отказ

Этот код ни в коем случае не должен использоваться в готовых к работе приложениях и не должен выполняться на производственных серверах. Никаких проверок безопасности и / или проверок не проводилось. Этот код был написан только для образовательных целей, с целью показать только основные функциональные возможности. Производительность, эффективность или возможность повторного использования не были приоритетом.

Обратите внимание, что хотя следующие примеры используют один и тот же хост для приложений производителя, брокера и потребителя, чтобы упростить разработку, развертывание и тестирование, в реальной жизни не имеет смысла иметь «распределенную систему» ​​в одном блоке.

Полный список исходного кода вы можете проверить в этом репозитории GitHub , содержащем приложение, используемое в следующих примерах.

Простой пример: отправить запрос на асинхронную обработку данных

Предположим, у нас есть пиццерия, и мы получаем онлайн-заказы. Давайте также предположим, что у нас есть автоматизированная система, которая обрабатывает заказы, но эта система не может быть открыта для общественности…

Мы реализуем самые простые шаблоны:

Прежде всего, у нас есть следующий скрипт для приема запросов из формы:

 <?php chdir(dirname(__DIR__)); require_once('vendor/autoload.php'); use Acme\AmqpWrapper\SimpleSender; $theName = filter_input(INPUT_POST, 'theName', FILTER_SANITIZE_STRING); $simpleSender = new SimpleSender(); $simpleSender->execute($theName); header("Location: orderReceived.html"); 

Этот код просто проверит параметр POST с именем theName и отправит его объекту, который мы создали для его обработки. Давайте посмотрим на метод SimpleSender :: execute ():

 <?php // ... SOME CODE HERE ... /** * Sends a message to the pizzaTime queue. * * @param string $message */ public function execute($message) { /** * Create a connection to RabbitAMQP */ $connection = new AMQPConnection( 'localhost', #host - host name where the RabbitMQ server is runing 5672, #port - port number of the service, 5672 is the default 'guest', #user - username to connect to server 'guest' #password ); /** @var $channel AMQPChannel */ $channel = $connection->channel(); $channel->queue_declare( 'pizzaTime', #queue name - Queue names may be up to 255 bytes of UTF-8 characters false, #passive - can use this to check whether an exchange exists without modifying the server state false, #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart false, #exclusive - used by only one connection and the queue will be deleted when that connection closes false #autodelete - queue is deleted when last consumer unsubscribes ); $msg = new AMQPMessage($message); $channel->basic_publish( $msg, #message '', #exchange 'pizzaTime' #routing key ); $channel->close(); $connection->close(); } 

Строковая разбивка выглядит следующим образом:

 <?php /* ... MORE CODE HERE ... */ $connection = new AMQPConnection( 'localhost', #host - host name where the RabbitMQ server is runing 5672, #port - port number of the service, 5672 is the default 'guest', #user - username to connect to server 'guest' #password ); /* ... MORE CODE HERE ... */ 

Сначала мы создаем объект подключения. Обратите внимание, что учетные данные guest: guest являются значениями по умолчанию для RabbitMQ. Тем не менее, вам будет разрешено подключаться к серверу только с его помощью, если вы подключаетесь с одного хоста (localhost).

Поскольку RabbitMQ прослушивает и обслуживает один порт, нам нужно создать канал (представьте его как виртуальный порт) с помощью $channel = $connection->channel(); поэтому другие клиенты могут подключаться к серверу.

 <?php /* ... MORE CODE HERE ... */ $channel->queue_declare( 'pizzaTime', #queue name - Queue names may be up to 255 bytes of UTF-8 characters false, #passive - can use this to check whether an exchange exists without modifying the server state false, #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart false, #exclusive - used by only one connection and the queue will be deleted when that connection closes false #autodelete - queue is deleted when last consumer unsubscribes ); /* ... MORE CODE HERE ... */ 

Как только у нас будет готов канал, давайте объявим очередь для отправки запроса. Хорошая вещь о RabbitMQ заключается в том, что мы можем создавать очереди непосредственно из клиента, но мы должны быть осторожны при его создании. Давайте кратко объясним параметры, используемые для создания очереди с помощью $channel->queue_declare()

  • Имя очереди : это произвольное имя, будет использоваться для идентификации очереди
  • Пассивный : если установлено значение true, сервер будет проверять только, может ли быть создана очередь, false будет фактически пытаться создать очередь.
  • Долговечный : как правило, если сервер останавливается или аварийно завершает работу, все очереди и сообщения теряются … если только мы не объявим очередь длительной, и в этом случае очередь сохранится, если сервер будет перезапущен.
  • Исключительно : если true, очередь может использоваться только тем соединением, которое ее создало.
  • Автоудаление : при значении true очередь будет удалена, если в ней нет сообщений и нет подключенных подписчиков.

В нашем примере очередь не сохранится, если сервер будет перезапущен, может использоваться другими подключениями и не будет удалена, если на него больше нет подписчиков.

Затем мы создали объект сообщения с $msg = new AMQPMessage($message); $message — параметр POST, theName которое мы получили из формы. Сообщение может быть любой строкой.

 <?php /* ... MORE CODE HERE ... */ $channel->basic_publish( $msg, #message '', #exchange 'pizzaTime' #routing key ); /* ... MORE CODE HERE ... */ 

Теперь мы должны опубликовать сообщение в очереди. Однако мы не можем публиковать сообщения напрямую в очередь, если это не происходит через обмен. Мы никогда не объявляли обмен, так как это будет возможно? Оказывается, что когда мы создаем очередь без определения обмена, к которому будет привязана очередь, будет использоваться обмен по умолчанию. Мы можем опубликовать сообщение в очереди через обмен по умолчанию с помощью $channel->basic_publish() , параметры которого используются:

  • Сообщение: сообщение, которое мы хотим отправить
  • Exchange: обратите внимание, что мы используем пустую строку, потому что мы будем использовать обмен по умолчанию
  • Ключ маршрутизации: имя очереди, в которую мы хотим доставить сообщение.
 <?php /* ... MORE CODE HERE ... */ $channel->close(); $connection->close(); /* ... MORE CODE HERE ... */ 

После того, как мы закончим, мы должны закрыть соединение с каналом и сервером.

Обратите внимание, что мы не получили никакого ответа от сервера. В лучшем случае мы можем быть уверены, что сообщение было поставлено в очередь, но мы полностью игнорируем, достигло ли сообщение конечного получателя. Это является частью красоты AMQP… мы можем быстро отправлять клиентов на наш общедоступный сайт и асинхронно обрабатывать заказы в приложении для бэк-офиса.

Итак, заказ пиццы находится в очереди, как мы их получим? Прежде всего, мы должны знать, что потребитель должен установить постоянное соединение с сервером очередей (или подписаться), чтобы он мог получать сообщения от сервера. Сервер не будет отправлять эти сообщения в наше приложение. К счастью, создать такое соединение очень просто.

 <?php namespace Acme\AmqpWrapper; use PhpAmqpLib\Connection\AMQPConnection; class SimpleReceiver { /* ... SOME CODE HERE ... */ /** * Listens for incoming messages */ public function listen() { $connection = new AMQPConnection( 'localhost', #host 5672, #port 'guest', #user 'guest' #password ); $channel = $connection->channel(); $channel->queue_declare( 'pizzaTime', #queue name, the same as the sender false, #passive false, #durable false, #exclusive false #autodelete ); $channel->basic_consume( 'pizzaTime', #queue '', #consumer tag - Identifier for the consumer, valid within the current channel. just string false, #no local - TRUE: the server will not send messages to the connection that published them true, #no ack - send a proper acknowledgment from the worker, once we're done with a task false, #exclusive - queues may only be accessed by the current connection false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method array($this, 'processOrder') #callback - method that will receive the message ); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } /** * @param $msg */ public function processOrder($msg) { /* ... CODE TO PROCESS ORDER HERE ... */ } } 

Так же, как мы подключились, создали канал и объявили очередь в производителе, мы должны сделать то же самое внутри потребителя. Однако в потребителе мы должны подписаться на канал с помощью $channel->basic_consume() , и используемые параметры определяются следующим образом:

  • Очередь: должно быть таким же именем очереди, которое мы определили в производителе
  • Потребительский тег: произвольное имя, данное потребителю. Если это поле не заполнено, сервер сгенерирует уникальный тег
  • Нет локальных: это скрытый параметр, при активации сервер не будет доставлять свои собственные сообщения
  • No Ack (nowledgement): автоматически подтвердит, что потребитель получил сообщение, поэтому нам не нужно делать это вручную.
  • Нет ожидания: если установлено, сервер не будет ждать завершения процесса в получателе
  • Обратный вызов: может быть именем функции, массивом, содержащим объект и имя метода, или замыканием, которое получит сообщение в очереди. Этот обратный вызов должен принимать параметр, содержащий такое сообщение. В нашем примере array($this, 'processOrder') используется для определения метода processOrder() текущего объекта в качестве обратного вызова.
 <?php /* ... SOME CODE HERE ... */ while(count($channel->callbacks)) { $channel->wait(); } /* ... SOME CODE HERE ... */ 

«Волшебство» происходит внутри цикла while. Если у подписчика есть хотя бы один определенный обратный вызов, мы будем ждать любого события в канале. Каждый раз, когда поступает сообщение, выполняется наш определенный обратный вызов processOrder() , поэтому мы можем обрабатывать сообщение по мере необходимости.

Как мы можем запустить это? Просто создайте сценарий, который будет вызывать метод SimpleReceiver::listen() например:

 <?php chdir(dirname(__DIR__)); require_once('vendor/autoload.php'); use Acme\AmqpWrapper\SimpleReceiver; $receiver = new SimpleReceiver(); $receiver->listen(); 

Теперь запустите процесс в консоли с помощью php <script name> и дайте ему выполнить свою работу. Если вы хотите убить потребителя, простой Ctrl + C прервет процесс.

Одна из приятных особенностей сервера очередей заключается в том, что если по какой-либо причине произойдет сбой работы вашего потребителя, это не помешает службе пользователям вашего общедоступного приложения. Сообщения будут просто помещаться в очередь, и после повторного запуска получателя сообщения будут доставляться ему одно за другим для обработки.

Вывод

В этой части мы представили теорию AMQP и системы массового обслуживания и продемонстрировали их использование на простом примере. В продолжение этого поста мы рассмотрим еще два примера повышенной сложности и сложных концепций. Будьте на связи!