basic_archive — пример модуля архивирования WAL

basic_archive является примером модуля архивирования. Этот модуль копирует файлы завершенных сегментов WAL в заданный каталог. Возможно, это не особенно полезно, но такой модуль может служить отправной точкой в разработке собственного модуля архивирования. Дополнительную информацию о модулях архивирования см. в главе Модули архивирования.

Чтобы этот модуль работал, его нужно загрузить посредством archive_library, а также включить archive_mode.


Параметры конфигурации

basic_archive.archive_directory (string)
Каталог, куда сервер должен копировать файлы сегментов WAL. Этот каталог уже должен существовать. Если задана пустая строка (по умолчанию), архивирование WAL по сути останавливается, но если включен archive_mode, сервер будет накапливать файлы сегментов WAL, ожидая, что вскоре значение будет предоставлено.

Эти параметры следует устанавливать в файле qhb.conf. Обычное применение выглядит следующим образом:

# qhb.conf
archive_mode = 'on'
archive_library = 'basic_archive'
basic_archive.archive_directory = '/путь/к/каталогу/архива'

Примечания

При сбоях сервера в каталоге архива могут оставаться временные файлы с префиксом archtemp. Рекомендуется удалять такие файлы перед перезапуском сервера после сбоя. Подобные файлы можно удалять и во время работы сервера, если они не связаны ни с каким выполняющимся процессом архивирования, но делать это следует с особой осторожностью.


Пример

В следующем примере продемонстрирована реализация базовой библиотеки архивирования, примерно равнозначная следующей команде оболочки:

test ! -f /путь/к/месту/назначения && cp /путь/к/источнику /путь/к/месту/назначения

Первым существенным отличием этого модуля от приведенной выше команды является то, что модуль сначала копирует файл во временное место назначения, синхронизирует его с диском, а затем надежно перемещает его в конечное место назначение. Другим существенным отличием является то, что если /путь/к/месту/назначения уже существует, но его содержимое идентично таковому /пути/к/источнику, архивирование будет успешным, тогда как команда выше завершится ошибкой. Это позволяет избежать проблем в случае, если файл успешно архивируется, а затем, прежде чем произведется надежная запись об этом успехе, в системе происходит сбой.

#include "qhb.h"

#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>

#include "archive/archive_module.h"
#include "common/int.h"
#include "miscadmin.h"
#include "storage/copydir.h"
#include "storage/fd.h"
#include "utils/guc.h"

PG_MODULE_MAGIC;

static char *archive_directory = NULL;

static bool basic_archive_configured(ArchiveModuleState *state);
static bool basic_archive_file(ArchiveModuleState *state, const char *file, const char *path);
static bool check_archive_directory(char **newval, void **extra, GucSource source);
static bool compare_files(const char *file1, const char *file2);

static const ArchiveModuleCallbacks basic_archive_callbacks = {
	.startup_cb = NULL,
	.check_configured_cb = basic_archive_configured,
	.archive_file_cb = basic_archive_file,
	.shutdown_cb = NULL
};

/*
 * _PG_init
 *
 * Определяет GUC модуля.
 */
void
_PG_init(void)
{
   DefineCustomStringVariable("basic_archive.archive_directory",
                              gettext_noop("Archive file destination directory."),
                              NULL,
                              &archive_directory,
                              "",
                              PGC_SIGHUP,
                              0,
                              check_archive_directory, NULL, NULL);

   MarkGUCPrefixReserved("basic_archive");
}

/*
 * _PG_archive_module_init
 *
 * Возвращает имеющиеся в модуле функции обратного вызова для архивирования.
 */
const ArchiveModuleCallbacks *
_PG_archive_module_init(void)
{
   return &basic_archive_callbacks;
}

/*
 * check_archive_directory
 *
 * Проверяет, что предоставленный архивный каталог существует.
 */
static bool
check_archive_directory(char **newval, void **extra, GucSource source)
{
   struct stat st;

     /*
      * Значение по умолчанию — пустая строка, поэтому нам приходится принять это значение.
      * Наша функция обратного вызова check_configured также проверяет это и не
      * позволяет перейти к архивированию, если строка по-прежнему пуста.
      */
   if (*newval == NULL || *newval[0] == '\0')
       return true;

   /*
    * Убедимся, что путь к файлу не будет слишком длинным.  Документы показывают,
    * что имена архивируемых файлов могут иметь длину до 64 символов.
    */
   if (strlen(*newval) + 64 + 2 >= MAXPGPATH)
   {
       GUC_check_errdetail("Archive directory too long.");
       return false;
   }

   /*
    * Проводим базовую проверку работоспособности — что указанный архивный каталог
    * существует. В будущем он может быть удален, поэтому нам все равно нужно быть
    * готовыми к тому, что он не существует в актуальной логике архивирования.
    */
   if (stat(*newval, &st) != 0 || !S_ISDIR(st.st_mode))
   {
       GUC_check_errdetail("Specified archive directory does not exist.");
       return false;
   }

   return true;
}

/*
 * basic_archive_configured
 *
 * Проверяет, что каталог archive_directory не пуст.
 */
