Модули архивирования

QHB предоставляет инфраструктуру для создания пользовательских модулей для непрерывного архивирования (см. раздел Непрерывное архивирование и восстановление на момент времени (PITR)). Хотя архивировать с помощью команды оболочки (т. е. archive_command) гораздо проще, пользовательский модуль архивирования зачастую будет гораздо более производительным и устойчивым к ошибкам.

Если задать в параметре archive_library библиотеку архивирования пользовательского модуля, QHB будет передавать завершенные файлы WAL этому модулю, а сервер будет избегать повторного использования или удаления этих файлов WAL до тех пор, пока модуль не просигнализирует, что файлы были успешно заархивированы. По большому счету модуль сам решает, что делать с каждым файлом WAL, но многие рекомендации перечислены в подразделе Настройка архивирования WAL.

Модули архивирования должны состоять как минимум из функции инициализации (см. раздел Функции инициализации) и требуемых функций обратного вызова (см. раздел Функции обратного вызова модулей архивирования). Тем не менее модулям архивирования разрешено также делать гораздо большее (например, объявлять GUC (Global User Configuration, серверные конфигурационные параметры) и регистрировать фоновые рабочие процессы).



Функции инициализации

Библиотека архивирования загружается путем динамической загрузки разделяемой библиотеки с базовым именем, указанным в archive_library. Для нахождения библиотеки используется обычный путь поиска библиотек. Чтобы предоставить необходимые функции обратного вызова модуля архивирования и показать, что библиотека в действительности является модулем архивирования, она должна предоставить функцию с именем _PG_archive_module_init. Этой функции передается структура, заполняемая указателями на функции обратного вызова для отдельных действий.

typedef struct ArchiveModuleCallbacks
{
    ArchiveCheckConfiguredCB check_configured_cb;
    ArchiveFileCB archive_file_cb;
    ArchiveShutdownCB shutdown_cb;
} ArchiveModuleCallbacks;
typedef void (*ArchiveModuleInit) (struct ArchiveModuleCallbacks *cb);

Обязательной является только функция обратного вызова archive_file_cb. Остальные функции необязательны.



Функции обратного вызова модулей архивирования

Функции обратного вызова архивирования определяют фактическое поведение модуля при архивировании. Сервер будет вызывать их по мере надобности для обработки каждого отдельного файла WAL.

Функция обратного вызова для проверки

Функция обратного вызова check_configured_cb вызывается, чтобы определить, полностью ли настроен модуль и готов ли он принимать файлы WAL (например, что в его параметрах конфигурации установлены допустимые значения). Если функция check_configured_cb не определена, сервер всегда предполагает, что модуль настроен.

typedef bool (*ArchiveCheckConfiguredCB) (void);

Если возвращается true, сервер перейдет к архивированию файла путем вызова функции обратного вызова archive_file_cb. Если возвращается false, архивирование не выполняется, и архиватор выдаст в журнал сервера следующее сообщение:

WARNING:  archive_mode enabled, yet archiving is not configured
-- ПРЕДУПРЕЖДЕНИЕ: включен archive_mode, но архивирование не настроено

В последнем случае сервер будет периодически вызывать эту функцию, и архивирование начнется, только когда она вернет true.


Функция обратного вызова для архивирования

Функция обратного вызова archive_file_cb вызывается для архивирования одного файла WAL.

typedef bool (*ArchiveFileCB) (const char *file, const char *path);

Если возвращается true, сервер переходит к следующему этапу, как если бы файл был успешно заархивирован, то есть может переработать или удалить исходный файл WAL. Если возвращается false, сервер сохранит оригинальный файл WAL и повторит попытку архивирования позже. Аргумент file будет содержать только имя архивируемого файла WAL, тогда как аргумент path содержит полный путь к файлу WAL (в том числе имя файла).


Функция обратного вызова для выключения

Функция обратного вызова shutdown_cb вызывается, когда завершается процесс архиватора (например, после ошибки) или изменяется значение archive_library. Если функция shutdown_cb не определена, в таких ситуациях никакие специальные действия не предпринимаются.

typedef void (*ArchiveShutdownCB) (void);


Пример

В следующем примере продемонстрирована реализация базовой библиотеки архивирования, примерно равнозначная следующей команде оболочки:

test ! -f /путь/к/месту/назначения && cp /путь/к/источнику /путь/к/месту/назначения

