Пишем первый ML-пайплайн на Airflow: подробный туториал 2 часть

Пишем первый ML-пайплайн на Airflow: подробный туториал 2 часть


2. Предсказание модели, zero-shot classification

Для начала определим список классов - тем, на которые мы будем разделять финансовые новости:

LABELS = [
    "Crypto",
    "SEC",
    "Dividend",
    "Economics",
    "Oil or Gas",
    "IPO",
    "Politics",
    "Buffet",
    "Stock",
    "Other",
]

Для получения предсказаний загрузим модель valhalla/distilbart-mnli-12-1 из Hugging Face Hub. device=-1 означает, что модель запускается на CPU.

from transformers import pipeline

model_hf = pipeline(model="valhalla/distilbart-mnli-12-1", device=-1)

Далее загрузим csv файл с новостями, который мы подготовили в предыдущем пункте. Для предсказания будем использовать объединение текста из заголовка новости (title) и ее краткого описания (summary).

import pandas as pd

df = pd.read_csv(data_path, sep="\t")
texts_for_pred = (df.title + ". " + df.summary).tolist()

Для получения предсказаний передадим модели список текстов texts_for_pred и классы LABELS:

pred = model_hf(texts_for_pred, LABELS, multi_label=False)

Флаг multi_label определяет, может ли объект быть отнесен к одному или более классам. Когда multi_label=True, модель может присваивать объектам несколько классов одновременно.

Таким образом, мы сами придумали название классов на свое усмотрение. Модель делает предсказание по нашим классам без необходимости предварительного обучения. В этом преимущество zero-shot моделей.

Благодаря библиотеки transformers код занял всего несколько строк.

Выберем предсказание лучшего класса и сохраним результат в json-файл:

df["label"] = [x["labels"][0] for x in pred]
df.T.to_json(pred_path)

На этом код предсказания готов.

Добавим логирование и использование Click по аналогии с кодом загрузки данных. Тогда финальная версия model_predict.py будет выглядеть следующим образом:

import logging

import click
import pandas as pd
from transformers import pipeline

LABELS = [
    "Crypto",
    "SEC",
    "Dividend",
    "Economics",
    "Oil or Gas",
    "IPO",
    "Politics",
    "Buffet",
    "Stock",
    "Other",
]

logging.basicConfig(level=logging.INFO)


@click.command()
@click.option("--data_path", help="Path to the input data CSV file")
@click.option("--pred_path", help="Path to save the output JSON file")
def model_predict(data_path: str, pred_path: str) -> None:
    logging.info("Loading the model...")
    model_hf = pipeline(model="valhalla/distilbart-mnli-12-1", device=-1)
    logging.info("Model loaded successfully.")

    logging.info(f"Reading data from '{data_path}'...")
    df = pd.read_csv(data_path, sep="\t")
    logging.info("Data read successfully.")
    
    texts_for_pred = (df.title + ". " + df.summary).tolist()

    logging.info("Performing model prediction...")
    pred = model_hf(texts_for_pred, LABELS, multi_label=False)
    logging.info("Prediction completed successfully.")

    df["label"] = [x["labels"][0] for x in pred]

    logging.info(f"Saving the predictions to '{pred_path}'...")
    df.T.to_json(pred_path)
    logging.info("Predictions saved successfully.")


if __name__ == "__main__":
    model_predict()

Код предсказания модели будет также запускаться в отдельном Docker-контейнере. Поэтому аналогично предыдущему пункту мы добавили свои requirements.txt и Dockerfile.

Итак, мы подготовили код для 2 компонент нашего пайплайна: загрузки данных и получения предсказания модели. Каждый будет выполняться в отдельном Docker-контейнере. Теперь все готово, чтобы мы перешли к написанию DAG на Airflow.

3. Пишем DAG

Вспомним основные компоненты нашего пайплайна:


  1. 3 последовательные таски, первые 2 из которых должны запускаться в отдельных Docker-контейнерах.
  2. Локальная директория data, которую мы будем использовать для сохранения результатов и обмена данными между тасками.

