Асинхронные задачи с Django и Celery

Асинхронные задачи с Django и Celery

S0mebody


Если длительный процесс является частью рабочего процесса вашего приложения, а не блокирует ответ, вы должны обрабатывать его в фоновом режиме, вне обычного потока запросов/ответов.


Возможно, ваше веб-приложение требует, чтобы пользователи отправили изображение(который, вероятно, потребуется изменить размер) и подтвердили свой адрес электронной почты при регистрации. Если ваше приложение обработало изображение и отправило электронное письмо с подтверждением непосредственно в обработчике запросов, то конечному пользователю придется без необходимости ждать, пока они оба закончат обработку, прежде чем страница загрузится или обновится. Вместо этого вы захотите передать эти процессы в очередь задач и позволить отдельному рабочему процессу обрабатывать их, чтобы вы могли немедленно отправить ответ обратно клиенту. Затем конечный пользователь может делать другие вещи на стороне клиента, пока выполняется обработка. Ваше приложение также может свободно отвечать на запросы других пользователей и клиентов.


Чтобы добиться этого, я проведу вас через процесс настройки Celery и Redis для обработки длительных процессов в Django приложении. Мы также будем использовать Docker и Docker Compose, чтобы связать все вместе. Наконец, мы рассмотрим, как тестировать задачи Celery с помощью модульных и интеграционных тестов.


Рабочий процесс

Наша цель разработать приложение Django, которое работает вместе с Celery для обработки длительных процессов вне обычного цикла запроса/ответа.


  1. Конечный пользователь запускает новую задачу с помощью запроса POST на стороне сервера.
  2. В вьюшке задача добавляется в очередь, а id задачи отправляется обратно на клиентскую сторону.
  3. Используя AJAX, клиент продолжает опрашивать сервер, чтобы проверить статус задачи, пока сама задача выполняется в фоновом режиме.


Настройка проекта

Загрузите zip-файл с Telegram канала.


Поскольку нам нужно управлять всего тремя процессами (Django, Redis, worker), мы будем использовать Docker, чтобы упростить наш рабочий процесс, подключив их так, чтобы все они могли запускаться из одного окна терминала с помощью одной команды.


Из корня проекта создайте образы и запустите Docker контейнеры:

docker-compose up -d --build

После завершения сборки перейдите по адресу http://localhost: 1337:

Убедитесь, что тесты тоже проходят:

$ docker-compose exec web python -m pytest

========================= test session starts ========================

platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, configfile: pytest.ini
plugins: django-4.4.0
collected 1 item

tests/test_tasks.py .                                                       [100%]

========================= 1 passed in 0.63s ===========================

Взгляните на структуру проекта, прежде чем двигаться дальше:

├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── pytest.ini
    ├── requirements.txt
    ├── static
    │   ├── bulma.min.css
    │   ├── jquery-3.4.1.min.js
    │   ├── main.css
    │   └── main.js
    ├── tasks
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── templates
    │   │   └── home.html
    │   └── views.py
    └── tests
        ├── __init__.py
        └── test_tasks.py


Запуск задачи

Настроен обработчик событий в project/static/main.js, который прослушивает нажатие кнопки. При нажатии на сервер отправляется запрос AJAX POST с соответствующим типом задачи: 1, 2 или 3.

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

На стороне сервера вьюшка уже настроено для обработки запроса в project/tasks/views.py:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        return JsonResponse({"task_type": task_type}, status=202)

А теперь самое интересное: подключение Celery!


Настройка Celery

Начните с добавления Celery и Redis в файл project/requirements.txt:

celery==4.4.7
Django==3.2.4
redis==3.5.3

pytest==6.2.4
pytest-django==4.4.0

Celery использует брокера сообщений - RabbitMQ, Redis или AWS Simple Queue Service (SQS) - для облегчения взаимодействия между Celery воркер и веб-приложением. Сообщения добавляются к брокеру, которые затем обрабатываются работником (-ами). После этого результаты добавляются в серверную часть.


