Встроенная очередь сообщений

Описание

Очереди сообщений представляет собой подсистему СУБД для отправки типизированных сообщений и их получения заинтересованными (подписанными) клиентами. Существует два вида очередей: персистентные и in-memory. Если последние хранят сообщения до обработки всеми подписанными клиентами исключительно в памяти, то персистентные могут сохранять данные на диске, благодаря чему данные будут доступны после перезапуска базы.

На данный момент реализована только in-memory очередь. Она хранит элементы в буфере фиксированного размера, и если тот заполнится, функция qhb_enqueue будет перезаписывать наиболее старые элементы.


Необходимые системные настройки

Так как в качестве брокера очередей используется выделенный системный процесс СУБД, то необходимо учитывать максимальное разрешенное количество данных процессов, задаваемое параметром конфигурации max_worker_processes. Значение следует установить с учетом количества брокеров, которые будут обслуживать очереди на данном кластере QHB.

В случае, если невозможно запустить дополнительные серверные процессы, функция start_mq_broker() вернет ошибку.


Запуск брокера очереди сообщений при старте СУБД

Для запуска брокера очереди сообщений при старте СУБД в файл qhb.conf необходимо добавить параметр with_mq_broker с указанием, для каких баз данных и от имени каких пользователей производить запуск. Важно, чтобы как база данных, так и пользователь существовали, а также чтобы у пользователя были необходимые права для работы с базой данных, в противном случае старт СУБД будет прерван.

# Добавление автоматического запуска брокера очереди сообщений для баз данных dbname1 и db_name2
echo with_mq_broker = 'db_owner1:db_name1, db_owner2:db_name2' >> $PGDATA/qhb.conf;

# Добавление автоматического запуска брокера очереди сообщений только для базы данных dbname1
echo with_mq_broker = 'db_owner1:db_name1' >> $PGDATA/qhb.conf;

Функции для работы с очередью сообщений

ИмяОписание
status_mq_broker()Запрос статуса брокера
start_mq_broker()Запуск брокера очереди сообщений во время работы СУБД
stop_mq_broker()Остановка брокера очереди сообщений
qhb_create_queue(name,typeoid)Создание очереди указанного типа
qhb_delete_queue(name)Удаление очереди
qhb_register(queue,subscriber,callback)Регистрация подписчика для обработки новых сообщений
qhb_unregister(queue,subscriber)Удаление подписчика
qhb_enqueue(queue,data)Добавление элемента в очередь
qhb_dequeue()Получение элемента из очереди (доступно только из подписанного обработчика)

Брокер очереди сообщений

Брокер очереди сообщений — это отдельный процесс, который предоставляет внутренний интерфейс для работы с очередью. Для того чтобы с ней работать, требуется явно запустить данный процесс с помощью функции start_mq_broker(). Помимо брокера также запускается процесс worker, который фактически вызывает переданные ему обработчики и в случае возникновения каких-либо ошибок способен перезапуститься. Оба процесса реализованы как встроенные BackgroundWorker, а следовательно, их PID можно найти в таблице pg_stat_activity.


Процедура обработки сообщений

Сообщения доставляются подписчикам асинхронно с помощью вызова указанной пользователем процедуры. Для того чтобы обрабатывать новые сообщения, необходимо зарегистрировать процедуру обработки с помощью функции qhb_register. Сама процедура может быть создана с помощью команды CREATE PROCEDURE и должна иметь пустое множество аргументов, к примеру:

CREATE PROCEDURE callback()
LANGUAGE plpgsql AS $$
BEGIN
    CREATE TABLE IF NOT EXISTS my_table (x TEXT);
    INSERT INTO my_table
    SELECT x FROM qhb_dequeue() as (x TEXT);
END; $$;

Элемент безусловно удаляется из очереди до вызова обработчика, даже если обработчик не вызывает qhb_dequeue. Каждый вызов qhb_dequeue в пределах одного вызова обработчика будет возвращать один и тот же результат вне зависимости от того, сколько раз она была вызвана.

Любые ошибки, возникшие в процедуре, протоколируются и далее никак не обрабатываются (в том числе процедура обработки не удаляется из списка подписчиков). Возникновение ошибки в одном из обработчиков не влияет на получение сообщений другими обработчиками.

ВАЖНО!
Не существует никаких ограничений для использования функций для работы с очередью в процедуре обработки сообщений. То есть в обработчике сообщений кроме получения сообщения из очереди при помощи qhb_dequeue можно также добавить сообщение в очередь, создать и/или удалить очередь, зарегистрировать или удалить подписчика и т. п. Использовать отличные от qhb_dequeue функции для работы с очередью в обработчике нужно очень внимательно и осторожно — добавление сообщения в очередь в обработчике сообщения может привести к неуправляемому поведению системы, неконтролируемым побочным эффектам и потере сообщений в очереди из-за их перезаписи. При реализации процедуры обработки сообщения лучшей практикой будет избегать использования каких-либо функций работы с очередью кроме qhb_dequeue.


Типизация

Очередь является строго типизированной и может содержать элементы только одного типа; данный тип явно указывается при создании очереди. К примеру, очередь с именем my_queue1 и типом int4 можно создать следующим образом:

SELECT qhb_create_queue(name := 'my_queue1', typeoid := oid)
FROM pg_type
WHERE typname = 'int4';

Поддерживаются все типы данных, которые реализуют RECEIVE и SEND функции.

При использовании enqueue или dequeue с некорректным типом будет выдана соответствующая ошибка.


Поддержка транзакционности

Очередь поддерживает транзакционность СУБД: отправка сообщения в очередь через qhb_enqueue внутри транзакции выполнит саму отправку только после успешного завершения текущей транзакции. При выполнении команды ROLLBACK или необработанном исключении внутри транзакции данные в очередь доставлены не будут и далее будут игнорироваться (обработчик при этом также остается зарегистрированным).


Внутреннее состояние очереди сообщений

Состояние очереди хранится для каждой базы данных отдельно в системных таблицах qhb_queue и qhb_queue_consumer. Первая содержит список очередей и их типы, вторая содержит список подписанных обработчиков и их соответствие с очередью. Данные таблицы рекомендуется использовать только в информационных целях, поскольку их изменение непосредственно ведет к несогласованности.