API очереди в Drupal позволяет нам решать ряд задач на более позднем этапе. Это означает, что мы можем помещать элементы в очередь, которая будет выполняться некоторое время в будущем и обрабатывать каждый отдельный элемент в этот момент и хотя бы один раз. Обычно это происходит при запуске CRON , а Drupal 8 позволяет быстро настроить очереди на основе cronjob. Однако это не обязательно должен быть CRON.
В этой статье мы рассмотрим использование API очереди в Drupal 8, исследуя два простых примера. Первый увидит очередь, вызванную Cron, а второй позволит нам сделать это вручную самостоятельно. Тем не менее, фактическая обработка будет обрабатываться аналогичным работником. Если вы хотите продолжить, клонируйте этот репозиторий git, где вы можете найти модуль npq
мы напишем в этой статье.
Модуль, с которым мы будем работать, называется Node Publisher Queue, и он автоматически добавляет вновь созданные узлы, которые сохраняются неопубликованными, в очередь, которая будет опубликована позже. Мы увидим, как позже может быть следующий запуск CRON или ручное действие, инициированное администратором сайта. Для начала давайте разберемся с некоторыми основными понятиями об очередях в Drupal 8.
Теория
В Drupal 8 есть несколько компонентов, которые составляют API очереди.
Наиболее важную роль в этом API играет реализация QueueInterface
которая представляет очередь. Тип очереди по умолчанию, с которым поставляется Drupal 8, в настоящее время представляет собой DatabaseQueue
который является типом надежной очереди, которая гарантирует, что все ее элементы обрабатываются как минимум один раз и в их первоначальном порядке ( FIFO ). Это в отличие от ненадежных очередей, которые делают все возможное для достижения этой цели (что-то, для чего существуют действительные варианты использования).
Типичная роль объекта очереди заключается в создании элементов, их последующем извлечении из очереди и удалении после обработки. Кроме того, он может освобождать элементы, если обработка либо не завершена, либо другому работнику необходимо снова обработать их перед удалением.
Реализация QueueInterface
с помощью общего QueueFactory
. В случае с DatabaseQueue
, первый также использует DatabaseQueueFactory
. Очереди также должны быть созданы, прежде чем они могут быть использованы. Однако DatabaseQueue
уже создается при первой установке Drupal, поэтому никаких дополнительных настроек не требуется.
Работники очереди отвечают за обработку элементов очереди по мере их получения. В Drupal 8 это плагины QueueWorker
которые реализуют QueueWorkerInterface
. Используя QueueWorkerManager
, мы создаем экземпляры этих плагинов и обрабатываем элементы всякий раз, когда необходимо запустить очередь.
Модуль очереди публикации узла
Теперь, когда мы рассмотрели основные концепции API очереди в Drupal 8, давайте запачкаем руки и создадим функциональность, описанную во введении. Наш файл npq.info.yml может быть простым:
name: Node Publish Queue description: Demo module illustrating the Queue API in Drupal 8 core: 8.x type: module
Создание элемента очереди
Внутри файла npq.module
мы заботимся о логике создания элементов очереди всякий раз, когда узел сохраняется и не публикуется:
use Drupal\Core\Entity\EntityInterface; use Drupal\Core\Queue\QueueFactory; use Drupal\Core\Queue\QueueInterface; /** * Implements hook_entity_insert(). */ function npq_entity_insert(EntityInterface $entity) { if ($entity->getEntityTypeId() !== 'node') { return; } if ($entity->isPublished()) { return; } /** @var QueueFactory $queue_factory */ $queue_factory = \Drupal::service('queue'); /** @var QueueInterface $queue */ $queue = $queue_factory->get('cron_node_publisher'); $item = new \stdClass(); $item->nid = $entity->id(); $queue->createItem($item); }
Внутри этой базовой реализации hook_entity_insert()
мы делаем очень простую задачу. Сначала мы извлекаем объект QueueFactoryInterface
из контейнера службы и используем его для получения очереди с именем cron_node_publisher
. Если мы отследим ситуацию, то заметим, что метод get()
в DatabaseQueueFactory
просто создает новый экземпляр DatabaseQueue
с именем, которое мы передаем ему.
Наконец, мы создаем небольшой объект PHP, содержащий идентификатор узла, и создаем элемент в очереди с этими данными. Просто.
Работник очереди CRON
Далее, давайте создадим плагин QueueWorker
который будет обрабатывать элементы очереди при каждом запуске Cron. Однако, поскольку мы знаем, что нам также понадобится один для ручной обработки, который делает то же самое, мы добавим большую часть логики в базовый абстрактный класс. Таким образом, внутри пространства имен Plugin/QueueWorker
нашего модуля у нас может быть класс NodePublishBase
:
/** * @file * Contains Drupal\npq\Plugin\QueueWorker\NodePublishBase.php */ namespace Drupal\npq\Plugin\QueueWorker; use Drupal\Core\Entity\EntityStorageInterface; use Drupal\Core\Plugin\ContainerFactoryPluginInterface; use Drupal\Core\Queue\QueueWorkerBase; use Drupal\node\NodeInterface; use Symfony\Component\DependencyInjection\ContainerInterface; /** * Provides base functionality for the NodePublish Queue Workers. */ abstract class NodePublishBase extends QueueWorkerBase implements ContainerFactoryPluginInterface { /** * The node storage. * * @var \Drupal\Core\Entity\EntityStorageInterface */ protected $nodeStorage; /** * Creates a new NodePublishBase object. * * @param \Drupal\Core\Entity\EntityStorageInterface $node_storage * The node storage. */ public function __construct(EntityStorageInterface $node_storage) { $this->nodeStorage = $node_storage; } /** * {@inheritdoc} */ public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) { return new static( $container->get('entity.manager')->getStorage('node') ); } /** * Publishes a node. * * @param NodeInterface $node * @return int */ protected function publishNode($node) { $node->setPublished(TRUE); return $node->save(); } /** * {@inheritdoc} */ public function processItem($data) { /** @var NodeInterface $node */ $node = $this->nodeStorage->load($data->nid); if (!$node->isPublished() && $node instanceof NodeInterface) { return $this->publishNode($node); } } }
Сразу же видно, что мы используем внедрение зависимостей, чтобы внедрить NodeStorage
в наш класс. Для получения дополнительной информации о внедрении зависимостей и сервисном контейнере см. Мою статью на эту тему.
В этом базовом классе у нас есть два метода: publishNode()
и обязательный processItem()
. Первый публикует и сохраняет переданный ему узел. Последний загружает узел, используя идентификатор узла, содержащийся в объекте $data
и публикует его, если он не опубликован.
Теперь давайте создадим плагин CronNodePublisher
который будет использовать эту логику при CronNodePublisher
Cron:
namespace Drupal\npq\Plugin\QueueWorker; /** * A Node Publisher that publishes nodes on CRON run. * * @QueueWorker( * id = "cron_node_publisher", * title = @Translation("Cron Node Publisher"), * cron = {"time" = 10} * ) */ class CronNodePublisher extends NodePublishBase {}
И это все. Нам не нужна никакая другая логика, кроме той, что уже есть в нашем базовом классе. Обратите внимание, что в аннотации мы говорим Drupal, что Cron должен использовать этого работника для обработки как можно большего количества элементов в течение 10 секунд. Как это произошло?
Всякий раз, когда Cron запускается, он использует QueueWorkerManager
для загрузки всех своих определений плагинов. Затем, если у кого-либо из них есть ключ cron
в их аннотации, Очередь с тем же именем, что и идентификатор работника , загружается для обработки. Наконец, каждый элемент в очереди запрашивается и обрабатывается рабочим, пока не истечет указанное время.
Если мы сейчас сохраним неопубликованный узел, он, скорее всего, будет опубликован при следующем запуске Cron.
Ручной работник
Давайте также создадим возможность обработки очереди вручную. Во-первых, давайте адаптируем реализацию hook_entity_insert()
ранее и изменим эту строку:
$queue = $queue_factory->get('cron_node_publisher');
к этому:
$queue = $queue_factory->get('manual_node_publisher');
Конечно, вы можете предоставить экран администратора для настройки типа издателя узла, который должно использовать приложение.
Во-вторых, давайте создадим наш плагин ManualNodePublisher
:
namespace Drupal\npq\Plugin\QueueWorker; /** * A Node Publisher that publishes nodes via a manual action triggered by an admin. * * @QueueWorker( * id = "manual_node_publisher", * title = @Translation("Manual Node Publisher"), * ) */ class ManualNodePublisher extends NodePublishBase {}
Это почти так же, как с примером CRON, но без ключа cron
.
В-третьих, давайте создадим форму, в которой мы сможем увидеть, сколько элементов находится в очереди manual_node_publisher
и обработать их все одним нажатием кнопки. Внутри npq.routing.yml
в корневой папке модуля:
demo.form: path: '/npq' defaults: _form: '\Drupal\npq\Form\NodePublisherQueueForm' _title: 'Node Publisher' requirements: _permission: 'administer site configuration'
поdemo.form: path: '/npq' defaults: _form: '\Drupal\npq\Form\NodePublisherQueueForm' _title: 'Node Publisher' requirements: _permission: 'administer site configuration'
Мы определяем путь в /npq
который должен использовать указанную форму, которая находится в этом пространстве имен и которую мы можем определить как таковой:
/** * @file * Contains \Drupal\npq\Form\NodePublisherQueueForm. */ namespace Drupal\npq\Form; use Drupal\Core\Form\FormBase; use Drupal\Core\Form\FormStateInterface; use Drupal\Core\Queue\QueueFactory; use Drupal\Core\Queue\QueueInterface; use Drupal\Core\Queue\QueueWorkerInterface; use Drupal\Core\Queue\QueueWorkerManagerInterface; use Drupal\Core\Queue\SuspendQueueException; use Symfony\Component\DependencyInjection\ContainerInterface; class NodePublisherQueueForm extends FormBase { /** * @var QueueFactory */ protected $queueFactory; /** * @var QueueWorkerManagerInterface */ protected $queueManager; /** * {@inheritdoc} */ public function __construct(QueueFactory $queue, QueueWorkerManagerInterface $queue_manager) { $this->queueFactory = $queue; $this->queueManager = $queue_manager; } /** * {@inheritdoc} */ public static function create(ContainerInterface $container) { return new static( $container->get('queue'), $container->get('plugin.manager.queue_worker') ); } /** * {@inheritdoc}. */ public function getFormId() { return 'demo_form'; } /** * {@inheritdoc}. */ public function buildForm(array $form, FormStateInterface $form_state) { /** @var QueueInterface $queue */ $queue = $this->queueFactory->get('node_publisher'); $form['help'] = array( '#type' => 'markup', '#markup' => $this->t('Submitting this form will process the Manual Queue which contains @number items.', array('@number' => $queue->numberOfItems())), ); $form['actions']['#type'] = 'actions'; $form['actions']['submit'] = array( '#type' => 'submit', '#value' => $this->t('Process queue'), '#button_type' => 'primary', ); return $form; } /** * {@inheritdoc} */ public function submitForm(array &$form, FormStateInterface $form_state) { /** @var QueueInterface $queue */ $queue = $this->queueFactory->get('manual_node_publisher'); /** @var QueueWorkerInterface $queue_worker */ $queue_worker = $this->queueManager->createInstance('manual_node_publisher'); while($item = $queue->claimItem()) { try { $queue_worker->processItem($item->data); $queue->deleteItem($item); } catch (SuspendQueueException $e) { $queue->releaseItem($item); break; } catch (\Exception $e) { watchdog_exception('npq', $e); } } } }
Мы снова используем внедрение зависимостей для внедрения QueueFactory
и менеджера для плагинов QueueWorker
. Внутри buildForm()
мы создаем базовую структуру формы и используем метод numberOfItems()
в очереди, чтобы сообщить пользователю, сколько элементов он собирается обработать. И наконец, внутри submitForm()
мы берем на себя обработку. Но как нам это сделать?
Сначала мы загружаем очередь и создаем экземпляр работника очереди (в обоих случаях мы используем идентификатор manual_node_publisher
). Затем мы запускаем цикл while, пока все элементы не будут обработаны. Метод claimItem()
отвечает за блокировку элемента очереди от claimItem()
другой очереди и возврат его для обработки. После того, как это обработано рабочим, мы удаляем это На следующей итерации следующий элемент возвращается и включается до тех пор, пока не останется ни одного элемента.
Хотя мы не использовали его, SuspendQueueException
предназначено для указания того, что во время обработки элемента рабочий обнаружил проблему, которая, скорее всего, также приведет к сбою всех других элементов в очереди. И по этой причине бессмысленно переходить к следующему пункту, поэтому мы вырываемся из цикла. Тем не менее, мы также выпускаем элемент, так что, когда мы попробуем позже, элемент будет доступен. Другие исключения также перехватываются и регистрируются в сторожевой таймер.
Теперь, если мы создадим пару узлов и не опубликуем их, мы увидим их количество в сообщении, если перейдем к /npq
. Нажав кнопку «Отправить», мы обрабатываем (публикуем) их все по одному.
Это был только демонстрационный пример. Всегда важно учитывать потенциальную нагрузку при обработке большого количества элементов и либо ограничить их, чтобы ваш запрос не истекал, либо использовать пакетный API для разделения их на несколько запросов.
Вывод
В этой статье мы рассмотрели API очереди в Drupal 8. Мы изучили некоторые основные понятия о том, как он построен и как он работает, но мы также видели несколько примеров того, как мы можем работать с ним. А именно, мы поработали с двумя вариантами использования, с помощью которых мы можем публиковать неопубликованные узлы либо во время выполнения Cron, либо вручную с помощью действия, выполняемого пользователем.
Вы пробовали API очереди в Drupal 8? Дайте нам знать, как все прошло!