Встроенная очередь сообщений

Описание

Очереди сообщений представляет собой подсистему СУБД для отправки типизированных сообщений и их получения заинтересованными (подписанными) клиентами. Существует два вида очередей: персистентные и in-memory. Если последние хранят сообщения до обработки всеми подписанными клиентами исключительно в памяти, то персистентные могут сохранять данные на диске, благодаря чему данные будут доступны после перезапуска базы.

На данный момент реализована только in-memory очередь. Она хранит элементы в буфере фиксированного размера, и если тот заполнится, функция qhb_enqueue будет перезаписывать наиболее старые элементы.

Необходимые системные настройки

Так как Брокер Очередей используется выделенный системный процесс СУБД, то необходимо учитывать разрешённое максимальное количество данных процессов задаваемое системной настройкой max_worker_processes.
Следует установить значение, с учётом количества брокеров, которые будут обсуживать очереди на данном кластере QHB.

В случае, если невозможно запустить дополнительные серверные процессы, функция start_mq_broker() вернёт ошибку.

Функции для работы с очередью сообщений

ИмяОписание
start_mq_broker()Запуск брокера очереди сообщений
stop_mq_broker()Остановка брокера очереди сообщений
qhb_create_queue(name,typeoid)Создание очереди указанного типа
qhb_delete_queue(name)Удаление очереди
qhb_register(queue,subscriber,callback)Регистрация подписчика для обработки новых сообщений
qhb_unregister(queue,subscriber)Удаление подписчика
qhb_enqueue(queue,data)Добавление элемента в очередь
qhb_dequeue()Получение элемента из очереди (доступно только из подписанного обработчика)

Брокер очереди сообщений

Брокер очереди сообщений — это отдельный процесс, который предоставляет внутренний интерфейс для работы с очередью. Для того чтобы с ней работать, требуется явно запустить данный процесс с помощью функции start_mq_broker(). Помимо брокера также запускается процесс worker, который фактически вызывает переданные ему обработчики и в случае возникновения каких-либо ошибок способен перезапустится. Оба процесса реализованы как встроенные BackgroundWorker, а следовательно, их pid можно обнаружить в таблице pg_stat_activity.

Процедура обработки сообщений

Сообщения доставляются подписчикам асинхронно с помощью вызова указанной пользователем процедуры. Для того чтобы обрабатывать новые сообщения, необходимо зарегистрировать процедуру-обработчик с помощью функции qhb_register. Сама процедура может быть создана с помощью CREATE PROCEDURE и должна иметь пустое множество аргументов, к примеру:

CREATE PROCEDURE callback()
LANGUAGE plpgsql AS $$
BEGIN
    CREATE TABLE IF NOT EXISTS my_table (x TEXT);
    INSERT INTO my_table
    SELECT x FROM qhb_dequeue() as (x TEXT);
END; $$;

Любые ошибки, возникшие в процедуре, логируются и далее никак не обрабатываются (в том числе процедура-обработчик не удаляется из списка подписчиков). Возникновение ошибки в одном из обработчиков не влияет на получение сообщений другими обработчиками.

Типизация

Очередь является строго типизированной и может содержать элементы только одного типа; данный тип явно указывается при создании очереди. К примеру, очередь с именем my_queue1 и типом int4 можно создать следующим образом:

SELECT qhb_create_queue(name := 'my_queue1', typeoid := oid)
FROM pg_type
WHERE typname = 'int4';

Список поддерживаемых типов ограничен следующим перечнем: cstring, int4, int8, bool и text. При использовании enqueue или dequeue с некорректным типом будет выдана соответствующая ошибка.

Поддержка транзакционности

Очередь поддерживает транзакционность СУБД: отправка сообщения в очередь через qhb_enqueue внутри транзакции выполнит саму отправку только после успешного завершения текущей транзакции. При выполнении команды ROLLBACK или необработанном исключении внутри транзакции данные в очередь доставлены не будут и далее будут игнорироваться (обработчик при этом также остается зарегистрированным).

Внутреннее состояние очереди сообщений

Состояние очереди хранится для каждой базы данных отдельно в системных таблицах qhb_queue и qhb_queue_consumer. Первая содержит список очередей и их типы, вторая содержит список подписанных обработчиков и их соответствие с очередью. Данные таблицы рекомендуется использовать только в информационных целях, поскольку их изменение непосредственно ведет к неконсистентному состоянию.