Планировщик асинхронных заданий. Прошлое, настоящее и будущее.

Прошлое или как появился планировщик.

Вкратце на вопрос, зачем я написал свой планировщик асинхронных заданий можно ответить так: потому, что я устал ждать, когда это сделает кто-то другой.

С 2007 года я занимался биллингом. Изначально он был написан на перле с базой постгрес, позднее его частично перевели на джангу. Большинство задач в биллинге можно было решать прямо в базе. Например, что-то посчитать, протарифицировать, включить, выключить. Но решались они внешними скриптами на перле и питоне, которые по сути выполняли всего лишь SQL-запросы в базе. Поэтому когда на очередном собеседовании летом 2018 меня спросили, что мне не хватает в постгресе, я не задумываясь на первом месте назвал асинхронные триггеры. (На втором месте - внешние ключи на партицированные таблицы. На третьем - мультимастер.)

Асинхронные триггеры - это как триггеры, но только выполняются асинхронно. Например, клиент в биллинге совершает платёж на свой лицевой счёт через платёжный шлюз. Платёжный шлюз запрашивает информацию у биллинга на существование такого лицевого счёта и на возможность пополнения баланса. В случае успешного ответа платёж совершается. Далее биллинг считает баланс для лицевого счёта и в зависимости от результата может совершать дальнейшие действия, например, включить телефон/интернет. Все эти действия по-сути являются просто SQL-запросами в базе и могут быть зашиты в хранимки и триггеры. Но обычные триггеры синхронны по отношению к запросу. Поэтому в таком случае платёжному шлюзу пришлось бы ждать, пока биллинг завершит все нужные действия, хотя они вовсе не обязательны для платёжного шлюза. А вот в случае асинхронных триггеров данные действия могут выполняться асинхронно, не тормозя платёжный шлюз, сколько бы их не было и как бы долго они не выполнялись.

Сначала я поискал, что есть готового для решения этого вопроса и нашёл 1) PGQ - асинхронные очереди прямо в базе 2) pg_cron - cron прямо в базе 3) pg_background - выполнение запроса в фоне асинхронно

и конечно же коммерческий планировщик pgpro_scheduler. Позже, когда я уже написал свой планировщик, я случайно нашёл ещё один открытый именно планировщик generic-scheduler.

Сначала я попытался заюзать асинхронные очереди PGQ. Для работы они требуют управляющего-тикера - внешнюю программу, которая нарезает задачи для выполнения исполнителям - тоже внешним программам, которые постоянно опрашивают базу на предмет новых задач для них. Подобная архитектура меня не устроила и первое, что я сделал - это перенёс тикера в базу в качестве фонового рабочего процесса. Так появилась первоначальная версия моего планировщика pgqbw. Но всё равно исполнители оставались внешними программами и более того, какое-то их количество должно быть всегда запущено, даже если задач для них совсем нет. Такой подход меня не устроил, мне бы хотелось динамически запускать исполнителей при наличии задач и останавливать их при отсутствии задач.

Подобным образом работает pg_cron, но на тот момент у него было много других ограничений, которые меня тоже не устраивали. Например, он проверял появление новых заданий не чаще чем раз в минуту (мой планировщик может проверять не чаще чем раз в миллисекунду). Также pg_cron не мог выполнять одновременно несколько задач (мой планировщик не имеет такого ограничения). Ещё pg_cron мог быть запущен только в одной базе для всего кластера (мой планировщик может запускаться в каждой базе да не по разу если надо). Кроме того, pg_cron - это был просто cron, т.е. какие-то периодические задачи, там нельзя было что-то выполнить один раз в определённую дату в определённое время.

Также мне не подошёл и pg_background, потому, что это просто асинхронное выполнение запроса в фоне, причём почти бесконтрольно. Поэтому, т.к. я не смог использовать найденные открытые решения так как мне надо, я решил написать свой планировщик асинхронных заданий и назвал его pg_task.

Работу над планировщиком я начал осенью 2018, в то время актуальная версия постгреса была 10, но вскоре уже сменилась на 11. Шло время, версии постгреса менялись, планировщик совершенствовался, я старался не отставать от новых версий постгреса. Осенью 2021, когда актуальной версией постгреса уже была 14, меня спросили, какие версии поддерживает планировщик. Тогда я подумал, что по-идее, он должен поддерживать и старые версии. Каково же было моё разочарование, когда планировщик не скомпилировался даже на предпредыдущей (12) версии! Я стал более подробно изучать этот вопрос и постепенно примерно за месяц мне удалось снизить минимально поддерживаемую версию аж до 9.4. Почему именно 9.4? Да потому, что уже в 9.3 в постгресе ещё не появились динамические фоновые процессы, как раз которые я очень активно использую в планировщике.

