Пример использования QDL

Описание задачи

Допустим, у нас имеется таблица, партиционированная по диапазонам дат. Пусть каждый диапазон охватывает один год. В качестве примера возьмем таблицу, у которой три поля c типами timestamp, int и text. Пусть на данный момент имеется две партиции с данными за 2019 и 2020 годы. Предположим, необходимо максимально быстро и эффективно загрузить данные за 2021 год. Для этих целей можно использовать утилиту QDL (Quantum Direct Loader). Утилита позволяет на основе заданного описания структуры таблицы и соответствующих данных в CSV-файле сформировать в многопоточном режиме файл таблицы, минуя обычные механизмы базы данных. Полученный файл может быть скопирован в качестве файла таблицы в директорию базы данных.

Установка модуля, ссылки на документацию

QDL - формально совершенно отдельный модуль, который можно поставить даже независимо от QHB (хотя при этом все-таки подразумевается, что база должна уже быть установлена и доступна).

Описание модуля в документации представлено в разделе Модуль прямой загрузки данных QDL, но далее будут подробно рассмотрены все типовые шаги использования инструмента на основе законченного примера.

Формирование данных тестовой таблицы

-- Партиционированная таблица qdl_test
DROP TABLE IF EXISTS qdl_test;
CREATE TABLE qdl_test (
    timestamp_value timestamp not null,
    int_value       int not null,
    text_value      text
) PARTITION BY RANGE (timestamp_value);

-- Тестовые данные за 2019 год
DROP TABLE IF EXISTS qdl_test_y2019;
CREATE TABLE qdl_test_y2019 PARTITION OF qdl_test
   FOR VALUES FROM ('2019-01-01') TO ('2020-01-01');
   
INSERT INTO qdl_test_y2019
SELECT to_timestamp(extract('epoch' FROM to_timestamp('2019-01-01','yyyy-mm-dd')) + random()*60*60*24*365)::timestamp AS timestamp_value,
       round(random()*1000000)::int AS int_value, 
       substr(md5(random()::text), 0, 30) AS text_value 
  FROM generate_series(1,1000);    

-- Тестовые данные за 2020 год
DROP TABLE IF EXISTS qdl_test_y2020;
CREATE TABLE qdl_test_y2020 PARTITION OF qdl_test
   FOR VALUES FROM ('2020-01-01') TO ('2021-01-01');
   
INSERT INTO qdl_test_y2020
SELECT to_timestamp(extract('epoch' from to_timestamp('2020-01-01','yyyy-mm-dd')) + random()*60*60*24*365)::timestamp AS timestamp_value,
       round(random()*1000000)::int AS int_value, 
       substr(md5(random()::text), 0, 30) AS text_value 
  FROM generate_series(1,1000);

Генерация загружаемых из CSV-файла данных

В целях тестирования сгенерируем данные за 2021 год в файле /tmp/qdl_data.csv, используя возможности команды COPY. В качестве символа разделителя выберем точку с запятой. Этот же символ необходимо будет использовать в файле конфигурации QDL.

Примечание
При работе QDL подразумевается, что первая строка должна содержать заголовок строки, поэтому необходимо добавить параметр header со значением true для формата CSV. Иначе первая строка с данными будет пропущена вместо заголовка и не попадет в загружаемую таблицу.

COPY (SELECT to_timestamp(extract('epoch' from to_timestamp('2021-01-01','yyyy-mm-dd')) + random()*60*60*24*365)::timestamp AS timestamp_value
           , round(random()*1000000) AS int_value
           , substr(md5(random()::text), 0, 30) AS text_value 
        FROM generate_series(1,1000)) 
  TO '/tmp/qdl_data.csv' 
  WITH (format csv, delimiter ';', header true);

Создание конфигурационного файла /tmp/config.yml, содержащего описание загружаемой таблицы для QDL