Разберем основные шаги при написании нашего DAG:


  • Локальную директорию data нужно примонтировать к /opt/airflow/data/ - это путь к данным внутри контейнера Airflow.
from docker.types import Mount

dockerops_kwargs = {
    "mount_tmp_dir": False,
    "mounts": [
        Mount(
            source="<path_to_your_airflow-ml_repo>/data",
            target="/opt/airflow/data/",
            type="bind",
        )
    ],
    ...
}
  • Определим пути для трех типов файлов: исходные данные, предсказания и файл с результатом. Они используют специальный синтаксис Airflow {{ ds }}, который будет заменен на дату выполнения при запуске DAG.
raw_data_path = "/opt/airflow/data/raw/data__{{ ds }}.csv"
pred_data_path = "/opt/airflow/data/predict/labels__{{ ds }}.json"
result_data_path = "/opt/airflow/data/predict/result__{{ ds }}.json"
  • Создадим DAG. Декоратор dag создает DAG с названием financial_news с начальной датой сегодня (days_ago(0)) и ежедневным запуском. Функция taskflow представляет собой сам DAG и содержит задачи, формирующие пайплайн.
from airflow.decorators import dag
from airflow.utils.dates import days_ago

# Create DAG
@dag("financial_news", start_date=days_ago(0), schedule="@daily", catchup=False)
def taskflow():
  ...
  • Создадим две таски для запуска в Docker-контейнерах. Для это будем использовать DockerOperator. Здесь мы указываем имя образа (этот образ будет описан в docker-compose.yml) и команду для запуска питоновского скрипта внутри контейнера.
    # Task 1
    news_load = DockerOperator(
        task_id="news_load",
        container_name="task__news_load",
        image="data-loader:latest",
        command=f"python data_load.py --data_path {raw_data_path}",
        **dockerops_kwargs,
    )

    # Task 2
    news_label = DockerOperator(
        task_id="news_label",
        container_name="task__news_label",
        image="model-prediction:latest",
        command=f"python model_predict.py --data_path {raw_data_path} --pred_path {pred_data_path}",
        **dockerops_kwargs,
    )
  • Создадим последнюю таску, она преобразует полученные предсказания питоновским кодом. Мы будем использовать PythonOperator, который выполнит несложный скрипт группировки предсказаний.
    # Task 3
    news_by_topic = PythonOperator(
        task_id="news_by_topic",
        python_callable=aggregate_predictions,
        op_kwargs={
            "pred_data_path": pred_data_path,
            "result_data_path": result_data_path,
        },
    )
  • Установим зависимости между задачами. В нашем случае они выполняются последовательно:
news_load >> news_label >> news_by_topic
  • Наконец, создадим и настроим объект DAG в соответствии с заданными параметрами:
taskflow()

Файл с описанием DAG имеет расширение .py и лежит в директории dags. При запуске Airflow, он сканирует эту директорию (или другую настроенную директорию) в поисках файлов с определением DAG. Когда Airflow обнаруживает файл с определением DAG, он регистрирует его и делает доступным для выполнения по расписанию.

Мы закончили писать наш пайплайн, теперь перейдем к настройке и запуску Airflow.

4. Запуск Airflow с помощью Docker Compose

Для локального запуска Airflow мы будем использовать Docker Compose. Он помогает запустить Apache Airflow с минимальными усилиями, предоставляя унифицированный и изолированный способ запуска всех компонентов Airflow.

У Airflow есть отличная инструкция по запуску с помощью Docker Compose. Там же есть загрузка готового файла docker-compose.yml. Инструкция позволяет запустить Airflow в пару строк.

Файл docker-compose.yml имеет раздел services, где определены различные сервисы, которые являются частями кластера Airflow. Каждый сервис имеет свою секцию с настройками, где указывается образ Docker, команда для запуска сервиса, порты, зависимости от других сервисов и другие параметры. В частности здесь описаны сервисы для Metadata Database, Webserver и Scheduler, которые мы упомянали в разделе Знакомство с Airflow.

Модификация docker-compose.yml для запуска тасок в отдельных Docker-контейнерах