В то время я ещё не слышал про гринплам, поэтому минимально поддерживаемая версия планировщиком 9.4 - это приятное случайное совпадение. Летом 2022 я узнал про гринплам и, когда осенью 2022 я уже порядочно изучил его, то я подумал, почему бы не сделать в планировщике поддержку и гринплама. Всего за каких-то буквально пару часов планировщик уже работал в 6 и 7 версиях гринплама. Это удалось сделать так быстро благодаря хорошо продуманной архитектуре планировщика, а также тому, что ранее мне удалось сделать поддержку планировщиком версии 9.4 постгреса, на основе которой как раз работает гринплам 6 версии.

В процессе работы над планировщиком я понял, что он может выполнять не только собственно любые SQL-запросы, типа SELECT, UPDATE, DELETE, VACUUM, CREATE DATABASE и т.д., но и более широкие возможности, реализуемые через различные расширения. Например, для биллинга я прикрутил к постгресу шаблонизатор pg_mustach, с помощью которого можно шаблонизировать счета абонентов прямо в базе, а именно, получать готовый красивый HTML-документ. Затем, с помощью преобразования из HTML в PDF pg_htmldoc можно этот HTML-документ преобразовать в PDF-документ прямо в базе. И наконец, с помощью pg_curl этот PDF-документ послать абоненту биллинга по электронной почте. Да, libcurl может делать не только HTTP-запросы, но и в том числе отправлять электронную почту. Я даже как-то его обучил выполнять команды по SSH, чтобы постгрес отключал и включал интернет по SSH на цисках и микротиках. К сожалению, мой пул-реквест отклонили, т.к. автору курла не понравился синтаксис URL. И поэтому для выполнения SSH-команд из постгреса я стал использовать расширение PLSH.

Также, мне всегда было интересно посмотреть в реальном времени, что же происходит в таблице с заданиями, поэтому я подружил пару плагинов для nginx. Один из них nginx-push-stream-module добавляет в nginx веб-сокеты, а второй - ngx_pq_module - соединяет nginx напрямую с постгрес. И теперь можно передавать асинхронные уведомления LISTEN / NOTIFY, которые могут генерироваться триггерами таблицы с заданиями, прямо в браузер через веб-сокет. Таким образом, дописав необходимый фронтенд на ява-скрипте, можно в реальном времени видеть выполнение заданий, т.е. изменения соответствующей таблицы.

Настоящее или что умеет планировщик.

Вкратце, планировщик асинхронных заданий pg_task может выполнять любые SQL-запросы в любое заданное время асинхронно.

Чтобы запустить планировщик достаточно прописать его в shared_preload_libraries conf shared_preload_libraries = 'pg_task' и перезагрузить кластер постгрес. При этом, по-умолчанию, планировщик запустит новый фоновый рабочий процесс pg_work в базе postgres от пользователя postgres и создаст в ней таблицу с заданиями task в схеме public, а затем будет проверять наличие заданий для выполнения в этой таблице каждую секунду.

Чтобы выполнить некий SQL-запрос асинхронно как можно быстрее, достаточно вставить его в текстовое поле input таблицы с заданиями: sql INSERT INTO task (input) VALUES ('SELECT now()'); -- выполнить запрос локально как можно быстрее При этом, при очередной проверке заданий для выполнения (которая происходит каждую секунду, по-умолчанию), планировщик увидит это новое задание и запустит новый фоновый рабочей процесс pg_task (с той же базой и пользователем, что и у pg_work), который и выполнит запрос, записав результат обратно в таблицу с заданиями в текстовое поле output.

Если же запрос нужно выполнить в удалённой базе, то параметры подключения к ней можно передать в текстовое поле remote: sql INSERT INTO task (input, remote) VALUES ('SELECT now()', 'user=user host=host'); -- выполнить запрос удалённо как можно быстрее При этом, опять же, при очередной проверке планировщик подключится к указанной удалённой базе, выполнит в ней запрос и запишет результат обратно в таблицу.