Redis будет использоваться и как брокер, и как бэкэнд. Добавьте Redis и Celery воркер в файл docker-compose.yml следующим образом:

version: '3.8'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./project
    command: celery worker --app=core --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:6-alpine

Обратите внимание на работника сельдерея celery worker --app=core --loglevel=info:

  1. celery worker используется для запуска celery worker
  2. --app=core запускает core приложение Celery (которое мы вскоре определим)
  3. --loglevel=info устанавливает уровень ведения логов как info

В settings.py проекта добавьте следующее внизу, чтобы указать Celery использовать Redis в качестве брокера и бэкэнда:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

Затем создайте новый файл с именем sample_tasks.py в project/tasks:

# project/tasks/sample_tasks.py

import time

from celery import shared_task


@shared_task
def create_task(task_type):
    time.sleep(int(task_type) * 10)
    return True

Здесь, используя декоратор shared_task, мы определили новую функцию задачи Celery с именем create_task.


Имейте в виду, что сама задача не будет выполняться из процесса Django; она будет выполнена работником Celery (Celery worker).


Теперь добавьте файл celery.py в project/core:

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

Что тут происходит?

  1. Сначала мы устанавливаем значение по умолчанию для переменной среды DJANGO_SETTINGS_MODULE, чтобы Celery знал, как найти Django проект.
  2. Затем мы создали новый экземпляр Celery с именем core и присвоили значение переменной app.
  3. Затем мы загрузили значения конфигурации Celery из объекта настроек из django.conf. Мы использовали namespace="CELERY", чтобы предотвратить конфликты с другими настройками Django. Другими словами, все настройки конфигурации для Celery должны иметь префикс CELERY_.
  4. Наконец, app.autodiscover_tasks() указывает Celery искать задачи Celery из приложений, определенных в settings.INSTALLED_APPS.

Обновите project/core/ __ init__.py, чтобы приложение Celery автоматически импортировалось при запуске Django:

from .celery import app as celery_app


__all__ = ("celery_app",)


Запуск задачи

Обновите вьюшку, чтобы запустить задачу и ответить идентификатором:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

Не забудьте импортировать задачу:

from tasks.sample_tasks import create_task

Создайте образы и запустите новые контейнеры:

docker-compose up -d --build

Чтобы запустить новую задачу, используйте:

curl -F type=0 http://localhost:1337/tasks/

Вы должны увидеть что-то вроде:

{
  "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}


Статус задачи

Вернитесь к обработчику событий на стороне клиента:

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

Когда ответ приходит от запроса AJAX, мы продолжаем вызывать getStatus() с id задачи каждую секунду:

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}/`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.task_id}</td>
        <td>${res.task_status}</td>
        <td>${res.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);

    const taskStatus = res.task_status;

    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err)
  });
}

Если ответ успешен, в таблицу в DOM добавляется новая строка.


Обновите вьюшкуget_status, чтобы вернуть статус:

@csrf_exempt
def get_status(request, task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JsonResponse(result, status=200)

Импортируете AsyncResult:

from celery.result import AsyncResult

Обновите контейнеры:

docker-compose up -d --build

Запускаете новую задачу:

curl -F type=1 http://localhost:1337/tasks/

Затем возьмите task_id из ответа и вызовите обновленную конечную точку, чтобы просмотреть статус:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/

{
    "task_id": "25278457-0957-4b0b-b1da-2600525f812f",
    "task_status": "SUCCESS",
    "task_result": true
}

Проверьте это в браузере:


Celery логи 

Обновите celery service в docker-compose.yml, чтобы логи Celery выгружались в файл логов:

celery:
  build: ./project
  command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

Добавьте новый директорию в project под названием logs. Затем добавьте новый файл с именем celery.log в этот созданный каталог.


Обновите:

docker-compose up -d --build

Вы должны увидеть, как файл логов заполняется локально, так как мы настроили волyмы (volume):

Connected to redis://redis:6379/0
mingle: searching for neighbors
mingle: all alone
...
succeeded in 10.023200300012832s: True


Flower панель

Flower это легкий веб-инструмент для мониторинга Celery в режиме реального времени. Вы можете отслеживать текущие выполняемые задачи, увеличивать или уменьшать пул рабочих, просматривать графики и статистику, и многие другие.


Добавьте его в файл requirements.txt:

celery==4.4.7
Django==3.2.4
flower==0.9.7
redis==3.5.3

pytest==6.2.4
pytest-django==4.4.0

Затем добавьте новый service в docker-compose.yml:

dashboard:
  build: ./project
  command:  flower -A core --port=5555 --broker=redis://redis:6379/0
  ports:
    - 5555:5555
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - celery

Проверьте, работает ли:

docker-compose up -d --build

Перейдите по адресу http://localhost:5555, чтобы просмотреть панель управления. Вы должны увидеть одного worker-a, готового к работе:

Начните еще несколько задач, чтобы полностью протестировать панель:

Попробуйте добавить еще несколько wroker-ов, чтобы увидеть, как это повлияет:

docker-compose up -d --build --scale celery=3


Тесты

Начнем с самого простого теста:

def test_task():
    assert sample_tasks.create_task.run(1)
    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run(3)

Добавьте приведенный выше тестовый пример в project/tests /test_tasks.py, а затем добавьте следующий импорт:

from tasks import sample_tasks

Выполните этот тест индивидуально:

docker-compose exec web python -m pytest -k "test_task and not test_home"

Выполнение должно занять около минуты:

======================== test session starts =========================
platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, configfile: pytest.ini
plugins: django-4.4.0, celery-4.4.7
collected 2 items / 1 deselected / 1 selected

tests/test_tasks.py .                                                       [100%]

============= 1 passed, 1 deselected in 60.69s (0:01:00) =============

Стоит отметить, что в приведенных выше примерах мы использовали метод .run (а не .delay) для запуска задачи напрямую без воркера Celery.


Хотите использовать методом .run, чтобы ускорить процесс?

@patch("tasks.sample_tasks.create_task.run")
def test_mock_task(mock_run):
    assert sample_tasks.create_task.run(1)
    sample_tasks.create_task.run.assert_called_once_with(1)

    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run.call_count == 2

    assert sample_tasks.create_task.run(3)
    assert sample_tasks.create_task.run.call_count == 3

Импортируете:

from unittest.mock import patch

Протестируйте :

$ docker-compose exec web python -m pytest -k "test_mock_task"

===================== test session starts ========================
platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, configfile: pytest.ini
plugins: django-4.4.0, celery-4.4.7
collected 3 items / 2 deselected / 1 selected

tests/test_tasks.py .                                                       [100%]

================= 1 passed, 2 deselected in 0.64s =================

Намного быстрее!


Как насчет полного интеграционного теста?

def test_task_status(client):
    response = client.post(reverse("run_task"), {"type": 0})
    content = json.loads(response.content)
    task_id = content["task_id"]
    assert response.status_code == 202
    assert task_id

    response = client.get(reverse("get_status", args=[task_id]))
    content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = client.get(reverse("get_status", args=[task_id]))
        content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

Имейте в виду, что в этом тесте используются тот же брокер и бэкэнд, что и при разработке. Вы можете создать экземпляр нового приложения Celery для тестирования:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)

Добавьте импорт:

import json

Убедитесь, что тест прошел.


Выводы

Это было базовое руководство по настройке Celery для выполнения длительных задач в приложении Django. Вы должны позволить очереди обрабатывать любые процессы, которые могут блокировать или замедлять выполнение кода, обращенного к пользователю.


Celery также можно использовать для выполнения повторяемых задач и разделения сложных, ресурсоемких задач, чтобы вычислительная нагрузка могла быть распределена между несколькими машинами, чтобы сократить: время до завершения и нагрузку на машинy которая обрабатывает запросы клиентов.

Report Page