Эксперимент для исследования различных форматов хранения данных.

Эксперимент для исследования различных форматов хранения данных.

@bigdatai

Перед нами встала задача проведения эксперимента для исследования различных форматов хранения. Для этого нами были выбраны наиболее популярные форматы, которые ко всему прочему можно открыть в знакомой всем Python библиотеке Pandas (версии 1.5.1):

  • CSV – или если перевести аббревиатуру «Comma-Separated Values», как значения, разделенные запятыми. Один из наиболее популярных простых форматов и предназначен для представления табличных данных.
  • Orc – сама аббревиатура Optimized Row Columnar говорит о том, что это оптимизированный строково-столбчатый формат хранения. Был специально разработан, чтобы преодолеть ограничения других возможных форматов хранения.
  • Parquet – формат данных, который хранит вложенные структуры данных в плоском столбчатом формате. По сравнению с другими строчными подходами, более эффективен в плане хранения и производительности.

Перед началом проведения эксперимента нами были выделены наиболее, на наш взгляд, важные аспекты, которые помогут определить лучший формат хранения:

  1. Скорость чтения файла
  2. Скорость записи файла
  3. Занимаемый объем сохраненного файла

Исследуемые датасеты были сгенерированы с помощью библиотеки SDV. Полученные нами данные имели ровно те же свойства, которые есть у оригинального набора данных. Этот пакет генерирует объемы информации с применением математических методов и ML моделей. К слову, SDV может обрабатывать данные, если они содержат в себе несколько разных типов, а также пустые значения. Всего было сгенерировано несколько датасетов:

  • Файл размером ±1 Гб, где размерность составляет (7933070, 15)
  • Файл размером ±10 Гб, где размерность составляет (66637788, 15).

(X, Y), где X — количество строк, Y — количество столбцов.

Этого исследуемого набора хватило, чтобы провести сравнительный анализ по скорости чтения и скорости записи файлов. Ссылка на библиотеку SDV: ссылка.

Как уже было сказано ранее, для эксперимента используется Python библиотека Pandas версии 1.5.1. В этой версии есть все необходимые методы для чтения и записи файлов в необходимых для нас форматах, ниже приведена таблица с используемыми методами:

Чтобы измерить скорость чтения и скорость записи будем использовать магическую команду %%time, которая показывает время выполнения конкретной ячейки.

Весь эксперимент разбили на два этапа, где первый этап — это работа с набором данных, содержащий 8 млн строк, а второй этап уже с датасетом, содержащий 66,5 млн строк.

Суть эксперимента в каждом этапе идентична и была разделена на несколько шагов:

  1. Считывание исходного csv файла:

Например,

df = pd.read_csv('df_1gb.csv')
  1. Сохранение исходного набора данных в форматы orc и parquet, используя между сохранением перезагрузку ядра, чтобы оценить абсолютную скорость записи:

Например,

df.to_orc('df_1gb_orc.orc')
df.to_parquet('df_1gb_parquet.parquet', engine='pyarrow')

3. Считывание данных во всех форматах, используя между чтением перезагрузку ядра, чтобы аналогично как в шаге 2 оценить абсолютную скорость чтения:

df = pd.read_csv('df_1gb.csv')
df = pd.read_orc('df_1gb_orc.orc')
df = pd.read_parquet('df_1gb_parquet.parquet')
  1. Анализ полученных результатов.
  2. Формирование результатов.

Результаты:

Нами были проанализированы все полученные в ходе эксперимента результаты и собраны в итоговую таблицу, которую можно увидеть ниже:

Хочется также отметить, что все результаты проводились на компьютере MacBook Air (2020) с чипом M1 и 8 Гб ОЗУ.

В дальнейшем хотелось бы повторить эксперимент на разных по характеристикам вычислительных машинах.

Поговорим о Apache Parquet на Python с Apache Arrow

Дизайн: высокопроизводительные колоночные данные в Python.


C++ библиотеки Apache Arrow и Parquet являются вспомогательными технологиями, которые изначально проектировались нами для согласованной совместной работы.


  • Библиотеки C++ Arrow обеспечивают управление памятью, эффективный ввод/вывод (файлы, memory map, HDFS), контейнеры колоночных массивов в памяти и чрезвычайно быстрый обмен сообщениями (IPC/RPC). Я подробнее коснусь слоя обмена сообщениями Arrow в другой статье.
  • Библиотеки C++ Parquet отвечают за кодирование и декодирование файлового формата Parquet. Мы реализовали libparquet_arrow — библиотеку, которая обрабатывает транзит между данными в памяти Arrow и низкоуровневыми инструментами чтения/записи Parquet.
  • PyArrow предоставляет Python интерфейс для всего этого и обрабатывает быстрые преобразования в pandas.DataFrame.


