Асинхронные задачи с Django и Celery
S0mebodyЕсли длительный процесс является частью рабочего процесса вашего приложения, а не блокирует ответ, вы должны обрабатывать его в фоновом режиме, вне обычного потока запросов/ответов.
Возможно, ваше веб-приложение требует, чтобы пользователи отправили изображение(который, вероятно, потребуется изменить размер) и подтвердили свой адрес электронной почты при регистрации. Если ваше приложение обработало изображение и отправило электронное письмо с подтверждением непосредственно в обработчике запросов, то конечному пользователю придется без необходимости ждать, пока они оба закончат обработку, прежде чем страница загрузится или обновится. Вместо этого вы захотите передать эти процессы в очередь задач и позволить отдельному рабочему процессу обрабатывать их, чтобы вы могли немедленно отправить ответ обратно клиенту. Затем конечный пользователь может делать другие вещи на стороне клиента, пока выполняется обработка. Ваше приложение также может свободно отвечать на запросы других пользователей и клиентов.
Чтобы добиться этого, я проведу вас через процесс настройки Celery и Redis для обработки длительных процессов в Django приложении. Мы также будем использовать Docker и Docker Compose, чтобы связать все вместе. Наконец, мы рассмотрим, как тестировать задачи Celery с помощью модульных и интеграционных тестов.
Рабочий процесс
Наша цель разработать приложение Django, которое работает вместе с Celery для обработки длительных процессов вне обычного цикла запроса/ответа.
- Конечный пользователь запускает новую задачу с помощью запроса POST на стороне сервера.
- В вьюшке задача добавляется в очередь, а id задачи отправляется обратно на клиентскую сторону.
- Используя AJAX, клиент продолжает опрашивать сервер, чтобы проверить статус задачи, пока сама задача выполняется в фоновом режиме.
Настройка проекта
Загрузите zip-файл с Telegram канала.
Поскольку нам нужно управлять всего тремя процессами (Django, Redis, worker), мы будем использовать Docker, чтобы упростить наш рабочий процесс, подключив их так, чтобы все они могли запускаться из одного окна терминала с помощью одной команды.
Из корня проекта создайте образы и запустите Docker контейнеры:
docker-compose up -d --build
После завершения сборки перейдите по адресу http://localhost: 1337:

Убедитесь, что тесты тоже проходят:
$ docker-compose exec web python -m pytest ========================= test session starts ======================== platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1 django: settings: core.settings (from ini) rootdir: /usr/src/app, configfile: pytest.ini plugins: django-4.4.0 collected 1 item tests/test_tasks.py . [100%] ========================= 1 passed in 0.63s ===========================
Взгляните на структуру проекта, прежде чем двигаться дальше:
├── docker-compose.yml
└── project
├── Dockerfile
├── core
│ ├── __init__.py
│ ├── asgi.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── entrypoint.sh
├── manage.py
├── pytest.ini
├── requirements.txt
├── static
│ ├── bulma.min.css
│ ├── jquery-3.4.1.min.js
│ ├── main.css
│ └── main.js
├── tasks
│ ├── __init__.py
│ ├── apps.py
│ ├── migrations
│ │ └── __init__.py
│ ├── templates
│ │ └── home.html
│ └── views.py
└── tests
├── __init__.py
└── test_tasks.py
Запуск задачи
Настроен обработчик событий в project/static/main.js, который прослушивает нажатие кнопки. При нажатии на сервер отправляется запрос AJAX POST с соответствующим типом задачи: 1, 2 или 3.
$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
На стороне сервера вьюшка уже настроено для обработки запроса в project/tasks/views.py:
@csrf_exempt
def run_task(request):
if request.POST:
task_type = request.POST.get("type")
return JsonResponse({"task_type": task_type}, status=202)
А теперь самое интересное: подключение Celery!
Настройка Celery
Начните с добавления Celery и Redis в файл project/requirements.txt:
celery==4.4.7 Django==3.2.4 redis==3.5.3 pytest==6.2.4 pytest-django==4.4.0
Celery использует брокера сообщений - RabbitMQ, Redis или AWS Simple Queue Service (SQS) - для облегчения взаимодействия между Celery воркер и веб-приложением. Сообщения добавляются к брокеру, которые затем обрабатываются работником (-ами). После этого результаты добавляются в серверную часть.
Redis будет использоваться и как брокер, и как бэкэнд. Добавьте Redis и Celery воркер в файл docker-compose.yml следующим образом:
version: '3.8'
services:
web:
build: ./project
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ./project:/usr/src/app/
ports:
- 1337:8000
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- redis
celery:
build: ./project
command: celery worker --app=core --loglevel=info
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
redis:
image: redis:6-alpine
Обратите внимание на работника сельдерея celery worker --app=core --loglevel=info:
celery workerиспользуется для запуска celery worker--app=coreзапускаетcoreприложение Celery (которое мы вскоре определим)--loglevel=infoустанавливает уровень ведения логов как info
В settings.py проекта добавьте следующее внизу, чтобы указать Celery использовать Redis в качестве брокера и бэкэнда:
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
Затем создайте новый файл с именем sample_tasks.py в project/tasks:
# project/tasks/sample_tasks.py
import time
from celery import shared_task
@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True
Здесь, используя декоратор shared_task, мы определили новую функцию задачи Celery с именем create_task.
Имейте в виду, что сама задача не будет выполняться из процесса Django; она будет выполнена работником Celery (Celery worker).
Теперь добавьте файл celery.py в project/core:
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
Что тут происходит?
- Сначала мы устанавливаем значение по умолчанию для переменной среды
DJANGO_SETTINGS_MODULE, чтобы Celery знал, как найти Django проект. - Затем мы создали новый экземпляр Celery с именем
coreи присвоили значение переменнойapp. - Затем мы загрузили значения конфигурации Celery из объекта настроек из
django.conf. Мы использовалиnamespace="CELERY", чтобы предотвратить конфликты с другими настройками Django. Другими словами, все настройки конфигурации для Celery должны иметь префиксCELERY_. - Наконец,
app.autodiscover_tasks()указывает Celery искать задачи Celery из приложений, определенных вsettings.INSTALLED_APPS.
Обновите project/core/ __ init__.py, чтобы приложение Celery автоматически импортировалось при запуске Django:
from .celery import app as celery_app
__all__ = ("celery_app",)
Запуск задачи
Обновите вьюшку, чтобы запустить задачу и ответить идентификатором:
@csrf_exempt
def run_task(request):
if request.POST:
task_type = request.POST.get("type")
task = create_task.delay(int(task_type))
return JsonResponse({"task_id": task.id}, status=202)
Не забудьте импортировать задачу:
from tasks.sample_tasks import create_task
Создайте образы и запустите новые контейнеры:
docker-compose up -d --build
Чтобы запустить новую задачу, используйте:
curl -F type=0 http://localhost:1337/tasks/
Вы должны увидеть что-то вроде:
{
"task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}
Статус задачи
Вернитесь к обработчику событий на стороне клиента:
$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
Когда ответ приходит от запроса AJAX, мы продолжаем вызывать getStatus() с id задачи каждую секунду:
function getStatus(taskID) {
$.ajax({
url: `/tasks/${taskID}/`,
method: 'GET'
})
.done((res) => {
const html = `
<tr>
<td>${res.task_id}</td>
<td>${res.task_status}</td>
<td>${res.task_result}</td>
</tr>`
$('#tasks').prepend(html);
const taskStatus = res.task_status;
if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
setTimeout(function() {
getStatus(res.task_id);
}, 1000);
})
.fail((err) => {
console.log(err)
});
}
Если ответ успешен, в таблицу в DOM добавляется новая строка.
Обновите вьюшкуget_status, чтобы вернуть статус:
@csrf_exempt
def get_status(request, task_id):
task_result = AsyncResult(task_id)
result = {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result
}
return JsonResponse(result, status=200)
Импортируете AsyncResult:
from celery.result import AsyncResult
Обновите контейнеры:
docker-compose up -d --build
Запускаете новую задачу:
curl -F type=1 http://localhost:1337/tasks/
Затем возьмите task_id из ответа и вызовите обновленную конечную точку, чтобы просмотреть статус:
$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/
{
"task_id": "25278457-0957-4b0b-b1da-2600525f812f",
"task_status": "SUCCESS",
"task_result": true
}
Проверьте это в браузере:

Celery логи
Обновите celery service в docker-compose.yml, чтобы логи Celery выгружались в файл логов:
celery:
build: ./project
command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
Добавьте новый директорию в project под названием logs. Затем добавьте новый файл с именем celery.log в этот созданный каталог.
Обновите:
docker-compose up -d --build
Вы должны увидеть, как файл логов заполняется локально, так как мы настроили волyмы (volume):
Connected to redis://redis:6379/0 mingle: searching for neighbors mingle: all alone ... succeeded in 10.023200300012832s: True
Flower панель
Flower это легкий веб-инструмент для мониторинга Celery в режиме реального времени. Вы можете отслеживать текущие выполняемые задачи, увеличивать или уменьшать пул рабочих, просматривать графики и статистику, и многие другие.
Добавьте его в файл requirements.txt:
celery==4.4.7 Django==3.2.4 flower==0.9.7 redis==3.5.3 pytest==6.2.4 pytest-django==4.4.0
Затем добавьте новый service в docker-compose.yml:
dashboard:
build: ./project
command: flower -A core --port=5555 --broker=redis://redis:6379/0
ports:
- 5555:5555
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
- celery
Проверьте, работает ли:
docker-compose up -d --build
Перейдите по адресу http://localhost:5555, чтобы просмотреть панель управления. Вы должны увидеть одного worker-a, готового к работе:

Начните еще несколько задач, чтобы полностью протестировать панель:


Попробуйте добавить еще несколько wroker-ов, чтобы увидеть, как это повлияет:
docker-compose up -d --build --scale celery=3
Тесты
Начнем с самого простого теста:
def test_task():
assert sample_tasks.create_task.run(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run(3)
Добавьте приведенный выше тестовый пример в project/tests /test_tasks.py, а затем добавьте следующий импорт:
from tasks import sample_tasks
Выполните этот тест индивидуально:
docker-compose exec web python -m pytest -k "test_task and not test_home"
Выполнение должно занять около минуты:
======================== test session starts ========================= platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1 django: settings: core.settings (from ini) rootdir: /usr/src/app, configfile: pytest.ini plugins: django-4.4.0, celery-4.4.7 collected 2 items / 1 deselected / 1 selected tests/test_tasks.py . [100%] ============= 1 passed, 1 deselected in 60.69s (0:01:00) =============
Стоит отметить, что в приведенных выше примерах мы использовали метод .run (а не .delay) для запуска задачи напрямую без воркера Celery.
Хотите использовать методом .run, чтобы ускорить процесс?
@patch("tasks.sample_tasks.create_task.run")
def test_mock_task(mock_run):
assert sample_tasks.create_task.run(1)
sample_tasks.create_task.run.assert_called_once_with(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run.call_count == 2
assert sample_tasks.create_task.run(3)
assert sample_tasks.create_task.run.call_count == 3
Импортируете:
from unittest.mock import patch
Протестируйте :
$ docker-compose exec web python -m pytest -k "test_mock_task" ===================== test session starts ======================== platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1 django: settings: core.settings (from ini) rootdir: /usr/src/app, configfile: pytest.ini plugins: django-4.4.0, celery-4.4.7 collected 3 items / 2 deselected / 1 selected tests/test_tasks.py . [100%] ================= 1 passed, 2 deselected in 0.64s =================
Намного быстрее!
Как насчет полного интеграционного теста?
def test_task_status(client):
response = client.post(reverse("run_task"), {"type": 0})
content = json.loads(response.content)
task_id = content["task_id"]
assert response.status_code == 202
assert task_id
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
assert response.status_code == 200
while content["task_status"] == "PENDING":
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}
Имейте в виду, что в этом тесте используются тот же брокер и бэкэнд, что и при разработке. Вы можете создать экземпляр нового приложения Celery для тестирования:
app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)
Добавьте импорт:
import json
Убедитесь, что тест прошел.
Выводы
Это было базовое руководство по настройке Celery для выполнения длительных задач в приложении Django. Вы должны позволить очереди обрабатывать любые процессы, которые могут блокировать или замедлять выполнение кода, обращенного к пользователю.
Celery также можно использовать для выполнения повторяемых задач и разделения сложных, ресурсоемких задач, чтобы вычислительная нагрузка могла быть распределена между несколькими машинами, чтобы сократить: время до завершения и нагрузку на машинy которая обрабатывает запросы клиентов.