static bool
basic_archive_configured(ArchiveModuleState *state)
{
	if (archive_directory != NULL && archive_directory[0] != '\0')
		return true;

	arch_module_check_errdetail("%s is not set.",
								"basic_archive.archive_directory");
	return false;
}

/*
 * basic_archive_file
 *
 * Архивирует один файл.
 */
static bool
basic_archive_file(ArchiveModuleState *state, const char *file, const char *path)
{
	char		destination[MAXPGPATH];
	char		temp[MAXPGPATH + 256];
	struct stat st;
	struct timeval tv;
	uint64		epoch;			/* milliseconds */

	ereport(DEBUG3,
			(errmsg("archiving \"%s\" via basic_archive", file)));

	snprintf(destination, MAXPGPATH, "%s/%s", archive_directory, file);

   /*
    * Сначала проверяем, не архивировался ли уже этот файл. Если он уже существует
    * и имеет то же содержимое, что и файл, который мы пытаемся заархивировать,
    * мы можем вернуть успешный результат (убедившись, что файл хранится на диске).
    * Такой сценарий возможен, если сбой на сервере произошел после архивирования
    * файла но до переименования его файла .ready в .done.
    *
    * Если архивный файл уже существует, но у него другое содержимое,
    * что-то может быть не так, поэтому мы просто выдадим ошибку.
    */
   if (stat(destination, &st) == 0)
   {
       if (compare_files(path, destination))
       {
           ereport(DEBUG3,
                   (errmsg("archive file \"%s\" already exists with identical contents",
                           destination)));

           fsync_fname(destination, false);
           fsync_fname(archive_directory, true);

           return true;
       }

       ereport(ERROR,
               (errmsg("archive file \"%s\" already exists", destination)));
   }
   else if (errno != ENOENT)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not stat file \"%s\": %m", destination)));

   /*
    * Выберем достаточно уникальное имя для временного файла, чтобы исключить
    * вероятность конфликта. Это помогает избежать проблем в случае, если временный
    * файл не был удален после сбоя или если так вышло, что другой сервер архивирует
    * файлы в тот же каталог.
    */
   gettimeofday(&tv, NULL);
   if (pg_mul_u64_overflow((uint64) 1000, (uint64) tv.tv_sec, &epoch) ||
       pg_add_u64_overflow(epoch, (uint64) (tv.tv_usec / 1000), &epoch))
       elog(ERROR, "could not generate temporary file name for archiving");

   snprintf(temp, sizeof(temp), "%s/%s.%s.%d." UINT64_FORMAT,
            archive_directory, "archtemp", file, MyProcPid, epoch);

   /*
    * Копируем файл во временное место назначения. Обратите внимание, что эта
    * операция завершится ошибкой, если temp уже существует.
    */
   copy_file(path, temp);

   /*
    * Синхронизируем временный файл с диском и переносим его в конечное место назначения.
    * Обратите внимание, что это перепишет любой уже существующий файл, но это возможно,
    * только если кто-то создал этот файл с момента выполнения вышеуказанной функции stat().
    */
	(void) durable_rename(temp, destination, ERROR);

	ereport(DEBUG1,
			(errmsg("archived \"%s\" via basic_archive", file)));

	return true;
}

/*
 * compare_files
 *
 * Возвращает сведения о том, совпадает ли содержимое файлов.
 */
static bool
compare_files(const char *file1, const char *file2)
{
#define CMP_BUF_SIZE (4096)
   char        buf1[CMP_BUF_SIZE];
   char        buf2[CMP_BUF_SIZE];
   int         fd1;
   int         fd2;
   bool        ret = true;

   fd1 = OpenTransientFile(file1, O_RDONLY | PG_BINARY);
   if (fd1 < 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not open file \"%s\": %m", file1)));

   fd2 = OpenTransientFile(file2, O_RDONLY | PG_BINARY);
   if (fd2 < 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not open file \"%s\": %m", file2)));

   for (;;)
   {
       int         nbytes = 0;
       int         buf1_len = 0;
       int         buf2_len = 0;

       while (buf1_len < CMP_BUF_SIZE)
       {
           nbytes = read(fd1, buf1 + buf1_len, CMP_BUF_SIZE - buf1_len);
           if (nbytes < 0)
               ereport(ERROR,
                       (errcode_for_file_access(),
                        errmsg("could not read file \"%s\": %m", file1)));
           else if (nbytes == 0)
               break;

           buf1_len += nbytes;
       }

       while (buf2_len < CMP_BUF_SIZE)
       {
           nbytes = read(fd2, buf2 + buf2_len, CMP_BUF_SIZE - buf2_len);
           if (nbytes < 0)
               ereport(ERROR,
                       (errcode_for_file_access(),
                        errmsg("could not read file \"%s\": %m", file2)));
           else if (nbytes == 0)
               break;

           buf2_len += nbytes;
       }

       if (buf1_len != buf2_len || memcmp(buf1, buf2, buf1_len) != 0)
       {
           ret = false;
           break;
       }
       else if (buf1_len == 0)
           break;
   }

   if (CloseTransientFile(fd1) != 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not close file \"%s\": %m", file1)));

   if (CloseTransientFile(fd2) != 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not close file \"%s\": %m", file2)));

   return ret;
}

Автор

Нейтан Боссарт (Nathan Bossart)