Статьи

Параллельное программирование с помощью Pthreads в PHP — основы

Эта статья была рецензирована Кристофером Питтом . Спасибо всем рецензентам SitePoint за то, что сделали контент SitePoint как можно лучше!


Разработчики PHP, кажется, редко используют параллелизм. Привлекательность простоты синхронного однопоточного программирования, безусловно, высока, но иногда использование небольшого параллелизма может привести к значительным улучшениям производительности.

В этой статье мы рассмотрим, как можно добиться многопоточности в PHP с помощью расширения pthreads . Для этого потребуется версия PHP 7.x для ZTS (Zend Thread Safety), а также установленная версия pthreads v3. (На момент написания статьи пользователям PHP 7.1 потребуется установить из основной ветки репозитория pthreads — подробности о создании сторонних расширений из исходного кода см. В разделе этой статьи .)

В качестве краткого пояснения: pthreads v2 нацелена на PHP 5.x и больше не поддерживается; pthreads v3 нацелена на PHP 7.x и активно развивается.

Параллельное исполнение абстрактного изображения

Большое спасибо Джо Уоткинсу (создателю расширения pthreads) за корректуру и помощь в улучшении моей статьи!

Когда не использовать pthreads

Прежде чем мы продолжим, я сначала хотел бы уточнить, когда вы не должныне можете ) использовать расширение pthreads.

В pthreads v2 рекомендация заключалась в том, что pthreads не следует использовать в среде веб-сервера (т.е. в процессе FCGI). Начиная с pthreads v3, эта рекомендация была выполнена, поэтому теперь вы просто не можете использовать ее в среде веб-сервера. Две основные причины этого:

  1. В такой среде небезопасно использовать несколько потоков (среди прочего, возникают проблемы с вводом-выводом).
  2. Это не хорошо масштабируется. Например, допустим, у вас есть скрипт PHP, который создает новый поток для обработки некоторой работы, и этот скрипт выполняется при каждом запросе. Это означает, что для каждого запроса ваше приложение будет создавать один новый поток (это модель потоков 1: 1 — один поток на один запрос). Если ваше приложение обслуживает 1000 запросов в секунду, то оно создает 1000 потоков в секунду! Наличие такого количества потоков, запущенных на одной машине, быстро затопит его, и проблема будет только усугубляться при увеличении частоты запросов.

Вот почему многопоточность не является хорошим решением в такой среде. Если вы ищете многопоточность как решение задач IO-блокировки (например, выполнение HTTP-запросов), то позвольте мне указать вам направление асинхронного программирования , которое может быть достигнуто с помощью таких сред, как Amp . SitePoint выпустил несколько отличных статей, посвященных этой теме (например, написание асинхронных библиотек и Modding Minecraft на PHP ), на случай, если вам интересно.

С этим из пути, давайте прыгнем прямо в вещи!

Обработка разовых задач

Иногда вам захочется обрабатывать одноразовые задачи многопоточным способом (например, выполнять некоторые задачи, связанные с вводом-выводом). В таких случаях класс Thread может использоваться для создания нового потока и выполнения некоторой единицы работы в этом отдельном потоке.

Например:

 $task = new class extends Thread { private $response; public function run() { $content = file_get_contents("http://google.com"); preg_match("~<title>(.+)</title>~", $content, $matches); $this->response = $matches[1]; } }; $task->start() && $task->join(); var_dump($task->response); // string(6) "Google" 

Выше метод run — это наша единица работы, которая будет выполняться внутри нового потока. При вызове Thread::start создается новый поток и вызывается метод run . Затем мы присоединяем порожденный поток обратно к основному потоку (через Thread::join ), который будет блокироваться, пока отдельный поток не завершит выполнение. Это гарантирует, что задача завершилась до того, как мы попытаемся вывести результат (хранится в $task->response ).

