Логическое декодирование
QHB предоставляет инфраструктуру для потоковой передачи модификаций, производимых через SQL, внешним потребителям. Эту функциональность можно использовать для различных целей, в том числе для решений по репликации и аудита.
Изменения передаются в потоках, идентифицируемых по слотам логической репликации.
Формат, в котором передаются эти изменения, определяется используемым плагином
вывода. Пример плагина включен в дистрибутив QHB. Чтобы расширить
выбор доступных форматов, не модифицируя код ядра, можно написать дополнительные
плагины. Каждый плагин вывода имеет доступ к каждой отдельной новой строке,
выдаваемой командой INSERT
и новой версии строки, создаваемой командой UPDATE
.
Доступность старых версий строк для команд UPDATE
и DELETE
зависит от
сконфигурированного репликационного идентификатора (см. REPLICA IDENTITY).
Изменения могут быть получены либо по протоколу потоковой репликации (см. разделы Протокол потоковой репликации и Интерфейс протокола потоковой репликации), либо вызовом функций через SQL (см. раздел SQL-интерфейс логического декодирования). Также можно написать дополнительные методы получения вывода из слота репликации, не модифицируя код ядра (см. раздел Запись вывода логического декодирования).
Примеры логического декодирования
Следующий пример демонстрирует управление логическим декодированием посредством SQL-интерфейса.
Прежде чем вы сможете использовать логическое декодирование, вам следует установить в параметре wal_level значение logical а в max_replication_slots — число не меньше 1. Затем вы должны подключиться к целевой базе данных (в данном примере это qhb) как суперпользователь.
qhb=# -- Создать слот с именем 'regression_slot', использующий плагин вывода 'test_decoding'
qhb=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
slot_name | lsn
-----------------+-----------
regression_slot | 0/16B1970
(1 row)
qhb=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn
-----------------+---------------+-----------+----------+--------+-------------+-----------------
regression_slot | test_decoding | logical | qhb | f | 0/16A4408 | 0/16A4440
(1 row)
qhb=# -- Пока никакие изменения не видны
qhb=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----+-----+------
(0 rows)
qhb=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE
qhb=# -- DDL не реплицируется, поэтому вы увидите только транзакцию
qhb=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+--------------
0/BA2DA58 | 10297 | BEGIN 10297
0/BA5A5A0 | 10297 | COMMIT 10297
(2 rows)
qhb=# -- После прочтения изменения считаются полученными и уже не выдаются
qhb=# -- в следующем вызове:
qhb=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----+-----+------
(0 rows)
qhb=# BEGIN;
qhb=*# INSERT INTO data(data) VALUES('1');
qhb=*# INSERT INTO data(data) VALUES('2');
qhb=*# COMMIT;
qhb=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A688 | 10298 | BEGIN 10298
0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1'
0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2'
0/BA5A8A8 | 10298 | COMMIT 10298
(4 rows)
qhb=# INSERT INTO data(data) VALUES('3');
qhb=# -- Также вы можете заглянуть вперед в потоке изменений, не получая эти изменения
qhb=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A8E0 | 10299 | BEGIN 10299
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
0/BA5A990 | 10299 | COMMIT 10299
(3 rows)
qhb=# -- Следующий вызов pg_logical_slot_peek_changes() снова возвращает те же изменения
qhb=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A8E0 | 10299 | BEGIN 10299
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
0/BA5A990 | 10299 | COMMIT 10299
(3 rows)
qhb=# -- Плагину вывода можно передать параметры, влияющие на форматирование
qhb=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A8E0 | 10299 | BEGIN 10299
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04)
(3 rows)
qhb=# -- Не забудьте удалить слот, который вам больше не нужен, чтобы он перестал
qhb=# -- потреблять ресурсы сервера:
qhb=# SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
-----------------------
(1 row)
Следующий пример показывает, как можно управлять логическим декодированием с помощью протокола потоковой репликации, используя программу qhb_recvlogical, включенную в дистрибутив QHB. Для этого нужно, чтобы система аутентификация клиентов была настроена допускать подключения репликации (см. подраздел Аутентификация) и чтобы в max_wal_senders было установлено достаточно большое значение, позволяющее установить дополнительное соединение. Во втором примере показано, как транслировать двухфазные транзакции. Прежде чем использовать двухфазные команды, следует установить в параметре max_prepared_transactions значение не меньше 1.
Пример 1:
$ pg_recvlogical -d qhb --slot=test --create-slot
$ pg_recvlogical -d qhb --slot=test --start -f -
Control+Z
$ psql -d qhb -c "INSERT INTO data(data) VALUES('4');"
$ fg
BEGIN 693
table public.data: INSERT: id[integer]:4 data[text]:'4'
COMMIT 693
Control+C
$ pg_recvlogical -d qhb --slot=test --drop-slot
Пример 2:
$ pg_recvlogical -d qhb --slot=test --create-slot --two-phase
$ pg_recvlogical -d qhb --slot=test --start -f -
Control+Z
$ psql -d qhb -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';"
$ fg
BEGIN 694
table public.data: INSERT: id[integer]:5 data[text]:'5'
PREPARE TRANSACTION 'test', txid 694
Control+Z
$ psql -d qhb -c "COMMIT PREPARED 'test';"
$ fg
COMMIT PREPARED 'test', txid 694
Control+C
$ pg_recvlogical -d qhb --slot=test --drop-slot
Следующий пример показывает SQL-интерфейс, который можно использовать для декодирования подготовленных транзакций. Прежде чем использовать команды двухфазной фиксации, следует установить в параметре max_prepared_transactions значение не меньше 1. Также при создании слота с помощью функции pg_create_logical_replication_slot нужно установить в ее параметре two-phase значение 'true'. Обратите внимание что если транзакция еще не была декодирована, мы будем транслировать ее целиком после фиксации.
qhb=# BEGIN;
qhb=*# INSERT INTO data(data) VALUES('5');
qhb=*# PREPARE TRANSACTION 'test_prepared1';
qhb=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+---------------------------------------------------------
0/1689DC0 | 529 | BEGIN 529
0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
(3 rows)
qhb=# COMMIT PREPARED 'test_prepared1';
qhb=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+--------------------------------------------
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
(4 row)
qhb=# -- вы также можете откатить подготовленную транзакцию
qhb=# BEGIN;
qhb=*# INSERT INTO data(data) VALUES('6');
qhb=*# PREPARE TRANSACTION 'test_prepared2';
qhb=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+---------------------------------------------------------
0/168A180 | 530 | BEGIN 530
0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6'
0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530
(3 rows)
qhb=# ROLLBACK PREPARED 'test_prepared2';
qhb=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+----------------------------------------------
0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
(1 row)
Принципы логического декодирования
Логическое декодирование
Логическое декодирование — это процесс извлечения всех постоянных изменений, происходящих в таблицах базы данных, в согласованном и простом для понимания формате, который можно интерпретировать, не имея досконального представления о внутреннем состоянии базы данных.
В QHB логическое декодирование реализуется путем декодирования содержимого журнала упреждающей записи, описывающего изменения на уровне хранения, в специфичную для приложения форму, например, в поток кортежей или операторов SQL.
Слоты репликации
В контексте логической репликации слот представляет поток изменений, которые могут быть воспроизведены клиенту в том порядке, в каком они были сделаны на исходном сервере. Через каждый слот передается последовательность изменений из одной базы данных.
Примечание
В QHB также имеются слоты потоковой репликации (см. подраздел Потоковая репликация), но они используются несколько по-другому.
У слота репликации есть идентификатор, уникальный для всех баз данных в кластере QHB. Слоты сохраняются независимо от использующих их подключений и защищены от сбоев сервера.
В обычном режиме работы логический слот будет выдавать каждое изменение только один раз. Текущая позиция каждого слота сохраняется только в контрольной точке, поэтому в случае сбоя слот может вернуться к предыдущему LSN, из-за чего впоследствии при перезапуске сервера последние изменения могут быть отправлены повторно. За предотвращение нежелательных эффектов от повторной обработки одного и того же сообщения отвечают клиенты логического декодирования. При необходимости клиенты могут при декодировании записывать последний полученный ими LSN и пропускать повторяющиеся данные или (при использовании протокола репликации) запрашивать, чтобы декодирование началось с этого LSN, вместо того чтобы позволить серверу определить начальную точку. Для этой цели разработан механизм отслеживания хода выполнения репликации, о котором можно прочитать в описании источников репликации.
Для одной базы данных могут существовать несколько независимых слотов. Каждый слот имеет собственное состояние, позволяя разным потребителям получать изменения из разных точек в потоке изменений базы данных. Для большинства приложений каждому потребителю понадобится отдельный слот.
Слот логической репликации ничего не знает о состоянии получателя(ей). Можно даже иметь несколько различных получателей, использующих один слот в разные моменты времени; они просто будут получать изменения, начиная с того момента, когда их перестал потреблять предыдущий получатель. Но в любой отдельно взятый момент времени потреблять изменения из слота может только один получатель.
ВНИМАНИЕ!
Слоты репликации сохраняются при сбоях сервера и ничего не знают о состоянии их потребителя(ей). Они не дадут удалить требуемые ресурсы, даже когда не используются никаким подключением. Это занимает место в хранилище, поскольку ни требуемые записи WAL, ни требуемые строки из системных каталогов нельзя удалить командойVACUUM
, пока они нужны слоту репликации. В исключительных случаях это может привести к отключению базы данных для предотвращения зацикливания идентификаторов транзакций (см. подраздел Предотвращение ошибок зацикливания идентификаторов транзакций). Поэтому если слот больше не нужен, его следует удалить.
Плагины вывода
Плагины вывода трансформируют данные из внутреннего представления журнала упреждающей записи в формат, запрашиваемый потребителем слота репликации.
Экспортированные снимки
Когда новый слот репликации создается через интерфейс потоковой репликации (см.
CREATE_REPLICATION_SLOT
), экспортируется снимок (см. подраздел
Функции синхронизации снимков), который будет показывать ровно то состояние
базы данных, все изменения после которого будут включаться в поток изменений. Это
можно использовать для создания новой реплики с помощью команды SET TRANSACTION SNAPSHOT
,
чтобы прочитать состояние базы данных в момент создания слота. Затем эту транзакцию
можно использовать для выгрузки состояния базы данных на этот момент времени,
которое впоследствии можно обновлять с помощью содержимого слота, не теряя никаких
изменений.
Создание снимка возможно не всегда. В частности, это не получится при подключении к горячему резерву. Приложения, которым не нужен экспорт снимка, могут подавить его, установив параметр NOEXPORT_SNAPSHOT.
Интерфейс протокола потоковой репликации
Команды
-
CREATE_REPLICATION_SLOT имя_слота LOGICAL плагин_вывода
-
DROP_REPLICATION_SLOT имя_слота [ WAIT ]
-
START_REPLICATION SLOT имя_слота LOGICAL ...
используются для создания, удаления и потоковой передачи изменений из слота репликации соответственно. Эти команды доступны только для соединения репликации; их нельзя использовать в обычном SQL. Подробную информацию об этих командах см. в разделе Протокол потоковой репликации.
Для управления логическим декодированием по соединению потоковой репликации можно воспользоваться утилитой qhb_recvlogical. (В ней используются эти команды.)
SQL-интерфейс логического декодирования
Подробное описание API уровня SQL для взаимодействия с логическим декодированием см. в подразделе Функции для управления репликацией.
Синхронная репликация (см. подраздел Синхронная репликация) поддерживается только для слотов репликации, используемых через интерфейс потоковой репликации. Интерфейс функций и дополнительные, не базовые интерфейсы не поддерживают синхронную репликацию.
Системные каталоги, связанные с логическим декодированием
В представлениях pg_replication_slots и pg_stat_replication выводится информация о текущем состоянии слотов репликации и соединений потоковой репликации соответственно. Эти представления относятся как к физической, так и к логической репликации. В представлении pg_stat_replication_slots выводится статистическая информация о слотах логической репликации.
Плагины вывода логического декодирования
Пример плагина вывода можно найти на справочной странице модуля test_decoding в Приложении данного руководства.
Функция инициализации
Плагин вывода загружается путем динамической загрузки разделяемой библиотеки, при этом в качестве базового имени библиотеки задается имя этого плагина. Для нахождения библиотеки используется обычный путь поиска библиотек. Чтобы предоставить требуемые функции обратного вызова плагина вывода и показать, что эта библиотека действительно является плагином вывода, она должна иметь функцию с именем _PG_output_plugin_init. Этой функции передается структура, которую нужно заполнить указателями на функции обратного вызова для определенных действий:
typedef struct OutputPluginCallbacks
{
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterPrepareCB filter_prepare_cb;
LogicalDecodeBeginPrepareCB begin_prepare_cb;
LogicalDecodePrepareCB prepare_cb;
LogicalDecodeCommitPreparedCB commit_prepared_cb;
LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
Функции обратного вызова begin_cb, change_cb и commit_cb являются
обязательными, а startup_cb, filter_by_origin_cb, truncate_cb и
shutdown_cb необязательны. Если truncate_cb не установлена, но нужно
декодировать TRUNCATE
, эта операция игнорируется.
Также плагин вывода может определять функции для поддержки потоковой передачи больших транзакций во время их выполнения. Функции stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb, stream_change_cb и stream_prepare_cb являются обязательными, а stream_message_cb и stream_truncate_cb необязательны.
Также плагин вывода может определять функции для поддержки двухфазных фиксаций,
позволяющие декодировать операции в PREPARE TRANSACTION
. Функции обратного вызова
begin_prepare_cb, prepare_cb, stream_prepare_cb, commit_prepared_cb
и rollback_prepared_cb являются обязательными, а filter_prepare_cb
необязательна.
Возможности
Для декодирования, форматирования и вывода изменений плагины вывода могут использовать практически всю обычную инфраструктуру сервера, включая вызов функций вывода. К отношениям разрешен доступ только на чтение, только если они были созданы утилитой qhb_bootstrap (или initdb) в схеме pg_catalog или были помечены как пользовательские таблицы каталога с помощью команд
ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
Обратите внимание, что обращение к пользовательским таблицам каталога или обычным системным таблицам каталога в плагинах вывода должно осуществляться только посредством функций сканирования systable_*. Обращение через функции сканирования heap_* вызовет ошибку. Кроме того, запрещены любые операции, ведущие к присвоению идентификатора транзакции. К таким операциями, среди прочих, относятся записи в таблицы, выполнение изменений DDL и вызов функции pg_current_xact_id().
Режимы вывода
Функции обратного вызова плагина вывода могут передавать данные потребителю практически в любых форматах. Для некоторых вариантов применения, например, просмотра изменений через SQL, возвращение данных в типах, которые могут содержать произвольные данные (например, bytea), может быть неудобным. Если плагин вывода выводит только текстовые данные в кодировке сервера, он может объявить это, установив в OutputPluginOptions.output_type значение OUTPUT_PLUGIN_TEXTUAL_OUTPUT вместо OUTPUT_PLUGIN_BINARY_OUTPUT в функции обратного вызова для запуска. В этом случае все данные должны быть в кодировке сервера, чтобы их могло содержать значение типа text. Это проверяется в сборках с включенными проверочными утверждениями.
Функции обратного вызова в плагине вывода
Плагин вывода уведомляется о происходящих изменениях через различные функции обратного вызова, которые он должен предоставить.
Параллельные транзакции декодируются в порядке фиксации, при этом между функциями
обратного вызова begin и commit декодируются только изменения, относящиеся к
определенной транзакции. Явно или неявно откатившиеся транзакции никогда не
декодируются. Успешные точки сохранения помещаются в содержащую их транзакцию в
том порядке, в котором они выполнялись в этой транзакции. Транзакция, подготовленная
для двухфазной фиксации с помощью команды PREPARE TRANSACTION
, тоже будет
декодирована, если плагином вывода были предоставлены требуемые для декодирования
функции обратного вызова. Возможно такое, что текущая декодируемая подготовленная
транзакция параллельно прерывается командой ROLLBACK PREPARED
. В этом случае
логическое декодирование этой транзакции тоже будет прервано. Все изменения такой
транзакции пропускаются после выявления прерывания и вызова функции обратного
вызова prepare_cb. Таким образом, даже в случае параллельного прерывания плагину
вывода предоставляется достаточно информации, чтобы он мог правильно обработать
ROLLBACK PREPARED
после ее декодирования.
Примечание
Декодироваться будут только те транзакции, которые уже были успешно сброшены на диск. В результате этогоCOMMIT
может не декодироваться в непосредственно следующей за ним функции pg_logical_slot_get_changes(), когда в параметре synchronous_commit установлено значение off (выключен).
Функция обратного вызова для запуска
Необязательная функция обратного вызова startup_cb вызывается, когда слот репликации создается или через него запрашивается потоковая передача изменений, независимо от количества изменений, готовых к отправке.
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
OutputPluginOptions *options,
bool is_init);
Параметр is_init будет равен true, когда слот репликации создается, и false в противном случае. Параметр options указывает на структуру параметров, которые могут устанавливать плагины вывода:
typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
} OutputPluginOptions;
В output_type следует установить значение OUTPUT_PLUGIN_TEXTUAL_OUTPUT или OUTPUT_PLUGIN_BINARY_OUTPUT. См. также подраздел Режимы вывода. Если receive_rewrites равен true, плагин вывода также будет вызываться для изменений, произведенных перезаписями кучи при определенных операциях DDL. Эти изменения представляют интерес для плагинов, проводящих репликацию DDL, но они требуют особой обработки.
Функция запуска должна проверить параметры, представленные в ctx->output_plugin_options. Если плагину вывода нужно иметь состояние, он может хранить его в ctx->output_plugin_private.
Функция обратного вызова для выключения
Необязательная функция обратного вызова shutdown_cb вызывается, когда ранее активный слот репликации перестает использоваться и может освободить ресурсы, занятые плагином вывода. При этом слот не всегда удаляется, прекращается только потоковая передача через него.
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
Функция обратного вызова для начала транзакции
Обязательная функция обратного вызова begin_cb вызывается, когда декодируется начало зафиксированной транзакции. Прерванные транзакции и их содержимое никогда не декодируются.
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
Параметр txn содержит метаинформацию о транзакции, например временную метку ее фиксации и ее идентификатор.
Функция обратного вызова для завершения транзакции
Обязательная функция обратного вызова commit_cb вызывается, когда декодируется фиксация транзакции. Перед этим для всех модифицированных строк (если таковые имеются) будут вызваны функции обратного вызова change_cb.
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
Функция обратного вызова для изменений
Обязательная функция обратного вызова change_cb вызывается для каждой отдельной
модификации строки в транзакции, производимой командами INSERT
, UPDATE
или
DELETE
. Даже если исходная команда модифицировала несколько строк сразу, эта
функция обратного вызова будет вызываться отдельно для каждой строки. Функция
change_cb может обращаться к системным или пользовательским таблицам каталога,
чтобы добавить в процессе вывода подробную информацию о модификации строки. В
случае декодирования подготовленной (но еще не зафиксированной) транзакции или
декодирования незафиксированной транзакции эта функция для изменений также может
выдать ошибку из-за одновременного отката этой самой транзакции. В этом случае
логическое декодирование этой прерванной транзакции корректно останавливается.
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
Параметры ctx и txn имеют то же содержимое, что и для функций обратного вызова begin_cb и commit_cb, но с дополнительным дескриптором отношений relation, указывающим на отношение, которому принадлежит строка, и структурой change, описывающей передаваемую модификацию строки.
Примечание
С помощью логического декодирования могут быть извлечены изменения только в пользовательских таблицах, не являющихся нежурналируемыми (см. UNLOGGED) или временными (см. TEMPORARY или TEMP).
Функция обратного вызова для опустошения
Функция обратного вызова truncate_cb вызывается для команды TRUNCATE
.
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
Параметры такие же, как для функции обратного вызова change_cb. Однако
поскольку операции TRUNCATE
в таблицах, связанных внешними ключами, должны
выполняться вместе, эта функция получает на вход не одно отношение, а массив
отношений. Подробную информацию см. на справочной странице команды TRUNCATE
.
Функция обратного вызова для фильтра источника
Необязательная функция обратного вызова filter_by_origin_cb вызывается, чтобы определить, интересуют ли плагин вывода данные, воспроизводимые из источника с заданным идентификатором (origin_id).
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id);
Параметр ctx имеет то же содержимое, что и для других функций обратного вызова. Доступна информация только об источнике. Чтобы показать, что изменения, поступающие из переданного узла, не представляют интереса, плагин возвращает true, вследствие чего они отфильтровываются; в противном случае он возвращает false. Другие функции обратного вызова для отфильтрованных транзакций и изменений вызываться не будут.
Это полезно при реализации каскадных или разнонаправленных решений репликации. В таких конфигурациях фильтрация по источнику позволяет предотвратить репликацию туда-обратно одних и тех же изменений. Хотя в транзакциях и изменениях тоже переносится информация об источнике, фильтрация посредством этой функции гораздо более эффективна.
Функция обратного вызова для произвольных сообщений
Необязательная функция обратного вызова message_cb вызывается, когда декодируется сообщение логического декодирования.
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
Параметр txn содержит метаинформацию о транзакции, например временную метку ее фиксации и ее идентификатор. Однако обратите внимание, что он может быть равен NULL, когда сообщение нетранзакционное, и транзакции, протоколирующей сообщение, еще не был присвоен идентификатор. В параметре lsn содержится позиция сообщения в WAL. Параметр transactional показывает, было ли сообщение передано как транзакционное или нет. Подобно функции обратного вызова для изменений, в случае декодирования подготовленной (но еще не зафиксированной) транзакции или декодирования незафиксированной транзакции, эта функция для сообщений тоже может выдать ошибку из-за одновременного отката этой самой транзакции. В этом случае логическое декодирование этой прерванной транзакции корректно останавливается. В параметре prefix передается произвольный префикс (завершающийся нулевым символом), который можно использовать для выделения сообщений, интересующих текущий плагин. И, наконец, в параметре message содержится само сообщение размером message_size байт.
Следует дополнительно позаботиться о том, чтобы префикс, интересующий плагин вывода, был уникальным. Обычно лучше всего использовать имя расширения или самого плагина вывода.
Функция обратного вызова для фильтра подготовки
Необязательная функция обратного вызова filter_prepare_cb вызывается, чтобы
определить, следует ли декодировать данные, составляющие текущую транзакцию с
двухфазной фиксацией, на этом этапе подготовки или позже, в виде обычной
однофазной транзакции во время выполнения команды COMMIT PREPARED
. Чтобы
показать, что декодирование нужно пропустить, возвращает true; в противном
случае — false. Когда функция обратного вызова не определена, предполагается
false (т. е. фильтрация отсутствует, все транзакции, использующие двухфазную
фиксацию, декодируются также в две фазы).
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
Параметр ctx имеет то же содержимое, что и для других функций обратного вызова.
Параметры xid и gid предоставляют два разных способа идентификации
транзакции. Последующая команда COMMIT PREPARED
или ROLLBACK PREPARED
передает
оба идентификатора, давая плагину вывода выбор, какой из них использовать.
Эта функция может вызываться несколько раз в течение одной декодируемой транзакции и при каждом вызове должна предоставлять один и тот же постоянный ответ для данной пары xid и gid.
Функция обратного вызова для начала подготовленной транзакции
Обязательная функция обратного вызова begin_prepare_cb вызывается, когда
декодируется начало подготовленной транзакции. Поле gid, входящее в параметр
txn, в этой функции можно применять для проверки того, получал ли уже этот
плагин данную команду PREPARE
, и если да, он может либо выдать ошибку, либо
пропустить оставшиеся изменения транзакции.
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
Функция обратного вызова для подготовленной транзакции
Обязательная функция обратного вызова prepare_cb вызывается, когда декодируется транзакция, подготовленная для двухфазной фиксации. Перед этим для всех модифицированных строк (если таковые имеются) будет вызвана функция обратного вызова change_cb. В этой функции можно применять поле gid, входящее в параметр txn.
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
Функция обратного вызова для фиксации подготовленной транзакции
Обязательная функция обратного вызова commit_prepared_cb вызывается, когда
декодируется команда COMMIT PREPARED
транзакции. В этой функции можно применять
поле gid, входящее в параметр txn.
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
Функция обратного вызова для отката подготовленной транзакции
Обязательная функция обратного вызова rollback_prepared_cb вызывается, когда
декодируется команда ROLLBACK PREPARED
транзакции. В этой функции можно применять
поле gid, входящее в параметр txn. Параметры prepare_end_lsn и
prepare_time можно использовать для проверки того, получал ли этот плагин
данную команду PREPARE TRANSACTION
, и если да, он может применить откат, а в
противном случае пропустить эту операцию. Одного поля gid для этого
недостаточно, поскольку на нижестоящем узле может быть подготовленная транзакция
с таким же идентификатором.
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time);
Функция обратного вызова для начала потока
Функция обратного вызова stream_start_cb вызывается, когда открывается блок передаваемых в потоке изменений из выполняющейся транзакции.
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
Функция обратного вызова для остановки потока
Функция обратного вызова stream_stop_cb вызывается, когда закрывается блок передаваемых в потоке изменений из выполняющейся транзакции.
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
Функция обратного вызова для прерывания потока
Функция обратного вызова stream_abort_cb вызывается, чтобы прервать ранее переданную в потоке транзакцию.
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
Функция обратного вызова для подготовки потока
Функция обратного вызова stream_prepare_cb вызывается, чтобы подготовить ранее переданную в потоке транзакцию как часть двухфазной фиксации.
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
Функция обратного вызова для фиксации потока
Функция обратного вызова stream_commit_cb вызывается, чтобы зафиксировать ранее переданную в потоке транзакцию.
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
Функция обратного вызова для изменений в потоке
Функция обратного вызова stream_change_cb вызывается, когда передается изменение в блоке поточно передаваемых изменений (границы которого устанавливают вызовы stream_start_cb и stream_stop_cb). Фактические изменения не отображаются, поскольку транзакция может позже прерваться, а изменения для прерванных транзакций не декодируются.
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
Функция обратного вызова для сообщений потока
Функция обратного вызова stream_message_cb вызывается, когда передается произвольное сообщение в блоке поточно передаваемых изменений (границы которого устанавливают вызовы stream_start_cb и stream_stop_cb). Содержимое транзакционных сообщений не отображается, поскольку может позже прерваться, а изменения для прерванных транзакций не декодируются.
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
Функция обратного вызова для опустошения потока
Функция обратного вызова stream_truncate_cb вызывается для команды TRUNCATE
в блоке поточно передаваемых изменений (границы которого устанавливают вызовы
stream_start_cb и stream_stop_cb).
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
Параметры аналогичны параметрам функции обратного вызова stream_change_cb.
Однако поскольку операции TRUNCATE
в таблицах, связанных внешними ключами,
должны выполняться одновременно, эта функция получает на вход не одно отношение,
а массив отношений. Подробную информацию см. на справочной странице команды
TRUNCATE
.
Функции для формирования вывода
Чтобы действительно сформировать вывод, плагины вывода могу записывать данные в выходной буфер StringInfo через ctx->out внутри функций обратного вызова begin_cb, commit_cb или change_cb. Прежде чем записывать данные в этот буфер, нужно вызвать OutputPluginPrepareWrite(ctx, last_write), а по завершении записи в буфер нужно вызвать OutputPluginWrite(ctx, last_write), чтобы сформировать запись. Параметр last_write показывает, была ли эта конкретная операция записи последней в данной функции обратного вызова.
Следующий пример показывает, как вывести данные для потребителя плагина вывода:
OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);
Запись вывода логического декодирования
Существует возможность добавлять другие методы вывода для логического декодирования. По сути, нужно предоставить три функции: одну для чтения WAL, одну для подготовки к записи и одну для записи вывода (см. подраздел Функции для формирования вывода).
/*-------------------------------------------------------------------------
* Впомогательные функции для использования логического декодирования и
* управления слотами логической репликации посредством SQL.
*-------------------------------------------------------------------------
*/
#include "qhb.h"
#include <unistd.h>
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "fmgr.h"
#include "funcapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/message.h"
#include "storage/fd.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/regproc.h"
#include "utils/resowner.h"
/* Внутренние данные для выписываемых данных */
typedef struct DecodingOutputState
{
Tuplestorestate *tupstore;
TupleDesc tupdesc;
bool binary_output;
int64 returned_rows;
} DecodingOutputState;
/*
* Подготовка к записи плагина вывода.
*/
static void
LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
bool last_write)
{
resetStringInfo(ctx->out);
}
/*
* Выполнение записи плагина вывода в хранилище кортежей.
*/
static void
LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
bool last_write)
{
Datum values[3];
bool nulls[3];
DecodingOutputState *p;
/* Данные SQL могут быть только ограниченной длины... */
if (ctx->out->len > MaxAllocSize - VARHDRSZ)
elog(ERROR, "too much output for sql interface");
p = (DecodingOutputState *) ctx->output_writer_private;
memset(nulls, 0, sizeof(nulls));
values[0] = LSNGetDatum(lsn);
values[1] = TransactionIdGetDatum(xid);
/*
* Когда мы записываем текстовый вывод, добавление ctx->out происходит в
* кодировке базы данных.
*/
if (!p->binary_output)
Assert(pg_verify_mbstr(GetDatabaseEncoding(),
ctx->out->data, ctx->out->len,
false));
/* неприглядно, но cstring_to_text_with_len прекрасно работает для bytea */
values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
p->returned_rows++;
}
/*
* Функция-помощник для различных вызываемых через SQL функций логического декодирования.
*/
static Datum
pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
{
Name name;
XLogRecPtr upto_lsn;
int32 upto_nchanges;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
XLogRecPtr end_of_wal;
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
ArrayType *arr;
Size ndim;
List *options = NIL;
DecodingOutputState *p;
CheckSlotPermissions();
CheckLogicalDecodingRequirements();
if (PG_ARGISNULL(0))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("slot name must not be null")));
name = PG_GETARG_NAME(0);
if (PG_ARGISNULL(1))
upto_lsn = InvalidXLogRecPtr;
else
upto_lsn = PG_GETARG_LSN(1);
if (PG_ARGISNULL(2))
upto_nchanges = InvalidXLogRecPtr;
else
upto_nchanges = PG_GETARG_INT32(2);
if (PG_ARGISNULL(3))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("options array must not be null")));
arr = PG_GETARG_ARRAYTYPE_P(3);
/* состояние, в котором должен записываться вывод */
p = palloc0(sizeof(DecodingOutputState));
p->binary_output = binary;
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
/* Деконструировать массив параметров */
ndim = ARR_NDIM(arr);
if (ndim > 1)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("array must be one-dimensional")));
}
else if (array_contains_nulls(arr))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("array must not contain nulls")));
}
else if (ndim == 1)
{
int nelems;
Datum *datum_opts;
int i;
Assert(ARR_ELEMTYPE(arr) == TEXTOID);
deconstruct_array_builtin(arr, TEXTOID, &datum_opts, NULL, &nelems);
if (nelems % 2 != 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("array must have even number of elements")));
for (i = 0; i < nelems; i += 2)
{
char *optname = TextDatumGetCString(datum_opts[i]);
char *opt = TextDatumGetCString(datum_opts[i + 1]);
options = lappend(options, makeDefElem(optname, (Node *) makeString(opt), -1));
}
}
InitMaterializedSRF(fcinfo, 0);
p->tupstore = rsinfo->setResult;
p->tupdesc = rsinfo->setDesc;
/*
* Вычисление текущего конца WAL.
*/
if (!RecoveryInProgress())
end_of_wal = GetFlushRecPtr(NULL);
else
end_of_wal = GetXLogReplayRecPtr(NULL);
ReplicationSlotAcquire(NameStr(*name), true);
PG_TRY();
{
/* перезапуск в функции confirmed_flush слота */
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
false,
XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
.segment_close = wal_segment_close),
LogicalOutputPrepareWrite,
LogicalOutputWrite, NULL);
MemoryContextSwitchTo(oldcontext);
/*
* Проверить, записывает ли плагин вывода текстовый вывод, если это то,
* что нам нужно.
*/
if (!binary &&
ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
NameStr(MyReplicationSlot->data.plugin),
format_procedure(fcinfo->flinfo->fn_oid))));
ctx->output_writer_private = p;
/*
* Декодирование WAL должно начаться в restart_lsn, чтобы все
* xact, зафиксированные после выполнения confirmed_flush слота, могли быть
* собраны в буферы переупорядочивания.
*/
XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
/* аннулировать нереверсируемые строки */
InvalidateSystemCaches();
/* Декодировать, пока не кончатся записи */
while (ctx->reader->EndRecPtr < end_of_wal)
{
XLogRecord *record;
char *errm = NULL;
record = XLogReadRecord(ctx->reader, &errm);
if (errm)
elog(ERROR, "could not find record for logical decoding: %s", errm);
/*
* Показанные выше функции обратного вызова {begin_txn,change,commit_txn}_wrapper
* сохранят описание в нашем хранилище кортежей.
*/
if (record != NULL)
LogicalDecodingProcessRecord(ctx, ctx->reader);
/* check limits */
if (upto_lsn != InvalidXLogRecPtr &&
upto_lsn <= ctx->reader->EndRecPtr)
break;
if (upto_nchanges != 0 &&
upto_nchanges <= p->returned_rows)
break;
CHECK_FOR_INTERRUPTS();
}
/*
* Логическое декодирование могло бы затереть CurrentResourceOwner во время
* управления транзакциями, поэтому восстанавливаем значение исполнителя.
* (Это топорно, но прямо сейчас в очистке нет смысла.)
*/
CurrentResourceOwner = old_resowner;
/*
* В следующий раз начнем там, где закончили.
*/
if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
{
LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
/*
* Если изменился только confirmed_flush_lsn, слот не будет помечен
* функцией выше как «грязный». Ожидается, что вызывающие функции в
* интерфейсе передатчика WAL будут отслеживать свой прогресс и не
* нуждаются в том, чтобы его записывать. Но функции, использующие
* SQL-интерфейс, не могут указать собственные начальные позиции, и им
* труднее отслеживать свой прогресс, поэтому мы должны приложить больше
* усилий, чтобы сохранить для них этот прогресс.
*
* «Загрязним» слот, чтобы он записался при следующей контрольной точке.
* Мы все равно потеряем его позицию при сбое (это подтверждено), но это
* лучше, чем постоянно терять эту позицию даже при обычном перезапуске.
*/
ReplicationSlotMarkDirty();
}
/* освобожение контекста, вызов функции обратного вызова для выключения */
FreeDecodingContext(ctx);
ReplicationSlotRelease();
InvalidateSystemCaches();
}
PG_CATCH();
{
/* очистка всех реверсируемых строк */
InvalidateSystemCaches();
PG_RE_THROW();
}
PG_END_TRY();
return (Datum) 0;
}
/*
* Функция SQL, возвращающая поток изменений в виде текста, потребляющая данные.
*/
Datum
pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
{
return pg_logical_slot_get_changes_guts(fcinfo, true, false);
}
/*
* Функция SQL, возвращающая поток изменений в виде текста, только чуть позже.
*/
Datum
pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
{
return pg_logical_slot_get_changes_guts(fcinfo, false, false);
}
/*
* Функция SQL, возвращающая поток изменений в виде двоичных данных, потребляющая данные.
*/
Datum
pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
{
return pg_logical_slot_get_changes_guts(fcinfo, true, true);
}
/*
* Функция SQL, возвращающая поток изменений в виде двоичных данных, только чуть позже.
*/
Datum
pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
{
return pg_logical_slot_get_changes_guts(fcinfo, false, true);
}
/*
* Функция SQL для записи сообщения логического декодирования в WAL.
*/
Datum
pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
{
bool transactional = PG_GETARG_BOOL(0);
char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
bytea *data = PG_GETARG_BYTEA_PP(2);
bool flush = PG_GETARG_BOOL(3);
XLogRecPtr lsn;
lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
transactional, flush);
PG_RETURN_LSN(lsn);
}
Datum
pg_logical_emit_message_text(PG_FUNCTION_ARGS)
{
/* bytea и text совместимы */
return pg_logical_emit_message_bytea(fcinfo);
}
Поддержка синхронной репликации для логического декодирования
Обзор
Логическое декодирование можно применять для создания решений синхронной репликации с тем же пользовательским интерфейсом, что и у синхронной репликации поверх потоковой репликации. Для этого потоковая передача данных должна проводиться через интерфейс потоковой репликации (см. раздел Интерфейс протокола потоковой репликации). Клиенты должны передавать сообщения Standby status update (F) (см. раздел Протокол потоковой репликации), как это делают клиенты потоковой репликации.
Примечание
Синхронная реплика, получающая изменения через логическое декодирование, будет работать в рамках одной базы данных. Поскольку в настоящее время параметр synchronous_standby_names, напротив, устанавливается на уровне сервера, это означает, что данная методика не будет работать корректно при активном использовании нескольких баз данных.
Ограничения применимости
В конфигурации с синхронной репликацией может возникнуть взаимоблокировка, если транзакция в эксклюзивном режиме заблокирует таблицы каталога (в т. ч. пользовательские). Информацию о пользовательских таблицах каталога см. в подразделе Возможности. Это происходит потому, что логическое декодирование транзакций может заблокировать таблицы каталога при обращении к ним. Во избежание этого пользователи должны воздерживаться от эксклюзивной блокировки таблиц каталога (в т. ч. пользовательских). Такую блокировку могут вызвать следующие действия:
-
Установка явной блокировки (с помощью команды
LOCK
) для каталога pg_class в транзакции. -
Выполнение
CLUSTER
для каталога pg_class в транзакции. -
Выполнение
PREPARE TRANSACTION
после командыLOCK
для каталога pg_class, с разрешенным логическим декодированием двухфазных транзакций. -
Выполнение
PREPARE TRANSACTION
после командыCLUSTER
для каталога pg_trigger, с разрешенным логическим декодированием двухфазных транзакций. Это приведет к взаимоблокировке, только если у публикуемой таблицы есть триггер. -
Выполнение
TRUNCATE
для таблицы каталога (в т. ч. пользовательской) в транзакции.
Обратите внимание, что эти команды, способные вызвать взаимоблокировку, могут применяться не только к явно указанным выше таблицам системного каталога, но и к любой другой таблице каталога (в т. ч. пользовательской).
Потоковая передача больших транзакций для логического декодирования
Базовые функции обратного вызова в плагине вывода (например, begin_cb, change_cb, commit_cb и message_cb) вызываются, только когда транзакция действительно фиксируется. Изменения в любом случае декодируются из журнала транзакций, но передаются в плагин вывода только при фиксации (и отбрасываются, если транзакция прерывается).
Это означает, что, хотя декодирование происходит инкрементно и может сбрасывать данные на диск, чтобы контролировать потребление памяти, все декодированные изменения должны передаваться, когда транзакция окончательно фиксируется (или, точнее, когда фиксация декодируется из журнала транзакций). В зависимости от размера транзакции и пропускной способности сети время передачи может значительно увеличить задержку применения.
Чтобы уменьшить задержку применения, вызванную большими транзакциями, плагин вывода может предоставить дополнительную функцию обратного вызова для поддержки инкрементальной потоковой передачи выполняющихся транзакций. Имеется несколько обязательных функций потоковой передачи (stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb и stream_change_cb) и две необязательных (stream_message_cb и stream_truncate_cb). Кроме того, если требуется поддержка потоковой передачи двухфазных комманд, нужно предоставить дополнительные функции обратного вызова. (Подробную информацию см. в разделе Поддержка двухфазной фиксации для логического декодирования.)
При потоковой передаче выполняющейся транзакции изменения (и сообщения) передаются
блоками, границы которых устанавливают функции обратного вызова stream_start_cb
и stream_stop_cb. По завершении передачи всех декодированных изменений
транзакция может быть зафиксирована функцией обратного вызова stream_commit_cb
(или, возможно, прервана функцией stream_abort_cb). Если поддерживается
двухфазная фиксация, транзакция может быть подготовлена функцией обратного вызова
stream_prepare_cb, зафиксирована (COMMIT PREPARED
) функцией обратного вызова
commit_prepared_cb или прервана rollback_prepared_cb.
Например, последовательность вызовов функций обратного вызова потоковой передачи для одной транзакции может выглядеть так:
stream_start_cb(...); <-- начало первого блока изменений
stream_change_cb(...);
stream_change_cb(...);
stream_message_cb(...);
stream_change_cb(...);
...
stream_change_cb(...);
stream_stop_cb(...); <-- конец первого блока изменений
stream_start_cb(...); <-- начало второго блока изменений
stream_change_cb(...);
stream_change_cb(...);
stream_change_cb(...);
...
stream_message_cb(...);
stream_change_cb(...);
stream_stop_cb(...); <-- конец второго блока изменений
[a. при использовании обычной фиксации]
stream_commit_cb(...); <-- фиксация переданной транзакции
[b. при использовании двухфазной фиксации]
stream_prepare_cb(...); <-- подготовка переданной транзакции
commit_prepared_cb(...); <-- фиксация подготовленной транзакции
Разумеется, рабочая последовательность вызовов этих функций может быть более сложной. В ней могут быть блоки нескольких поточно передаваемых транзакций, некоторые транзакции могут прерываться и т. д.
Подобно поведению сброса на диск, потоковая передача запускается, когда общее количество изменений, декодированных из WAL (для всех выполняющихся транзакций), превышает предел, определенный параметром logical_decoding_work_mem. В этот момент выбирается и поточно передается самая большая транзакция верхнего уровня (это определяется по объему памяти, который на данный момент занимают декодированные изменения). Однако в некоторых случаях все равно приходится сбросить данные на диск, даже если включена поточная передача, поскольку порог памяти уже превышен, а кортеж еще полностью не декодирован (например, декодировано добавление только в таблицу TOAST, но не в основную таблицу).
Даже при потоковой передаче больших транзакций изменения по-прежнему применяются в порядке фиксации, сохраняя те же гарантии, что и не в потоковом режиме.
Поддержка двухфазной фиксации для логического декодирования
С базовыми функциями обратного вызова в плагине вывода (например, begin_cb,
change_cb, commit_cb и message_cb) такие команды двухфазной фиксации,
как PREPARE TRANSACTION
, COMMIT PREPARED
и ROLLBACK PREPARED
, не декодируются.
При этом PREPARE TRANSACTION
игнорируется, COMMIT PREPARED
декодируется как
COMMIT
, а ROLLBACK PREPARED
декодируется как ROLLBACK
.
Для поддержки потоковой передачи двухфазных команд плагин вывода должен предоставить дополнительные функции обратного вызова. Имеется несколько обязательных функций для двухфазной фиксации (begin_prepare_cb, prepare_cb, commit_prepared_cb, rollback_prepared_cb и stream_prepare_cb) и одна необязательная (filter_prepare_cb).
Если плагин предоставляет функции обратного вызова для декодирования команд
двухфазной фиксации, то при выполнении PREPARE TRANSACTION
изменения этой
транзакции декодируются и передаются в плагин вывода, и вызывается функция обратного
вызова prepare_cb. Это отличается от базовой схемы декодирования, где изменения
передаются в плагин вывода только при фиксации транзакции. Начало подготовленной
транзакции обозначается вызовом функции обратного вызова begin_prepare_cb.
Когда подготовленная транзакция откатывается командой ROLLBACK PREPARED
,
вызывается функция обратного вызова rollback_prepared_cb, а когда подготовленная
команда фиксируется командой COMMIT PREPARED
, вызывается функция обратного вызова
commit_prepared_cb.
Плагин вывода может дополнительно определять правила фильтрации посредством filter_prepare_cb, чтобы декодировать в две фазы только определенную транзакцию. Этого можно добиться, сопоставляя с некоторым шаблоном в gid или производя поиск по xid.
Пользователям, желающим декодировать подготовленные транзакции, нужно учитывать следующие моменты:
-
Если подготовленная транзакция поставила эксклюзивные блокировки на таблицы каталога (в т. ч. пользовательские), то декодирование подготовки может заблокироваться и оставаться таковым вплоть до фиксации основной транзакции.
-
Решение логической репликации, которое формирует распределенную двухфазную фиксацию с использованием этой функциональности, может получить взаимоблокировку, если подготовленная транзакция поставила эксклюзивные блокировки на таблицы каталога (в т. ч. пользовательские). Во избежание этого в таких транзакциях следует воздержаться от блокировок таблиц каталога (например, явной командой
LOCK
). Подробную информацию см. в подразделе Ограничения применимости.