Следует отметить, что на первом этапе параметр oid в секции output не играет роли, поэтому его значение можно оставить нулевым Важными здесь являются параметры кодировки, символа-разделителя, описание таблицы и ее полей. Если поле может принимать пустое значение, после наименования типа нужно добавить ключевое слово [nullable].

# Конфигурация системы:
general:
    # Количество потоков, выделяемых для парсинга и сериализации строк.
    # Следует обратить внимание, что помимо этих потоков обязательно выделяются ещё два:
    # поток чтения и поток записи в файл.
    threads: 2

# Конфигурация исходных данных:
input:
    # Кодировка:
    encoding: "utf8"
    # Разделитель столбцов для CSV:
    delimiter: ";"

# Конфигурация получаемых данных:
output:
    # Object IDentifier. Определяет целочисленный идентификатор для группы файлов целевой таблицы. 
    # Не важен до выполнения этапа создания таблицы
    oid: 0

# Конфигурация целевой таблицы:
table:
    # Имя таблицы. Требуется для `generate_sql`, чтобы задать имя создаваемой таблицы.
    name: "qdl_test_y2021"
    # Колонки в формате "имя": "тип"
    fields:
        timestamp_value : timestamp
        int_value : integer
        text_value : text [nullable]

Проверка целостности описания таблицы и данных в CSV-файле

Это опциональный шаг, но его желательно выполнить для того, чтобы не терять результаты, быть может, многочасовой работы в процессе формирования данных таблицы, если вдруг формат данных в какой-то строке окажется неверным.

Следующая команда проверяет логическую целостность CSV-файла и согласованность структуры данных с файлом конфигурации, сигнализируя об ошибках соответствующим кодом возврата.

/usr/bin/qdl validate --config /tmp/config.yml --data /tmp/qdl_data.csv --verbose

В случае успешной проверки сообщение, похожее на следующее:

[20210319_140056][qdl::validate_csv][DEBUG] /tmp/qdl_data.csv is valid csv file

Получение скрипта для создания таблицы и вывода значения OID

/usr/bin/qdl create_table --config /tmp/config.yml --data /tmp/qdl_data.csv > /tmp/table_script.sql

Вывод результата работы команды выглядит следующим образом:

create table qdl_test_y2021("timestamp_value" timestamp NOT NULL, "int_value" integer NOT NULL, "text_value" text);

BEGIN TRANSACTION;
insert into qdl_test_y2021("timestamp_value", "int_value", "text_value")
values ('2021-01-22 18:48:32.683962', 135563, '5eb8ab78966be8bf2048e9a4bd83a');
insert into qdl_test_y2021("timestamp_value", "int_value", "text_value")
values ('2021-03-08 22:54:53.465401', 899737, '21926559632ba26460cc2eafcf3c7');
insert into qdl_test_y2021("timestamp_value", "int_value", "text_value")
values ('2021-02-02 16:45:35.763331', 742323, '37ea89be376354fe2f0797aa8b471');
COMMIT TRANSACTION;
checkpoint;
SELECT pg_relation_filepath('qdl_test_y2021');

Скрипт нужно выполнить в базе данных. Если имеются реплики, команды выполнятся там автоматически. В данный скрипт попадают команды INSERT только для первых трех строк загружаемых данных. Впоследствии мы заменим данные этой таблицы на те, которые будут сформированы утилитой QDL, поэтому вставка этих трех строк в итоге ни на что не повлияет. Последний запрос скрипта выведет путь к таблице относительно каталога кластера баз данных.

Формирование файла таблицы

Это основная и самая ресурсоемкая операция. Важно выбрать эффективное значение для количества параллельных потоков (параметр threads в секции general), участвующих в формировании файла таблицы. Это значение будет зависеть от количества процессоров на используемой машине.

Значение OID таблицы нужно прописать в файле /tmp/config.yml в качестве значения параметра oid в секции output. Допустим, в приведенном скрипте в результате последнего запроса функция pg_relation_filepath вывела значение base/13676/16463, тогда для OID таблицы пропишем значение 16463. Имя сформированного в результате файла таблицы будет соответствовать этому параметру.