Первым существенным отличием этого модуля от приведенной выше команды является то, что модуль сначала копирует файл во временное место назначения, синхронизирует его с диском, а затем надежно перемещает его в конечное место назначение. Другим существенным отличием является то, что если /путь/к/месту/назначения уже существует, но его содержимое идентично таковому /пути/к/источнику, архивирование будет успешным, тогда как команда выше завершится ошибкой. Это позволяет избежать проблем в случае, если файл успешно архивируется, а затем, прежде чем произведется надежная запись об этом успехе, в системе происходит сбой.

#include "qhb.h"

#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>

#include "archive/archive_module.h"
#include "common/int.h"
#include "miscadmin.h"
#include "storage/copydir.h"
#include "storage/fd.h"
#include "utils/guc.h"
#include "utils/memutils.h"

PG_MODULE_MAGIC;

typedef struct BasicArchiveData
{
   MemoryContext context;
} BasicArchiveData;

static char *archive_directory = NULL;

static void basic_archive_startup(ArchiveModuleState *state);
static bool basic_archive_configured(ArchiveModuleState *state);
static bool basic_archive_file(ArchiveModuleState *state, const char *file, const char *path);
static void basic_archive_file_internal(const char *file, const char *path);
static bool check_archive_directory(char **newval, void **extra, GucSource source);
static bool compare_files(const char *file1, const char *file2);
static void basic_archive_shutdown(ArchiveModuleState *state);

static const ArchiveModuleCallbacks basic_archive_callbacks = {
   .startup_cb = basic_archive_startup,
   .check_configured_cb = basic_archive_configured,
   .archive_file_cb = basic_archive_file,
   .shutdown_cb = basic_archive_shutdown
};

/*
 * _PG_init
 *
 * Определяет GUC модуля.
 */
void
_PG_init(void)
{
   DefineCustomStringVariable("basic_archive.archive_directory",
                              gettext_noop("Archive file destination directory."),
                              NULL,
                              &archive_directory,
                              "",
                              PGC_SIGHUP,
                              0,
                              check_archive_directory, NULL, NULL);

   MarkGUCPrefixReserved("basic_archive");
}

/*
 * _PG_archive_module_init
 *
 * Возвращает имеющиеся в модуле функции обратного вызова для архивирования.
 */
const ArchiveModuleCallbacks *
_PG_archive_module_init(void)
{
   return &basic_archive_callbacks;
}

/*
 * basic_archive_startup
 *
 * Создает контекст памяти модуля.
 */
void
basic_archive_startup(ArchiveModuleState *state)
{
   BasicArchiveData *data;

   data = (BasicArchiveData *) MemoryContextAllocZero(TopMemoryContext,
                                                        sizeof(BasicArchiveData));
   data->context = AllocSetContextCreate(TopMemoryContext,
                                         "basic_archive",
                                         ALLOCSET_DEFAULT_SIZES);
   state->private_data = (void *) data;
}

/*
 * check_archive_directory
 *
 * Проверяет, что предоставленный архивный каталог существует.
 */
static bool
check_archive_directory(char **newval, void **extra, GucSource source)
{
   struct stat st;

     /*
      * Значение по умолчанию — пустая строка, поэтому нам приходится принять это значение.
      * Наша функция обратного вызова check_configured также проверяет это и не
      * позволяет перейти к архивированию, если строка по-прежнему пуста.
      */
   if (*newval == NULL || *newval[0] == '\0')
       return true;

   /*
    * Убедимся, что путь к файлу не будет слишком длинным.  Документы показывают,
    * что имена архивируемых файлов могут иметь длину до 64 символов.
    */
   if (strlen(*newval) + 64 + 2 >= MAXPGPATH)
   {
       GUC_check_errdetail("Archive directory too long.");
       return false;
   }

   /*
    * Проводим базовую проверку работоспособности — что указанный архивный каталог
    * существует. В будущем он может быть удален, поэтому нам все равно нужно быть
    * готовыми к тому, что он не существует в актуальной логике архивирования.
    */
   if (stat(*newval, &st) != 0 || !S_ISDIR(st.st_mode))
   {
       GUC_check_errdetail("Specified archive directory does not exist.");
       return false;
   }

   return true;
}

/*
 * basic_archive_configured
 *
 * Проверяет, что каталог archive_directory не пуст.
 */
