Встроенная очередь сообщений
Описание
Очереди сообщений представляет собой подсистему СУБД для отправки типизированных сообщений и их получения заинтересованными (подписанными) клиентами. Существует два вида очередей: персистентные и in-memory. Если последние хранят сообщения до обработки всеми подписанными клиентами исключительно в памяти, то персистентные могут сохранять данные на диске, благодаря чему данные будут доступны после перезапуска базы.
На данный момент реализована только in-memory очередь. Она хранит элементы в буфере фиксированного размера, и если тот заполнится, функция qhb_enqueue будет перезаписывать наиболее старые элементы.
Необходимые системные настройки
Так как Брокер Очередей используется выделенный системный процесс СУБД, то необходимо
учитывать разрешённое максимальное количество данных процессов задаваемое системной настройкой
max_worker_processes.
Следует установить значение, с учётом количества брокеров, которые будут обсуживать очереди
на данном кластере QHB.
В случае, если невозможно запустить дополнительные серверные процессы, функция start_mq_broker()
вернёт ошибку.
Запуск брокера очереди сообщений при старте СУБД
Для запуска брокера очереди сообщений при старте СУБД в файл qhb.conf необходимо добавить параметр with_mq_broker с указанием для каких баз данных и от имени каких пользователей производить запуск. Важно, чтобы как база данных, так и пользователь существовали, а так же чтобы у пользователя были необходимые права для работы с базой данных, в противном случае старт СУБД будет прерван.
# Добавление автоматического запуска брокера очереди сообщений для баз данных dbname1 и db_name2
echo with_mq_broker = 'db_owner1:db_name1, db_owner2:db_name2' >> $PGDATA/qhb.conf;
# Добавление автоматического запуска брокера очереди сообщений только для базы данных dbname1
echo with_mq_broker = 'db_owner1:db_name1' >> $PGDATA/qhb.conf;
Функции для работы с очередью сообщений
Имя | Описание |
---|---|
status_mq_broker() | Запрос статуса брокера |
start_mq_broker() | Запуск брокера очереди сообщений во время работы СУБД |
stop_mq_broker() | Остановка брокера очереди сообщений |
qhb_create_queue(name,typeoid) | Создание очереди указанного типа |
qhb_delete_queue(name) | Удаление очереди |
qhb_register(queue,subscriber,callback) | Регистрация подписчика для обработки новых сообщений |
qhb_unregister(queue,subscriber) | Удаление подписчика |
qhb_enqueue(queue,data) | Добавление элемента в очередь |
qhb_dequeue() | Получение элемента из очереди (доступно только из подписанного обработчика) |
Брокер очереди сообщений
Брокер очереди сообщений — это отдельный процесс, который предоставляет внутренний интерфейс для работы с очередью. Для того чтобы с ней работать, требуется явно запустить данный процесс с помощью функции start_mq_broker(). Помимо брокера также запускается процесс worker, который фактически вызывает переданные ему обработчики и в случае возникновения каких-либо ошибок способен перезапустится. Оба процесса реализованы как встроенные BackgroundWorker, а следовательно, их pid можно обнаружить в таблице pg_stat_activity.
Процедура обработки сообщений
Сообщения доставляются подписчикам асинхронно с помощью вызова указанной пользователем
процедуры. Для того чтобы обрабатывать новые сообщения, необходимо зарегистрировать
процедуру обработки с помощью функции qhb_register. Сама процедура может быть
создана с помощью CREATE PROCEDURE
и должна иметь пустое множество аргументов, к
примеру:
CREATE PROCEDURE callback()
LANGUAGE plpgsql AS $$
BEGIN
CREATE TABLE IF NOT EXISTS my_table (x TEXT);
INSERT INTO my_table
SELECT x FROM qhb_dequeue() as (x TEXT);
END; $$;
Элемент безусловно удаляется из очереди до вызова обработчика, даже если обработчик
не вызывает qhb_dequeue
. Каждый вызов qhb_dequeue
в пределах одного вызова обработчика
будет возвращать один и тот же результат вне зависимости от того, сколько раз она была вызвана.
Любые ошибки, возникшие в процедуре, логируются и далее никак не обрабатываются (в том числе процедура обработки не удаляется из списка подписчиков). Возникновение ошибки в одном из обработчиков не влияет на получение сообщений другими обработчиками.
ВАЖНО Не существует никаких ограничений для использования функций для работы с очередью в процедуре обработки сообщений. То есть в обработчике сообщений кроме получения сообщения из очереди при помощи
qhb_dequeue
можно также добавить сообщение в очередь, создать и / или удалить очередь, зарегистрировать или удалить подписчика и т.п. Использовать отличные отqhb_dequeue
функции для работы с очередью в обработчике нужно очень внимательно и осторожно - добавление сообщения в очередь в обработчике сообщения может привести к неуправляемому поведению системы, неконтролируемым побочным эффектам и потере сообщений в очереди из-за их перезаписи. При реализации процедуры обработки сообщения лучшей практикой будет избегать использования каких-либо функций работы с очередью кромеqhb_dequeue
.
Типизация
Очередь является строго типизированной и может содержать элементы только одного типа; данный тип явно указывается при создании очереди. К примеру, очередь с именем my_queue1 и типом int4 можно создать следующим образом:
SELECT qhb_create_queue(name := 'my_queue1', typeoid := oid)
FROM pg_type
WHERE typname = 'int4';
Список поддерживаемых типов ограничен следующим перечнем:
Класс типа | Название конкретного типа |
---|---|
Числовой тип | int2, int4, int8 |
Тип с плавающей запятой | float4, float8 |
Логический тип | bool |
Строковый тип | text, cstring, bpchar, varchar, mvarchar |
Временной тип | timestamp, timestamptz, time, timetz, date, interval |
Тип char
напрямую не поддерживается. Вместо него следует использовать аналогичный тип bpchar
.
При использовании enqueue или dequeue с некорректным типом будет выдана соответствующая ошибка.
Поддержка транзакционности
Очередь поддерживает транзакционность СУБД: отправка сообщения в очередь через qhb_enqueue внутри транзакции выполнит саму отправку только после успешного
завершения текущей транзакции. При выполнении команды ROLLBACK
или необработанном
исключении внутри транзакции данные в очередь доставлены не будут и далее будут
игнорироваться (обработчик при этом также остается зарегистрированным).
Внутреннее состояние очереди сообщений
Состояние очереди хранится для каждой базы данных отдельно в системных таблицах qhb_queue и qhb_queue_consumer. Первая содержит список очередей и их типы, вторая содержит список подписанных обработчиков и их соответствие с очередью. Данные таблицы рекомендуется использовать только в информационных целях, поскольку их изменение непосредственно ведет к неконсистентному состоянию.