Пример использования QDL
Общие замечания
QDL (Quantum Data Loader) — дополнительный модуль, который не входит в основную поставку QHB и устанавливается отдельно. Эта утилита позволяет на основе заданного описания структуры таблицы и соответствующих данных в CSV-файле сформировать в многопоточном режиме файл таблицы, минуя обычные механизмы базы данных. Полученный файл можно скопировать в качестве файла таблицы в каталог базы данных.
Подробное описание модуля см. в главе Модуль прямой загрузки данных QDL.
Вариант использования QDL в случае загрузки данных в новую партицию существующей таблицы
Допустим, имеется таблица, партиционированная по диапазонам дат. Пусть каждый диапазон охватывает один год. В качестве упрощенного примера возьмем таблицу, у которой имеется три поля c типами timestamp, int и text. Пусть на данный момент имеется две партиции с данными за 2019 и 2020 годы, и необходимо максимально быстро загрузить данные за 2021 год. Сделаем это с использованием QDL.
Шаг 1. Формирование данных тестовой таблицы
-- Партиционированная таблица qdl_test
DROP TABLE IF EXISTS qdl_test;
CREATE TABLE qdl_test (
timestamp_value timestamp NOT NULL,
int_value int NOT NULL,
text_value text
) PARTITION BY RANGE (timestamp_value);
-- Тестовые данные за 2019 год
DROP TABLE IF EXISTS qdl_test_y2019;
CREATE TABLE qdl_test_y2019 PARTITION OF qdl_test
FOR VALUES FROM ('2019-01-01') TO ('2020-01-01');
INSERT INTO qdl_test_y2019
SELECT to_timestamp(extract('epoch' FROM to_timestamp('2019-01-01','yyyy-mm-dd')) + random()*60*60*24*365)::timestamp AS timestamp_value,
round(random()*1000000)::int AS int_value,
substr(md5(random()::text), 0, 30) AS text_value
FROM generate_series(1,1000);
-- Тестовые данные за 2020 год
DROP TABLE IF EXISTS qdl_test_y2020;
CREATE TABLE qdl_test_y2020 PARTITION OF qdl_test
FOR VALUES FROM ('2020-01-01') TO ('2021-01-01');
INSERT INTO qdl_test_y2020
SELECT to_timestamp(extract('epoch' FROM to_timestamp('2020-01-01','yyyy-mm-dd')) + random()*60*60*24*365)::timestamp AS timestamp_value,
round(random()*1000000)::int AS int_value,
substr(md5(random()::text), 0, 30) AS text_value
FROM generate_series(1,1000);
Шаг 2. Генерация данных для загрузки в виде CSV-файла
В целях тестирования сгенерируем данные за 2021 год в файле /tmp/qdl_data.csv,
используя возможности команды COPY
. В качестве символа-разделителя выберем точку
с запятой (;). Этот же символ необходимо будет использовать в файле конфигурации
QDL.
Примечание
На этапе загрузки данных в QDL подразумевается, что первая строка файла данных содержит заголовок с названиями полей, поэтому при выгрузке в CSV командеCOPY
необходимо добавить параметр header true. Иначе при загрузке вместо заголовка будет пропущена первая строка с данными, что недопустимо.
COPY (SELECT to_timestamp(extract('epoch' FROM to_timestamp('2021-01-01','yyyy-mm-dd')) + random()*60*60*24*365)::timestamp AS timestamp_value
, round(random()*1000000) AS int_value
, substr(md5(random()::text), 0, 30) AS text_value
FROM generate_series(1,1000))
TO '/tmp/qdl_data.csv'
WITH (format csv, delimiter ';', header true);
Шаг 3. Создание конфигурационного файла /tmp/config.yml, содержащего описание загружаемой таблицы для QDL
Следует отметить, что на первом этапе параметр oid в разделе output не играет роли, поэтому его значение можно оставить нулевым. Важным здесь является символ-разделитель (delimiter), описание таблицы и ее столбцов (параметры партиции table). Если значение столбца может быть равным NULL, после наименования типа нужно добавить ключевое слово [nullable].
Список поддерживаемых при загрузке типов данных приводится в подразделе Поддерживаемые типы полей.
В данном примере файл конфигурации /tmp/config.yml выглядит следующим образом:
# Конфигурация системы:
general:
# Количество потоков, выделяемых для разбора и сериализации строк.
# Следует обратить внимание, что помимо этих потоков обязательно выделяются
# еще два: поток чтения и поток записи в файл.
threads: 2
# Задание расчета контрольных сумм
checksum: true
# Конфигурация исходных данных:
data_source:
csv:
# Разделитель столбцов для CSV:
delimiter: ";"
# Конфигурация получаемых данных:
output:
# Идентификатор объекта. Определяет целочисленный идентификатор для группы
# файлов целевой таблицы.
# Не важен до выполнения этапа создания таблицы
oid: 0
# Идентификатор таблицы для хранения больших данных
toast_oid: 0
# Конфигурация целевой таблицы:
table:
# Имя таблицы. Требуется для generate_sql, чтобы задать имя создаваемой таблицы.
name: "qdl_test_y2021"
# Столбцы в формате "имя" : "тип"
fields:
timestamp_value : timestamp
int_value : integer
text_value : text [nullable]
Шаг 4. Проверка целостности описания таблицы и данных в CSV-файле
Это необязательный шаг, но его желательно выполнить, чтобы не терять результаты работы по формированию файла таблицы, если формат данных в какой-то строке окажется неверным. В этом случае загрузку данных придется запустить с самого начала.
Следующая команда проверяет логическую целостность CSV-файла и согласованность структуры данных с файлом конфигурации, сигнализируя об ошибках соответствующим кодом завершения.
/usr/bin/qdl --config /tmp/config.yml --data /tmp/qdl_data.csv validate
Шаг 5. Получение и применение скрипта создания таблицы и вывода значения OID
/usr/bin/qdl --config /tmp/config.yml --data /tmp/qdl_data.csv create_table > /tmp/table_script.sql
Вывод результата работы команды выглядит следующим образом:
CREATE TABLE qdl_test_y2021("timestamp_value" timestamp NOT NULL, "int_value" integer NOT NULL, "text_value" text);
CHECKPOINT;
-- Расположение основной таблицы
SELECT pg_relation_filepath('qdl_test_y2021');
-- Расположение таблицы TOAST
SELECT pg_relation_filepath('pg_toast.pg_toast_' || (SELECT oid FROM pg_class WHERE relname = 'qdl_test_y2021'));
Скрипт нужно выполнить в базе данных.
Запросы SELECT
отображают путь относительно каталога кластера баз данных. Первый
выведет путь к основной таблице. Второй выведет путь к соответствующей таблице
TOAST, если она имеется.
Путь к каталогу отображается с помощью команды:
SHOW data_directory;
Шаг 6. Формирование файла таблицы
Это основная и самая ресурсоемкая операция. Важно выбрать эффективное значение для количества параллельных потоков (параметр threads в разделе general), участвующих в формировании файла таблицы. Это значение будет зависеть от количества процессоров на используемой машине, а также от возможностей дисковой системы. Относительно большое значение потоков может усилить конкуренцию за диски, и возможный выигрыш может быть нивелирован.
Значение OID таблицы нужно прописать в файле /tmp/config.yml в качестве
значения параметра oid в разделе output. Допустим, в приведенном скрипте
в результате первого запроса SELECT
функция pg_relation_filepath вывела
значение base/13676/16463, тогда для OID таблицы пропишем значение 16463. Имя
сформированного в результате файла таблицы будет соответствовать этому параметру.
Аналогично прописываем toast_oid (OID таблицы TOAST), если он был получен.
В результате работы утилиты в каталоге, указанном в параметре --out-dir, появятся файлы с заданными OID в качестве имени, в приведенном примере это будет файл 16463. В случае, если размер данных окажется более 1 Гб, создадутся дополнительные файлы с соответствующими расширениями (16463.1, 16463.2 и т. д.).
/usr/bin/qdl --config /tmp/config.yml --data /tmp/qdl_data.csv insert_values --out-dir /tmp
Шаг 7. Копирование полученного файла средствами ОС в каталог базы данных
Если обработка данных происходит непосредственно на сервере баз данных, команда копирования может выглядеть примерно так:
cp /tmp/16463 <путь_к_каталогу_данных>/base/13676/16463
Шаг 8. Сброс кэша данных таблицы и некоторые вспомогательные действия
Необходимо выполнить следующую команду:
SELECT qhb_attach_qdl_data('qdl_test_y2021');
При этом происходит сброс кэша данных таблицы, переподключение файлов данных таблицы, переиндексация соответствующей таблицы TOAST (при ее наличии), регистрация данных для функциональности CDC (Change Data Capture, захват изменения данных).
Шаг 9. Проверка данных таблицы
Можно убедиться, что данные доступны как на основном сервере, так и на резервном.
SELECT * FROM qdl_test_y2021 LIMIT 10;
SELECT count(*) FROM qdl_test_y2021; -- при желании можно убедиться в соответствии количеству строк в файле
Теперь при необходимости можно построить требуемые индексы.
ВНИМАНИЕ!
Следует отметить, что загрузка данных в обход стандартных механизмов работы базы данных приведет к тому, что при восстановлении базы из резервной копии и при применении архивных журналов данные в таблицах, загруженных через QDL, не восстановятся. При выполнении массовой загрузки данных через QDL желательно сразу после этого выполнить полное резервное копирование базы данных. Альтернативным вариантом может стать использование инкрементального резервного копирования, изменения для которого по загружаемым данным фиксируются автоматически.
Шаг 10. Включение данных в партиционированную таблицу
Сделаем таблицу qdl_test_y2021 частью таблицы qdl_test, превратив ее в партицию и добавив ограничения по диапазону дат.
ALTER TABLE qdl_test ATTACH PARTITION qdl_test_y2021
FOR VALUES FROM ('2021-01-01') TO ('2022-01-01');
При этом на резервном сервере уже будет иметься заранее подготовленная таблица qdl_test_y2021, и команда успешно выполнится.
Теперь при запросе данных 2021 года в таблице qdl_test используется обращение к новой партиции.
qhb=# explain select count(*) from qdl_test where timestamp_value >= to_timestamp('2021-01-01','YYYY-MM_DD');
QUERY PLAN
-------------------------------------------------------------------------------------------------
Aggregate (cost=79.79..79.80 rows=1 width=8)
-> Append (cost=0.00..78.84 rows=379 width=0)
Subplans Removed: 2
-> Seq Scan on qdl_test_y2021 (cost=0.00..26.95 rows=377 width=0)
Filter: (timestamp_value >= to_timestamp('2021-01-01'::text, 'YYYY-MM_DD'::text))
(5 rows)
В плане запроса отражено обращение к партиции qdl_test_y2021.
Замечания, касающиеся физической репликации
При использовании физической репликации необходимо иметь в виду, что данные таблицы на главном сервере формируются без генерации журналов упреждающей записи. Данные, скопированные в виде готового файла, не попадут автоматически через репликацию на резервные серверы. Поэтому необходимо также скопировать файл таблицы в соответствующую директорию на резервных серверах. Если этого не сделать, на резервные серверы попадет только пустая таблица, созданная на шаге 5.
Необходимо также учитывать, что репликация может осуществляться с задержкой, связанной либо с переключением файлов журналов упреждающей записи (если не используется потоковая репликация), либо со специально настроенным отставанием резервного сервера по времени, которое применяется в целях защиты от случайной потери данных на основном сервере.
Особенности выполнения изменений на физических резервных серверах нужно учитывать на следующих шагах:
Шаг 5. Команды скрипта выполнятся на резервных серверах автоматически,
возможно, с некоторой задержкой.
Шаг 7. Необходимо скопировать подготовленный файл таблицы также в
соответствующий каталог данных на резервных серверах. Этот шаг на резервных
серверах нужно выполнить после того, как на них будут созданы таблицы.
Шаг 8. Необходимо выполнить приведенный запрос по сбросу данных также и на
резервных серверах.
Шаг 9. Репликация индексов выполнится автоматически. Однако необходимо,
чтобы к моменту завершения репликации индексов на резервных серверах были выполнены
все предыдущие шаги (5, 7 и 8) и таблица была готова к чтению данных через
доступ по индексам.
Шаг 10. Скрипт выполнится на резервных серверах автоматически, возможно, с
некоторой задержкой. Желательно убедиться в успешном выполнении операции на
резервных серверах.
Пример использования QDL в случае логической репликации таблицы
Нередко возникает необходимость загрузить объемные данные в новую таблицу, изменения в которой должны передаваться с помощью логической репликации в некое отдельное хранилище данных. QDL можно успешно использовать и в этой ситуации. В данном случае приведен простой пример обычной таблицы, который тем не менее демонстрирует все характерные нюансы.
Предварительные замечания
В случае логической репликации термины «основной сервер» и «резервный сервер» используются относительно реплицируемой таблицы. Логическая репликация подразумевает наличие на стороне резервного сервера копии реплицируемой таблицы. При инициализации логической репликации все данные таблицы на стороне основного сервера копируются целиком стандартными встроенными средствами. Чтобы избежать копирования всего объема загружаемых данных с основного сервера на резервный, можно инициализировать логическую репликацию с пустыми таблицами, заменив после этого содержимое таблиц как на основном сервере, так и на резервном. На стороне резервного сервера таблица будет иметь в общем случае свой OID, поэтому файл с данными при копировании нужно будет переименовать соответствующим образом. Затем необходимо сбросить закэшированные данные таблицы как на основном сервере, так и на резервном. Далее репликация изменений будет выполняться в обычном порядке. Разумеется, до окончания выполнения работ по настройке репликации и копированию данных таблица на основном сервере не должна обновляться.
Ниже по шагам приводится пример загрузки и настройки логической репликации таблицы с использованием QDL.
Шаг 1. Создание CSV-файла с данными для тестирования
В целях тестирования сгенерируем данные в файле /tmp/qdl_log_repl.csv,
используя возможности команды COPY
. В качестве символа-разделителя выберем
точку с запятой (;). Этот же символ необходимо будет использовать в файле
конфигурации QDL.
Примечание
На этапе загрузки данных в QDL подразумевается, что первая строка файла данных содержит заголовок с названиями полей, поэтому при выгрузке в CSV командеCOPY
необходимо добавить параметр header true. Иначе при загрузке вместо заголовка будет пропущена первая строка с данными, что недопустимо.
Воспользуемся тем же скриптом, что и в прошлом примере.
COPY (SELECT to_timestamp(extract('epoch' FROM to_timestamp('2021-01-01','yyyy-mm-dd')) + random()*60*60*24*365)::timestamp AS timestamp_value
, round(random()*1000000) AS int_value
, substr(md5(random()::text), 0, 30) AS text_value
FROM generate_series(1,1000))
TO '/tmp/qdl_log_repl.csv'
WITH (format csv, delimiter ';', header true);
Шаг 2. Создание конфигурационного файла /tmp/config_qdl_log_repl.yml, содержащего описание загружаемой таблицы для QDL
Следует отметить, что на первом этапе параметр oid в разделе output не играет роли, поэтому его значение можно оставить нулевым. Важным здесь является символ-разделитель (delimiter), описание таблицы и ее столбцов (параметры партиции table). Если значение столбца может быть равным NULL, после наименования типа нужно добавить ключевое слово [nullable].
Список поддерживаемых при загрузке типов данных приводится в подразделе Поддерживаемые типы полей.
В данном примере файл конфигурации /tmp/config_qdl_log_repl.yml выглядит следующим образом:
# Конфигурация системы:
general:
# Количество потоков, выделяемых для разбора и сериализации строк.
# Следует обратить внимание, что помимо этих потоков обязательно выделяются
# еще два: поток чтения и поток записи в файл.
threads: 2
# Задание расчета контрольных сумм
checksum: true
# Конфигурация исходных данных:
data_source:
csv:
# Разделитель столбцов для CSV:
delimiter: ";"
# Конфигурация получаемых данных:
output:
# Идентификатор объекта. Определяет целочисленный идентификатор для группы
# файлов целевой таблицы.
# Не важен до выполнения этапа создания таблицы
oid: 0
# Идентификатор таблицы для хранения больших данных
toast_oid: 0
# Конфигурация целевой таблицы:
table:
# Имя таблицы. Требуется для generate_sql, чтобы задать имя создаваемой таблицы.
name: "qdl_log_repl"
# Столбцы в формате "имя" : "тип"
fields:
timestamp_value : timestamp
int_value : integer
text_value : text [nullable]
Шаг 3. Проверка целостности описания таблицы и данных в CSV-файле
Это необязательный шаг, но его желательно выполнить для того, чтобы не терять результаты работы по формированию файла таблицы, если формат данных в какой-то строке окажется неверным. В этом случае загрузку данных придется запустить с самого начала.
Следующая команда проверяет логическую целостность CSV-файла и согласованность структуры данных с файлом конфигурации, сигнализируя об ошибках соответствующим кодом завершения.
/usr/bin/qdl --config /tmp/config_qdl_log_repl.yml --data /tmp/qdl_data.csv validate
Шаг 4. Получение и применение скрипта создания таблицы и вывода значений OID
/usr/bin/qdl --config /tmp/config_qdl_log_repl.yml --data /tmp/qdl_log_repl.csv create_table > /tmp/table_script.sql
Вывод результата работы команды выглядит следующим образом:
CREATE TABLE qdl_log_repl("timestamp_value" timestamp NOT NULL, "int_value" integer NOT NULL, "text_value" text);
CHECKPOINT;
-- Расположение основной таблицы
SELECT pg_relation_filepath('qdl_test_y2021');
-- Расположение таблицы TOAST
SELECT pg_relation_filepath('pg_toast.pg_toast_' || (SELECT oid FROM pg_class WHERE relname = 'qdl_log_repl'));
Скрипт нужно выполнить как на основном сервере, так и на резервном.
Первый запрос SELECT
скрипта выведет путь к таблице относительно каталога
кластера баз данных. Обратите внимание, что на каждой базе данных выведутся свои
значения функции pg_relation_filepath с номером OID таблицы в конце. Этот
номер нужно будет использовать в параметре oid раздела output
конфигурационного файла QDL.
На этом же шаге следует удалить все строки из таблицы qdl_log_repl на основном и на резервном серверах:
DELETE FROM qdl_log_repl;
При инициализации репликации строки таблицы копируются с основного сервера на резервный; в данном случае этот шаг фактически будет пропущен.
Шаг 5. Настройка логической репликации
Следует отметить, что для успешной работы логической репликации параметр wal_level на основном сервере должен принимать значение logical.
На основном сервере выполните следующий скрипт:
SELECT pg_create_logical_replication_slot('test_slot','pgoutput');
CREATE PUBLICATION test_pub FOR TABLE qdl_log_repl;
На резервном сервере замените в тексте следующей команды <имя_пользователя> на действительное имя пользователя в базе основного сервера и выполните ее.
CREATE SUBSCRIPTION test_sub CONNECTION 'port=5432 user=<имя_пользователя> dbname=qhb' PUBLICATION test_pub WITH (create_slot = false, slot_name = test_slot);
Шаг 6. Формирование файла таблицы
Это основная и самая ресурсоемкая операция. Важно выбрать эффективное значение для количества параллельных потоков (параметр threads раздела general), участвующих в формировании файла таблицы. Это значение будет зависеть от количества процессоров на используемой машине, а также от возможностей дисковой системы. Относительно большое значение потоков может усилить конкуренцию за диски, и возможный выигрыш может быть нивелирован.
Номер OID таблицы нужно прописать в файле /tmp/config_qdl_log_repl.yml в качестве значения параметра oid в разделе output. Допустим, в приведенном скрипте в результате последнего запроса функция pg_relation_filepath вывела на основном сервере значение base/13676/17101, а на резервном — base/13676/18203. Тогда для основного сервера пропишем в качестве параметра oid значение 17101. Имя сформированного в результате файла таблицы будет соответствовать этому параметру.
Значение на резервном сервере будет отличаться. При копировании необходимо будет переименовать файл так, чтобы имя соответствовало номеру OID на резервном сервере. Если не указать правильное значение, в лучшем случае файл будет просто лишним, а в худшем можно случайно перезаписать уже существующую таблицу с номером OID, совпадающим с номером OID таблицы на основном сервере.
Далее необходимо выполнить следующий скрипт:
/usr/bin/qdl --config /tmp/config.yml --data /tmp/qdl_data.csv insert_values --out-dir /tmp
В результате работы утилиты в каталоге, указанном в параметре --out-dir, появится файл с заданным OID в качестве имени; в приведенном примере это будет файл 17101. В случае, если размер данных окажется более 1 Гб, создадутся дополнительные файлы с соответствующими расширениями (17101.1, 17101.2 и т. д.).
Шаг 7. Копирование полученного файла средствами ОС в каталог базы данных на основном и резервном серверах
На основном сервере команда копирования может выглядеть приблизительно следующим образом:
cp /tmp/16463 <путь к каталогу данных на основном сервере>/base/13676/17101
На резервном сервере имя файла отличается (OID базы данных также может быть другим, но в этом примере не изменен):
cp /tmp/16463 <путь к каталогу данных на резервном сервере>/base/13676/18203
Шаг 8. Сброс кэша данных таблицы и некоторые вспомогательные действия
Необходимо выполнить следующую команду:
SELECT qhb_attach_qdl_data('qdl_test_y2021');
При этом происходит сброс кэша данных таблицы, переподключение файлов данных таблицы, переиндексация соответствующей таблицы TOAST (при ее наличии), регистрация данных для функциональности CDC.
Шаг 9. Проверка данных таблицы
Можно убедиться, что данные доступны как на основном сервере, так и на резервном.
SELECT * FROM qdl_log_repl limit 10;
SELECT count(*) FROM qdl_log_repl; -- при желании можно убедиться в соответствии количеству строк в файле
При необходимости теперь можно построить требуемые индексы.
ВНИМАНИЕ!
Следует отметить, что загрузка данных в обход стандартных механизмов работы базы данных приводит к тому, что при восстановлении базы из резервной копии и при применении архивных журналов данные в таблицах, загруженных через QDL, не восстанавливаются. При выполнении массовой загрузки данных через QDL желательно сразу после этого выполнить полное резервное копирование базы. Чтобы это копирование не было дополнительным, желательно согласовать политику выполнения массовой загрузки данных с политикой выполнения резервного копирования. Альтернативным вариантом может стать использование инкрементального резервного копирования, изменения для которого по загружаемым данным фиксируются автоматически.
Заключение
Используя QDL, можно эффективно загружать в базу объемные данные, в том
числе в случае использования физической или логической репликации. Тестирование
показывает, что общее время загрузки данных с помощью QDL и время
копирования полученного файла в 3,5 раза меньше по сравнению с продолжительностью
загрузки тех же данных с помощью команды COPY
.
Приведенный подход можно использовать в хранилищах и витринах данных, где время загрузки новых данных является критичным показателем, а данные, как правило, поступают из других источников.