Параллельные вычисления на PHP

Параллельные вычисления на PHP

Dmitrii Derepko (@xepozz)

Недавно попалась задача сделать параллельные расчеты без использования внешних сервисов вроде Redis, RabbitMQ, Database и прочие.


PCNTL

Первое решение, которое пришло в голову – форк процесса через pcntl расширение для PHP.

На первый взгляд это выглядит очень просто. pcntl работает следующим образом:

  • Вызываем pcntl_fork() в месте, где хотим отделить от родительского процесса новый дочерний
  • Далее нужно проверить возвращаемый результат, чтобы понять “в каком процессе” сейчас выполняется код:
<?php

$pid = pcntl_fork();

if ($pid === -1) {
  // произошла ошибка при форке, 
  // но мы можем её обработать и попробовать сделать форк еще раз
}

if ($pid === 0) {
  // логика дочернего процесса
  // завершаем форк
  exit(0);
} else {
  // родительский процесс
}
  • После pcntl_fork() с того же места продолжают работать несколько процессов параллельно. Условием мы управляем какой код должен будет выполнится далее

Не вдаваясь в дальнейшие подробности, хотелось бы сразу понять ограничения:

  • Не получится создать большое количество процессов: от тысячи и выше, зависит у всех от системы
  • Передать результат работы от child процесса к parent процессу простыми привычными операциями с заполнение массива или записью в объект не получится. Участки памяти у двух процессов разделены. У каждого своя копия. Придется использовать shmem расширения для общения между двумя процессами, либо какое-то другое внешнее хранилище

Реализация имитации отправки писем с помощью pcntl

const MAX_PROCESSES = 1000;

function queue_tasks(\Generator $fetchUsers): void
{
    $usersNumber = 0;
    $processes = [];
    foreach ($fetchUsers as $user) {
        /**
         * Дебаунс, чтобы не досить систему
         */
        if (count($processes) >= MAX_PROCESSES) {
            foreach ($processes as $pid => $status) {
                if (\pcntl_waitpid($pid, $status, WNOHANG) == -1) {
                    unset($processes[$pid]);
                }
            }
        }
        $pid = pcntl_fork();
        if ($pid === -1) {
            echo sprintf('Ошибка при форке процесса: "%s"...', pcntl_strerror(pcntl_errno())), PHP_EOL;
            exit(1);
        }

        $usersNumber++;
        if ($pid > 0) {
            // parent
            $processes[$pid] = 0;
            //echo sprintf('Старт процесса #%s', $pid), PHP_EOL; // на больших объемах это будет мешать
        } else {
            // child
            sleep(rand(1, 10));

            echo sprintf(
                'Письмо отправлено пользователю #%d "%s".' . PHP_EOL,
                $user['id'],
                $user['email'],
            );
            exit(0);
        }
    }

    /**
     * Дожидаемся остальных
     */
    wait_all_processes_done($processes);

    echo sprintf('Всего обработано %s пользователей', $usersNumber), PHP_EOL;
}

function wait_all_processes_done(array &$processes): void
{
    while (($pid = \pcntl_waitpid(0, $status)) !== -1) {
        unset($processes[$pid]);
        //echo sprintf('Процесс завершен #%d осталось %s процессов', $pid, count($processes)), PHP_EOL;
    }
}


Генераторы

Вторая идея, которая мне пришла в голову – это использование генераторов, но вся полезная работа будет происходить с Input/Output, а в стандартных драйверах практически всё* взаимодействие с IO является блокируемым, а значит генератор всё еще будет задерживать взаимодействие с внешним миром.

Однако, не всё взаимодействие является блокируемым. Библиотека сокетов позволяет сделать это с минимальными задержками.

Можно было бы написать свои обвязки, а можно просто взять готовые реализации. Например, AMPHP.

Для тестирования гипотезы можно взять пример с официальной документации:

<?php

require __DIR__ . '/vendor/autoload.php';

use Amp\Future;
use function Amp\async;
use function Amp\delay;

$future1 = async(function () {
    echo 'Hello ';
    delay(2);
    echo 'the future! ';
});

$future2 = async(function () {
    echo 'World ';
    delay(1);
    echo 'from ';
});

echo "Let's start: ";

$future1->await();
$future2->await();

echo PHP_EOL;

Результат, понятное дело, будет Let's start: Hello World from the future!

Немного доработать скрипт и можно получить следующий код

<?php

require __DIR__ . '/../vendor/autoload.php';