Одной из основных целей Apache Arrow является создание эффективного межоперационного уровня транспортировки колоночной памяти.


Вы можете почитать о пользовательском API Parquet в кодовой базе PyArrow. Библиотеки доступны в conda-forge по адресу:


conda install pyarrow arrow-cpp parquet-cpp -c conda-forge


Бенчмарки: PyArrow и fastparquet


Чтобы получить представление о производительности PyArrow, я сгенерировал набор данных объемом 512 мегабайт с числовыми данными, которые демонстрируют различные варианты использования Parquet. Я сгенерировал два варианта наборов данных:


  • С высокой энтропией: все значения данных в файле (за исключением нулевых значений) различны. Этот набор данных весит 469 МБ.
  • С низкой энтропией: данные демонстрируют высокую степень повторения. Эти данные кодируются и сжимаются до весьма небольшого размера: всего 23 МБ посредством сжатия Snappy. Если вы создадите файл со словарной кодировкой, он получится еще меньше. Поскольку декодирование таких файлов имеет ограничение по процессору, нежели по операциям ввода-вывода, обычно можно ожидать более высокую пропускную способность для файлов данных с низкой энтропией.


Я создал эти файлы в трех основных используемых стилях сжатия: несжатые, snappy и gzip. Затем я вычисляю физическое время, необходимое для получения pandas DataFrame с диска.


fastparquet — это более новая реализация программы чтения/записи файлов Parquet для пользователей Python, созданная для использования в проекте Dask. Она реализована на Python и использует компилятор Numba Python-to-LLVM для ускорения процедур декодирования Parquet. Я также установил ее, чтобы сравнить с альтернативными реализациями.


Код для чтения файла в качестве pandas.DataFrame аналогичен:


# PyArrow
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()

# fastparquet
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()


Зеленые столбцы соответствуют времени PyArrow: более длинные столбцы указывают на более высокую производительность/более высокую пропускную способность данных. Аппаратное обеспечение — ноутбук Xeon E3-1505.


Я обновлял эти бенчмарки 1 февраля 2017 года в соответствии с последними кодовыми базами.




Состояние разработки


Нам нужна помощь с виндовыми сборками и упаковкой. Кроме того, поддержание пакетов conda-forge в актуальном состоянии занимает очень много времени. И конечно, мы ищем разработчиков как на C++, так и на Python, для контрибуций в кодовую базу вообще в целом.


До сих пор мы уделяли особое внимание качественной реализации файлового формата с высокой производительностью чтения и записи простых наборов данных. Мы начинаем переходить к обработке вложенных JSON-подобных данных в parquet-cpp, используя Arrow в качестве контейнера для вложенных колоночных данных.


Недавно Уве Корн реализовал поддержку List Arrow в преобразованиях в pandas:


In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])

In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
  [1,
   2,
   3],
  NA,
  [1,
   2],
  [],
  [4]
]

In [11]: arr.type
Out[11]: DataType(list<item: int64>)

In [12]: t = pa.Table.from_arrays([arr], ['col'])

In [13]: t.to_pandas()
Out[13]:
         col
0  [1, 2, 3]
1       None
2     [1, 2]
3         []
4        [4]


Код бенчмарка


import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)

    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan

    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize

    datagen_func = DATA_GENERATORS[dtype]

    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    if compression == 'UNCOMPRESSED':
        compression = None
    pq.write_table(arrow_table, out_path, use_dictionary=False,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16

cases = {
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000
    }
}

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    return elapsed

NITER = 5

results = []

readers = [
    ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
]

case_files = {}

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = '{0}_{1}.parquet'.format(case, compression)
        df = generate_data(DATA_SIZE, NCOLS, **params)
        write_to_parquet(df, path, compression=compression)
        df = None
        case_files[case, compression] = path

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = case_files[case, compression]

        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)

        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)


Вывод:

Исходя из полученных результатов, можно сделать вывод, что формат хранения данных parquet является наиболее оптимальным вариантом для хранения больших объемов данных, так как он позволяет быстро считать данные и занимает мало места, нежели его аналоги csv и orc.

Спасибо за внимание, надеемся, что данная публикация была полезной для Вас!



Report Page