При выполнении запросов планировщик записывает текущий статус выполнения в перечисляемое поле state, значениями которого могут быть - PLAN - задача запланирована для выполнения (по-умолчанию) - TAKE - задача взята для выполнения - WORK - задача выполняется - DONE - задача выполнена - STOP - задачу не надо выполнять

Перед выполнением запроса планировщик записывает фактическое время начала выполнения в поле start, а также идентификатор обрабатывающего запрос процесса в целочисленное поле pid.

После выполнения запроса планировщик записывает фактическое время окончания в поле stop.

Если при выполнении запроса произойдёт какая-то ошибка, то сообщение об ошибке планировщик запишет в текстовое поле error: sql INSERT INTO task (input) VALUES ('SELECT 1/0'); INSERT INTO task (input, remote) VALUES ('SELECT 1/0', 'user=user host=host'); Чтобы выполнить запрос в указанное время, достаточно указать его в поле plan (по-умолчанию там будет текущее время): sql INSERT INTO task (plan, input) VALUES (now() + '5 min':INTERVAL, 'SELECT now()'); -- выполнить запрос через 5 минут INSERT INTO task (plan, input) VALUES ('2029-07-01 12:51:00', 'SELECT now()'); -- выполнить запрос в указанное время в указанную дату При этом планировщик будет выполнять такие запросы только при наступлении указанного времени.

Для автоматического повторения запроса через равные промежутки времени, можно указать требуемый интервал в поле repeat: sql INSERT INTO task (repeat, input) VALUES ('5 min', 'SELECT now()'); -- повторять запрос каждые 5 минут При этом планировщик будет каждые 5 минут выполнять запрос (автоматически создавая новую задачу для этого, в которой сохраняются все нужные поля).

По-умолчанию, планировщик выполняет одновременно только одну задачу, но если нужно выполнять несколько, то это можно указать в целочисленном поле max (по-умолчанию, там будет 0): sql INSERT INTO task (max, input) SELECT 1, 'SELECT pg_sleep(10)' FROM generate_series(1, 10); -- выполнить 10 запросов, одновременно выполняя по два запроса Здесь число 1 в поле max означает запускать одну задачу параллельно уже запущенной, т.е. всего будет выполняться 2 задачи одновременно.

Если, пока выполняются задачи как выше, в поле max указать число 2 sql INSERT INTO task (max, input) VALUES (2, 'SELECT now()'); -- выполнить запрос параллельно к двум уже запущенным то планировщик запустит новую задачу параллельно к двум уже запущенным, несмотря на то, что еще не выполнились все задачи у которых в поле max было число 1. Также, это можно рассматривать как некое подобие приоритета выполнения в задачах.

Здесь нужно упомянуть ещё то, что планировщик выполняет задачи в порядки их поступления последовательно (при прочих равных условиях), т.е. упорядочивая таблицу по автоинкрементирующемуся целочисленному полю id.

Задачи можно группировать, задавая разные текстовые поля group (по-умолчанию, там будет строка group): sql INSERT INTO task (group, max, input) SELECT 'one', 1, 'SELECT pg_sleep(10)' FROM generate_series(1, 10); -- выполнить 10 запросов, одновременно выполняя по два запроса в одной группе INSERT INTO task (group, max, input) SELECT 'two', 1, 'SELECT pg_sleep(10)' FROM generate_series(1, 10); -- выполнить 10 запросов, одновременно выполняя по два запроса в другой группе Т.е. будет выполняться одновременно 4 запроса, 2 для одной группы и 2 для другой. На самом деле, группировка выполяется по целочисленному полю hash, которое вычисляется как хэш от полей group и remote.

Т.о. планировщик может выполнять запросы последовательно и/или параллельно. Но есть ещё и третий возможный вариант - антипараллельное выполнение запросов. Это когда запросы выполняются последовательно один за одним (в группе), но с заданными промежутками-паузами между выполнением запросов. Конечно, это всегда можно сделать, указывая необходимое планируемое время выполнения запросов, но гораздо проще задать паузу как отрицательное число миллисекунд в поле max: sql INSERT INTO task (max, input) SELECT -5000, 'SELECT pg_sleep(10)' FROM generate_series(1, 10); -- выполнить 10 запросов антипараллельно, т.е. последовательно выполняя их с паузой 5 секунд между выполнениями Здесь отрицательное число в поле max означает количество миллисекунд паузы между выполнением запросов.