Поскольку мы усложнили настройку Airflow, когда решили запускать таски в отдельных контейнерах, будем использовать свой модифицированный docker-compose.yml. Его можно посмотреть тут.

Основные моменты, которые мы изменили для поддержки запуска тасок в Docker-контейнерах:

  • Установили пакет для поддержки работы с Docker:
_PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-docker==3.6.0
  • В список volumes добавили монтирование директории с данными и сокета Docker:
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/data/:/opt/airflow/data/
    - /var/run/docker.sock:/var/run/docker.sock
  • Ранее в директориях ml_pipeline/data_loader и ml_pipeline/model_prediction мы написали инструкции по созданию Docker-образов, которые импользуются для запуска тасок с помощью DockerOperator. Здесь мы также определяем эти сервисы:
  data-loader:
    build:
      context: ml_pipeline/data_loader
    image: data-loader
    restart: "no"

  model-prediction:
    build:
      context: ml_pipeline/model_prediction
    image: model-prediction
    restart: "no"
  • Добавим сервис docker-socket-proxy:
  # Required because of DockerOperator. For secure access and handling permissions.
  docker-socket-proxy:
    image: tecnativa/docker-socket-proxy:0.1.1
    environment:
      CONTAINERS: 1
      IMAGES: 1
      AUTH: 1
      POST: 1
    privileged: true
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock:ro
    restart: always

Запуск Airflow

Возможно, в вашем случае запуск потребует выделения больше памяти для Docker Engine.

Перед первым запуском Airflow нужно подготовить окружение.

Если вы работаете на Linux перед запуском нужно указать AIRFLOW_UID:

echo -e "AIRFLOW_UID=$(id -u)" > .env

Далее независимо от вашей ОС необходимо выполнить миграцию базы данных и создать первую учетную запись пользователя. Для этого выполните команду:

docker compose up airflow-init

Созданная учетная запись имеет логин airflow и пароль airflow.

Запуск и остановка Airflow

Для создания и запуска всех необходимых контейнеров, определенных в файле docker-compose.yml, используется команда:

docker compose up

В нашем случае также необходимо предварительно собрать образы data-loader и model-prediction, которые также указаны в файле docker-compose.yml. Поэтому модифицируем команду:

docker compose up --build

Когда вы закончите работу и захотите очистить свое окружение, выполните:

docker compose down --volumes --rmi all

После запуска Airflow продолжим работу в веб-интерфейсе.

5. Веб-интерфейс Airflow, запуск пайплайна

Пользовательский интерфейс Airflow упрощает мониторинг и запуск пайплайнов. Он доступен по адресу http://localhost:8080. На странице входа нужно ввести логин и пароль от учетной записи, в нашем случае airflow и airflow.

Страница авторизации

Запуск и мониторинг пайплайна

На домашней странице вы увидете список всех дагов, включая дефолтные от Airflow. Здесь можно найти и выбрать созданный нами DAG financial_news.

На странице нашего DAG доступна разная информация о пайплайне: графическое представление, время ближайшего запуска, логи запуска, код и многое другое.

Графическое представление нашего пайплайна

Не дожидаясь планового запуска пайплайна, запустим DAG, нажав на кнопку старта. Для отслеживания выполнения отдельных тасок, можно нажать на нужную таску и посмотреть ее логи:

Логи таски 1

Здесь мы можем видеть логи, которые добавили специально для отслеживания выполнения отдельных шагов.

После того, как выполнение пайплайна успешно завершилось, посмотрим на результаты.

Смотрим результат пайплайна. Как классифицировались новости?

Результаты выполнения пайплайна сохранились в data/predict/result__<date>.json. Изначально мы ставили задачу написать пайплайн, которые будет автоматически загружать актуальные новости из финансового мира и группировать их по заданным нами темам. Посмотрим, что у нас получилось.


Результат работы пайплайна
Таким образом, мы справились с поставленной задачей. Новости успешно загружаются и классифицируются. Напомним, что темы заданы нами произвольно, без предварительного обучения модели.


Report Page