Может быть нежелательно загрязнять ответственность класса логикой, связанной с потоками (включая необходимость определения метода выполнения). Мы можем разделить такие классы, если вместо этого они расширяют класс Threaded , где они затем могут запускаться в других потоках:

 class Task extends Threaded { public $response; public function someWork() { $content = file_get_contents('http://google.com'); preg_match('~<title>(.+)</title>~', $content, $matches); $this->response = $matches[1]; } } $task = new Task; $thread = new class($task) extends Thread { private $task; public function __construct(Threaded $task) { $this->task = $task; } public function run() { $this->task->someWork(); } }; $thread->start() && $thread->join(); var_dump($task->response); 

Любой класс, который необходимо запустить внутри отдельного потока, должен каким-то образом расширять класс Threaded . Это связано с тем, что он обеспечивает необходимые возможности для работы в разных потоках, а также обеспечивает неявную безопасность и полезные интерфейсы (для таких вещей, как синхронизация ресурсов).

Давайте кратко рассмотрим иерархию классов, предоставляемых pthreads:

 Threaded (implements Traversable, Collectable) Thread Worker Volatile Pool 

Мы уже видели и изучили основы классов Thread и Threaded , поэтому теперь давайте посмотрим на остальные три ( Worker , Volatile и Pool ).

Переработка потоков

Раскрутка нового потока для каждой задачи, которую нужно распараллелить, стоит дорого. Это связано с тем, что pthreads должна использовать архитектуру без совместного использования ресурсов для достижения многопоточности внутри PHP. Это означает, что весь контекст выполнения текущего экземпляра интерпретатора PHP (включая каждый класс, интерфейс, свойство и функцию) должен быть скопирован для каждого созданного потока. Поскольку это оказывает заметное влияние на производительность, поток всегда должен использоваться повторно, когда это возможно. Потоки могут быть повторно использованы двумя способами: с Worker или с Pool .

Класс Worker используется для синхронного выполнения ряда задач внутри другого потока. Это делается путем создания нового экземпляра Worker (который создает новый поток), а затем размещения задач в этом отдельном потоке (через Worker::stack ).

Вот быстрый пример:

 class Task extends Threaded { private $value; public function __construct(int $i) { $this->value = $i; } public function run() { usleep(250000); echo "Task: {$this->value}\n"; } } $worker = new Worker(); $worker->start(); for ($i = 0; $i < 15; ++$i) { $worker->stack(new Task($i)); } while ($worker->collect()); $worker->shutdown(); 

Выход:

Выходной бассейн

Вышеприведенное объединяет 15 задач в новый объект $worker через Worker::stack , а затем обрабатывает их в сложенном порядке. Метод Worker::collect , как показано выше, используется для очистки задач после их завершения. Используя его внутри цикла while, мы блокируем основной поток до тех пор, пока все сложенные задачи не завершат выполнение и не будут очищены, прежде чем мы вызовем Worker::shutdown . Преждевременное завершение рабочего процесса (т. Е. Пока есть задачи, которые должны быть выполнены) будет по-прежнему блокировать основной поток до тех пор, пока все задачи не будут завершены — задачи просто не будут собираться мусором (вызывая утечки памяти).

Класс Worker предоставляет несколько других методов, относящихся к его стеку задач, включая Worker::unstack для удаления самого старого элемента в стеке и Worker::getStacked для количества элементов в стеке выполнения. Стек рабочего содержит только те задачи, которые должны быть выполнены. После выполнения задачи в стеке она удаляется и помещается в отдельный (внутренний) стек для сбора мусора (с помощью Worker::collect ).

Другой способ повторно использовать поток при выполнении многих задач — использовать пул потоков (через класс Pool ). Пулы потоков питаются от группы Worker что позволяет одновременно выполнять задачи, где коэффициент параллелизма (количество потоков, в которых работает пул) указывается при создании пула.

