Celery: проясняем неочевидные моменты

Celery: проясняем неочевидные моменты

Автор: shamaevnn

Начнем с небольшой философии на тему разработки. Вопрос простой: должны ли мы сначала до последней страницы изучить документацию, а затем, когда мы, предположительно, полностью поймем библиотеку изнутри, начать использовать ее в своем коде? Или мы должны сначала использовать ее, поиграться с ней, прежде чем возвращаться и читать документацию.


Celery на самом деле полна подводных камней. Отчасти потому, что внутри происходит работа с параллельными процессами, потоками ... и большую часть времени такие детали скрываются. Зачастую разработчику не нужно думать о них, и, следовательно, контакта со всем этим почти нет. И отчасти поэтому, для разработчика Celery иногда ведет себя самым неожиданным образом. Поэтому в нашем случае чтение документации все-таки необходимо.

Обработчики и Брокеры (Workers & Brokers)

Для начала, нужно объяснить некоторые основные понятия, которые используются в Celery.

Celery — это "Очередь задач" (Task queue). Да, для меня тоже было неизвестно, что это реальный термин. Мне казалось, что это описание того, что это такое в принципе — очередь задач, которые в конечном итоге будут выполнены. Итак, Celery — это программа, которая отслеживает задачи (tasks), которые необходимо выполнить, и в которой есть набор обработчиков (workers), которые будут выполнять эти задачи. Основной смысл в том, что она (программа) может выполнять несколько задач параллельно и что она не блокирует поставщиков (producers) этих самых задач.

Celery на самом деле не хранит все эти задачи в памяти. Для хранения задач есть отдельный сервис, называемый брокером сообщений (message broker), который по сути своей является очередью. Обычно это либо Redis, либо RabbitMQ. Т.е. Celery следит за тем, что происходит в очереди, но хранится она внутри Redis/RabbitMQ.

При запуске Celery создается 1 обработчик.

celery -A tasks worker

Этот обработчик является главным процессом (supervisor process), который будет порождать дочерние процессы или потоки, которые в свою очередь будут выполнять задачи. По умолчанию главный обработчик будет создавать дочерние процессы, а не потоки, и он создаст столько одновременных дочерних процессов, сколько ядер у процессора. Главный процесс будет следить за тем, что происходит с задачами и процессами/потоками, но он не будет запускать сами задачи. Эта группа дочерних процессов или потоков, которая ожидает выполнения задач, называется пулом выполнения (execution pool) или пулом потоков (thread pool).

Очереди (Queues)

Да, тут намеренно используется множественное число для очередей, потому что существует несколько видов очередей 🧙🏽‍⚗️.

Прежде всего, существует главная очередь (main queue), которая принимает задачи от поставщиков (producers) по мере их поступления и передает обработчикам по мере их запроса. По умолчанию есть только одна такая очередь. Все обработчики принимают задачи из одной очереди. Но вы также можете указать несколько таких очередей и назначить конкретные обработчики на определенные очереди. Очередь по умолчанию называется celery.

Чтобы просмотреть первые 100 задач в очереди в Redis, выполните:

redis-cli lrange celery 0 100

Эти очереди сильно напоминают FIFO (First In First Out), но это не совсем так. Задачи, которые сначала помещаются в очередь, первыми удаляются из очереди, НО они не обязательно выполняются первыми.

Когда обработчики извлекают новые задачи из очереди, они обычно (по умолчанию) берут не столько задач, сколько у них есть процессов, они берут больше. По умолчанию формула для количества взятых задач такая:

# n_processes — количествово потоков/процессов
n_tasks = 4 * n_processes

Делается это для экономии времени. Взаимодействие с брокером занимает какое-то время, и если задачи, которые необходимо выполнить, выполняются быстро, то обработчики будут запрашивать дополнительные задачи снова, и снова, и снова. Чтобы избежать этого, они запрашивают в X раз больше задач, чем у них есть процессов, за это отвечает параметр worker_prefetch_multiplier (прим: этот момент хорошо описан в документации).

Но ведь есть задачи, которые никогда не попадают в главную очередь и все равно выполняются. Как это возможно, спросите вы меня? Задавая себе и Google один и тот же вопрос, хочу сообщить вам, что Google очень мало что смог сказать по этому поводу. Нашлись только обрывки информации. Но, посидев с Celery и Redis несколько часов (или дней??), обнаружилось следующее.

