Contents
Plain-language companion: v0.28.0.md
v0.28.0 — Transactional Inbox & Outbox Patterns
Status: Released. Driven by PLAN_TRANSACTIONAL_OUTBOX_HELPER.md and PLAN_TRANSACTIONAL_INBOX_HELPER.md. Outbox helper moved here from v0.22.0 to ship alongside the inbox helper and production-grade advanced features as a complete transactional messaging solution.
Release Theme This release delivers a complete, production-grade solution for the two most common event-driven integration patterns in microservice architectures. Part A (Essential) ships the Transactional Outbox (reliable atomic event publication) and Transactional Inbox (reliable idempotent event consumption) as zero-boilerplate SQL helpers. Part B (Advanced) adds Consumer Groups for coordinated multi-relay outbox polling with Kafka-style offset tracking, visibility timeouts, and lag monitoring — and Ordered Processing for the inbox, including per-aggregate sequence ordering, gap detection, priority queues, and partition-affinity helpers for competing workers. Together, Parts A and B let pg_trickle users build reliable, exactly-once event pipelines that scale from a single relay to multi-instance deployments, using nothing but PostgreSQL.
See PLAN_TRANSACTIONAL_OUTBOX_HELPER.md and PLAN_TRANSACTIONAL_INBOX_HELPER.md for the full architecture and API design.
Known Limitations in v0.28.0
| Limitation | Rationale | Future Path |
|---|---|---|
Outbox requires DIFFERENTIAL mode. enable_outbox() on IMMEDIATE-mode stream tables returns OutboxRequiresNotImmediateMode. |
Outbox writes one row per refresh cycle inside the refresh transaction. IMMEDIATE refreshes fire inside every source transaction; adding an outbox INSERT there imposes that cost on every application write. | Post-1.0 opt-in GUC if demand justifies. |
Ordering and priority are mutually exclusive per inbox. Calling both enable_inbox_ordering() and enable_inbox_priority() on the same inbox returns InboxOrderingPriorityConflict. |
Per-aggregate sequence ordering must surface the next message in sequence regardless of priority level; priority tiers violate that guarantee. | Use separate inboxes per priority class, each with enable_inbox_ordering() applied independently. |
Gap detection degrades above ~100K aggregates. The gaps_<inbox> stream table uses LEAD() over pending messages, which is O(N log N) in pending message count — not O(sequence range). This is a significant improvement over the generate_series approach; however, refresh time still scales with pending message volume. |
Acceptable up to ~1M pending messages at 30 s schedule. Above 10M pending messages, auto-refresh may be slow; use inbox_ordering_gaps() for on-demand checks. |
Post-v0.28.0: delta-based detection scanning only aggregates with recent activity. |
| Consumer groups provide at-least-once delivery per consumer instance, not exactly-once globally. | Exactly-once is achieved by composition: relay uses broker idempotency keys; inbox uses ON CONFLICT (event_id) DO NOTHING. Three-layer deduplication is more resilient than a monolithic exactly-once guarantee. |
Design decision. Documented in PATTERNS.md and SQL_REFERENCE.md. |
AUTO mode may fall back to FULL refresh while outbox is enabled. When AUTO refresh falls back to FULL, the outbox header row carries "full_refresh": true. If the number of current rows exceeds outbox_inline_threshold_rows, the claim-check path applies: rows land in outbox_delta_rows_<st> and the relay fetches via cursor. A pg_trickle_alert outbox_full_refresh event is emitted regardless of which path is taken. Relays must detect the full_refresh flag, apply snapshot semantics (upsert rather than publish-as-new), and handle either inline or claim-check payloads. |
AUTO refresh adapts to IVM cost at runtime; blocking the FULL fallback permanently would compromise the adaptation that makes AUTO useful. The sentinel flag preserves correctness; the claim-check path prevents memory exhaustion on large tables. | Reference relay updated in OUTBOX-8 to demonstrate all combinations. Post-v0.28.0: consider a GUC to disable FULL fallback per ST when outbox is enabled. |
next_<inbox> ordered ST scans all processed rows. The last_processed CTE in the aggregate-ordered ST runs MAX(sequence_num) GROUP BY aggregate_id over every processed row on each refresh. For inboxes with large volumes of processed history this grows without bound. |
A partial index (aggregate_id, sequence_num) WHERE processed_at IS NOT NULL is created by enable_inbox_ordering() to mitigate this at v0.28.0, making it an index-only scan. Scaling thresholds: < 100K rows → < 5 ms at 1 s schedule; 100K–1M → increase schedule to 5s; > 1M → increase to 10s–30s; > 10M → use inbox_ordering_gaps() on-demand only. |
Post-v0.28.0: introduce pgt_inbox_sequence_state catalog table updated atomically via advance_inbox_sequence(), making the CTE O(changed aggregates). |
Global consumer monitoring STs created once, not reference-counted. pgt_consumer_status, pgt_consumer_group_lag, pgt_consumer_active_leases are auto-created on the first create_consumer_group() call. They must be created idempotently and torn down only when the last consumer group for an outbox is dropped. |
A single set of monitoring STs per outbox is correct and cheaper than per-group STs. | Implementation: create_stream_table() called with if_not_exists := true; drop_consumer_group() decrements a reference count and drops STs at zero. |
Outbox relay latency bounded by poll interval. Relays discover new outbox rows by polling. The pg_trickle extension emits pg_notify('pgtrickle_outbox_new', outbox_table_name) after each outbox INSERT (v0.28.0), but the pgtrickle-relay binary does not yet use LISTEN — it starts polling on the standard interval. Minimum relay latency today equals the poll interval (visibility_seconds). |
The NOTIFY is cheap (≈2 µs, inside the existing refresh transaction) and is emitted from v0.28.0 onwards so relay authors can begin using it immediately. The pgtrickle-relay CLI will use LISTEN/NOTIFY in v0.29.0. |
v0.29.0 relay: subscribe to pgtrickle_outbox_new for sub-100 ms wake-up (see E2E latency benchmark in PLAN_RELAY_CLI.md §E.5). |
replay_inbox_messages() accepts only explicit event ID lists. A free-form where_clause parameter was removed to eliminate SQL injection risk. |
EXPLAIN-based validation of dynamic SQL is insufficient; parameterised WHERE event_id = ANY($1) is the safe API. |
Operators who need filter-based replay should run a parameterised SELECT ARRAY_AGG(event_id) ... WHERE <condition> first, then pass the result to replay_inbox_messages(). |
Part A — Essential Patterns
Transactional Outbox Helper (P2 — §9.12)
In plain terms: After each DIFFERENTIAL refresh cycle, pg_trickle writes a row to
pgtrickle.outbox_<st>within the same transaction as the MERGE — either both succeed or neither does. For small deltas the row carries a versioned inline JSON payload{"v":1, "inserted":[…], "deleted":[…]}. For large deltas (aboveoutbox_inline_threshold_rows, default 10 000 rows) the row carries a lightweight claim-check header{"v":1, "claim_check": true, …}and the actual rows land in the companion tablepgtrickle.outbox_delta_rows_<st>, which the relay reads via a server-side cursor in bounded batches — constant memory regardless of delta size. Eliminates the dual-write problem for downstream event buses without a CDC connector or external replication slot.
| Item | Description | Effort | Ref |
|---|---|---|---|
| OUTBOX-1 | Catalog + SQL functions. pgt_outbox_config catalog table. enable_outbox(name, retention_hours) / disable_outbox(name, if_exists) SQL functions. OutboxAlreadyEnabled, OutboxNotEnabled, OutboxRequiresNotImmediateMode error variants. |
0.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.1–A.2 |
| OUTBOX-2 | Outbox table creation. pgtrickle.outbox_<st> with id BIGSERIAL, pgt_id UUID, refresh_id UUID, created_at, inserted_count INT, deleted_count INT, is_claim_check BOOLEAN DEFAULT false, payload JSONB. Index on created_at. Naming: 7-byte outbox_ prefix + up to 56-byte stream table name; collision resolution appends 7-char hex suffix derived from left(md5(name), 7). Final name stored in pgt_outbox_config.outbox_table_name. Also creates: (a) latest-row view pgtrickle.pgt_outbox_latest_<st> (ORDER BY id DESC LIMIT 1) for quick lag inspection and operational checks; (b) delta rows table pgtrickle.outbox_delta_rows_<st> with outbox_id BIGINT REFERENCES outbox_<st>(id), row_num INT, op CHAR(1) CHECK (op IN ('I','D')), payload JSONB, PRIMARY KEY (outbox_id, row_num) — populated only for claim-check entries. (Note: pgt_consumer_claim_check_acks is created in Part B / OUTBOX-B1, not here — it has no purpose without consumer groups.) All objects dropped alongside the outbox table. |
1d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.3 |
| OUTBOX-3 | Refresh-path integration. After successful MERGE, if outbox is enabled, INSERT outbox row within the same transaction — unless outbox_skip_empty_delta = true (default) and inserted_count = 0 AND deleted_count = 0, in which case no INSERT or NOTIFY is issued, saving write amplification on quiet refresh cycles. In-memory outbox_enabled_set cache with DDL-triggered invalidation. Hot-path cost < 50 ns when disabled. Routing: if delta_row_count <= outbox_inline_threshold_rows, serialise Vec<DeltaRow> to inline JSONB as before. If delta_row_count > outbox_inline_threshold_rows, write is_claim_check = true header row first (no payload), then INSERT delta rows into outbox_delta_rows_<st> in batches controlled by outbox_claim_check_batch_size GUC (default 1 000 rows/call) — keeping Rust heap bounded regardless of delta size. Both writes are in the same transaction. FULL-refresh fallback: when AUTO mode falls back to FULL refresh, the outbox header row additionally carries "full_refresh": true; if row count exceeds the threshold the claim-check path applies; a pg_trickle_alert outbox_full_refresh event is emitted so relays apply snapshot semantics. NOTIFY: emit pg_notify('pgtrickle_outbox_new', outbox_table_name) inside the same transaction after the outbox INSERT, enabling relay authors to use LISTEN for sub-second wake-up (cost: ~2 µs per refresh; skipped when empty-delta skip applies). |
1.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.4 |
| OUTBOX-4 | Versioned payload format — two paths. Inline path (small delta): {"v":1, "inserted":[…], "deleted":[…]} with to_jsonb() type mapping. Claim-check path (large delta): {"v":1, "claim_check": true, "inserted_count": N, "deleted_count": N, "refresh_id": "…"} — no row data in the outbox row itself; relay reads outbox_delta_rows_<st> via server-side cursor and calls outbox_rows_consumed(stream_table, outbox_id) when done. FULL-fallback payloads additionally set "full_refresh": true in the header; claim-check applies when the full-refresh row count exceeds the threshold. GUC outbox_inline_threshold_rows (default 10 000 rows) controls the routing threshold. No truncation path — data is never silently dropped. |
0.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.5 |
| OUTBOX-5 | Retention drain. Scheduler cleanup step: batched DELETE on outbox_<st> with outbox_drain_batch_size GUC (default 10 000). Cascades to outbox_delta_rows_<st> via FK ON DELETE CASCADE — no separate drain step needed for delta rows. Per-ST or global outbox_retention_hours (default 24). last_drained_at / last_drained_count tracked in catalog. |
0.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.6 |
| OUTBOX-6 | Lifecycle & cascade. drop_stream_table() cascades to outbox table + delta rows table + metadata. alter_stream_table() errors if column set changed while outbox enabled. outbox_status() monitoring function (includes claim_check_pending_count and storage_status fields). outbox_rows_consumed(stream_table TEXT, outbox_id BIGINT) SQL function: called by relay after cursor consumption to record per-group completion in pgt_consumer_claim_check_acks; idempotent. Note: stream_table takes the stream table name (as registered in pgt_stream_tables), not the outbox table name — the function resolves the outbox table via pgt_outbox_config. 8 Part-A GUCs (outbox_enabled, outbox_retention_hours, outbox_drain_batch_size, outbox_inline_threshold_rows, outbox_claim_check_batch_size, outbox_drain_interval_seconds, outbox_storage_critical_mb, outbox_skip_empty_delta) + 4 Part-B GUCs (consumer_dead_threshold_hours, consumer_stale_offset_threshold_days, consumer_cleanup_enabled, outbox_force_retention). |
0.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.7–A.8 |
| OUTBOX-7 | Tests & benchmark. Unit: enable/disable/validation/naming/cascade. Integration: end-to-end inline outbox write; claim-check triggered at threshold boundary (N = threshold, N = threshold+1); delta rows populated atomically in same transaction; relay cursor-fetch returns all rows in order; outbox_rows_consumed() idempotency; retention drain cascades delta rows via FK; rollback on outbox INSERT failure leaves no orphan delta rows; pg_notify('pgtrickle_outbox_new', ...) emitted. Benchmark gates: (a) refresh_no_outbox vs refresh_outbox_inline vs refresh_outbox_claim_check — < 10 % overhead at inline threshold, < 25 % at large payloads; (b) poll_outbox() < 5 ms at 10K outbox rows; © commit_offset() < 10 ms with 10 concurrent relays; (d) consumer_lag() < 50 ms at 100K outbox rows; (e) E2E latency benchmark benches/e2e_outbox_latency.rs: p50 < 1.5 s (polling), p95 < 2.5 s (see PLAN_RELAY_CLI.md §E.5). |
1.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §D |
| OUTBOX-8 | Documentation & examples. SQL_REFERENCE.md: outbox API + both payload formats (inline and claim-check) + outbox_rows_consumed() + pgtrickle_outbox_new NOTIFY channel. CONFIGURATION.md: 7 GUCs (replacing outbox_max_payload_bytes with outbox_inline_threshold_rows; adding outbox_claim_check_batch_size and outbox_storage_critical_mb with tuning table). PATTERNS.md: Transactional Outbox section including claim-check relay pattern; WAL overhead analysis; backpressure guidance for dead consumers (outbox_storage_critical_mb alert workflow). Reference Python relay (examples/relay/outbox_relay.py) demonstrates both inline and claim-check paths. |
1d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §C, §E |
Outbox essential subtotal: ~7 days
Transactional Inbox Helper
In plain terms:
create_inbox('payment_inbox')creates a production-grade inbox table with auto-managed stream tables for the pending-message queue, dead-letter queue, and processing statistics. Applications write to the inbox (ON CONFLICT DO NOTHINGfor dedup), process messages from the pending stream table, and pg_trickle handles DLQ routing, alerts, retention, and monitoring automatically.enable_inbox_tracking()adopts an existing inbox table into pg_trickle’s monitoring without schema changes.
| Item | Description | Effort | Ref |
|---|---|---|---|
| INBOX-1 | Catalog + create_inbox(). pgt_inbox_config catalog table with column mapping (id_column, processed_at_column, retry_count_column, error_column, received_at_column, event_type_column). create_inbox(name, schema, max_retries, schedule, with_dead_letter, with_stats, retention_hours) creates inbox table in the specified schema (default pgtrickle) + metadata. InboxAlreadyExists, InboxNotFound, InboxTableNotFound, InboxColumnMissing error variants. |
1d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.1–A.3 |
| INBOX-2 | Inbox table DDL. Standard schema: event_id TEXT PK, event_type, source, aggregate_id, payload JSONB, received_at, processed_at, error, retry_count, trace_id. Partial indexes for pending, DLQ, and processed rows. Autovacuum tuning. |
0.5d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.3 |
| INBOX-3 | Auto-created stream tables. Pending ST (WHERE processed_at IS NULL AND retry_count < max_retries, DIFFERENTIAL, user-defined schedule). DLQ ST (WHERE processed_at IS NULL AND retry_count >= max_retries, DIFFERENTIAL, 30 s). Stats ST (GROUP BY event_type with pending/processed/dead_letter/avg processing time — max_pending_age_sec removed from the materialised ST query to enable DIFFERENTIAL mode and eliminate the O(N) full scan every 10 s; use inbox_health() for oldest_pending_age_sec on demand). All STs use column-mapped SQL from pgt_inbox_config. |
1d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.4 |
| INBOX-4 | enable_inbox_tracking(). Adopt existing table: validate columns exist with compatible types, validate PK/UNIQUE on id column, create stream tables using mapped column names, insert metadata with is_managed = false. Gracefully omit optional columns (source, aggregate_id, trace_id) if not present. |
0.5d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.6 |
| INBOX-5 | DLQ alert mechanism. Post-refresh hook on DLQ stream table: when rows_inserted > 0, emit pg_trickle_alert event inbox_dlq_message per new entry (capped at inbox_dlq_alert_max_per_refresh, default 10; excess batched into summary alert). |
0.5d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.5 |
| INBOX-6 | inbox_health() + inbox_status(). inbox_health(name) returns JSONB with pending_count, dead_letter_count, avg_processing_time_sec, oldest_pending_age_sec, throughput_per_sec, health_status (healthy/degraded/critical). inbox_status(name) returns tabular overview of all inboxes. |
0.5d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.1 |
| INBOX-7 | Retention drain + replay_inbox_messages(). Processed message drain via scheduler (batched DELETE, inbox_processed_retention_hours default 72 h). DLQ messages kept forever by default (inbox_dlq_retention_hours default 0). replay_inbox_messages(name TEXT, event_ids TEXT[]) resets processed_at + retry_count for the specified message IDs using a parameterised WHERE event_id = ANY($1) — no free-form SQL accepted; eliminates injection surface entirely. |
0.5d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.7–A.8 |
| INBOX-8 | drop_inbox() + lifecycle. drop_inbox(name, if_exists, cascade): always drops stream tables + metadata; drops inbox table only if cascade := true AND is_managed = true. DROP EXTENSION cascades managed tables; adopted tables survive. 6 GUCs (inbox_enabled, inbox_processed_retention_hours, inbox_dlq_retention_hours, inbox_drain_batch_size, inbox_drain_interval_seconds, inbox_dlq_alert_max_per_refresh). |
0.5d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.9–A.10 |
| INBOX-9 | Tests & benchmark. Unit: create/drop/enable_tracking/replay/health. Integration: end-to-end inbox lifecycle, DLQ routing, DLQ alert, retention drain, concurrent processors with FOR UPDATE SKIP LOCKED, enable_inbox_tracking() with non-standard columns. Benchmark gates: (a) pending ST refresh < 5 ms at 100 pending, < 50 ms at 10K pending; (b) next_<inbox> ordered ST refresh at each threshold (100K/1M/10M processed rows) matches documented scaling table; © stats ST FULL refresh < 5 ms at 100K rows, < 50 ms at 1M rows; (d) backpressure indicator: inbox_health() returns degraded within 2 refresh cycles when oldest_pending_age_sec exceeds threshold. |
1d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §D |
| INBOX-10 | Documentation & examples. SQL_REFERENCE.md: inbox API. CONFIGURATION.md: 6 GUCs. PATTERNS.md: Transactional Inbox section + “Bidirectional Event Pipeline” (inbox → business logic → outbox) worked example. Reference examples: inbox_writer_nats.py, inbox_processor.py, webhook_receiver.py. |
0.5d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §C, §E |
Inbox essential subtotal: ~6.5 days
Shared Infrastructure (Part A)
| Item | Description | Effort | Ref |
|---|---|---|---|
| SHARED-1 | Upgrade SQL. sql/pg_trickle--0.23.0--0.24.0.sql: create pgt_outbox_config and pgt_inbox_config catalog tables, register all new SQL functions. |
0.5d | — |
| SHARED-2 | PATTERNS.md integration guide. New “Event-Driven Integration Patterns” chapter in docs/PATTERNS.md covering: when to use outbox vs inbox vs both, transport comparison (NATS/Kafka/pgmq), bidirectional pipeline (inbox → business logic → outbox), and competing consumer patterns (FOR UPDATE SKIP LOCKED). |
0.5d | PLAN_TRANSACTIONAL_OUTBOX.md, PLAN_TRANSACTIONAL_INBOX.md |
| SHARED-3 | E2E integration test. Full pipeline: inbox receives event → processor creates business entity → outbox captures delta → verify end-to-end exactly-once delivery. | 0.5d | — |
Part A subtotal: ~15 days
Part B — Production Patterns
Consumer Groups for Outbox
In plain terms: Multiple relay processes can share a single outbox table safely using consumer groups — the same concept as Kafka consumer groups or SQS consumer groups, but implemented entirely in PostgreSQL. Each group has its own offset pointer. Relays call
poll_outbox()to claim a batch under a visibility timeout (like SQS), then callcommit_offset()when done. If a relay crashes, its lease expires and another relay picks up the batch.consumer_lag()shows how far behind each consumer is. Dead relays are reaped automatically after 24 h.
| Item | Description | Effort | Ref |
|---|---|---|---|
| OUTBOX-B1 | Consumer group catalog + lifecycle. pgt_consumer_groups + pgt_consumer_offsets + pgt_consumer_leases catalog tables. Also creates pgt_consumer_claim_check_acks (tracks per-group cursor-consumption completion for claim-check retention drain safety; not created in Part A since it has no purpose without consumer groups). create_consumer_group(name, outbox, auto_offset_reset) / drop_consumer_group(name) SQL functions; drop_consumer_group() decrements a per-outbox reference count and drops per-outbox monitoring STs when count reaches zero. auto_offset_reset values: latest (default) or earliest. ConsumerGroupAlreadyExists, ConsumerGroupNotFound error variants. |
1d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.2–B.3 |
| OUTBOX-B2 | poll_outbox() with visibility timeout and lease management. Returns next batch for (group, consumer_id) using FOR UPDATE SKIP LOCKED. Acquires lease in pgt_consumer_leases with configurable visibility_seconds (default 30). Auto-registers new consumer_id on first call based on auto_offset_reset. Skips rows already leased by other consumers. |
1.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.4 |
| OUTBOX-B2a | extend_lease() — lease renewal for long-running relays. extend_lease(group, consumer, extension_seconds INT DEFAULT 30) extends the visibility_until of all active leases held by the named consumer, returning the new visibility_until timestamp. Prevents spurious re-delivery when broker publish or business logic takes longer than the original visibility_seconds. Calling consumer_heartbeat() does not extend leases — heartbeat and lease lifetime are separate concerns. |
0.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.4 |
| OUTBOX-B3 | commit_offset() + seek_offset(). commit_offset(group, consumer, last_offset) monotonically advances offset, releases lease, rejects regression with warning. seek_offset(group, consumer, new_offset) resets to any position and clears leases; emits pg_trickle_alert event consumer_seeked. |
0.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.4, §B.6 |
| OUTBOX-B4 | Heartbeat + liveness. consumer_heartbeat(group, consumer) updates last_heartbeat_at (liveness only — does not extend active leases; use extend_lease() for that). Consumer is healthy when last_heartbeat_at > now() - 60 s. pg_trickle_alert event consumer_unhealthy when consumer transitions healthy → unhealthy. consumer_lag() live SQL function (always-fresh, suitable for ad-hoc inspection) exposes per-consumer healthy boolean, current lag, and offset. |
0.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.5 |
| OUTBOX-B5 | Monitoring stream tables. Three auto-created STs on first create_consumer_group(): pgt_consumer_status (per-consumer offset + heartbeat timestamp, FULL mode, 5 s — FULL because pgt_consumer_offsets is updated on every heartbeat and offset commit; at typical relay poll rates most rows change between refreshes, making FULL simpler than DIFFERENTIAL for this small table), pgt_consumer_group_lag (per-group aggregate lag, DIFFERENTIAL, 10 s), pgt_consumer_active_leases (current leases filtered by visibility_until > now(), FULL mode, 5 s — FULL because the filter changes every cycle as leases expire). Use consumer_lag() for ad-hoc inspection of live health data including heartbeat_age_sec; use pgt_consumer_group_lag ST for Grafana dashboards and alerting rules (materialized every 10 s). |
1d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.7 |
| OUTBOX-B6 | Dead consumer auto-cleanup. Scheduler step (GUC consumer_cleanup_enabled, default true): reap consumers with last_heartbeat_at < now() - consumer_dead_threshold_hours (GUC, default 24 h), release their leases. Remove from offsets if also last_commit_at < now() - consumer_stale_offset_threshold_days (GUC, default 7 d). Emit pg_trickle_alert event consumer_reaped. |
0.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.9 |
| OUTBOX-B7 | Retention safety guard. When consumer groups are enabled, retention drain refuses to delete outbox_<st> rows with id > MIN(last_offset across all consumers) to prevent silent data loss for slow relays. For claim-check rows, additionally waits until all consumer groups that have polled past that outbox_id have called outbox_rows_consumed() for it — preventing delta rows from being cascade-deleted via FK before the relay finishes cursor consumption. GUC outbox_force_retention (default false) allows operator override for permanently abandoned consumers. |
0.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.6 |
| OUTBOX-B8 | Tests. Integration: multi-relay group creation, visibility timeout expiry + re-poll, commit_offset idempotency, seek_offset replay, heartbeat → unhealthy transition, dead consumer reaping, retention guard prevents early drain. Benchmark: poll_outbox latency < 5 ms at 10K outbox rows. |
1.5d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §D |
| OUTBOX-B9 | Documentation & reference relay. SQL_REFERENCE.md: consumer group API + delivery guarantee section (at-least-once per consumer; exactly-once by composition). CONFIGURATION.md: consumer_cleanup_enabled, outbox_force_retention, consumer_dead_threshold_hours (default 24), consumer_stale_offset_threshold_days (default 7) GUCs. Reference Python relay with group coordination (examples/relay/outbox_relay.py). Rust equivalent (examples/relay/outbox_relay.rs). PATTERNS.md: multi-relay competing consumers section + claim-check large delta handling guide (server-side cursor consumption, outbox_rows_consumed(), bounded-memory relay loop) + latest-state consumer section (dedup view). |
1d | PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B, §C |
Consumer groups subtotal: ~8 days
Ordered Processing for Inbox
In plain terms: For financial, order management, and audit-trail use-cases, messages about the same entity (customer, order, account) must be processed in the order they were produced.
enable_inbox_ordering()creates anext_<inbox>stream table that surfaces only the next expected message per aggregate — preventing out-of-order processing automatically. Gap detection alerts when a message is missing too long. Priority queues let critical messages use a 1-second refresh schedule while background messages use 30 seconds. Worker partition affinity reduces contention when multiple processors share an inbox.
| Item | Description | Effort | Ref |
|---|---|---|---|
| INBOX-B1 | enable_inbox_ordering() + aggregate-ordered stream table. pgt_inbox_ordering_config catalog table. enable_inbox_ordering(inbox, aggregate_id_col, sequence_num_col) creates next_<inbox> ST: DISTINCT ON (aggregate_id) selecting only the row where sequence_num = last_processed_seq + 1. Ensures only the next expected message per aggregate is surfaced. disable_inbox_ordering(inbox) drops the ST + config row. Mutually exclusive with enable_inbox_priority() — returns InboxOrderingPriorityConflict if priority is already enabled on this inbox (and vice versa). |
1.5d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §B.2–B.3 |
| INBOX-B2 | Gap detection stream table + alert. gaps_<inbox> ST uses a LEAD() window function (O(N log N)) to detect missing sequence numbers by comparing adjacent sequences in the pending-only messages. Uses FULL refresh mode (contains now() in gap_age_sec). Emits pg_trickle_alert event inbox_ordering_gap when new gaps appear. inbox_ordering_gaps(inbox_name) SQL function for ad-hoc inspection. 30 s refresh schedule. Scales to 1M+ pending messages without the O(sequence_range) blowup of generate_series. |
1d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §B.4 |
| INBOX-B3 | enable_inbox_priority() + tier-based stream tables. pgt_inbox_priority_config catalog table. enable_inbox_priority(inbox, priority_col, tiers JSONB) creates one pending_<inbox>_<tier> ST per priority tier with per-tier schedule and WHERE priority BETWEEN min AND max. Default 3 tiers: critical (1–2, 1 s), normal (3–6, 5 s), background (7–9, 30 s). Original pending_<inbox> preserved as unified view. disable_inbox_priority(inbox, if_exists) drops all tier STs + config row; original unified pending_<inbox> is restored. |
1d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §B.5 |
| INBOX-B4 | inbox_is_my_partition() helper. Boolean-returning SQL function with signature inbox_is_my_partition(aggregate_id TEXT, worker_id INT, total_workers INT) RETURNS BOOLEAN. Evaluates abs(hashtext(aggregate_id)) % total_workers = worker_id inline in the WHERE clause. Advisory only — workers can still process any message; the condition makes each worker prefer its subset for cache locality. Composable with prepared statements and ORMs without SQL string interpolation. Documented in PATTERNS.md with Python + SQL usage example. |
0.5d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §B.6 |
| INBOX-B5 | Tests. Integration: ordered ST surfaces only next-sequence messages; out-of-order arrivals withheld until preceding sequence processed; gap detection fires alert after configurable delay; priority tier routing; partition affinity correctness (no messages lost). Benchmark gate: gaps_<inbox> ST refresh at 1M messages across 10K aggregates must complete in < 1 s at 30 s schedule (uses LEAD() window function; O(N log N) not O(sequence_range)). Chaos: processor crash mid-processing + replay recovery; concurrent processors with FOR UPDATE SKIP LOCKED (no duplicate processing at 10 concurrent workers). |
1.5d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §D |
| INBOX-B6 | Documentation & examples. SQL_REFERENCE.md: ordering + priority API. CONFIGURATION.md: ordering GUCs. PATTERNS.md: per-aggregate ordering, gap recovery, priority queue, and competing workers with partition affinity sections. Reference examples/inbox/inbox_processor_ordered.py. |
0.5d | PLAN_TRANSACTIONAL_INBOX_HELPER.md §B, §C |
Ordered processing subtotal: ~6 days
Shared Infrastructure (Part B)
| Item | Description | Effort | Ref |
|---|---|---|---|
| SHARED-B1 | Upgrade SQL additions. Extend sql/pg_trickle--0.23.0--0.24.0.sql: create pgt_consumer_groups, pgt_consumer_offsets, pgt_consumer_leases, pgt_inbox_ordering_config, pgt_inbox_priority_config tables; register all Part B SQL functions. (Note: pgt_consumer_claim_check_acks is created dynamically by create_consumer_group() at runtime, not in the upgrade script — it has no purpose without consumer groups.) |
0.5d | — |
| SHARED-B2 | Advanced PATTERNS.md sections. Add to “Event-Driven Integration Patterns” chapter: competing relays with consumer groups, ordered inbox processing end-to-end, priority queues (when to use), partition-affinity for high-throughput inboxes, claim-check large delta handling guide (when triggered, cursor consumption loop, outbox_rows_consumed(), interaction with full_refresh flag), latest-state consumer pattern (dedup view), and FULL-refresh fallback handling for relay authors. Note in PATTERNS.md: add Grafana dashboard panel recommendations for consumer lag (pgt_consumer_group_lag ST), DLQ growth rate (dlq_<inbox> ST), inbox pending backlog (pending_<inbox> ST), and inbox throughput (stats_<inbox> ST). |
0.5d | — |
| SHARED-B3 | Advanced E2E tests. (1) Multi-relay group test: 3 relays share one outbox group, verify each row published exactly once, simulate relay crash + visibility timeout redelivery. (2) Ordered inbox test: publish 10 messages out-of-order per aggregate, verify processor receives them in sequence order. (3) Concurrent stress: 10 relay workers + 100K outbox rows; verify < 0.1% duplicate rate at broker. | 1d | — |
| SHARED-B4 | dbt adapter updates. Add outbox_enabled, consumer_group and inbox_config properties to dbt model config; add pgtrickle_outbox_config and pgtrickle_create_inbox macros; update dbt-pgtrickle docs and integration tests. |
0.5d | dbt-pgtrickle/AGENTS.md |
Part B subtotal: ~17.5 days
Implementation Phases
| Phase | Description | Duration |
|---|---|---|
| A-SHARED | Upgrade SQL, shared Part A catalog infrastructure | Day 1 |
| A-OUTBOX | Outbox helper: catalog, table DDL, refresh-path hook, payload format, retention, lifecycle, GUCs | Days 1–5 |
| A-INBOX | Inbox helper: catalog, table DDL, stream tables, enable_inbox_tracking, DLQ alerts, health, replay, retention, lifecycle, GUCs |
Days 5–11 |
| A-TEST | Part A integration tests, E2E pipeline test, benchmarks | Days 11–13 |
| A-DOC | Part A documentation, PATTERNS.md guide, reference examples | Days 13–14 |
| B-OUTBOX | Consumer groups: catalog, poll_outbox, commit_offset, seek_offset, heartbeat, monitoring STs, dead consumer cleanup, retention guard |
Days 14–22 |
| B-INBOX | Ordered processing: enable_inbox_ordering, gap detection, priority queues, worker partition helper |
Days 22–28 |
| B-TEST | Part B integration tests, multi-relay E2E, ordered inbox E2E | Days 28–31 |
| B-DOC | Part B documentation, advanced PATTERNS.md sections, reference relay implementations | Days 31–33 |
v0.28.0 total: ~6–7 weeks solo / ~4–5 weeks with two developers working Part A and Part B tracks in parallel (Part A: essential patterns + Part B: production patterns)
Exit criteria:
- [x] OUTBOX-½: enable_outbox() creates outbox table + pgt_outbox_latest_<st> view with correct schema; catalog row present
- [x] OUTBOX-1: enable_outbox() on IMMEDIATE-mode stream table returns OutboxRequiresNotImmediateMode with clear message
- [x] OUTBOX-2: Naming collision resolution: truncation + hex suffix tested end-to-end; final name stored in catalog
- [x] OUTBOX-3/CC: Initial load (first refresh, all rows as "inserted") above outbox_inline_threshold_rows uses claim-check path; outbox_delta_rows_<st> populated atomically; no data loss
- [x] OUTBOX-3/CC: Bulk source update (many rows changed in one cycle) above threshold uses claim-check path; relay cursor returns all inserted + deleted rows correctly
- [x] OUTBOX-3: Refresh populates outbox payload within same transaction; rollback on outbox INSERT failure leaves no orphan delta rows
- [x] OUTBOX-4: Small deltas (≤ outbox_inline_threshold_rows) produce inline {"v":1, "inserted":[…], "deleted":[…]}; large deltas produce claim-check header {"v":1, "claim_check": true, …} with rows in outbox_delta_rows_<st>; relay cursor consumption + outbox_rows_consumed() documented + tested; no truncation path exists
- [x] OUTBOX-5: Retention drain removes rows older than outbox_retention_hours; respects batch size
- [x] OUTBOX-6: drop_stream_table() cascades to outbox + latest-row view; outbox_status() returns correct data
- [x] OUTBOX-7: Benchmark shows < 10 % overhead vs baseline at small payloads
- [x] INBOX-½: create_inbox() creates inbox table + 3 stream tables + metadata
- [x] INBOX-3: Pending ST reflects unprocessed messages; DLQ ST reflects poisoned messages
- [x] INBOX-4: enable_inbox_tracking() works with non-standard column names on existing tables
- [x] INBOX-5: pg_trickle_alert fires when new DLQ entries appear
- [x] INBOX-6: inbox_health() returns correct health status; inbox_status() lists all inboxes
- [x] INBOX-7: replay_inbox_messages() resets messages by explicit event_ids array (no where_clause); uses parameterised WHERE event_id = ANY($1) — no dynamic SQL; retention drain respects DLQ; processor crash + replay recovery path documented
- [x] INBOX-8: drop_inbox(cascade := true) drops managed table; preserves adopted tables
- [x] SHARED-3: End-to-end inbox → business logic → outbox pipeline test passes
- [x] SHARED-4: dbt adapter updated with outbox_enabled and inbox_config properties; integration tests pass
- [x] OUTBOX-B1: create_consumer_group() creates group + offset + lease tables; idempotent re-create
- [x] OUTBOX-¾: FULL-refresh fallback writes "full_refresh": true in header; claim-check applies when row count exceeds outbox_inline_threshold_rows; reference relay handles all four combinations (inline/claim-check × differential/full-refresh) correctly
- [x] OUTBOX-B2: poll_outbox() returns correct batch; no overlap between concurrent relays; visibility timeout expires and row re-delivered
- [x] OUTBOX-B2a: extend_lease() extends visibility_until for all active consumer leases; re-delivery does not occur when relay calls extend_lease before timeout
- [x] OUTBOX-B3: commit_offset() advances monotonically; seek_offset() enables replay from any position
- [x] OUTBOX-B4: Heartbeat tracks liveness; consumer_unhealthy alert fires on timeout
- [x] OUTBOX-B5: Three monitoring STs (status/FULL, group lag/DIFFERENTIAL, active leases/FULL) created idempotently (second create_consumer_group() does not fail); refreshed correctly; dropped when last group is dropped (reference count reaches zero)
- [x] OUTBOX-B6: Dead relay reaped after consumer_dead_threshold_hours (default 24 h, configurable); leases released; consumer_reaped alert emitted
- [x] OUTBOX-B7: Retention drain respects MIN(last_offset); outbox_force_retention override works
- [x] OUTBOX-B8: Multi-relay group E2E: each outbox row published exactly once across 3 concurrent relays
- [x] OUTBOX-B8b: Concurrent relay stress test: 10 relays, 100K outbox rows, < 0.1% duplicate rate before broker dedup; 0% after
- [x] INBOX-B1: next_<inbox> ST surfaces only next expected sequence per aggregate; withholds future sequences; partial index (aggregate_id, sequence_num) WHERE processed_at IS NOT NULL created by enable_inbox_ordering()
- [x] INBOX-B1: enable_inbox_ordering() + enable_inbox_priority() together returns InboxOrderingPriorityConflict with clear message
- [x] INBOX-B2: gaps_<inbox> ST detects missing sequences using LEAD() window function; inbox_ordering_gap alert fires; gap detection benchmark passes (< 1 s at 10K aggregates, 1M messages; O(N log N) not O(sequence_range))
- [x] INBOX-B3: Priority tier STs refresh at configured schedules; messages route to correct tier
- [x] INBOX-B3: disable_inbox_priority() drops all tier STs + config row; unified pending_<inbox> is restored
- [x] INBOX-B1: disable_inbox_ordering() drops next_<inbox> ST + config row; inbox resumes normal pending behaviour
- [x] INBOX-B4: inbox_is_my_partition(aggregate_id, worker_id, total_workers) returns BOOLEAN; no messages lost across N workers; usable in prepared statements without SQL interpolation
- [x] SHARED-B3: Ordered inbox E2E: 10 out-of-order arrivals per aggregate delivered to processor in order
- [x] SHARED-B4: dbt adapter updated with consumer group and inbox ordering properties
- [x] Extension upgrade path tested (0.27.0 → 0.28.0) — sql/pg_trickle--0.27.0--0.28.0.sql validated by scripts/check_upgrade_completeness.sh
- [x] just check-version-sync passes