basic_archive — пример модуля архивирования WAL
basic_archive является примером модуля архивирования. Этот модуль копирует файлы завершенных сегментов WAL в заданный каталог. Возможно, это не особенно полезно, но такой модуль может служить отправной точкой в разработке собственного модуля архивирования. Дополнительную информацию о модулях архивирования см. в главе Модули архивирования.
Чтобы этот модуль работал, его нужно загрузить посредством archive_library, а также включить archive_mode.
Параметры конфигурации
basic_archive.archive_directory (string)
Каталог, куда сервер должен копировать файлы сегментов WAL. Этот каталог уже
должен существовать. Если задана пустая строка (по умолчанию), архивирование WAL
по сути останавливается, но если включен archive_mode, сервер будет
накапливать файлы сегментов WAL, ожидая, что вскоре значение будет предоставлено.
Эти параметры следует устанавливать в файле qhb.conf. Обычное применение выглядит следующим образом:
# qhb.conf
archive_mode = 'on'
archive_library = 'basic_archive'
basic_archive.archive_directory = '/путь/к/каталогу/архива'
Примечания
При сбоях сервера в каталоге архива могут оставаться временные файлы с префиксом archtemp. Рекомендуется удалять такие файлы перед перезапуском сервера после сбоя. Подобные файлы можно удалять и во время работы сервера, если они не связаны ни с каким выполняющимся процессом архивирования, но делать это следует с особой осторожностью.
Пример
В следующем примере продемонстрирована реализация базовой библиотеки архивирования, примерно равнозначная следующей команде оболочки:
test ! -f /путь/к/месту/назначения && cp /путь/к/источнику /путь/к/месту/назначения
Первым существенным отличием этого модуля от приведенной выше команды является то, что модуль сначала копирует файл во временное место назначения, синхронизирует его с диском, а затем надежно перемещает его в конечное место назначение. Другим существенным отличием является то, что если /путь/к/месту/назначения уже существует, но его содержимое идентично таковому /пути/к/источнику, архивирование будет успешным, тогда как команда выше завершится ошибкой. Это позволяет избежать проблем в случае, если файл успешно архивируется, а затем, прежде чем произведется надежная запись об этом успехе, в системе происходит сбой.
#include "qhb.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>
#include "archive/archive_module.h"
#include "common/int.h"
#include "miscadmin.h"
#include "storage/copydir.h"
#include "storage/fd.h"
#include "utils/guc.h"
PG_MODULE_MAGIC;
static char *archive_directory = NULL;
static bool basic_archive_configured(ArchiveModuleState *state);
static bool basic_archive_file(ArchiveModuleState *state, const char *file, const char *path);
static bool check_archive_directory(char **newval, void **extra, GucSource source);
static bool compare_files(const char *file1, const char *file2);
static const ArchiveModuleCallbacks basic_archive_callbacks = {
.startup_cb = NULL,
.check_configured_cb = basic_archive_configured,
.archive_file_cb = basic_archive_file,
.shutdown_cb = NULL
};
/*
* _PG_init
*
* Определяет GUC модуля.
*/
void
_PG_init(void)
{
DefineCustomStringVariable("basic_archive.archive_directory",
gettext_noop("Archive file destination directory."),
NULL,
&archive_directory,
"",
PGC_SIGHUP,
0,
check_archive_directory, NULL, NULL);
MarkGUCPrefixReserved("basic_archive");
}
/*
* _PG_archive_module_init
*
* Возвращает имеющиеся в модуле функции обратного вызова для архивирования.
*/
const ArchiveModuleCallbacks *
_PG_archive_module_init(void)
{
return &basic_archive_callbacks;
}
/*
* check_archive_directory
*
* Проверяет, что предоставленный архивный каталог существует.
*/
static bool
check_archive_directory(char **newval, void **extra, GucSource source)
{
struct stat st;
/*
* Значение по умолчанию — пустая строка, поэтому нам приходится принять это значение.
* Наша функция обратного вызова check_configured также проверяет это и не
* позволяет перейти к архивированию, если строка по-прежнему пуста.
*/
if (*newval == NULL || *newval[0] == '\0')
return true;
/*
* Убедимся, что путь к файлу не будет слишком длинным. Документы показывают,
* что имена архивируемых файлов могут иметь длину до 64 символов.
*/
if (strlen(*newval) + 64 + 2 >= MAXPGPATH)
{
GUC_check_errdetail("Archive directory too long.");
return false;
}
/*
* Проводим базовую проверку работоспособности — что указанный архивный каталог
* существует. В будущем он может быть удален, поэтому нам все равно нужно быть
* готовыми к тому, что он не существует в актуальной логике архивирования.
*/
if (stat(*newval, &st) != 0 || !S_ISDIR(st.st_mode))
{
GUC_check_errdetail("Specified archive directory does not exist.");
return false;
}
return true;
}
/*
* basic_archive_configured
*
* Проверяет, что каталог archive_directory не пуст.
*/
static bool
basic_archive_configured(ArchiveModuleState *state)
{
if (archive_directory != NULL && archive_directory[0] != '\0')
return true;
arch_module_check_errdetail("%s is not set.",
"basic_archive.archive_directory");
return false;
}
/*
* basic_archive_file
*
* Архивирует один файл.
*/
static bool
basic_archive_file(ArchiveModuleState *state, const char *file, const char *path)
{
char destination[MAXPGPATH];
char temp[MAXPGPATH + 256];
struct stat st;
struct timeval tv;
uint64 epoch; /* milliseconds */
ereport(DEBUG3,
(errmsg("archiving \"%s\" via basic_archive", file)));
snprintf(destination, MAXPGPATH, "%s/%s", archive_directory, file);
/*
* Сначала проверяем, не архивировался ли уже этот файл. Если он уже существует
* и имеет то же содержимое, что и файл, который мы пытаемся заархивировать,
* мы можем вернуть успешный результат (убедившись, что файл хранится на диске).
* Такой сценарий возможен, если сбой на сервере произошел после архивирования
* файла но до переименования его файла .ready в .done.
*
* Если архивный файл уже существует, но у него другое содержимое,
* что-то может быть не так, поэтому мы просто выдадим ошибку.
*/
if (stat(destination, &st) == 0)
{
if (compare_files(path, destination))
{
ereport(DEBUG3,
(errmsg("archive file \"%s\" already exists with identical contents",
destination)));
fsync_fname(destination, false);
fsync_fname(archive_directory, true);
return true;
}
ereport(ERROR,
(errmsg("archive file \"%s\" already exists", destination)));
}
else if (errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m", destination)));
/*
* Выберем достаточно уникальное имя для временного файла, чтобы исключить
* вероятность конфликта. Это помогает избежать проблем в случае, если временный
* файл не был удален после сбоя или если так вышло, что другой сервер архивирует
* файлы в тот же каталог.
*/
gettimeofday(&tv, NULL);
if (pg_mul_u64_overflow((uint64) 1000, (uint64) tv.tv_sec, &epoch) ||
pg_add_u64_overflow(epoch, (uint64) (tv.tv_usec / 1000), &epoch))
elog(ERROR, "could not generate temporary file name for archiving");
snprintf(temp, sizeof(temp), "%s/%s.%s.%d." UINT64_FORMAT,
archive_directory, "archtemp", file, MyProcPid, epoch);
/*
* Копируем файл во временное место назначения. Обратите внимание, что эта
* операция завершится ошибкой, если temp уже существует.
*/
copy_file(path, temp);
/*
* Синхронизируем временный файл с диском и переносим его в конечное место назначения.
* Обратите внимание, что это перепишет любой уже существующий файл, но это возможно,
* только если кто-то создал этот файл с момента выполнения вышеуказанной функции stat().
*/
(void) durable_rename(temp, destination, ERROR);
ereport(DEBUG1,
(errmsg("archived \"%s\" via basic_archive", file)));
return true;
}
/*
* compare_files
*
* Возвращает сведения о том, совпадает ли содержимое файлов.
*/
static bool
compare_files(const char *file1, const char *file2)
{
#define CMP_BUF_SIZE (4096)
char buf1[CMP_BUF_SIZE];
char buf2[CMP_BUF_SIZE];
int fd1;
int fd2;
bool ret = true;
fd1 = OpenTransientFile(file1, O_RDONLY | PG_BINARY);
if (fd1 < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", file1)));
fd2 = OpenTransientFile(file2, O_RDONLY | PG_BINARY);
if (fd2 < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", file2)));
for (;;)
{
int nbytes = 0;
int buf1_len = 0;
int buf2_len = 0;
while (buf1_len < CMP_BUF_SIZE)
{
nbytes = read(fd1, buf1 + buf1_len, CMP_BUF_SIZE - buf1_len);
if (nbytes < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", file1)));
else if (nbytes == 0)
break;
buf1_len += nbytes;
}
while (buf2_len < CMP_BUF_SIZE)
{
nbytes = read(fd2, buf2 + buf2_len, CMP_BUF_SIZE - buf2_len);
if (nbytes < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", file2)));
else if (nbytes == 0)
break;
buf2_len += nbytes;
}
if (buf1_len != buf2_len || memcmp(buf1, buf2, buf1_len) != 0)
{
ret = false;
break;
}
else if (buf1_len == 0)
break;
}
if (CloseTransientFile(fd1) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m", file1)));
if (CloseTransientFile(fd2) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m", file2)));
return ret;
}
Автор
Нейтан Боссарт (Nathan Bossart)