Встроенная очередь сообщений
Описание
Очереди сообщений представляет собой подсистему СУБД для отправки типизированных сообщений и их получения заинтересованными (подписанными) клиентами. Существует два вида очередей: персистентные и in-memory. Если последние хранят сообщения до обработки всеми подписанными клиентами исключительно в памяти, то персистентные могут сохранять данные на диске, благодаря чему данные будут доступны после перезапуска базы.
На данный момент реализована только in-memory очередь. Она хранит элементы в буфере фиксированного размера, и если тот заполнится, функция qhb_enqueue будет перезаписывать наиболее старые элементы.
Необходимые системные настройки
Так как в качестве брокера очередей используется выделенный системный процесс СУБД, то необходимо учитывать максимальное разрешенное количество данных процессов, задаваемое параметром конфигурации max_worker_processes. Значение следует установить с учетом количества брокеров, которые будут обслуживать очереди на данном кластере QHB.
В случае, если невозможно запустить дополнительные серверные процессы, функция start_mq_broker() вернет ошибку.
Запуск брокера очереди сообщений при старте СУБД
Для запуска брокера очереди сообщений при старте СУБД в файл qhb.conf необходимо добавить параметр with_mq_broker с указанием, для каких баз данных и от имени каких пользователей производить запуск. Важно, чтобы как база данных, так и пользователь существовали, а также чтобы у пользователя были необходимые права для работы с базой данных, в противном случае старт СУБД будет прерван.
# Добавление автоматического запуска брокера очереди сообщений для баз данных dbname1 и db_name2
echo with_mq_broker = 'db_owner1:db_name1, db_owner2:db_name2' >> $PGDATA/qhb.conf;
# Добавление автоматического запуска брокера очереди сообщений только для базы данных dbname1
echo with_mq_broker = 'db_owner1:db_name1' >> $PGDATA/qhb.conf;
Функции для работы с очередью сообщений
| Имя | Описание |
|---|---|
| status_mq_broker() | Запрос статуса брокера |
| start_mq_broker() | Запуск брокера очереди сообщений во время работы СУБД |
| stop_mq_broker() | Остановка брокера очереди сообщений |
| qhb_create_queue(name,typeoid) | Создание очереди указанного типа |
| qhb_delete_queue(name) | Удаление очереди |
| qhb_register(queue,subscriber,callback) | Регистрация подписчика для обработки новых сообщений |
| qhb_unregister(queue,subscriber) | Удаление подписчика |
| qhb_enqueue(queue,data) | Добавление элемента в очередь |
| qhb_dequeue() | Получение элемента из очереди (доступно только из подписанного обработчика) |
Брокер очереди сообщений
Брокер очереди сообщений — это отдельный процесс, который предоставляет внутренний интерфейс для работы с очередью. Для того чтобы с ней работать, требуется явно запустить данный процесс с помощью функции start_mq_broker(). Помимо брокера также запускается процесс worker, который фактически вызывает переданные ему обработчики и в случае возникновения каких-либо ошибок способен перезапуститься. Оба процесса реализованы как встроенные BackgroundWorker, а следовательно, их PID можно найти в таблице pg_stat_activity.
Процедура обработки сообщений
Сообщения доставляются подписчикам асинхронно с помощью вызова указанной пользователем процедуры. Для того чтобы обрабатывать новые сообщения, необходимо зарегистрировать процедуру обработки с помощью функции qhb_register. Сама процедура может быть создана с помощью команды CREATE PROCEDURE и должна иметь пустое множество аргументов, к примеру:
CREATE PROCEDURE callback()
LANGUAGE plpgsql AS $$
BEGIN
CREATE TABLE IF NOT EXISTS my_table (x TEXT);
INSERT INTO my_table
SELECT x FROM qhb_dequeue() as (x TEXT);
END; $$;
Элемент безусловно удаляется из очереди до вызова обработчика, даже если обработчик не вызывает qhb_dequeue. Каждый вызов qhb_dequeue в пределах одного вызова обработчика будет возвращать один и тот же результат вне зависимости от того, сколько раз она была вызвана.
Любые ошибки, возникшие в процедуре, протоколируются и далее никак не обрабатываются (в том числе процедура обработки не удаляется из списка подписчиков). Возникновение ошибки в одном из обработчиков не влияет на получение сообщений другими обработчиками.
ВАЖНО!
Не существует никаких ограничений для использования функций для работы с очередью в процедуре обработки сообщений. То есть в обработчике сообщений кроме получения сообщения из очереди при помощи qhb_dequeue можно также добавить сообщение в очередь, создать и/или удалить очередь, зарегистрировать или удалить подписчика и т. п. Использовать отличные от qhb_dequeue функции для работы с очередью в обработчике нужно очень внимательно и осторожно — добавление сообщения в очередь в обработчике сообщения может привести к неуправляемому поведению системы, неконтролируемым побочным эффектам и потере сообщений в очереди из-за их перезаписи. При реализации процедуры обработки сообщения лучшей практикой будет избегать использования каких-либо функций работы с очередью кроме qhb_dequeue.
Типизация
Очередь является строго типизированной и может содержать элементы только одного типа; данный тип явно указывается при создании очереди. К примеру, очередь с именем my_queue1 и типом int4 можно создать следующим образом:
SELECT qhb_create_queue(name := 'my_queue1', typeoid := oid)
FROM pg_type
WHERE typname = 'int4';
Поддерживаются все типы данных, которые реализуют RECEIVE и SEND функции.
При использовании enqueue или dequeue с некорректным типом будет выдана соответствующая ошибка.
Поддержка транзакционности
Очередь поддерживает транзакционность СУБД: отправка сообщения в очередь через
qhb_enqueue внутри транзакции выполнит саму отправку только после успешного
завершения текущей транзакции. При выполнении команды ROLLBACK или необработанном
исключении внутри транзакции данные в очередь доставлены не будут и далее будут
игнорироваться (обработчик при этом также остается зарегистрированным).
Внутреннее состояние очереди сообщений
Состояние очереди хранится для каждой базы данных отдельно в системных таблицах qhb_queue и qhb_queue_consumer. Первая содержит список очередей и их типы, вторая содержит список подписанных обработчиков и их соответствие с очередью. Данные таблицы рекомендуется использовать только в информационных целях, поскольку их изменение непосредственно ведет к несогласованности.