Параллельные вычисления на PHP
Dmitrii Derepko (@xepozz)Недавно попалась задача сделать параллельные расчеты без использования внешних сервисов вроде Redis, RabbitMQ, Database и прочие.
PCNTL
Первое решение, которое пришло в голову – форк процесса через pcntl расширение для PHP.
На первый взгляд это выглядит очень просто. pcntl работает следующим образом:
- Вызываем
pcntl_fork()в месте, где хотим отделить от родительского процесса новый дочерний - Далее нужно проверить возвращаемый результат, чтобы понять “в каком процессе” сейчас выполняется код:
<?php
$pid = pcntl_fork();
if ($pid === -1) {
// произошла ошибка при форке,
// но мы можем её обработать и попробовать сделать форк еще раз
}
if ($pid === 0) {
// логика дочернего процесса
// завершаем форк
exit(0);
} else {
// родительский процесс
}
- После
pcntl_fork()с того же места продолжают работать несколько процессов параллельно. Условием мы управляем какой код должен будет выполнится далее
Не вдаваясь в дальнейшие подробности, хотелось бы сразу понять ограничения:
- Не получится создать большое количество процессов: от тысячи и выше, зависит у всех от системы
- Передать результат работы от child процесса к parent процессу простыми привычными операциями с заполнение массива или записью в объект не получится. Участки памяти у двух процессов разделены. У каждого своя копия. Придется использовать
shmemрасширения для общения между двумя процессами, либо какое-то другое внешнее хранилище
Реализация имитации отправки писем с помощью pcntl
const MAX_PROCESSES = 1000;
function queue_tasks(\Generator $fetchUsers): void
{
$usersNumber = 0;
$processes = [];
foreach ($fetchUsers as $user) {
/**
* Дебаунс, чтобы не досить систему
*/
if (count($processes) >= MAX_PROCESSES) {
foreach ($processes as $pid => $status) {
if (\pcntl_waitpid($pid, $status, WNOHANG) == -1) {
unset($processes[$pid]);
}
}
}
$pid = pcntl_fork();
if ($pid === -1) {
echo sprintf('Ошибка при форке процесса: "%s"...', pcntl_strerror(pcntl_errno())), PHP_EOL;
exit(1);
}
$usersNumber++;
if ($pid > 0) {
// parent
$processes[$pid] = 0;
//echo sprintf('Старт процесса #%s', $pid), PHP_EOL; // на больших объемах это будет мешать
} else {
// child
sleep(rand(1, 10));
echo sprintf(
'Письмо отправлено пользователю #%d "%s".' . PHP_EOL,
$user['id'],
$user['email'],
);
exit(0);
}
}
/**
* Дожидаемся остальных
*/
wait_all_processes_done($processes);
echo sprintf('Всего обработано %s пользователей', $usersNumber), PHP_EOL;
}
function wait_all_processes_done(array &$processes): void
{
while (($pid = \pcntl_waitpid(0, $status)) !== -1) {
unset($processes[$pid]);
//echo sprintf('Процесс завершен #%d осталось %s процессов', $pid, count($processes)), PHP_EOL;
}
}
Генераторы
Вторая идея, которая мне пришла в голову – это использование генераторов, но вся полезная работа будет происходить с Input/Output, а в стандартных драйверах практически всё* взаимодействие с IO является блокируемым, а значит генератор всё еще будет задерживать взаимодействие с внешним миром.
Однако, не всё взаимодействие является блокируемым. Библиотека сокетов позволяет сделать это с минимальными задержками.
Можно было бы написать свои обвязки, а можно просто взять готовые реализации. Например, AMPHP.
Для тестирования гипотезы можно взять пример с официальной документации:
<?php
require __DIR__ . '/vendor/autoload.php';
use Amp\Future;
use function Amp\async;
use function Amp\delay;
$future1 = async(function () {
echo 'Hello ';
delay(2);
echo 'the future! ';
});
$future2 = async(function () {
echo 'World ';
delay(1);
echo 'from ';
});
echo "Let's start: ";
$future1->await();
$future2->await();
echo PHP_EOL;
Результат, понятное дело, будет Let's start: Hello World from the future!
Немного доработать скрипт и можно получить следующий код
<?php
require __DIR__ . '/../vendor/autoload.php';
use function Amp\async;
use function Amp\delay;
use function Amp\Future\awaitAll;
ini_set('memory_limit', '3G');
const MAX_PROCESSES = 100_000;
const USERS = 1_000_000;
function generator_chunk(\Generator $generator, int $n): \Generator
{
for ($i = 0; $generator->valid() && $i < $n; $generator->next(), ++$i) {
yield $generator->current();
}
}
$fetchUsers = function (): \Generator {
for ($i = 1; $i <= USERS; $i++) {
yield [
'id' => $i,
'name' => random_bytes(30),
'email' => "email_{$i}@email.com",
];
}
};
function queue_tasks(\Generator $fetchUsers): void
{
$chunkNumber = 0;
while ($fetchUsers->valid()) {
echo sprintf('Chunk #%d' . PHP_EOL, ++$chunkNumber);
$chunk = [];
foreach (generator_chunk($fetchUsers, MAX_PROCESSES) as $value) {
$chunk[] = async(static function (array $user): void {
delay(rand(1, 10));
echo sprintf('Письмо отправлено пользователю #%d "%s".' . PHP_EOL, $user['id'], $user['email']);
}, $value)
->catch(static function (\Throwable $e) {
echo sprintf('Ошибка: %s' . PHP_EOL, $e->getMessage());
});
}
awaitAll($chunk);
}
}
- Что здесь происходит…
Какие есть ограничения?
Количество параллельных взаимодействий:
- Я сделал ограничение в 100 тысяч параллельных обработчиков. Всегда можно подтюнить под свои мощности
Общение с основным процессом:
- Взаимодействие между субпроцессами вполне обычное, по типу Promise из JS, т.е. обычные callback
Кажется, это самый оптимальный способ создать себе механизм для параллельных вычислений. И это доказывают бенчмарки:
php src/1.php 177.29s user 55.16s system 95% cpu 4:04.17 total
Не фейк? А если реальный http запрос сделать?
Берем какой-нибудь сервис для отслеживания запросов. Я взял https://pipedream.com. Добавляем зависимость amphp/http-client. Вместо delay(rand()) сделаем http запрос. 1 миллион запросов сократим до 1 тысячи, а размер батча сократим до 100. Т.е. будет 10 итераций по 100 запросов.
function queue_tasks(\Generator $fetchUsers): void
{
$chunkNumber = 0;
$client = HttpClientBuilder::buildDefault();
while ($fetchUsers->valid()) {
echo sprintf('Chunk #%d' . PHP_EOL, ++$chunkNumber);
$chunk = [];
foreach (generator_chunk($fetchUsers, MAX_PROCESSES) as $value) {
$chunk[] = async(static function (array $user) use ($client): void {
$client->request(new Request('https://eos1jafdj3tncgb.m.pipedream.net', 'GET'));
}, $value)
->catch(static function (\Throwable $e) {
echo sprintf('Ошибка: %s' . PHP_EOL, $e->getMessage());
});
}
awaitAll($chunk);
}
}
Логируем время в скрипте:
$start = microtime(true);
queue_tasks($fetchUsers());
$end = microtime(true);
echo sprintf(
'Все задачи выполнены. Время: %s секунд.',
($end - $start),
), PHP_EOL;
Запускаем:
Все задачи выполнены. Время: 8.2028141021729 секунд. php src/1.php 0.87s user 0.10s system 11% cpu 8.358 total
8 секунд на отправку 1 000 HTTP запросов. Неплохо? Даже очень! Если попробуете заменить лимиты на
Думаю, в какой-нибудь реальной задачи в следующий раз попробую асинхронный PHP, потому что результаты удивляют. Хоть на TechEmpower AMPHP занимает 50-ю строчку, это явно быстрее классического PHP или обвязки с брокерами очередей.
В целом подобные фреймворки существует давно и показывает отличные результаты в бенчмарках. В следующий раз попробую еще какой-нибудь фреймворк и поделюсь результатами.
Пример AMPHP: https://amphp.org/installation#hello-world
Shared Memory Operations: https://www.php.net/manual/ru/shmop.examples-basic.php
TechEmpower PHP: https://www.techempower.com/benchmarks/#hw=ph&test=fortune§ion=data-r22&l=zik073-cn3
Пост из канала "Хэндлим тему | Дерепко": https://t.me/handle_topic