static bool
basic_archive_configured(ArchiveModuleState *state)
{
   return archive_directory != NULL && archive_directory[0] != '\0';
}

/*
 * basic_archive_file
 *
 * Архивирует один файл.
 */
static bool
basic_archive_file(ArchiveModuleState *state, const char *file, const char *path)
{
   sigjmp_buf  local_sigjmp_buf;
   MemoryContext oldcontext;
   BasicArchiveData *data = (BasicArchiveData *) state->private_data;
   MemoryContext basic_archive_context = data->context;

   /*
    * Мы выполняем basic_archive_file_internal() в собственном контексте памяти,
    * чтобы его можно было с легкостью сбросить во время восстановления после
    * ошибки (тем самым избежав утечек памяти).
    */
   oldcontext = MemoryContextSwitchTo(basic_archive_context);

   /*
    * Поскольку архиватор оперирует на самом нижнем уровне стека исключений, ошибки
    * уровня ERROR превращаются в FATAL и приводят к перезапуску процесса архиватора.
    * Однако при использовании в случае проблем функции ereport(ERROR, ...) их легко
    * кодировать и решать.  Таким образом, мы создает свой обработчик исключений,
    * чтобы ловить ошибки ERROR и возвращать false, вместо того чтобы перезапускать
    * архиватор при каждой неудаче.
    */
   if (sigsetjmp(local_sigjmp_buf, 1) != 0)
   {
       /* Поскольку мы не используем PG_TRY, нужно сбросить стек ошибок вручную */
       error_context_stack = NULL;

       /* Предотвращаем прерывания во время очистки */
       HOLD_INTERRUPTS();

       /* Сообщаем об ошибке и очищаем ErrorContext для следующего раза */
       EmitErrorReport();
       FlushErrorState();

       /* Закрываем все файлы, оставленные открытыми copy_file() или compare_files() */
       AtEOSubXact_Files(false, InvalidSubTransactionId, InvalidSubTransactionId);

       /* Сбрасываем наш контекст памяти и опять переключаемся на оригинальный */
       MemoryContextSwitchTo(oldcontext);
       MemoryContextReset(basic_archive_context);

       /* Удаляем наш обработчик исключений */
       PG_exception_stack = NULL;

       /* Теперь мы снова можем разрешить прерывания */
       RESUME_INTERRUPTS();

       /* Сообщаем об ошибке, чтобы архиватор снова попытался обработать этот файл */
       return false;
   }

   /* Включаем наш обработчик исключений */
   PG_exception_stack = &local_sigjmp_buf;

   /* Архивируем файл! */
   basic_archive_file_internal(file, path);

   /* Удаляем наш обработчик исключений */
   PG_exception_stack = NULL;

   /* Сбрасываем наш контекст памяти и опять переключаемся на оригинальный */
   MemoryContextSwitchTo(oldcontext);
   MemoryContextReset(basic_archive_context);

   return true;
}