Давайте адаптируем приведенный выше пример, чтобы использовать вместо этого пул рабочих:

 class Task extends Threaded { private $value; public function __construct(int $i) { $this->value = $i; } public function run() { usleep(250000); echo "Task: {$this->value}\n"; } } $pool = new Pool(4); for ($i = 0; $i < 15; ++$i) { $pool->submit(new Task($i)); } while ($pool->collect()); $pool->shutdown(); 

Выход:

Выходной бассейн

Есть несколько заметных различий между использованием пула в отличие от работника. Во-первых, пулы не нужно запускать вручную, они начинают выполнять задачи, как только они становятся доступными. Во-вторых, мы отправляем задачи в пул, а не складываем их. Кроме того, класс Pool не расширяет Threaded , поэтому его нельзя передавать другим потокам (в отличие от Worker ).

В соответствии с хорошей практикой работники и пулы должны всегда собирать свои задания после завершения и закрываться вручную. Потоки, созданные с помощью класса Thread также должны быть присоединены к потоку создателя.

pthreads и (im) изменчивость

Последний класс для покрытия — Volatile — новое дополнение к pthreads v3. Неизменность стала важной концепцией в pthreads, поскольку без нее производительность сильно ухудшается. Поэтому по умолчанию свойства классов Threaded , которые сами являются объектами Threaded , теперь являются неизменяемыми, и поэтому они не могут быть переназначены после первоначального назначения. Явная изменчивость для таких свойств теперь одобрена, и все еще может быть сделана, используя новый класс Volatile .

Давайте кратко рассмотрим пример, чтобы продемонстрировать новые ограничения неизменяемости:

 class Task extends Threaded // a Threaded class { public function __construct() { $this->data = new Threaded(); // $this->data is not overwritable, since it is a Threaded property of a Threaded class } } $task = new class(new Task()) extends Thread { // a Threaded class, since Thread extends Threaded public function __construct($tm) { $this->threadedMember = $tm; var_dump($this->threadedMember->data); // object(Threaded)#3 (0) {} $this->threadedMember = new StdClass(); // invalid, since the property is a Threaded member of a Threaded class } }; 

