Встроенная очередь сообщений

Описание

Очереди сообщений представляет собой подсистему СУБД для отправки типизированных сообщений и их получения заинтересованными (подписанными) клиентами. Существует два вида очередей: персистентные и in-memory. Если последние хранят сообщения до обработки всеми подписанными клиентами исключительно в памяти, то персистентные могут сохранять данные на диске, благодаря чему данные будут доступны после перезапуска базы.

На данный момент реализована только in-memory очередь. Она хранит элементы в буфере фиксированного размера, и если тот заполнится, функция qhb_enqueue будет перезаписывать наиболее старые элементы.

Необходимые системные настройки

Так как Брокер Очередей используется выделенный системный процесс СУБД, то необходимо учитывать разрешённое максимальное количество данных процессов задаваемое системной настройкой max_worker_processes.
Следует установить значение, с учётом количества брокеров, которые будут обсуживать очереди на данном кластере QHB.

В случае, если невозможно запустить дополнительные серверные процессы, функция start_mq_broker() вернёт ошибку.

Запуск брокера очереди сообщений при старте СУБД

Для запуска брокера очереди сообщений при старте СУБД в файл qhb.conf необходимо добавить параметр with_mq_broker с указанием для каких баз данных и от имени каких пользователей производить запуск. Важно, чтобы как база данных, так и пользователь существовали, а так же чтобы у пользователя были необходимые права для работы с базой данных, в противном случае старт СУБД будет прерван.

# Добавление автоматического запуска брокера очереди сообщений для баз данных dbname1 и db_name2
echo with_mq_broker = 'db_owner1:db_name1, db_owner2:db_name2' >> $PGDATA/qhb.conf;

# Добавление автоматического запуска брокера очереди сообщений только для базы данных dbname1
echo with_mq_broker = 'db_owner1:db_name1' >> $PGDATA/qhb.conf;

Функции для работы с очередью сообщений

ИмяОписание
status_mq_broker()Запрос статуса брокера
start_mq_broker()Запуск брокера очереди сообщений во время работы СУБД
stop_mq_broker()Остановка брокера очереди сообщений
qhb_create_queue(name,typeoid)Создание очереди указанного типа
qhb_delete_queue(name)Удаление очереди
qhb_register(queue,subscriber,callback)Регистрация подписчика для обработки новых сообщений
qhb_unregister(queue,subscriber)Удаление подписчика
qhb_enqueue(queue,data)Добавление элемента в очередь
qhb_dequeue()Получение элемента из очереди (доступно только из подписанного обработчика)

Брокер очереди сообщений

Брокер очереди сообщений — это отдельный процесс, который предоставляет внутренний интерфейс для работы с очередью. Для того чтобы с ней работать, требуется явно запустить данный процесс с помощью функции start_mq_broker(). Помимо брокера также запускается процесс worker, который фактически вызывает переданные ему обработчики и в случае возникновения каких-либо ошибок способен перезапустится. Оба процесса реализованы как встроенные BackgroundWorker, а следовательно, их pid можно обнаружить в таблице pg_stat_activity.

Процедура обработки сообщений

Сообщения доставляются подписчикам асинхронно с помощью вызова указанной пользователем процедуры. Для того чтобы обрабатывать новые сообщения, необходимо зарегистрировать процедуру обработки с помощью функции qhb_register. Сама процедура может быть создана с помощью CREATE PROCEDURE и должна иметь пустое множество аргументов, к примеру:

CREATE PROCEDURE callback()
LANGUAGE plpgsql AS $$
BEGIN
    CREATE TABLE IF NOT EXISTS my_table (x TEXT);
    INSERT INTO my_table
    SELECT x FROM qhb_dequeue() as (x TEXT);
END; $$;

Элемент безусловно удаляется из очереди до вызова обработчика, даже если обработчик не вызывает qhb_dequeue. Каждый вызов qhb_dequeue в пределах одного вызова обработчика будет возвращать один и тот же результат вне зависимости от того, сколько раз она была вызвана.

Любые ошибки, возникшие в процедуре, логируются и далее никак не обрабатываются (в том числе процедура обработки не удаляется из списка подписчиков). Возникновение ошибки в одном из обработчиков не влияет на получение сообщений другими обработчиками.

ВАЖНО Не существует никаких ограничений для использования функций для работы с очередью в процедуре обработки сообщений. То есть в обработчике сообщений кроме получения сообщения из очереди при помощи qhb_dequeue можно также добавить сообщение в очередь, создать и / или удалить очередь, зарегистрировать или удалить подписчика и т.п. Использовать отличные от qhb_dequeue функции для работы с очередью в обработчике нужно очень внимательно и осторожно - добавление сообщения в очередь в обработчике сообщения может привести к неуправляемому поведению системы, неконтролируемым побочным эффектам и потере сообщений в очереди из-за их перезаписи. При реализации процедуры обработки сообщения лучшей практикой будет избегать использования каких-либо функций работы с очередью кроме qhb_dequeue.

Типизация

Очередь является строго типизированной и может содержать элементы только одного типа; данный тип явно указывается при создании очереди. К примеру, очередь с именем my_queue1 и типом int4 можно создать следующим образом:

SELECT qhb_create_queue(name := 'my_queue1', typeoid := oid)
FROM pg_type
WHERE typname = 'int4';

Список поддерживаемых типов ограничен следующим перечнем:

Класс типаНазвание конкретного типа
Числовой типint2, int4, int8
Тип с плавающей запятойfloat4, float8
Логический типbool
Строковый типtext, cstring, bpchar, varchar, mvarchar
Временной типtimestamp, timestamptz, time, timetz, date, interval

Тип char напрямую не поддерживается. Вместо него следует использовать аналогичный тип bpchar.

При использовании enqueue или dequeue с некорректным типом будет выдана соответствующая ошибка.

Поддержка транзакционности

Очередь поддерживает транзакционность СУБД: отправка сообщения в очередь через qhb_enqueue внутри транзакции выполнит саму отправку только после успешного завершения текущей транзакции. При выполнении команды ROLLBACK или необработанном исключении внутри транзакции данные в очередь доставлены не будут и далее будут игнорироваться (обработчик при этом также остается зарегистрированным).

Внутреннее состояние очереди сообщений

Состояние очереди хранится для каждой базы данных отдельно в системных таблицах qhb_queue и qhb_queue_consumer. Первая содержит список очередей и их типы, вторая содержит список подписанных обработчиков и их соответствие с очередью. Данные таблицы рекомендуется использовать только в информационных целях, поскольку их изменение непосредственно ведет к неконсистентному состоянию.