При автоматическом повторении запросов, а также при антипараллельном выполнении запросов с паузами, могут возникнуть интересные ситуации, когда продолжительность запроса больше, чем интервал автоматического повтора или чем пауза между антипараллельными выполнениями. В таких ситуациях есть два варианта: 1) отсчитывать следующий повтор (или паузу) после окончания выполнения запроса, при этом начало следующей задачи будет всё время сдвигаться, "дрейфовать" 2) либо вычислять повтор (или, соответственно, паузу) от запланированного времени предыдущей задачи, так, что начало следующей задачи будет чётко отстоять от запланированного времени предыдущей задачи на кратный интервал повтора (паузы)

Эти варианты можно задать в булевском поле drift (по-умолчанию, там будет false).

Существуют ещё несколько полезных полей в таблице с заданиями.

Поле active используется для указания интервала (относительно запланированного времени выполнения), когда задача ещё активна, т.е актуальна (по-умолчанию, там будет 1 час). Если в течении этого времени планировщик не успеет выполнить задачу, то в дальнейшем (когда до неё дойдёт очередь), эта задача не будет выполняться, а будет сразу записана ошибка о том, что задача просрочена. При этом, очевидно, если задача была повторяющаяся, то будет создано очередное повторение для задачи, которое в дальнейшем уже может успеть выполниться за определённое время.

Если указано булевское поле delete (а по-умолчанию оно указано и там true) и в результате выполнения запроса нет ошибок и сам результат пустой, то после выполнения планировщик удалит запись о задаче из таблицы с заданиями. И точно также, если задача была повторяющаяся, то будет создано очередное повторение для задачи.

Также можно ограничить время выполнения задачи, указав неотрицательный интервал в поле timeout (по умолчанию там 0, что означает неограниченное время выполнения): sql INSERT INTO task (timeout, input) VALUES ('5 sec', 'SELECT pg_sleep(10)'); -- выполнить запрос с таймаутом 5 секунд В данном случае возникнет ошибка, т.к. время выполнения запроса будет больше таймаута.

Если требуется выполнить много запросов в одной и той же группе, то логичнее будет не запускать для каждого запроса новый фоновый рабочий процесс, а обрабатывать в одном таком процессе множество запросов. Для управления такими ситуациями используются поле live, в котором можно задать интервал жизни фонового рабочего процесса, и/или неотрицательное целочисленное поле count, в котором можно ограничить количество запросов, обрабатываемых фоновым рабочим процессом. Нулевые значение по-умолчанию в обоих этих полях означают то, что фоновый рабочий процесс обработает только один запрос.

Для кастомного форматирования результатов выполнения запросов можно использовать следующие поля.

Если задано булево поле header (по-умолчанию true), то в результатах будут показываться заголовки.

Если задано булево поле string (по-умолчанию true), то в результатах будут квотироваться только строки.

Поле delimiter (по-умолчанию это знак табуляции) задаёт разделитель между колонками в результатах.

Поле quote (по-умолчанию это пустая строка) задаёт символ кавычек для квотирования.

Поле escape (по-умолчанию это пустая строка) задаёт символ для экранирования.

И, наконец, поле null (по-умолчанию это \N) задаёт символ для NULL.

Как уже упоминалось выше, планировщик может быть запущен одновременно в нескольких базах. Для управления этим используется переменная pg_task.json, которая может быть задана только в конфигурационном файле. Но зато применять её изменения можно даже просто при перечитывании конфигурации. В этой переменной хранится таблица конфигурации планировщика в сериализованном в json(b) виде, которая состоит из следующих колонок: - data - название базы данных, в которой должен быть запущен планировщик (по-умолчанию это postgres) - reset - интервал сброса зависших заданий (по-умолчанию это 1 час) - schema - название схемы, в которой планировщик создаст и будет использовать таблицу с заданиями (по-умолчанию это public) - table - название самой таблицы с заданиями (по-умолчанию это task) - sleep - интервал между проверками на новые задания (по-умолчанию это 1 секунда) - user - имя пользователя, от которого будет работать планировщик (по-умолчанию это postgres)

Т.к. переменная pg_task.json хранит json-значение таблицы, то можно опускать некоторые колонки, которые при этом будут заполняться значениями по-умолчанию. Также, если указанная база и/или пользователь не существуют, то они будут созданы планировщиком.

