Фоновые рабочие процессы
QHB можно расширить для выполнения пользовательского кода в отдельных процессах. Такие процессы запускаются, останавливаются и контролируются процессом qhb, что позволяет им тесно связать время существования со статусом сервера. Эти процессы привязаны к области разделяемой памяти QHB и имеют возможность внутренне подключаться к базам данных; кроме того, они могут запускать последовательно несколько транзакций, в точности как обычный серверный процесс, подключенный к клиенту. Также, связываясь с libpq, они могут подключаться к серверу и вести себя как обычное клиентское приложение.
ПРЕДУПРЕЖДЕНИЕ
С использованием фоновых рабочих процессов сопряжены значительные угрозы стабильности и безопасности, поскольку, будучи написанными на языке C/RUST, они имеют неограниченный доступ к данным. Администраторам, желающим применять модули, включающие в себя фоновые рабочие процессы, нужно проявлять крайнюю осторожность. Запускать фоновые рабочие процессы следует разрешать только тщательно проверенным модулям.
Фоновые рабочие процессы можно инициализировать во время запуска QHB путем включения имени модуля в shared_preload_libraries. Модуль, желающий запустить фоновый рабочий процесс, может зарегистрировать его, вызвав функцию RegisterBackgroundWorker(BackgroundWorker *worker) из своей функции _PG_init(). Также фоновые рабочие процессы можно запустить после запуска системы, вызвав функцию RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle). В отличие от RegisterBackgroundWorker, которую можно вызывать только из процесса qhbmaster, RegisterDynamicBackgroundWorker нужно вызывать из обычного обслуживающего процесса или другого фонового рабочего процесса.
Структура BackgroundWorker определяется таким образом:
typedef void (*bgworker_main_type)(Datum main_arg);
typedef struct BackgroundWorker
{
char bgw_name[BGW_MAXLEN];
char bgw_type[BGW_MAXLEN];
int bgw_flags;
BgWorkerStartTime bgw_start_time;
int bgw_restart_time; /* время в секундах или BGW_NEVER_RESTART */
char bgw_library_name[BGW_MAXLEN];
char bgw_function_name[BGW_MAXLEN];
Datum bgw_main_arg;
char bgw_extra[BGW_EXTRALEN];
pid_t bgw_notify_pid;
} BackgroundWorker;
Поля bgw_name и bgw_type — это строки, которые будут использоваться в сообщениях журнала, списках процессов и тому подобных контекстах. Поле bgw_type должно быть одинаковым для всех фоновых процессов одного типа, чтобы, например, можно было сгруппировать такие рабочие процессы в список процессов. С другой стороны, поле bgw_name может содержать дополнительную информацию о конкретном процессе. (Как правило, строка для bgw_name будет некоторым образом содержать тип, но строго это не требуется.)
Поле bgw_flags — это побитовая битовая маска, указывающая возможности, запрашиваемые модулем. Возможные значения:
BGWORKER_SHMEM_ACCESS
Запрашивает доступ к разделяемой памяти. Это обязательный флаг.
BGWORKER_BACKEND_DATABASE_CONNECTION
Запрашивает возможность установить соединение с базой данных, с помощью которого
она потом может выполнять транзакции и запросы. Фоновый рабочий процесс, использующий
BGWORKER_BACKEND_DATABASE_CONNECTION для подключения к базе данных, должен
также подключить разделяемую память с помощью BGWORKER_SHMEM_ACCESS, иначе
рабочий процесс не запустится.
Поле bgw_start_time — это состояние сервера, в течение которого qhb должен запустить процесс; это может быть BgWorkerStart_QhbmasterStart (запускаться сразу после того, как сам qhb завершил собственную инициализацию; запрашивающие это процессы, не подходят для подключений к базе данных), BgWorkerStart_ConsistentState (запускаться сразу после того, как было достигнуто согласованное состояние горячего резерва, позволяя процессам подключаться к базам данных и выполнять запросы только на чтение) и BgWorkerStart_RecoveryFinished (запускаться сразу после того, как система вошла в нормальное состояние чтения-записи). Обратите внимание, что последние два значения равнозначны для сервера, который не находится в режиме горячего резерва. Также обратите внимание, что этот параметр указывает только когда эти процессы должны запускаться; они не будут останавливаться при переходе в другое состояние.
Поле bgw_restart_time — это интервал в секундах, который qhb должен выждать перед перезапуском процесса в случае его сбоя. Это может быть любое положительное значение или флаг BGW_NEVER_RESTART, указывающий не перезапускать процесс в случае сбоя.
Поле bgw_library_name — это имя библиотеки, в которой следует искать начальную точку входа для фонового процесса. Именованная библиотека будет динамически загружена рабочим процессом, а поле bgw_function_name будет использовано для идентификации вызываемой функции. При вызове функции в основном коде в этом поле должно быть установлено значение "qhb".
Поле bgw_function_name — это имя той функции в динамически загружаемой библиотеке, которая должна использоваться в качестве начальной точки входа для нового фонового рабочего процесса. Если эта функция находится в динамически загружаемой библиотеке, она должна быть помечена как PGDLLEXPORT (и не быть помеченной как static).
Поле bgw_main_arg — это аргумент типа Datum для основной функции фонового рабочего процесса. Основная функция должна принимать один аргумент типа Datum и возвращать void. Поле bgw_main_arg будет передано в качестве этого аргумента. Кроме того, глобальная переменная MyBgworkerEntry указывает на копию структуры BackgroundWorker, переданную во время регистрации; рабочему процессу может быть полезно проверить эту структуру.
Везде, где определяется EXEC_BACKEND, или в динамических рабочих процессах, небезопасно передавать Datum по ссылке, нужно передавать его по значению. Если требуется аргумент, безопаснее всего будет передать int32 или другое маленькое значение и использовать его как индекс в массиве, размещенном в разделяемой памяти. Если же передается значение типа cstring или text, то этот указатель не будет работать в новом фоновом рабочем процессе.
Поле bgw_extra может содержать дополнительные данные для передачи фоновому рабочему процессу. В отличие от bgw_main_arg, эти данные не передаются в качестве аргумента основной функции рабочего процесса, но к ним можно получить доступ через MyBgworkerEntry, как было сказано выше.
Поле bgw_notify_pid — это PID обслуживающего процесса QHB, которому qhbmaster должен отправить SIGUSR1 при запуске или окончании этого рабочего процесса. Оно должно содержать 0 для рабочих процессов, зарегистрированных во время запуска qhbmaster или когда обслуживающий процесс, регистрирующий рабочий процесс, не хочет ждать его запуска. В противном случае его следует инициализировать как MyProcPid.
После запуска процесс может подключиться к базе данных, вызвав функцию BackgroundWorkerInitializeConnection(char *dbname, char *username, uint32 flags) или BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags). Это позволяет процессу выполнять транзакции и запросы с использованием интерфейса SPI. Если аргумент dbname равен NULL или dboid имеет значение InvalidOid, сеанс не подключен к какой-либо конкретной базе данных, но может обращаться к разделяемым каталогам. Если аргумент username равен NULL или useroid имеет значение InvalidOid, процесс будет запускаться от имени суперпользователя, созданного во время qhb_bootstrap (или initdb). Если в качестве flags указано BGWORKER_BYPASS_ALLOWCONN, можно обойти ограничение на подключение к базам данных, не допускающим подключения от пользователей. Фоновый рабочий процесс может вызвать только одну из этих двух функций и только один раз. Переключаться между базами данных невозможно.
Сигналы изначально блокируются, когда управление достигает основной функции фонового рабочего процесса, и должны быть разблокированы ей; это позволяет процессу при необходимости настраивать свои обработчики сигналов. В новом процессе сигналы могут быть разблокированы путем вызова BackgroundWorkerUnblockSignals и заблокированы путем вызова BackgroundWorkerBlockSignals.
Если поле bgw_restart_time для фонового рабочего процесса сконфигурировано как BGW_NEVER_RESTART, или если он завершается с кодом завершения 0 или уничтожается TerminateBackgroundWorker, после выхода он автоматически перестанет регистрироваться процессом qhbmaster. В противном случае он будет перезапущен по истечении периода времени, сконфигурированного посредством bgw_restart_time, или немедленно, если qhbmaster повторно инициализирует кластер из-за сбоя обслуживающего процесса. Для обслуживающих процессов, выполнение которых должно приостановиться лишь на время, вместо выхода следует использовать прерываемый режим ожидания; это можно сделать путем вызова WaitLatch(). Убедитесь, что при вызове этой функции установлен флаг WL_QHBMASTER_DEATH, и проверьте код завершения для быстрого выхода в экстренной ситуации, когда был уничтожен сам qhb.
Когда фоновый рабочий процесс регистрируется с помощью функции RegisterDynamicBackgroundWorker, обслуживающий процесс, выполняющий регистрацию, может получить информацию о его статусе. Обслуживающие процессы, желающие сделать это, должны передать в качестве второго аргумента RegisterDynamicBackgroundWorker адрес BackgroundWorkerHandle *. Если процесс успешно зарегистрирован, этот указатель будет инициализирован со скрытым обработчиком, который впоследствии можно будет передать в GetBackgroundWorkerPid(BackgroundWorkerHandle *, pid_t *) или в TerminateBackgroundWorker(BackgroundWorkerHandle *). GetBackgroundWorkerPid можно использовать для опроса статуса рабочего процесса: возвращаемое значение BGWH_NOT_YET_STARTED указывает, что процесс еще не был запущен qhbmaster, BGWH_STOPPED — что он был запущен, но больше не работает, а BGWH_STARTED — что он в данный момент работает. В последнем случае через второй аргумент также будет возвращен PID. TerminateBackgroundWorker заставляет qhbmaster отправлять рабочему процессу SIGTERM, если он работает, и отменять его регистрацию, как только он завершится.
В некоторых случаях может понадобиться, чтобы процесс, регистрирующий фоновый рабочий процесс, дождался его запуска. Это можно сделать путем инициализации bgw_notify_pid для MyProcPid и последующей передачи BackgroundWorkerHandle *, полученного во время регистрации, в WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t*). Эта функция будет блокировать выполнение до тех пор, пока qhbmaster не попытается запустить фоновый рабочий процесс или не прекратит работу. Если фоновый процесс работает, будет возвращено значение BGWH_STARTED, а PID будет записан по указанному адресу. В противном случае будет возвращено значение BGWH_STOPPED или BGWH_QHBMASTER_DIED.
Процесс также может ожидать выключения фонового рабочего процесса, используя функцию WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle) и передавая BackgroundWorkerHandle *, полученный при регистрации. Эта функция будет блокировать выполнение до тех пор, пока не завершится либо фоновый рабочий процесс, либо qhbmaster. Когда завершает работу фоновый процесс, будет возвращено значение BGWH_STOPPED, если завершается qhbmaster, она вернет BGWH_QHBMASTER_DIED.
Фоновые рабочие процессы могут отправлять асинхронные уведомления либо с помощью
команды NOTIFY через SPI, либо напрямую вызвав функцию Async_Notify(). Такие
уведомления будут передаваться при фиксации транзакции. Фоновые рабочие процессы
не должны регистрироваться командой LISTEN для получения асинхронных уведомлений,
поскольку для них не предусмотрена инфраструктура, позволяющая получать подобные
уведомления.
Максимальное количество регистрируемых фоновых рабочих процессов ограничивается параметром max_worker_processes.
Пример фонового рабочего процесса
Ниже приводится пример кода фонового рабочего процесса, демонстрирующий различные шаблоны: установку подключения к базе данных, запуск и фиксацию транзакций, использование переменных GUC и отклик на сигнал SIGHUP перечитать файл конфигурации, передачу информации в pg_stat_activity, использование самоблокировки процесса для перехода в режим ожидания и для завершения работы в случае уничтожения qhbmaster.
Этот код подключается к базе данных, создает схему и таблицу и суммирует содержащиеся в ней числа. Чтобы понаблюдать за его работой, добавьте начальное значение с «общим» типом и некоторое начальное значение, затем добавьте еще несколько строк с типом «дельта». Строки дельта будут удалены этим рабочим процессом, а их значения агрегированы в общие.
#include "qhb.h"
/* Эти заголовки требуются для фонового рабочего процесса всегда */
#include "miscadmin.h"
#include "qhbmaster/bgworker.h"
#include "qhbmaster/interrupt.h"
#include "storage/latch.h"
/* эти заголовки используются этим конкретным кодом рабочего процесса */
#include "access/xact.h"
#include "commands/dbcommands.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(worker_spi_launch);
PGDLLEXPORT pg_noreturn void worker_spi_main(Datum main_arg);
/* переменные GUC */
static int worker_spi_naptime = 10;
static int worker_spi_total_workers = 2;
static char *worker_spi_database = NULL;
static char *worker_spi_role = NULL;
/* кешированное значение, извлеченное из разделяемой памяти */
static uint32 worker_spi_wait_event_main = 0;
typedef struct worktable
{
const char *schema;
const char *name;
} worktable;
/*
* Инициализировать рабочее пространство для рабочего процесса: создать схему,
* если она не существует.
*/
static void
initialize_worker_spi(worktable *table)
{
int ret;
int ntup;
bool isnull;
StringInfoData buf;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
/* XXX можем ли мы использовать CREATE SCHEMA IF NOT EXISTS? */
initStringInfo(&buf);
appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
table->schema);
debug_query_string = buf.data;
ret = SPI_execute(buf.data, true, 0);
if (ret != SPI_OK_SELECT)
elog(FATAL, "SPI_execute failed: error code %d", ret);
if (SPI_processed != 1)
elog(FATAL, "not a singleton result");
ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (isnull)
elog(FATAL, "null result");
if (ntup == 0)
{
debug_query_string = NULL;
resetStringInfo(&buf);
appendStringInfo(&buf,
"CREATE SCHEMA \"%s\" "
"CREATE TABLE \"%s\" ("
" type text CHECK (type IN ('total', 'delta')), "
" value integer)"
"CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
"WHERE type = 'total'",
table->schema, table->name, table->name, table->name);
/* установить время запуска оператора */
SetCurrentStatementStartTimestamp();
debug_query_string = buf.data;
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UTILITY)
elog(FATAL, "failed to create my schema");
debug_query_string = NULL; /* остальное не специфично для оператора */
}
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
debug_query_string = NULL;
pgstat_report_activity(STATE_IDLE, NULL);
}
void
worker_spi_main(Datum main_arg)
{
int index = DatumGetInt32(main_arg);
worktable *table;
StringInfoData buf;
char name[20];
Oid dboid;
Oid roleoid;
char *p;
bits32 flags = 0;
table = palloc(sizeof(worktable));
sprintf(name, "schema%d", index);
table->schema = pstrdup(name);
table->name = pstrdup("counted");
/* извлечь идентификаторы базы данных и роли; они устанавливаются для динамического рабочего процесса */
p = MyBgworkerEntry->bgw_extra;
memcpy(&dboid, p, sizeof(Oid));
p += sizeof(Oid);
memcpy(&roleoid, p, sizeof(Oid));
p += sizeof(Oid);
memcpy(&flags, p, sizeof(bits32));
/* Прежде чем разблокировать сигналы, установить обработчики сигналов. */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
/* Теперь мы готовы получать сигналы */
BackgroundWorkerUnblockSignals();
/* Подключиться к нашей базе данных */
if (OidIsValid(dboid))
BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
else
BackgroundWorkerInitializeConnection(worker_spi_database,
worker_spi_role, flags);
elog(LOG, "%s initialized with %s.%s",
MyBgworkerEntry->bgw_name, table->schema, table->name);
initialize_worker_spi(table);
/*
* Заключить в кавычки переданные нам идентификаторы. Обратите внимание, что это
* нужно сделать после вызова initialize_worker_spi, поскольку эта подпрограмма
* предполагает, что имена не заключаются в кавычки.
*
* Обратите внимание, что здесь могут возникнуть небольшие утечки памяти.
*/
table->schema = quote_identifier(table->schema);
table->name = quote_identifier(table->name);
initStringInfo(&buf);
appendStringInfo(&buf,
"WITH deleted AS (DELETE "
"FROM %s.%s "
"WHERE type = 'delta' RETURNING value), "
"total AS (SELECT coalesce(sum(value), 0) as sum "
"FROM deleted) "
"UPDATE %s.%s "
"SET value = %s.value + total.sum "
"FROM total WHERE type = 'total' "
"RETURNING %s.value",
table->schema, table->name,
table->schema, table->name,
table->name,
table->name);
/*
* Основной цикл: повторять эту операцию до получения сигнала SIGTERM и его обработки
* ProcessInterrupts.
*/
for (;;)
{
int ret;
/* В первый раз выделить или получить пользовательское событие ожидания */
if (worker_spi_wait_event_main == 0)
worker_spi_wait_event_main = WaitEventExtensionNew("WorkerSpiMain");
/*
* Фоновые рабочие процессы не должны вызывать usleep() или любую непосредственно
* равнозначную ей функцию:
* вместо этого они могут подождать своей самоблокировки, которая при необходимости
* бездействует, но активируется при уничтожении qhbmaster. Таким образом, в
* аварийной ситуации фоновый процесс немедленно заканчивается.
*/
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
worker_spi_naptime * 1000L,
worker_spi_wait_event_main);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/*
* В случае SIGHUP просто перезагрузить конфигурацию.
*/
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* Начать транзакцию, в которой мы можем выполнять запросы. Обратите внимание,
* что каждому вызову StartTransactionCommand() должен предшествовать вызов
* SetCurrentStatementStartTimestamp(), которая устанавливает время для
* оператора, который мы собираемся вы полнить, а также время начала
* транзакции. Кроме того, любому другому запросу, передаваемому SPI, вероятно,
* должен предшествовать вызов SetCurrentStatementStartTimestamp(), чтобы время
* запуска оператора всегда было актуальным.
*
* Вызов SPI_connect() позволяет нам выполнять запросы через менеджера SPI,
* а вызовы PushActiveSnapshot() создают «активный» снимок состояния,
* который необходим, чтобы запросы могли работать с данными MVCC.
*
* Благодаря вызову pgstat_report_activity() наша активность становится видимой
* через представления pgstat.
*/
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
debug_query_string = buf.data;
pgstat_report_activity(STATE_RUNNING, buf.data);
/* Теперь мы можем выполнять запросы посредством SPI */
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UPDATE_RETURNING)
elog(FATAL, "cannot select from table %s.%s: error code %d",
table->schema, table->name, ret);
if (SPI_processed > 0)
{
bool isnull;
int32 val;
val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (!isnull)
elog(LOG, "%s: count in %s.%s is now %d",
MyBgworkerEntry->bgw_name,
table->schema, table->name, val);
}
/*
* И закончим нашу транзакцию.
*/
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
debug_query_string = NULL;
pgstat_report_stat(true);
pgstat_report_activity(STATE_IDLE, NULL);
}
/* Недостижимо */
}
/*
* Точка входа этого модуля.
*
* Здесь мы регистрируем несколько рабочих процессов, чтобы продемонстрировать,
* как это можно сделать.
*/
void
_PG_init(void)
{
BackgroundWorker worker;
/* получить конфигурацию */
/*
* Эти GUC определяются, даже если эта библиотека не загружается при помощи
* shared_preload_libraries, для worker_spi_launch().
*/
DefineCustomIntVariable("worker_spi.naptime",
"Duration between each check (in seconds).",
NULL,
&worker_spi_naptime,
10,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("worker_spi.database",
"Database to connect to.",
NULL,
&worker_spi_database,
"qhb",
PGC_SIGHUP,
0,
NULL, NULL, NULL);
DefineCustomStringVariable("worker_spi.role",
"Role to connect with.",
NULL,
&worker_spi_role,
NULL,
PGC_SIGHUP,
0,
NULL, NULL, NULL);
if (!process_shared_preload_libraries_in_progress)
return;
DefineCustomIntVariable("worker_spi.total_workers",
"Number of workers.",
NULL,
&worker_spi_total_workers,
2,
1,
100,
PGC_QHBMASTER,
0,
NULL,
NULL,
NULL);
MarkGUCPrefixReserved("worker_spi");
/* установить общие данные для всех наших рабочих процессов */
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
sprintf(worker.bgw_library_name, "worker_spi");
sprintf(worker.bgw_function_name, "worker_spi_main");
worker.bgw_notify_pid = 0;
/*
* Теперь вносим данные, специфичные для каждого процесса, и выполняем
* фактические регистрации.
*
* При необходимости bgw_extra может включать OID базы данных, OID роли и набор
* флагов. Здесь она оставлена пустой, чтобы вернуться к связанным GUC при
* запуске (0 для флагов фонового рабочего процесса).
*/
for (int i = 1; i <= worker_spi_total_workers; i++)
{
snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
worker.bgw_main_arg = Int32GetDatum(i);
RegisterBackgroundWorker(&worker);
}
}
/*
* Динамически запустить рабочий процесс SPI.
*/
Datum
worker_spi_launch(PG_FUNCTION_ARGS)
{
int32 i = PG_GETARG_INT32(0);
Oid dboid = PG_GETARG_OID(1);
Oid roleoid = PG_GETARG_OID(2);
BackgroundWorker worker;
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
pid_t pid;
char *p;
bits32 flags = 0;
ArrayType *arr = PG_GETARG_ARRAYTYPE_P(3);
Size ndim;
int nelems;
Datum *datum_flags;
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
sprintf(worker.bgw_library_name, "worker_spi");
sprintf(worker.bgw_function_name, "worker_spi_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
worker.bgw_main_arg = Int32GetDatum(i);
/* установить bgw_notify_pid, чтобы мы могли использовать WaitForBackgroundWorkerStartup */
worker.bgw_notify_pid = MyProcPid;
/* извлечь флаги, если таковые имеются */
ndim = ARR_NDIM(arr);
if (ndim > 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("flags array must be one-dimensional")));
if (array_contains_nulls(arr))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("flags array must not contain nulls")));
Assert(ARR_ELEMTYPE(arr) == TEXTOID);
deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
for (i = 0; i < nelems; i++)
{
char *optname = TextDatumGetCString(datum_flags[i]);
if (strcmp(optname, "ALLOWCONN") == 0)
flags |= BGWORKER_BYPASS_ALLOWCONN;
else if (strcmp(optname, "ROLELOGINCHECK") == 0)
flags |= BGWORKER_BYPASS_ROLELOGINCHECK;
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("incorrect flag value found in array")));
}
/*
* Зарегистрировать базу данных и роль, чтобы использовать их для рабочего
* процесса, запущенного в bgw_extra. Если они не были предоставлены, она
* вернется к GUC при запуске.
*/
if (!OidIsValid(dboid))
dboid = get_database_oid(worker_spi_database, false);
/*
* по умолчанию worker_spi_role равен NULL, так что в этом случае
* worker_spi_main() передается недопустимый OID.
*/
if (!OidIsValid(roleoid) && worker_spi_role)
roleoid = get_role_oid(worker_spi_role, false);
p = worker.bgw_extra;
memcpy(p, &dboid, sizeof(Oid));
p += sizeof(Oid);
memcpy(p, &roleoid, sizeof(Oid));
p += sizeof(Oid);
memcpy(p, &flags, sizeof(bits32));
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
PG_RETURN_NULL();
status = WaitForBackgroundWorkerStartup(handle, &pid);
if (status == BGWH_STOPPED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not start background process"),
errhint("More details may be available in the server log.")));
if (status == BGWH_QHBMASTER_DIED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("cannot start background processes without qhbmaster"),
errhint("Kill all remaining database processes and restart the database.")));
Assert(status == BGWH_STARTED);
PG_RETURN_INT32(pid);
}