Threaded свойства классов Volatile , с другой стороны, являются изменяемыми:

 class Task extends Volatile { public function __construct() { $this->data = new Threaded(); $this->data = new StdClass(); // valid, since we are in a volatile class } } $task = new class(new Task()) extends Thread { public function __construct($vm) { $this->volatileMember = $vm; var_dump($this->volatileMember->data); // object(stdClass)#4 (0) {} // still invalid, since Volatile extends Threaded, so the property is still a Threaded member of a Threaded class $this->volatileMember = new StdClass(); } }; 

Мы можем видеть, что класс Volatile переопределяет неизменность, обеспечиваемую его родительским классом Threaded чтобы обеспечить возможность переназначения свойств Threaded (а также unset() ).

Есть только одна последняя фундаментальная тема, касающаяся изменчивости и класса Volatile — массивы. Массивы в pthreads автоматически приводятся к объектам Volatile когда они присваиваются свойству класса Threaded . Это потому, что просто не безопасно манипулировать массивом из нескольких контекстов в PHP.

Давайте снова взглянем на пример, чтобы лучше понять вещи:

 $array = [1,2,3]; $task = new class($array) extends Thread { private $data; public function __construct(array $array) { $this->data = $array; } public function run() { $this->data[3] = 4; $this->data[] = 5; print_r($this->data); } }; $task->start() && $task->join(); /* Output: Volatile Object ( [0] => 1 [1] => 2 [2] => 3 [3] => 4 [4] => 5 ) */ 

Мы можем видеть, что объекты Volatile можно обрабатывать так, как если бы они были массивами, поскольку они обеспечивают поддержку операций на основе массива (как показано выше) с помощью оператора подмножества ( [] ). Volatile классы не поддерживаются общими функциями на основе массива, такими как array_pop и array_shift . Вместо этого класс Threaded предоставляет нам такие операции, как встроенные методы.

В качестве демонстрации:

 $data = new class extends Volatile { public $a = 1; public $b = 2; public $c = 3; }; var_dump($data); var_dump($data->pop()); var_dump($data->shift()); var_dump($data); /* Output: object(class@anonymous)#1 (3) { ["a"]=> int(1) ["b"]=> int(2) ["c"]=> int(3) } int(3) int(1) object(class@anonymous)#1 (1) { ["b"]=> int(2) } */ 

Другие поддерживаемые операции включают в себя Threaded::chunk и Threaded::merge .

синхронизация

Последняя тема, которую мы рассмотрим в этой статье, — синхронизация в pthreads. Синхронизация — это метод обеспечения контролируемого доступа к общим ресурсам.

Например, давайте реализуем наивный счетчик:

 $counter = new class extends Thread { public $i = 0; public function run() { for ($i = 0; $i < 10; ++$i) { ++$this->i; } } }; $counter->start(); for ($i = 0; $i < 10; ++$i) { ++$counter->i; } $counter->join(); var_dump($counter->i); // outputs a number from between 10 and 20 

Без использования синхронизации вывод не является детерминированным. Запись нескольких потоков в одну переменную без контролируемого доступа привела к потере обновлений.

Давайте исправим это, добавив синхронизацию, чтобы мы получили правильный результат 20 :

 $counter = new class extends Thread { public $i = 0; public function run() { $this->synchronized(function () { for ($i = 0; $i < 10; ++$i) { ++$this->i; } }); } }; $counter->start(); $counter->synchronized(function ($counter) { for ($i = 0; $i < 10; ++$i) { ++$counter->i; } }, $counter); $counter->join(); var_dump($counter->i); // int(20) 

Синхронизированные блоки кода также могут взаимодействовать друг с другом, используя Threaded::wait и Threaded::notifyOne (наряду с Threaded::notifyOne ).

Вот пошаговое увеличение двух синхронизированных циклов while:

 $counter = new class extends Thread { public $cond = 1; public function run() { $this->synchronized(function () { for ($i = 0; $i < 10; ++$i) { var_dump($i); $this->notify(); if ($this->cond === 1) { $this->cond = 2; $this->wait(); } } }); } }; $counter->start(); $counter->synchronized(function ($counter) { if ($counter->cond !== 2) { $counter->wait(); // wait for the other to start first } for ($i = 10; $i < 20; ++$i) { var_dump($i); $counter->notify(); if ($counter->cond === 2) { $counter->cond = 1; $counter->wait(); } } }, $counter); $counter->join(); /* Output: int(0) int(10) int(1) int(11) int(2) int(12) int(3) int(13) int(4) int(14) int(5) int(15) int(6) int(16) int(7) int(17) int(8) int(18) int(9) int(19) */ 

Возможно, вы заметили дополнительные условия, которые были помещены вокруг вызовов Threaded::wait . Эти условия имеют решающее значение, поскольку они позволяют возобновить синхронизированный обратный вызов только после получения уведомления и выполнения указанного условия. Это важно, потому что уведомления могут поступать из мест, отличных от вызовов Threaded::notify . Таким образом, если бы вызовы Threaded::wait не были заключены в определенные условия, мы были бы открыты для ложных вызовов пробуждения , что приведет к непредсказуемому коду.

Вывод

Мы видели пять пакетов классов pthreads с ним ( Threaded , Thread , Worker , Volatile и Pool ), включая покрытие при использовании каждого из классов. Мы также рассмотрели новую концепцию неизменяемости в pthreads, а также ознакомились с функцией синхронизации, которую она поддерживает. С этими основными принципами мы можем начать изучать применение pthreads к некоторым реальным сценариям использования! Это будет темой нашего следующего поста.

В то же время, если у вас есть идеи для приложений относительно pthreads, не стесняйтесь опускать их ниже в область комментариев!