Например следующая настройка conf pg_task.json = '[{"data":"database1"},{"data":"database2","user":"username2"},{"data":"database3","schema":"schema3"},{"data":"database4","table":"table4"},{"data":"database5","sleep":100}]' запустит планировщик в базе database1 под пользователем database1 (т.к. если не указан пользователь, то его имя берётся как имя базы), в базе database2 под пользователем username2, в базе database3 под пользователем database3 в схеме schema3, в базе database4 под пользователем database4 с таблицей table4, и наконец в базе database5 под пользователем database5 и с интервалом проверки задач в 100 миллисекунд, при этом остальные неуказанные поля будут браться по-умолчанию.

В планировщике применяется многоуровневая система значений по-умолчанию, в отдельных случаях может доходить до 5 уровней! Например, значение поля drift можно задать при вставке в таблицу с заданиями, либо в текущей сессии, либо для текущего пользователя, либо для текущей базы, либо в конфигурационном файле.

Будущее или как устроен планировщик.

Вкратце, планировщик асинхронных заданий pg_task представляет из себя фоновый рабочий процесс, который запускает ещё фоновые рабочие процессы, которые следят за таблицами с заданиями и запускают фоновые рабочие процессы, которые выполняют задачи.

На логическую структуру моего планировщика во многом повлиял планировщик задач из замечательного и очень необычного интересного питоновского веб-фреймворка web2py. В том планировщике задачи и информация об их запусках хранятся в двух разных таблицах, причём повторяющаяся задача имеет одну запись в таблице задач и много записей в таблице запусков. Я же всё уместил в одну таблицу с заданиями, при этом повторение задания - это просто новая запись в таблице с заданиями. Такую идею мне подсказал один человек, правда он говорил не о планировщике, а об организации бизнес процессов между отделами при передачи задач между ними. Что мне очень не нравится в планировщике web2py, так это то, что там должны быть постоянно запущены рабочие, которые выполняют задачи. Мне так и не удалось сделать, чтобы эти рабочие динамически запускались и останавливались в зависимости от количества задач, зато удалось это сделать как раз в своём планировщике.

Итак, планировщик создаёт и работает всего с одной таблицей заданий со следующими колонками:

| Название | Тип | Нулевой? | По-умолчанию | Описание | | --- | --- | --- | --- | --- | | id | bigserial | NOT NULL | автоинкремент | идентификатор - первичный ключ | | parent | bigint | NULL | pg_task.id | идентификатор родительской задачи (если такая есть), подобно внешнему ключу, но без явного ограничения для производительности | | plan | timestamptz | NOT NULL | CURRENT_TIMESTAMP | планируемая дата и время запуска задачи | | start | timestamptz | NULL | | актуальная дата и время запуска задачи | | stop | timestamptz | NULL | | актуальная дата и время завершения задачи | | active | interval | NOT NULL | pg_task.active | положительный период после запланированного времени, в течении которого задача актуальна для выполнения | | live | interval | NOT NULL | pg_task.live | неотрицательный период жизни текущего фонового рабочего процесса для выполнения нескольких задач перед выходом | | repeat | interval | NOT NULL | pg_task.repeat | неотрицательный период автоматического повторения задачи | | timeout | interval | NOT NULL | pg_task.timeout | неотрицательный период разрешённого времени выполнения задачи | | count | int | NOT NULL | pg_task.count | неотрицательное количество задач, выполняемое текущим фоновым рабочим процессом перед выходом | | hash | int | NOT NULL | генерируется по group и remote | хэш для группировки задач | | max | int | NOT NULL | pg_task.max | максимальное число одновременно выполняемых в группе задач, отрицательное значение означает паузу в миллисекундах между выполнениями задач в группе | | pid | int | NULL | | идентификатор процесса, выполняющего задачу | | state | enum state (PLAN, TAKE, WORK, DONE, STOP) | NOT NULL | PLAN | состояние задачи | | delete | bool | NOT NULL | pg_task.delete | удалять задачу если нет результата и нет ошибки? | | drift | bool | NOT NULL | pg_task.drift | вычислять следующее время относительно времени завершения вместо времени планирования? | | header | bool | NOT NULL | pg_task.header | показывать заголовки в результатах? | | string | bool | NOT NULL | pg_task.string | квотировать только строки в результатах? | | delimiter | char | NOT NULL | pg_task.delimiter | разделитель между колонками в результатах | | escape | char | NOT NULL | pg_task.escape | символ экранирования в результатах | | quote | char | NOT NULL | pg_task.quote | символ кавычек в результатах | | data | text | NULL | | некоторые пользовательские данные | | error | text | NULL | | полученные ошибки при выполнении запроса | | group | text | NOT NULL | pg_task.group | имя для группировки задач | | input | text | NOT NULL | | SQL-команда(ы) для выполнения | | null | text | NOT NULL | pg_task.null | заполнитель для NULL в результатах | | output | text | NULL | | полученный результат(ы) | | remote | text | NULL | | строка для подключения к удалённой базе (при необходимости) |