Задачи с ETA никогда не помещаются в главную очередь. Они помещаются во что-то среднее между очередью и списком “неподтвержденных задач”, и называется эта очередь unacked (примсокращение от слова "unacknowledged"). Согласитесь, что очень легко пропустить что-то с названием unacked, когда вы пытаетесь понять, как и куда некоторые задачи только что исчезли. Итак, примечание для следующего раза, когда мне или вам нужно будет что-то назвать: все названия, с которыми встречается пользователь, должны быть прописаны полностью.

Итак, что представляют собой ETA задачи? Это запланированные задачи. ETA расшифровывается как "estimated time of arrival" (“предполагаемое время прибытия”). Все задачи, для которых указано ETA или обратный отсчет (countdown), например:

my_task.apply_async((1, 2), countdown=3)
my_task.apply_async((1, 2), eta=tomorrow_datetime)

хранятся в unacked очереди. Сюда же попадают и задачи с ретраями, потому что при повторном выполнении задачи она повторяется через определенное количество секунд, а это означает, что у нее есть ETA.

Посмотреть, какие задачи находятся в очереди ETA в Redis, можно вот так:

redis-cli HGETAL unacked

В ответ получите список из чередующихся ключей и значений:

1) "46165d9f-cf45-4a75-ace1-44443337e000"
2) "[{\"body\": \"W1swXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIj\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"priority\": 0, \"body_encoding\": ...
3) "d91e8c77-25c0-497f-9969-0ccce000c6667"
4) "[{\"body\": \"W1s0XSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgI\", \"content-encoding\": \"utf-8\", ...
...

Задачи (Tasks)

Задачи иногда также называют сообщениями. По сути брокер сообщений - это нечто, что передает сообщения из одной системы в другую. В нашем случае сообщение представляет собой описание задачи: название (уникальный идентификатор), входные параметры, время ожидания, количество повторных попыток и тд.

В celery задача является классом. Таким образом, каждый раз, когда вы используете декоратор для функции (например, @shared_task), чтобы сделать ее celery задачей, под капотом создается класс. Это означает, что у каждой задачи есть self, к которому добавляется множество атрибутов, например: namerequeststatuspriorityretries и многое другое. Если мы хотим получить доступ к этим атрибутам, то нужно указать параметр bind=True.

@shared_task(bind=True,...)
def _send_one_email(self, email_type, user_id):
    ...
    num_of_retries = self.request.retries
    ...

Подтверждение задач (Task Acknowledgment)

Ранее мы говорили, что, когда у обработчиков нет задач, они идут и получают еще несколько задач от брокера. Но не все так просто. Когда обработчик “берет” задачу, задача перемещается из главной очереди в unacked очередь. Задача полностью удаляется из брокера только после того, как обработчик подтвердит это. Это означает, что когда обработчик забирает себе очередную пачку задач, на самом деле в этот момент задачи только резервируются. Они помещаются в unacked очередь и другие обработчики их не возьмут. Если процесс обработчика умирает, то эти задачи становятся доступными для других обработчиков.

Итак, когда обработчик всё же подтверждает выполнение задачи? По умолчанию Celery предполагает, что опасно запускать задачу более одного раза, следовательно, подтверждение задачи происходит непосредственно перед ее выполнением. Вы можете изменить это, установив значение acks_late=True. В этом случае задача имеет небольшую вероятность быть запущенной более одного раза, если обработчик, выполняющий ее, умирает в середине выполнения. И под “умирает” буквально подразумевается умереть. Python Exception в коде задачи не приведет к смерти обработчика. Такая задача по-прежнему будет подтверждена, но ее состояние будет FAILURE. Должно произойти что-то из ряда вон выходящее, чтобы обработчик никогда не достиг момента self.confirmate(). И на самом деле это редкость. По этой причине, можно сказать, что значение параметра acks_late имеет мало значения.

ETA