use function Amp\async;
use function Amp\delay;
use function Amp\Future\awaitAll;

ini_set('memory_limit', '3G');

const MAX_PROCESSES = 100_000;
const USERS = 1_000_000;

function generator_chunk(\Generator $generator, int $n): \Generator
{
    for ($i = 0; $generator->valid() && $i < $n; $generator->next(), ++$i) {
        yield $generator->current();
    }
}

$fetchUsers = function (): \Generator {
    for ($i = 1; $i <= USERS; $i++) {
        yield [
            'id' => $i,
            'name' => random_bytes(30),
            'email' => "email_{$i}@email.com",
        ];
    }
};

function queue_tasks(\Generator $fetchUsers): void
{
    $chunkNumber = 0;
    while ($fetchUsers->valid()) {
        echo sprintf('Chunk #%d' . PHP_EOL, ++$chunkNumber);
        $chunk = [];
        foreach (generator_chunk($fetchUsers, MAX_PROCESSES) as $value) {
            $chunk[] = async(static function (array $user): void {
                delay(rand(1, 10));

                echo sprintf('Письмо отправлено пользователю #%d "%s".' . PHP_EOL, $user['id'], $user['email']);
            }, $value)
                ->catch(static function (\Throwable $e) {
                    echo sprintf('Ошибка: %s' . PHP_EOL, $e->getMessage());
                });
        }

        awaitAll($chunk);
    }
}
  • Что здесь происходит…

Какие есть ограничения?

Количество параллельных взаимодействий:

  • Я сделал ограничение в 100 тысяч параллельных обработчиков. Всегда можно подтюнить под свои мощности

Общение с основным процессом:

  • Взаимодействие между субпроцессами вполне обычное, по типу Promise из JS, т.е. обычные callback

Кажется, это самый оптимальный способ создать себе механизм для параллельных вычислений. И это доказывают бенчмарки:

php src/1.php  177.29s user 55.16s system 95% cpu 4:04.17 total

Не фейк? А если реальный http запрос сделать?


Берем какой-нибудь сервис для отслеживания запросов. Я взял https://pipedream.com. Добавляем зависимость amphp/http-client. Вместо delay(rand()) сделаем http запрос. 1 миллион запросов сократим до 1 тысячи, а размер батча сократим до 100. Т.е. будет 10 итераций по 100 запросов.

function queue_tasks(\Generator $fetchUsers): void
{
    $chunkNumber = 0;
    $client = HttpClientBuilder::buildDefault();
    while ($fetchUsers->valid()) {
        echo sprintf('Chunk #%d' . PHP_EOL, ++$chunkNumber);
        $chunk = [];
        foreach (generator_chunk($fetchUsers, MAX_PROCESSES) as $value) {
            $chunk[] = async(static function (array $user) use ($client): void {
                $client->request(new Request('https://eos1jafdj3tncgb.m.pipedream.net', 'GET'));
            }, $value)
                ->catch(static function (\Throwable $e) {
                    echo sprintf('Ошибка: %s' . PHP_EOL, $e->getMessage());
                });
        }

        awaitAll($chunk);
    }
}

Логируем время в скрипте:

$start = microtime(true);
queue_tasks($fetchUsers());
$end = microtime(true);

echo sprintf(
    'Все задачи выполнены. Время: %s секунд.',
    ($end - $start),
), PHP_EOL;

Запускаем:

Все задачи выполнены. Время: 8.2028141021729 секунд.
php src/1.php  0.87s user 0.10s system 11% cpu 8.358 total

8 секунд на отправку 1 000 HTTP запросов. Неплохо? Даже очень! Если попробуете заменить лимиты на


Думаю, в какой-нибудь реальной задачи в следующий раз попробую асинхронный PHP, потому что результаты удивляют. Хоть на TechEmpower AMPHP занимает 50-ю строчку, это явно быстрее классического PHP или обвязки с брокерами очередей.

В целом подобные фреймворки существует давно и показывает отличные результаты в бенчмарках. В следующий раз попробую еще какой-нибудь фреймворк и поделюсь результатами.


Пример AMPHP: https://amphp.org/installation#hello-world

Shared Memory Operations: https://www.php.net/manual/ru/shmop.examples-basic.php

TechEmpower PHP: https://www.techempower.com/benchmarks/#hw=ph&test=fortune&section=data-r22&l=zik073-cn3

Пост из канала "Хэндлим тему | Дерепко": https://t.me/handle_topic


Report Page