Почти все значения по-умолчанию в таблице с заданиями берутся из следующих переменных

| Название | Тип | По-умолчанию | Уровень | Описание | | --- | --- | --- | --- | --- | | pg_task.delete | bool | true | конфигурация, база, пользователь, сессия | удалять задачу если нет результата и нет ошибки? | | pg_task.drift | bool | false | конфигурация, база, пользователь, сессия | вычислять следующее время относительно времени завершения вместо времени планирования? | | pg_task.header | bool | true | конфигурация, база, пользователь, сессия | показывать заголовки в результатах? | | pg_task.string | bool | true | конфигурация, база, пользователь, сессия | квотировать только строки в результатах? | | pg_conf.close | int | 60 * 1000 | конфигурация, база, супер-пользователь | закрывать фоновые рабочие процессы в pg_conf в течении указанных миллисекунд | | pg_conf.fetch | int | 10 | конфигурация, база, супер-пользователь | обрабатывать в pg_conf указанное чило строк за раз | | pg_conf.restart | int | 60 | конфигурация, база, супер-пользователь | интервал в секундах для перезапуска pg_conf при ошибках | | pg_task.count | int | 0 | конфигурация, база, пользователь, сессия | неотрицательное количество задач, выполняемое текущим фоновым рабочим процессом перед выходом | | pg_task.fetch | int | 100 | конфигурация, база, пользователь | обрабатывать указанное число задач за раз | | pg_task.id | bigint | 0 | сессия | идентификатор текущий задачи (только для чтения) | | pg_task.limit | int | 1000 | конфигурация, база, пользователь | ограничить число обрабатываемых за раз задач | | pg_task.max | int | 0 | конфигурация, база, пользователь, сессия | максимальное число одновременно выполняемых в группе задач, отрицательное значение означает паузу в миллисекундах между выполнениями задач в группе | | pg_task.sleep | int | 1000 | конфигурация, база, пользователь | период в миллисекундах для проверки наличия новых задач | | pg_work.close | int | 60 * 1000 | конфигурация, база, супер-пользователь | закрывать фоновые рабочие процессы в pg_work в течении указанных миллисекунд | | pg_work.fetch | int | 100 | конфигурация, база, супер-пользователь | обрабатывать в pg_work указанное чило строк за раз | | pg_work.restart | int | 60 | конфигурация, база, супер-пользователь | интервал в секундах для перезапуска pg_work при ошибках | | pg_task.active | interval | 1 hour | конфигурация, база, пользователь, сессия | положительный период после запланированного времени, в течении которого задача актуальна для выполнения | | pg_task.data | text | postgres | конфигурация | название базы данных для таблицы с заданиями | | pg_task.delimiter | char | \t | конфигурация, база, пользователь, сессия | разделитель между колонками в результатах | | pg_task.escape | char | | конфигурация, база, пользователь, сессия | символ экранирования в результатах | | pg_task.group | text | group | конфигурация, база, пользователь, сессия | имя для группировки задач | | pg_task.idle | int | 60 | конфигурация, база, пользователь | количество проверок на новые задачи перед глубоким сном в случае неактивности | | pg_task.json | json | [{"data":"postgres"}] | конфигурация | конфигурация json, возможные колонки: data, reset, schema, table, sleep и user | | pg_task.live | interval | 0 sec | конфигурация, база, пользователь, сессия | неотрицательный период жизни текущего фонового рабочего процесса для выполнения нескольких задач перед выходом | | pg_task.null | text | \N | конфигурация, база, пользователь, сессия | заполнитель для NULL в результатах (и логах) | | pg_task.quote | char | | конфигурация, база, пользователь, сессия | символ кавычек в результатах | | pg_task.repeat | interval | 0 sec | конфигурация, база, пользователь, сессия | неотрицательный период автоматического повторения задачи | | pg_task.reset | interval | 1 hour | конфигурация, база, пользователь | период сброса зависших задач | | pg_task.schema | text | public | конфигурация, база, пользователь | название схемы для таблицы с заданиями | | pg_task.table | text | task | конфигурация, база, пользователь | название таблицы с заданиями | | pg_task.timeout | interval | 0 sec | конфигурация, база, пользователь, сессия | неотрицательный период разрешённого времени выполнения задачи | | pg_task.user | text | postgres | конфигурация | имя пользователя для запуска планировщика |