Как уже упоминалось, ETA задачи ... тяжело отыскать. Они никогда не попадают в главную очередь. Они сразу назначаются обработчику и помещаются в unacked очередь. Я подозреваю, что это не было сделано намеренно, чтобы задачи ETA немедленно назначались конкретному обработчику. Скорее это было просто следствием существующего кода. Задача ETA не может попасть в общую очередь, которая работает почти как FIFO. Единственное другое место для нее находится среди неподтвержденных задач, и в этом случае она должно быть зарезервирована одним обработчиком.

Интересно, что время ETA не является точным временем выполнения этой задачи. Вместо этого это самый ранний момент выполнения этой задачи. Как только наступит время ETA, задача должна дождаться освобождения обработчика.

Повторное исполнение задач (Retry Tasks)

Celery по умолчанию сама не делает ретраи для задач. Главным образом потому, что предполагается, что задачи не являются идемпотентными (прим: одинаковыми независимо от момента выполнения), и поэтому небезопасно запускать их более одного раза. Однако, в Celery есть функционал для ретраев задач, но он должен быть явно и отдельно настроен для каждой задачи.

Одним из способов запуска ретрая является вызов self.retry() в задаче. Что происходит после того, когда вы вызываете эту команду? Вычисляется время ETA, собираются некоторые новые метаданные, а затем задача отправляется брокеру, где она попадает в unacked очередь и назначается тому же обработчику, который уже выполнял эту задачу. Именно так ретрай-задачи становятся задачами ETA и, следовательно, никогда не отображаются в главной очереди брокера. Это очень изящная, но неожиданная схема. И опять же, Google очень мало что может сказать по этому поводу.

Подробнее о ретраях читайте в гайде по ретраям в Celery.

CPU, I/O ограничения и Процессы vs Потоки

Как мы уже говорили, по умолчанию Celery выполняет задачи в отдельных процессах, а не в потоках. Но вы можете заставить использовать потоки, стартуя обработчики либо с помощью --poll eventlet, либо с помощью --pool gevent. И eventlet, и gevent на самом деле создают гринлеты, а не потоки. Гринлеты (или зеленые потоки) похожи на потоки, но не являются ими, потому что по определению потоки управляются операционной системой. Гринлеты не полагаются на ОС для обеспечения поддержки потоков, вместо этого они эмулируют многопоточность. Они управляются в пространстве программы, а не в пространстве операционной системы. В любой данный момент нет переключения между потоками. Вместо этого гринлеты добровольно или явно передают управление друг другу в определенных точках вашего кода.

Если ваши задачи сильно загружают процессор, если они выполняют много вычислений (CPU-bound), то вам следует продолжать использовать процессы. Если, с другой стороны, ваши задачи в основном выполняют HTTP-запросы (I/O bound), то лучше использовать потоки. Причина этого заключается в том, что, пока ваша задача ожидает, пока HTTP-запрос вернет результат, она ничего не делает, она не использует процессор и, следовательно, не будет "возражать", если другой поток будет использовать его.

В Celery гораздо больше неочевидных моментов

Документация в Celery далека от идеала. Описание многих функций разбито на части и разбросано по всем страницам. Трудно найти детали реализации. Я не знаю, как Celery будет вести себя за пределами сценариев, которые создает разработчик. Конечно, после нескольких лет интенсивной работы можно было бы хорошо понять, как это работает, но Celery живет на периферии моей повседневной жизни. Celery ведет себя совершенно по-разному, когда находится на сервере и когда находится на моем компьютере. Я вижу, какие задачи были выполнены, но я не вижу, насколько хорошо они были выполнены. Это сложно измерить у того, что выполняется параллельно, в потоках и почти независимо от вашей программы. Я не доверяю Celery, я не верю, что правильно понимаю настройки, или, можно сказать, я не верю, что знаю, как их правильно настроить. Celery подобен приведению, которое приходит и уходит, иногда ломается, но в большинстве случаев просто работает. Надеюсь, Celery справится с теми задачами, которые мы ему поручаем, но если это не так, он будет работать так же тихо и мы ничего об этом не узнаем.

Источники:


Конечно, можно сказать, что всё это можно было вычитать из официальной документации. Как бы да, но нет. И это доказывает нам автор статьи Ines Panker, которая провела много часов (дней?) на чтение документации, поиск в гугле, тыканье в python/celery/redis. Скажем же ей спасибо за это. А я надеюсь, что эта статья была полезна для вас и что у вас появилось больше понимания того, что вы используете.




Report Page