static void
basic_archive_file_internal(const char *file, const char *path)
{
   char        destination[MAXPGPATH];
   char        temp[MAXPGPATH + 256];
   struct stat st;
   struct timeval tv;
   uint64      epoch;          /* миллисекунды*/

   ereport(DEBUG3,
           (errmsg("archiving \"%s\" via basic_archive", file)));

   snprintf(destination, MAXPGPATH, "%s/%s", archive_directory, file);

   /*
    * Сначала проверяем, не архивировался ли уже этот файл. Если он уже существует
    * и имеет то же содержимое, что и файл, который мы пытаемся заархивировать,
    * мы можем вернуть успешный результат (убедившись, что файл хранится на диске).
    * Такой сценарий возможен, если сбой на сервере произошел после архивирования
    * файла но до переименования его файла .ready в .done.
    *
    * Если архивный файл уже существует, но у него другое содержимое,
    * что-то может быть не так, поэтому мы просто выдадим ошибку.
    */
   if (stat(destination, &st) == 0)
   {
       if (compare_files(path, destination))
       {
           ereport(DEBUG3,
                   (errmsg("archive file \"%s\" already exists with identical contents",
                           destination)));

           fsync_fname(destination, false);
           fsync_fname(archive_directory, true);

           return;
       }

       ereport(ERROR,
               (errmsg("archive file \"%s\" already exists", destination)));
   }
   else if (errno != ENOENT)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not stat file \"%s\": %m", destination)));

   /*
    * Выберем достаточно уникальное имя для временного файла, чтобы исключить
    * вероятность конфликта. Это помогает избежать проблем в случае, если временный
    * файл не был удален после сбоя или если так вышло, что другой сервер архивирует
    * файлы в тот же каталог.
    */
   gettimeofday(&tv, NULL);
   if (pg_mul_u64_overflow((uint64) 1000, (uint64) tv.tv_sec, &epoch) ||
       pg_add_u64_overflow(epoch, (uint64) (tv.tv_usec / 1000), &epoch))
       elog(ERROR, "could not generate temporary file name for archiving");

   snprintf(temp, sizeof(temp), "%s/%s.%s.%d." UINT64_FORMAT,
            archive_directory, "archtemp", file, MyProcPid, epoch);

   /*
    * Копируем файл во временное место назначения. Обратите внимание, что эта
    * операция завершится ошибкой, если temp уже существует.
    */
   copy_file(path, temp);

   /*
    * Синхронизируем временный файл с диском и переносим его в конечное место назначения.
    * Обратите внимание, что это перепишет любой уже существующий файл, но это возможно,
    * только если кто-то создал этот файл с момента выполнения вышеуказанной функции stat().
    */
   (void) durable_rename(temp, destination, ERROR);

   ereport(DEBUG1,
           (errmsg("archived \"%s\" via basic_archive", file)));
}

/*
 * compare_files
 *
 * Возвращает сведения о том, совпадает ли содержимое файлов.
 */
static bool
compare_files(const char *file1, const char *file2)
{
#define CMP_BUF_SIZE (4096)
   char        buf1[CMP_BUF_SIZE];
   char        buf2[CMP_BUF_SIZE];
   int         fd1;
   int         fd2;
   bool        ret = true;

   fd1 = OpenTransientFile(file1, O_RDONLY | PG_BINARY);
   if (fd1 < 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not open file \"%s\": %m", file1)));

   fd2 = OpenTransientFile(file2, O_RDONLY | PG_BINARY);
   if (fd2 < 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not open file \"%s\": %m", file2)));

   for (;;)
   {
       int         nbytes = 0;
       int         buf1_len = 0;
       int         buf2_len = 0;

       while (buf1_len < CMP_BUF_SIZE)
       {
           nbytes = read(fd1, buf1 + buf1_len, CMP_BUF_SIZE - buf1_len);
           if (nbytes < 0)
               ereport(ERROR,
                       (errcode_for_file_access(),
                        errmsg("could not read file \"%s\": %m", file1)));
           else if (nbytes == 0)
               break;

           buf1_len += nbytes;
       }

       while (buf2_len < CMP_BUF_SIZE)
       {
           nbytes = read(fd2, buf2 + buf2_len, CMP_BUF_SIZE - buf2_len);
           if (nbytes < 0)
               ereport(ERROR,
                       (errcode_for_file_access(),
                        errmsg("could not read file \"%s\": %m", file2)));
           else if (nbytes == 0)
               break;

           buf2_len += nbytes;
       }

       if (buf1_len != buf2_len || memcmp(buf1, buf2, buf1_len) != 0)
       {
           ret = false;
           break;
       }
       else if (buf1_len == 0)
           break;
   }

   if (CloseTransientFile(fd1) != 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not close file \"%s\": %m", file1)));

   if (CloseTransientFile(fd2) != 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not close file \"%s\": %m", file2)));

   return ret;
}

/*
 * basic_archive_shutdown
 *
 * Освобождает наше выделенное состояние.
 */
static void
basic_archive_shutdown(ArchiveModuleState *state)
{
   BasicArchiveData *data = (BasicArchiveData *) state->private_data;
   MemoryContext basic_archive_context;

   /*
    * Если мы не сохранили указатель на наше выделенное состояние, нам не нужно
    * ничего очищать.
    */
   if (data == NULL)
       return;

   basic_archive_context = data->context;
   Assert(CurrentMemoryContext != basic_archive_context);

   if (MemoryContextIsValid(basic_archive_context))
       MemoryContextDelete(basic_archive_context);
   data->context = NULL;

   /*
    * Наконец освобождаем состояние.
    */
   pfree(data);
   state->private_data = NULL;
}