На самом деле, структура таблицы не является жёстким требованием. Пользователь может сам добавить в таблицу нужные ему колонки или изменить у стандартных колонок значения по-умолчанию, или вообще сам создать таблицу для заданий, главное, чтобы было соответствие со стандартными полями в плане приведения типов без явного указания.

Основным принципом при проектировании планировщика было использовать по возможности как можно меньше кода на C и по возможности большинство действий производить на SQL (через SPI). Даже межпроцессное взаимодействие происходит на SQL через представление pg_locks, благодаря чему не пришлось его адаптировать под разные конкретные версии постгреса. Таже, все повторяющиеся запросы выполняются через подготовленные операторы, а там, где результатов ожидается больше одной строки - используются курсоры.

В планировщике реализовано множество необычных и неординарных решений. Т.к. это фактически моё первое расширение для постгреса, то многих вещей я не знал, когда начал делать свой планировщик, и поэтому навелосипедил много такого, что, как я понял уже много позже, обычно не делают (или вообще нельзя делать) в расширениях, но я-то это уже сделал, это работает и от этого довольно сложно отказаться из-за удобности.

Например, я не знал, что для использования расширения необходимо сначала его создать командой CREATE EXTENSION, и поэтому планировщик можно использовать без этой команды. Можно сказать, что планировщик - это единственное расширение, которое для общения с пользователем не использует эту команду. Более того, именно благодаря такому подходу, планировщик может сам создавать необходимые базы и пользователей, а также запускаться в каждой базе в нескольких экземплярах.

Также, я не знал, что нельзя запускать новый фоновый рабочий процесс при изменении какого-нибудь конфигурационного параметра, а взял и сделал это. Все другие расширения постоянно держат включенным фоновый рабочий процесс, который следит за нужными конфигурационными параметрами и при их изменениях производит какие-либо действия.

Физически, вкратце, планировщик состоит из четырёх основных частей: 1) init - инициализация, вспомогательные функции и запуск фонового рабочего процесса конфигурации pg_conf 2) pg_conf - фоновый рабочий процесс, который по конфигурационному параметру pg_task.json, а также по параметрам, заданным для баз и пользователей, определяет, в каких базах и под какими пользователями и с какими параметрами запустить управляющие pg_work 3) pg_work - фоновый рабочий процесс, который периодически опрашивает таблицу с заданиями и при наличии задач, запускает их выполнение в удалённой базе через libpq или запускает фоновый рабочий процесс pg_task для выполнения задачи в локальной базе 4) pg_task - фоновый рабочий процесс, который выполняет задачу в локальной базе (той же самой, что и у pg_work, который его запустил)

Итак, после установки планировщика в shared_preload_libraries и перезапуска кластера, планировщик инициализирует свои многочисленные конфигурационные переменные и запускает фоновый рабочий процесс конфигурации pg_conf (для гринплама это происходит только на координаторе), который с помощью блокировок в первую очередь убеждается, что он запущен только в единственном экземпляре. Также, он запускает расширенный перехват сигнала SIGUSR2, а также устанавливает обработчик стандартного пользовательского таймаута. Затем pg_conf подключается к базе postgres под пользователем по-умолчанию и превращает конфигурационный параметр pg_task.json в таблицу с помощью функции jsonb_to_recordset, из которой с помощью SPI курсоров извлекает строки (в заранее инициализированный список), определяющие, в каких базах и с какими параметрами должны быть запущены управляющие фоновые рабочие процессы pg_work. При этом, какие процессы уже запущены определяется с помощью блокировок. Далее, для всех строк запускается функция, которая сначала проверяет существование пользователя и базы, и если их нет, то создаёт их, а потом запускает как раз соответствующие фоновые рабочие процессы pg_work.