В результате работы утилиты в директории --out-dir появится файл с заданным OID в качестве имени, в приведенном примере это будет файл 16463. В случае, если размер данных окажется более 1 Gb, создадутся дополнительные файлы с соответствующими расширениями (16463.1, 16463.2 и так далее).

/usr/bin/qdl insert_values --config /tmp/config.yml --data /tmp/qdl_data.csv --out-dir /tmp

Копирование полученного файла средствами OS в директорию базы данных на главном сервере и на репликах

Если обработка данных происходит непосредственно на сервере базы данных, команда копирования может выглядеть приблизительно следующим образом:

cp /tmp/16463 <путь к каталогу данных>/base/13676/16463

Данные, скопированные через команду операционной системы, не попадут через репликацию на сервера реплик. Чтобы они там оказались, необходимо также скопировать их в соответствующие директории на каждом из этих серверов. Если этого не сделать, на серверах реплик останутся те три строки, которые были внесены в таблицу на главном сервере скриптом, созданным в результате работы команды QDL create_table.

Сброс данных таблицы

Этот шаг необходим для того, чтобы закешированные данные таблицы сбросились. Если имеются реплики, необходимо выполнить приведенный запрос как на главном сервере, так и на серверах реплик.

SELECT qhb_drop_rel_cache('qdl_test_y2021');

Проверка данных таблицы

Можно убедиться, что данные доступны как на главном сервере, так и на серверах реплик, если они имеются.

select * from qdl_test_y2021 limit 10;
select count(*) from  qdl_test_y2021; -- опционально можно убедиться в соответствии количеству строк в файле

Если нужно, теперь можно построить необходимые индексы.

Примечание
Следует отметить, что загрузка данных в обход стандартных механизмов работы базы данных сказывается на том, что при восстановлении базы из бэкапа и при применении архивных журналов данные в таблицах, загруженных через QDL, не восстановятся. При выполнении массовой загрузки данных через QDL желательно сразу после этого выполнить полный бэкап базы.

Включение данных в партиционированную таблицу

Сделаем таблицу qdl_test_y2021 частью таблицы qdl_test, превратив ее в партицию и добавив ограничения по диапазону дат.

ALTER TABLE qdl_test ATTACH PARTITION qdl_test_y2021
  FOR VALUES FROM ('2021-01-01') TO ('2022-01-01');

При этом на серверах реплик уже будет иметься заранее подготовленная таблица qdl_test_y2021 и команда успешно выполнится на репликах.

Теперь при запросе данных 2021 года в таблице qdl_test используется обращение к новой партиции.

qhb=# explain select count(*) from qdl_test where timestamp_value >= to_timestamp('2021-01-01','YYYY-MM_DD');
                                           QUERY PLAN                                            
-------------------------------------------------------------------------------------------------
 Aggregate  (cost=79.79..79.80 rows=1 width=8)
   ->  Append  (cost=0.00..78.84 rows=379 width=0)
         Subplans Removed: 2
         ->  Seq Scan on qdl_test_y2021  (cost=0.00..26.95 rows=377 width=0)
               Filter: (timestamp_value >= to_timestamp('2021-01-01'::text, 'YYYY-MM_DD'::text))
(5 rows)

В плане запроса отражено обращение к партиции qdl_test_y2021.

Заключение

Таким образом, используя QDL, можно эффективно загружать в базу объемные дынные. Тестирование показывает, что общее время загрузки данных с помощью QDL и время копирования полученного файла в три с половиной раза меньше по сравнению с продолжительностью загрузки тех же данных с помощью команды COPY.

Приведенный подход может быть использован в хранилищах и витринах данных, где время загрузки новых данных является критичным показателем, а данные, как правило, поступают из других источников.