В первой части мы рассмотрели теорию и простой пример использования протокола AMQP в PHP с RabbitMQ в качестве брокера. Теперь давайте углубимся в несколько более сложных примеров.
Пример 1: отправить запрос на асинхронную обработку данных среди нескольких работников
В примере предыдущей части у нас был один производитель, один потребитель. Если потребитель умер, сообщения будут продолжать складываться в очереди, пока потребитель не запустится снова. Затем он будет обрабатывать все сообщения, одно за другим.
Это может быть не совсем идеально в среде параллельных пользователей с достаточным количеством запросов в минуту. К счастью, масштабировать потребителей очень просто, но давайте реализуем другой пример.
Допустим, у нас есть служба генерации счетов, где пользователям просто нужно предоставить номер счета, и система автоматически сгенерирует файл PDF и отправит его пользователю по электронной почте. Генерация и отправка электронной почты может занять даже несколько секунд, если сервер, на котором выполняется процесс генерации, ограничен в ресурсах. Теперь предположим, что мы обязаны поддерживать несколько транзакций в секунду, как мы можем это сделать, не перегружая сервер?
Нам нужно реализовать следующий шаблон:
Давайте посмотрим на наш класс производителя:
<?php namespace Acme\AmqpWrapper; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; class WorkerSender { /* ... SOME OTHER CODE HERE ... */ /** * Sends an invoice generation task to the workers * * @param int $invoiceNum */ public function execute($invoiceNum) { $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare( 'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters false, #passive - can use this to check whether an exchange exists without modifying the server state true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart false, #exclusive - used by only one connection and the queue will be deleted when that connection closes false #auto delete - queue is deleted when last consumer unsubscribes ); $msg = new AMQPMessage( $invoiceNum, array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits ); $channel->basic_publish( $msg, #message '', #exchange 'invoice_queue' #routing key (queue) ); $channel->close(); $connection->close(); } }
Метод WorkerSender::execute()
получит номер счета. Далее мы создаем соединение, канал и очередь как обычно.
<?php /* ... SOME CODE HERE ... */ $msg = new AMQPMessage( $invoiceNum, array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits ); /* ... SOME CODE HERE ... */
Обратите внимание, что на этот раз при создании объекта сообщения конструктор получает второй параметр: array('delivery_mode' => 2)
. В этом случае мы хотим заявить, что сообщение не должно быть потеряно в случае сбоя сервера RabbitMQ. Помните, что для того, чтобы это работало, очередь также должна быть объявлена длительной.
Следующий код может быть использован для получения данных формы и выполнения производителя:
<?php chdir(dirname(__DIR__)); require_once('vendor/autoload.php'); use Acme\AmqpWrapper\WorkerSender; $inputFilters = array( 'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, ); $input = filter_input_array(INPUT_POST, $inputFilters); $sender = new WorkerSender(); $sender->execute($input['invoiceNo']);
Пожалуйста, используйте подходящую вам дезинфекцию / проверку ввода.
Вещи становятся немного интереснее на стороне потребителя:
<?php namespace Acme\AmqpWrapper; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; class WorkerReceiver { /* ... SOME OTHER CODE HERE ... */ /** * Process incoming request to generate pdf invoices and send them through * email. */ public function listen() { $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare( 'invoice_queue', #queue false, #passive true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs false, #exclusive - queues may only be accessed by the current connection false #auto delete - the queue is deleted when all consumers have finished using it ); /** * don't dispatch a new message to a worker until it has processed and * acknowledged the previous one. Instead, it will dispatch it to the * next worker that is not still busy. */ $channel->basic_qos( null, #prefetch size - prefetch window size in octets, null meaning "no specific limit" 1, #prefetch count - prefetch window in terms of whole messages null #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel ); /** * indicate interest in consuming messages from a particular queue. When they do * so, we say that they register a consumer or, simply put, subscribe to a queue. * Each consumer (subscription) has an identifier called a consumer tag */ $channel->basic_consume( 'invoice_queue', #queue '', #consumer tag - Identifier for the consumer, valid within the current channel. just string false, #no local - TRUE: the server will not send messages to the connection that published them false, #no ack, false - acks turned on, true - off. send a proper acknowledgment from the worker, once we're done with a task false, #exclusive - queues may only be accessed by the current connection false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method array($this, 'process') #callback ); while(count($channel->callbacks)) { $this->log->addInfo('Waiting for incoming messages'); $channel->wait(); } $channel->close(); $connection->close(); } /** * process received request * * @param AMQPMessage $msg */ public function process(AMQPMessage $msg) { $this->generatePdf()->sendEmail(); /** * If a consumer dies without sending an acknowledgement the AMQP broker * will redeliver it to another consumer or, if none are available at the * time, the broker will wait until at least one consumer is registered * for the same queue before attempting redelivery */ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } /** * Generates invoice's pdf * * @return WorkerReceiver */ private function generatePdf() { /** * Mocking a pdf generation processing time. This will take between * 2 and 5 seconds */ sleep(mt_rand(2, 5)); return $this; } /** * Sends email * * @return WorkerReceiver */ private function sendEmail() { /** * Mocking email sending time. This will take between 1 and 3 seconds */ sleep(mt_rand(1,3)); return $this; } }
Как обычно, мы должны создать соединение, получить канал и объявить очередь (параметры очереди должны быть такими же, как у производителя).
<?php /* ... SOME CODE HERE ... */ $channel->basic_qos( null, #prefetch size - prefetch window size in octets, null meaning "no specific limit" 1, #prefetch count - prefetch window in terms of whole messages null #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel ); /* ... SOME CODE HERE ... */
Чтобы иметь рабочее поведение (отправка сообщений между несколькими процессами), мы должны объявить параметры качества обслуживания (qos) с помощью $channel->basic_qos()
:
- Размер предварительной выборки : без особых ограничений, у нас может быть столько рабочих, сколько нам нужно
- Счетчик предварительной выборки: сколько сообщений нужно извлечь на одного работника перед отправкой подтверждения. Это заставит работника обрабатывать 1 сообщение за раз.
- Global : null означает, что указанные выше настройки будут применяться только к этому потребителю.
Далее мы начнем потреблять с ключевой разницы в параметрах. Мы отключим автоматическое подтверждение, поскольку сообщим серверу RabbitMQ, когда закончим обработку сообщения, и будем готовы получить новое.
Теперь, как мы можем отправить это подтверждение? Пожалуйста, взгляните на метод WorkerReceiver::process()
(который объявляется как метод обратного вызова при получении сообщения). Вызовы методы generatedPdf()
и sendEmail()
— это просто фиктивные методы, которые будут имитировать время, затрачиваемое на выполнение обеих задач. Параметр $msg
не только содержит полезную нагрузку, отправленную производителем, но также содержит информацию об объектах, используемых производителем. Мы можем извлечь информацию о канале, используемом производителем, с помощью $msg->delivery_info['channel']
(это тот же тип объекта, что и канал, который мы открыли для потребителя с помощью $connection->channel();
). Так как нам нужно отправить каналу производителя подтверждение того, что мы завершили процесс, мы будем использовать его basic_ack()
, отправляя в качестве параметра тег доставки ( $msg->delivery_info['delivery_tag']
) RabbitMQ, автоматически сгенерированный в порядке правильно связать с каким сообщением принадлежит ack.
Как мы увольняем рабочих? Просто создайте файл, подобный следующему, вызывая метод WorkerReceiver::listen()
:
<?php chdir(dirname(__DIR__)); require_once('vendor/autoload.php'); use Acme\AmqpWrapper\WorkerReceiver; $worker = new WorkerReceiver(); $worker->listen();
Теперь используйте команду php (например, php worker.php
или php worker.php
имя, указанное вами выше), чтобы запустить работника. Но подождите, цель состояла в том, чтобы иметь двух или более рабочих, не так ли? Нет проблем, запустите больше рабочих одним и тем же способом, чтобы иметь несколько процессов в одном файле, и RabbitMQ зарегистрирует потребителей и распределит работу между ними в соответствии с параметрами QoS.
Пример 2: отправка запросов RPC и ожидание ответа
До сих пор мы отправляли сообщения на сервер RabbitMQ без необходимости ждать ответа пользователя. Это нормально для асинхронных процессов, которые могут занять больше времени, чем пользователь готов потратить, чтобы увидеть сообщение «ОК». Но что, если нам действительно нужен ответ? Допустим, какой-то результат сложного расчета, чтобы мы могли показать его пользователю?
Допустим, у нас есть централизованный сервер входа в систему (единый вход), который будет работать в качестве механизма аутентификации, изолированного от остальных наших приложений. Единственный способ добраться до этого сервера — через RabbitMQ. Нам необходимо реализовать способ отправки учетных данных для входа в систему на этот сервер и ожидания ответа о предоставлении / запрете доступа.
Нам нужно реализовать следующий шаблон:
Как обычно, давайте сначала посмотрим на производителя:
<?php namespace Acme\AmqpWrapper; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; class RpcSender { private $response; /** * @var string */ private $corr_id; /* ...SOME OTHER CODE HERE... */ /** * @param array $credentials * @return string */ public function execute($credentials) { $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); /* * creates an anonymous exclusive callback queue * $callback_queue has a value like amq.gen-_U0kJVm8helFzQk9P0z9gg */ list($callback_queue, ,) = $channel->queue_declare( "", #queue false, #passive false, #durable true, #exclusive false #auto delete ); $channel->basic_consume( $callback_queue, #queue '', #consumer tag false, #no local false, #no ack false, #exclusive false, #no wait array($this, 'onResponse') #callback ); $this->response = null; /* * $this->corr_id has a value like 53e26b393313a */ $this->corr_id = uniqid(); $jsonCredentials = json_encode($credentials); /* * create a message with two properties: reply_to, which is set to the * callback queue and correlation_id, which is set to a unique value for * every request */ $msg = new AMQPMessage( $jsonCredentials, #body array('correlation_id' => $this->corr_id, 'reply_to' => $callback_queue) #properties ); /* * The request is sent to an rpc_queue queue. */ $channel->basic_publish( $msg, #message '', #exchange 'rpc_queue' #routing key ); while(!$this->response) { $channel->wait(); } $channel->close(); $connection->close(); return $this->response; } /** * When a message appears, it checks the correlation_id property. If it * matches the value from the request it returns the response to the * application. * * @param AMQPMessage $rep */ public function onResponse(AMQPMessage $rep) { if($rep->get('correlation_id') == $this->corr_id) { $this->response = $rep->body; } } }
Рассматривая метод RpcSender::execute
, обратите внимание, что параметр $credentials
является массивом в виде ['username'=>'x', 'password'=>'y']
. Опять же, мы открываем новое соединение и создаем канал как обычно.
<?php //... list($callback_queue, ,) = $channel->queue_declare( "", #queue false, #passive false, #durable true, #exclusive false #auto delete ); //...
Первое отличие заключается в объявлении очереди. Сначала обратите внимание, что мы используем конструкцию list()
чтобы поймать результат из $channel->queue_declare()
. Это связано с тем, что мы не отправляем имя очереди явным образом при ее объявлении, поэтому нам необходимо выяснить, как эта очередь идентифицируется. Нас интересует только первый элемент массива результатов, который будет уникальным идентификатором очереди (что-то вроде amq.gen-_U0kJVm8helFzQk9P0z9gg
). Второе изменение заключается в том, что мы должны объявить эту очередь эксклюзивной, чтобы не было путаницы в результатах других параллельных процессов.
Другое большое изменение заключается в том, что производитель будет также потребителем очереди, при выполнении $channel->basic_consume()
обратите внимание, что мы предоставляем значение $callback_queue
мы получили при объявлении очереди. И, как и любой потребитель, мы объявим обратный вызов для выполнения, когда процесс получит ответ.
<?php //... /* * $this->corr_id has a value like 53e26b393313a */ $this->corr_id = uniqid(); //...
Затем мы должны создать идентификатор корреляции для сообщения, это не более чем уникальный идентификатор для каждого сообщения. В примере мы используем uniqid()
, но вы можете использовать любой механизм, который вы предпочитаете (если он не создает условия гонки, не обязательно должен быть сильным, крипто-безопасным ГСЧ).
<?php //... $msg = new AMQPMessage( $jsonCredentials, #body array('correlation_id' => $this->corr_id, 'reply_to' => $callback_queue) #properties ); //...
Теперь давайте создадим сообщение, в котором есть важные изменения по сравнению с тем, к чему мы привыкли в первых 2 примерах. Помимо назначения json-кодированной строки, содержащей учетные данные, которые мы хотим аутентифицировать, мы должны предоставить конструктору AMQPMessage
массив с двумя определенными свойствами:
- correlation_id : тег для сообщения
- reply_to : идентификатор очереди, сгенерированный при его объявлении
После публикации сообщения мы оценим ответ, который вначале будет пустым. Пока значение ответа остается пустым, мы будем ждать ответа от канала с помощью $channel->wait();
,
Как только мы получим ответ от канала, будет вызван метод обратного вызова ( RpcSender::onResponse()
). Этот метод будет сопоставлять полученный идентификатор корреляции с сгенерированным и, если они совпадают, будет устанавливать тело ответа, таким образом прерывая цикл while.
А как насчет потребителя RPC? Вот:
<?php namespace Acme\AmqpWrapper; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; class RpcReceiver { /* ... SOME OTHER CODE HERE... */ /** * Listens for incoming messages */ public function listen() { $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare( 'rpc_queue', #queue false, #passive false, #durable false, #exclusive false #autodelete ); $channel->basic_qos( null, #prefetch size 1, #prefetch count null #global ); $channel->basic_consume( 'rpc_queue', #queue '', #consumer tag false, #no local false, #no ack false, #exclusive false, #no wait array($this, 'callback') #callback ); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } /** * Executes when a message is received. * * @param AMQPMessage $req */ public function callback(AMQPMessage $req) { $credentials = json_decode($req->body); $authResult = $this->auth($credentials); /* * Creating a reply message with the same correlation id than the incoming message */ $msg = new AMQPMessage( json_encode(array('status' => $authResult)), #message array('correlation_id' => $req->get('correlation_id')) #options ); /* * Publishing to the same channel from the incoming message */ $req->delivery_info['channel']->basic_publish( $msg, #message '', #exchange $req->get('reply_to') #routing key ); /* * Acknowledging the message */ $req->delivery_info['channel']->basic_ack( $req->delivery_info['delivery_tag'] #delivery tag ); } /** * @param \stdClass $credentials * @return bool */ private function auth(\stdClass $credentials) { if (($credentials->username == 'admin') && ($credentials->password == 'admin')) { return true; } else { return false; } } }
То же самое старое соединение и создание канала ?
То же, что и объявление очереди, однако эта очередь будет иметь предопределенное имя (‘ rpc_queue ‘). Мы определим параметры QoS, так как мы отключим автоматическое подтверждение, поэтому мы можем уведомить, когда мы закончим проверку учетных данных и получим результат.
<?php //... $msg = new AMQPMessage( json_encode(array('status' => $authResult)), #message array('correlation_id' => $req->get('correlation_id')) #options ); //...
Магия приходит из объявленного обратного вызова. Как только мы закончим аутентификацию учетных данных (да, я знаю, что процесс выполняется со статическими значениями имени пользователя / пароля, этот учебник не о том, как аутентифицировать учетные данные;)), мы должны создать ответное сообщение с той же самой корреляцией id производителя создано. Мы можем извлечь это из сообщения запроса с помощью $req->get('correlation_id')
, передавая это значение так же, как мы делали это в производителе.
<?php //... $req->delivery_info['channel']->basic_publish( $msg, #message '', #exchange $req->get('reply_to') #routing key ); //...
Теперь мы должны опубликовать это сообщение в той же очереди, которая была создана в источнике (со случайным именем). Мы извлекаем имя очереди с помощью $req->get('reply_to')
и используем его в качестве ключа маршрутизации в basic_publish()
.
После того, как мы опубликовали сообщение, мы должны отправить уведомление ack на канал с помощью $req->delivery_info['channel']->basic_ack()
, используя тег доставки в $req->delivery_info['delivery_tag']
чтобы продюсер может перестать ждать.
Снова запустите процесс прослушивания, и вы готовы к работе. Вы можете даже объединить примеры 2 и 3, чтобы иметь процесс rpc с несколькими рабочими для выполнения запросов аутентификации, который можно масштабировать, просто увольняя нескольких рабочих.
О RabbitMQ и AMQP можно сказать гораздо больше, например о виртуальных хостах, типах обмена, администрировании сервера и т. Д. Вы можете найти больше шаблонов приложений (таких как маршрутизация, темы) здесь и на странице документации . Существует также инструмент командной строки для управления RabbitMQ, а также веб-интерфейс .
Если вам понравился этот учебник, и вы хотели бы узнать больше о MQ и других реальных случаях использования, сообщите нам об этом в комментариях ниже!