Параметры (такие как название базы, имя пользователя, схема, название таблицы и другие) в фоновый рабочий процесс передаются через динамическую разделяемую память, которая требует очень внимательного и осторожного обращения. Во-первых, надо не освободить память раньше, чем дочерний процесс её прочитал. А во-вторых, надо освободить память не только после завершения дочернего процесса, но и в случае его неожиданного падения. Как раз для этих целей используется расширенный перехват сигнала SIGUSR2 и пользовательский таймаут. А именно, после успешного прочтения параметров из динамической разделяемой памяти дочерний процесс посылает родительскому сигнал SIGUSR2, по которому последний как раз и может освободить соответствующую память. Этого не происходит при падении дочернего процесса, до того как он успел прочитать разделяемую память, и в таком случае как раз используется пользовательский таймаут, по которому родительский процесс также может освободить память.

Теперь в игру вступает управляющий фоновый рабочий процесс pg_work, который использует ту же тактику для передачи параметров своим рабочим процессам. В первую очередь он подключается к динамической разделяемой памяти, идентификатор которой ему передал в аргументе родительский процесс, и прочитывает оттуда параметры. Потом он посылает сигнал SIGUSR2 родительскому процессу об успешном прочтении разделяемой памяти, и тот освобождает соответствующую память, которую он может найти по идентификатору дочернего процесса, как раз передаваемому при расширенном перехвате сигналов. Затем, с помощью блокировки pg_work убеждается, что он запущен только в единственном экземпляре (эта же блокировка используется в pg_conf для проверки, какие процессы уже запущены). Далее pg_work подключается к нужной базе с нужными параметрами и создаёт в ней схему и таблицу с заданиями, если они ещё не были до этого созданы, и запускает главный цикл проверки наличия новых заданий. Т.к. некоторые задания могут выполняться в удалённой базе через libpq, то в этом же цикле и происходит асинхронная обработка соответствующих событий.

Как было сказано выше, в главном цикле наличие новых заданий проверяется периодически, по-умолчанию, каждую секунду. Но недавно я сделал существенную оптимизацию, позволяющую процессу pg_work как бы "заснуть" при некотором периоде отсутствия новых заданий, по-умолчанию это минута. При этом при вставке новых записей в таблицу с заданиями, управляющий pg_work будится сигналом, посылаемым через триггер, и снова начинает проверять наличие новых заданий каждую секунду в течении минуты. Если новых заданий нет, то управляющий засыпает надолго, а если есть - до самого ближайшего по времени.

Итак, каждую секунду управляющий процесс pg_work запускает довольно сложный подготовленный запрос с помощью SPI курсора, чтобы получить задания для выполнения. Если задачу нужно выполнить удалённо, то она запускается через libpq, и соответствующие события добавляются для обработки в главный цикл. А если же задачу нужно выполнить локально, то запускается фоновый рабочий процесс pg_task, которому передаются параметры также через динамическую разделяемую память. Отдельный рабочий процесс необходим для локальных задач, т.к. постгрес выполняет запросы синхронно, а также чтобы можно было выполнять несколько заданий одновременно. Для удалённых задач отдельный процесс не обязателен, т.к. он и так создаётся при подключении через libpq.

Наконец, фоновый рабочий процесс pg_task выполняет задачу (или несколько) в локальной базе следующим образом. Во-первых, он использует блокировку, чтобы убедиться, что он запущен только в единственном экземпляре (эта же блокировка используется в pg_work для проверки, какие задачи уже запущены). Во-вторых, pg_task получает информацию о задаче с помощью подготовленного SPI запроса (pg_work получает информацию об удалённой задаче с помощью той же функции). Планировщик выполняет запрос из задачи с помощью функции exec_simple_query, которая в постгресе является приватной, т.е. объявлена как static. Поэтому пришлось настроить копирование файла, содержащего эту функцию, соответствующей версии постгрес (в т.ч. для гринплама - соответствующей версии гринплама). Для тех, кому это не очень нравится, я создал отдельную ветку spi, в которой выполнение запроса задачи происходит через SPI с соответствующими ограничениями на возможные выполняемые запросы. Для перехвата исключений используется стандартная конструкция PG_TRY - PG_CATCH, а само сообщение об ошибке перехватывается кастомным emit_log_hook для записи в соответствующее поле error таблицы с заданиями.

Ближайшее будущее планировщика я вижу в том, чтобы сделать как можно больше разнообразных тестов, потому что те, которые я пока сделал покрывают лишь совсем малую часть возможностей планировщика.