Plain-language companion: v0.28.0.md

v0.28.0 — Transactional Inbox & Outbox Patterns

Status: Released. Driven by PLAN_TRANSACTIONAL_OUTBOX_HELPER.md and PLAN_TRANSACTIONAL_INBOX_HELPER.md. Outbox helper moved here from v0.22.0 to ship alongside the inbox helper and production-grade advanced features as a complete transactional messaging solution.

Release Theme This release delivers a complete, production-grade solution for the two most common event-driven integration patterns in microservice architectures. Part A (Essential) ships the Transactional Outbox (reliable atomic event publication) and Transactional Inbox (reliable idempotent event consumption) as zero-boilerplate SQL helpers. Part B (Advanced) adds Consumer Groups for coordinated multi-relay outbox polling with Kafka-style offset tracking, visibility timeouts, and lag monitoring — and Ordered Processing for the inbox, including per-aggregate sequence ordering, gap detection, priority queues, and partition-affinity helpers for competing workers. Together, Parts A and B let pg_trickle users build reliable, exactly-once event pipelines that scale from a single relay to multi-instance deployments, using nothing but PostgreSQL.

See PLAN_TRANSACTIONAL_OUTBOX_HELPER.md and PLAN_TRANSACTIONAL_INBOX_HELPER.md for the full architecture and API design.


Known Limitations in v0.28.0

Limitation Rationale Future Path
Outbox requires DIFFERENTIAL mode. enable_outbox() on IMMEDIATE-mode stream tables returns OutboxRequiresNotImmediateMode. Outbox writes one row per refresh cycle inside the refresh transaction. IMMEDIATE refreshes fire inside every source transaction; adding an outbox INSERT there imposes that cost on every application write. Post-1.0 opt-in GUC if demand justifies.
Ordering and priority are mutually exclusive per inbox. Calling both enable_inbox_ordering() and enable_inbox_priority() on the same inbox returns InboxOrderingPriorityConflict. Per-aggregate sequence ordering must surface the next message in sequence regardless of priority level; priority tiers violate that guarantee. Use separate inboxes per priority class, each with enable_inbox_ordering() applied independently.
Gap detection degrades above ~100K aggregates. The gaps_<inbox> stream table uses LEAD() over pending messages, which is O(N log N) in pending message count — not O(sequence range). This is a significant improvement over the generate_series approach; however, refresh time still scales with pending message volume. Acceptable up to ~1M pending messages at 30 s schedule. Above 10M pending messages, auto-refresh may be slow; use inbox_ordering_gaps() for on-demand checks. Post-v0.28.0: delta-based detection scanning only aggregates with recent activity.
Consumer groups provide at-least-once delivery per consumer instance, not exactly-once globally. Exactly-once is achieved by composition: relay uses broker idempotency keys; inbox uses ON CONFLICT (event_id) DO NOTHING. Three-layer deduplication is more resilient than a monolithic exactly-once guarantee. Design decision. Documented in PATTERNS.md and SQL_REFERENCE.md.
AUTO mode may fall back to FULL refresh while outbox is enabled. When AUTO refresh falls back to FULL, the outbox header row carries "full_refresh": true. If the number of current rows exceeds outbox_inline_threshold_rows, the claim-check path applies: rows land in outbox_delta_rows_<st> and the relay fetches via cursor. A pg_trickle_alert outbox_full_refresh event is emitted regardless of which path is taken. Relays must detect the full_refresh flag, apply snapshot semantics (upsert rather than publish-as-new), and handle either inline or claim-check payloads. AUTO refresh adapts to IVM cost at runtime; blocking the FULL fallback permanently would compromise the adaptation that makes AUTO useful. The sentinel flag preserves correctness; the claim-check path prevents memory exhaustion on large tables. Reference relay updated in OUTBOX-8 to demonstrate all combinations. Post-v0.28.0: consider a GUC to disable FULL fallback per ST when outbox is enabled.
next_<inbox> ordered ST scans all processed rows. The last_processed CTE in the aggregate-ordered ST runs MAX(sequence_num) GROUP BY aggregate_id over every processed row on each refresh. For inboxes with large volumes of processed history this grows without bound. A partial index (aggregate_id, sequence_num) WHERE processed_at IS NOT NULL is created by enable_inbox_ordering() to mitigate this at v0.28.0, making it an index-only scan. Scaling thresholds: < 100K rows → < 5 ms at 1 s schedule; 100K–1M → increase schedule to 5s; > 1M → increase to 10s–30s; > 10M → use inbox_ordering_gaps() on-demand only. Post-v0.28.0: introduce pgt_inbox_sequence_state catalog table updated atomically via advance_inbox_sequence(), making the CTE O(changed aggregates).
Global consumer monitoring STs created once, not reference-counted. pgt_consumer_status, pgt_consumer_group_lag, pgt_consumer_active_leases are auto-created on the first create_consumer_group() call. They must be created idempotently and torn down only when the last consumer group for an outbox is dropped. A single set of monitoring STs per outbox is correct and cheaper than per-group STs. Implementation: create_stream_table() called with if_not_exists := true; drop_consumer_group() decrements a reference count and drops STs at zero.
Outbox relay latency bounded by poll interval. Relays discover new outbox rows by polling. The pg_trickle extension emits pg_notify('pgtrickle_outbox_new', outbox_table_name) after each outbox INSERT (v0.28.0), but the pgtrickle-relay binary does not yet use LISTEN — it starts polling on the standard interval. Minimum relay latency today equals the poll interval (visibility_seconds). The NOTIFY is cheap (≈2 µs, inside the existing refresh transaction) and is emitted from v0.28.0 onwards so relay authors can begin using it immediately. The pgtrickle-relay CLI will use LISTEN/NOTIFY in v0.29.0. v0.29.0 relay: subscribe to pgtrickle_outbox_new for sub-100 ms wake-up (see E2E latency benchmark in PLAN_RELAY_CLI.md §E.5).
replay_inbox_messages() accepts only explicit event ID lists. A free-form where_clause parameter was removed to eliminate SQL injection risk. EXPLAIN-based validation of dynamic SQL is insufficient; parameterised WHERE event_id = ANY($1) is the safe API. Operators who need filter-based replay should run a parameterised SELECT ARRAY_AGG(event_id) ... WHERE <condition> first, then pass the result to replay_inbox_messages().

Part A — Essential Patterns

Transactional Outbox Helper (P2 — §9.12)

In plain terms: After each DIFFERENTIAL refresh cycle, pg_trickle writes a row to pgtrickle.outbox_<st> within the same transaction as the MERGE — either both succeed or neither does. For small deltas the row carries a versioned inline JSON payload {"v":1, "inserted":[…], "deleted":[…]}. For large deltas (above outbox_inline_threshold_rows, default 10 000 rows) the row carries a lightweight claim-check header {"v":1, "claim_check": true, …} and the actual rows land in the companion table pgtrickle.outbox_delta_rows_<st>, which the relay reads via a server-side cursor in bounded batches — constant memory regardless of delta size. Eliminates the dual-write problem for downstream event buses without a CDC connector or external replication slot.

Item Description Effort Ref
OUTBOX-1 Catalog + SQL functions. pgt_outbox_config catalog table. enable_outbox(name, retention_hours) / disable_outbox(name, if_exists) SQL functions. OutboxAlreadyEnabled, OutboxNotEnabled, OutboxRequiresNotImmediateMode error variants. 0.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.1–A.2
OUTBOX-2 Outbox table creation. pgtrickle.outbox_<st> with id BIGSERIAL, pgt_id UUID, refresh_id UUID, created_at, inserted_count INT, deleted_count INT, is_claim_check BOOLEAN DEFAULT false, payload JSONB. Index on created_at. Naming: 7-byte outbox_ prefix + up to 56-byte stream table name; collision resolution appends 7-char hex suffix derived from left(md5(name), 7). Final name stored in pgt_outbox_config.outbox_table_name. Also creates: (a) latest-row view pgtrickle.pgt_outbox_latest_<st> (ORDER BY id DESC LIMIT 1) for quick lag inspection and operational checks; (b) delta rows table pgtrickle.outbox_delta_rows_<st> with outbox_id BIGINT REFERENCES outbox_<st>(id), row_num INT, op CHAR(1) CHECK (op IN ('I','D')), payload JSONB, PRIMARY KEY (outbox_id, row_num) — populated only for claim-check entries. (Note: pgt_consumer_claim_check_acks is created in Part B / OUTBOX-B1, not here — it has no purpose without consumer groups.) All objects dropped alongside the outbox table. 1d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.3
OUTBOX-3 Refresh-path integration. After successful MERGE, if outbox is enabled, INSERT outbox row within the same transaction — unless outbox_skip_empty_delta = true (default) and inserted_count = 0 AND deleted_count = 0, in which case no INSERT or NOTIFY is issued, saving write amplification on quiet refresh cycles. In-memory outbox_enabled_set cache with DDL-triggered invalidation. Hot-path cost < 50 ns when disabled. Routing: if delta_row_count <= outbox_inline_threshold_rows, serialise Vec<DeltaRow> to inline JSONB as before. If delta_row_count > outbox_inline_threshold_rows, write is_claim_check = true header row first (no payload), then INSERT delta rows into outbox_delta_rows_<st> in batches controlled by outbox_claim_check_batch_size GUC (default 1 000 rows/call) — keeping Rust heap bounded regardless of delta size. Both writes are in the same transaction. FULL-refresh fallback: when AUTO mode falls back to FULL refresh, the outbox header row additionally carries "full_refresh": true; if row count exceeds the threshold the claim-check path applies; a pg_trickle_alert outbox_full_refresh event is emitted so relays apply snapshot semantics. NOTIFY: emit pg_notify('pgtrickle_outbox_new', outbox_table_name) inside the same transaction after the outbox INSERT, enabling relay authors to use LISTEN for sub-second wake-up (cost: ~2 µs per refresh; skipped when empty-delta skip applies). 1.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.4
OUTBOX-4 Versioned payload format — two paths. Inline path (small delta): {"v":1, "inserted":[…], "deleted":[…]} with to_jsonb() type mapping. Claim-check path (large delta): {"v":1, "claim_check": true, "inserted_count": N, "deleted_count": N, "refresh_id": "…"} — no row data in the outbox row itself; relay reads outbox_delta_rows_<st> via server-side cursor and calls outbox_rows_consumed(stream_table, outbox_id) when done. FULL-fallback payloads additionally set "full_refresh": true in the header; claim-check applies when the full-refresh row count exceeds the threshold. GUC outbox_inline_threshold_rows (default 10 000 rows) controls the routing threshold. No truncation path — data is never silently dropped. 0.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.5
OUTBOX-5 Retention drain. Scheduler cleanup step: batched DELETE on outbox_<st> with outbox_drain_batch_size GUC (default 10 000). Cascades to outbox_delta_rows_<st> via FK ON DELETE CASCADE — no separate drain step needed for delta rows. Per-ST or global outbox_retention_hours (default 24). last_drained_at / last_drained_count tracked in catalog. 0.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.6
OUTBOX-6 Lifecycle & cascade. drop_stream_table() cascades to outbox table + delta rows table + metadata. alter_stream_table() errors if column set changed while outbox enabled. outbox_status() monitoring function (includes claim_check_pending_count and storage_status fields). outbox_rows_consumed(stream_table TEXT, outbox_id BIGINT) SQL function: called by relay after cursor consumption to record per-group completion in pgt_consumer_claim_check_acks; idempotent. Note: stream_table takes the stream table name (as registered in pgt_stream_tables), not the outbox table name — the function resolves the outbox table via pgt_outbox_config. 8 Part-A GUCs (outbox_enabled, outbox_retention_hours, outbox_drain_batch_size, outbox_inline_threshold_rows, outbox_claim_check_batch_size, outbox_drain_interval_seconds, outbox_storage_critical_mb, outbox_skip_empty_delta) + 4 Part-B GUCs (consumer_dead_threshold_hours, consumer_stale_offset_threshold_days, consumer_cleanup_enabled, outbox_force_retention). 0.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §A.7–A.8
OUTBOX-7 Tests & benchmark. Unit: enable/disable/validation/naming/cascade. Integration: end-to-end inline outbox write; claim-check triggered at threshold boundary (N = threshold, N = threshold+1); delta rows populated atomically in same transaction; relay cursor-fetch returns all rows in order; outbox_rows_consumed() idempotency; retention drain cascades delta rows via FK; rollback on outbox INSERT failure leaves no orphan delta rows; pg_notify('pgtrickle_outbox_new', ...) emitted. Benchmark gates: (a) refresh_no_outbox vs refresh_outbox_inline vs refresh_outbox_claim_check — < 10 % overhead at inline threshold, < 25 % at large payloads; (b) poll_outbox() < 5 ms at 10K outbox rows; © commit_offset() < 10 ms with 10 concurrent relays; (d) consumer_lag() < 50 ms at 100K outbox rows; (e) E2E latency benchmark benches/e2e_outbox_latency.rs: p50 < 1.5 s (polling), p95 < 2.5 s (see PLAN_RELAY_CLI.md §E.5). 1.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §D
OUTBOX-8 Documentation & examples. SQL_REFERENCE.md: outbox API + both payload formats (inline and claim-check) + outbox_rows_consumed() + pgtrickle_outbox_new NOTIFY channel. CONFIGURATION.md: 7 GUCs (replacing outbox_max_payload_bytes with outbox_inline_threshold_rows; adding outbox_claim_check_batch_size and outbox_storage_critical_mb with tuning table). PATTERNS.md: Transactional Outbox section including claim-check relay pattern; WAL overhead analysis; backpressure guidance for dead consumers (outbox_storage_critical_mb alert workflow). Reference Python relay (examples/relay/outbox_relay.py) demonstrates both inline and claim-check paths. 1d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §C, §E

Outbox essential subtotal: ~7 days

Transactional Inbox Helper

In plain terms: create_inbox('payment_inbox') creates a production-grade inbox table with auto-managed stream tables for the pending-message queue, dead-letter queue, and processing statistics. Applications write to the inbox (ON CONFLICT DO NOTHING for dedup), process messages from the pending stream table, and pg_trickle handles DLQ routing, alerts, retention, and monitoring automatically. enable_inbox_tracking() adopts an existing inbox table into pg_trickle’s monitoring without schema changes.

Item Description Effort Ref
INBOX-1 Catalog + create_inbox(). pgt_inbox_config catalog table with column mapping (id_column, processed_at_column, retry_count_column, error_column, received_at_column, event_type_column). create_inbox(name, schema, max_retries, schedule, with_dead_letter, with_stats, retention_hours) creates inbox table in the specified schema (default pgtrickle) + metadata. InboxAlreadyExists, InboxNotFound, InboxTableNotFound, InboxColumnMissing error variants. 1d PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.1–A.3
INBOX-2 Inbox table DDL. Standard schema: event_id TEXT PK, event_type, source, aggregate_id, payload JSONB, received_at, processed_at, error, retry_count, trace_id. Partial indexes for pending, DLQ, and processed rows. Autovacuum tuning. 0.5d PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.3
INBOX-3 Auto-created stream tables. Pending ST (WHERE processed_at IS NULL AND retry_count < max_retries, DIFFERENTIAL, user-defined schedule). DLQ ST (WHERE processed_at IS NULL AND retry_count >= max_retries, DIFFERENTIAL, 30 s). Stats ST (GROUP BY event_type with pending/processed/dead_letter/avg processing time — max_pending_age_sec removed from the materialised ST query to enable DIFFERENTIAL mode and eliminate the O(N) full scan every 10 s; use inbox_health() for oldest_pending_age_sec on demand). All STs use column-mapped SQL from pgt_inbox_config. 1d PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.4
INBOX-4 enable_inbox_tracking(). Adopt existing table: validate columns exist with compatible types, validate PK/UNIQUE on id column, create stream tables using mapped column names, insert metadata with is_managed = false. Gracefully omit optional columns (source, aggregate_id, trace_id) if not present. 0.5d PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.6
INBOX-5 DLQ alert mechanism. Post-refresh hook on DLQ stream table: when rows_inserted > 0, emit pg_trickle_alert event inbox_dlq_message per new entry (capped at inbox_dlq_alert_max_per_refresh, default 10; excess batched into summary alert). 0.5d PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.5
INBOX-6 inbox_health() + inbox_status(). inbox_health(name) returns JSONB with pending_count, dead_letter_count, avg_processing_time_sec, oldest_pending_age_sec, throughput_per_sec, health_status (healthy/degraded/critical). inbox_status(name) returns tabular overview of all inboxes. 0.5d PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.1
INBOX-7 Retention drain + replay_inbox_messages(). Processed message drain via scheduler (batched DELETE, inbox_processed_retention_hours default 72 h). DLQ messages kept forever by default (inbox_dlq_retention_hours default 0). replay_inbox_messages(name TEXT, event_ids TEXT[]) resets processed_at + retry_count for the specified message IDs using a parameterised WHERE event_id = ANY($1) — no free-form SQL accepted; eliminates injection surface entirely. 0.5d PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.7–A.8
INBOX-8 drop_inbox() + lifecycle. drop_inbox(name, if_exists, cascade): always drops stream tables + metadata; drops inbox table only if cascade := true AND is_managed = true. DROP EXTENSION cascades managed tables; adopted tables survive. 6 GUCs (inbox_enabled, inbox_processed_retention_hours, inbox_dlq_retention_hours, inbox_drain_batch_size, inbox_drain_interval_seconds, inbox_dlq_alert_max_per_refresh). 0.5d PLAN_TRANSACTIONAL_INBOX_HELPER.md §A.9–A.10
INBOX-9 Tests & benchmark. Unit: create/drop/enable_tracking/replay/health. Integration: end-to-end inbox lifecycle, DLQ routing, DLQ alert, retention drain, concurrent processors with FOR UPDATE SKIP LOCKED, enable_inbox_tracking() with non-standard columns. Benchmark gates: (a) pending ST refresh < 5 ms at 100 pending, < 50 ms at 10K pending; (b) next_<inbox> ordered ST refresh at each threshold (100K/1M/10M processed rows) matches documented scaling table; © stats ST FULL refresh < 5 ms at 100K rows, < 50 ms at 1M rows; (d) backpressure indicator: inbox_health() returns degraded within 2 refresh cycles when oldest_pending_age_sec exceeds threshold. 1d PLAN_TRANSACTIONAL_INBOX_HELPER.md §D
INBOX-10 Documentation & examples. SQL_REFERENCE.md: inbox API. CONFIGURATION.md: 6 GUCs. PATTERNS.md: Transactional Inbox section + “Bidirectional Event Pipeline” (inbox → business logic → outbox) worked example. Reference examples: inbox_writer_nats.py, inbox_processor.py, webhook_receiver.py. 0.5d PLAN_TRANSACTIONAL_INBOX_HELPER.md §C, §E

Inbox essential subtotal: ~6.5 days

Shared Infrastructure (Part A)

Item Description Effort Ref
SHARED-1 Upgrade SQL. sql/pg_trickle--0.23.0--0.24.0.sql: create pgt_outbox_config and pgt_inbox_config catalog tables, register all new SQL functions. 0.5d
SHARED-2 PATTERNS.md integration guide. New “Event-Driven Integration Patterns” chapter in docs/PATTERNS.md covering: when to use outbox vs inbox vs both, transport comparison (NATS/Kafka/pgmq), bidirectional pipeline (inbox → business logic → outbox), and competing consumer patterns (FOR UPDATE SKIP LOCKED). 0.5d PLAN_TRANSACTIONAL_OUTBOX.md, PLAN_TRANSACTIONAL_INBOX.md
SHARED-3 E2E integration test. Full pipeline: inbox receives event → processor creates business entity → outbox captures delta → verify end-to-end exactly-once delivery. 0.5d

Part A subtotal: ~15 days


Part B — Production Patterns

Consumer Groups for Outbox

In plain terms: Multiple relay processes can share a single outbox table safely using consumer groups — the same concept as Kafka consumer groups or SQS consumer groups, but implemented entirely in PostgreSQL. Each group has its own offset pointer. Relays call poll_outbox() to claim a batch under a visibility timeout (like SQS), then call commit_offset() when done. If a relay crashes, its lease expires and another relay picks up the batch. consumer_lag() shows how far behind each consumer is. Dead relays are reaped automatically after 24 h.

Item Description Effort Ref
OUTBOX-B1 Consumer group catalog + lifecycle. pgt_consumer_groups + pgt_consumer_offsets + pgt_consumer_leases catalog tables. Also creates pgt_consumer_claim_check_acks (tracks per-group cursor-consumption completion for claim-check retention drain safety; not created in Part A since it has no purpose without consumer groups). create_consumer_group(name, outbox, auto_offset_reset) / drop_consumer_group(name) SQL functions; drop_consumer_group() decrements a per-outbox reference count and drops per-outbox monitoring STs when count reaches zero. auto_offset_reset values: latest (default) or earliest. ConsumerGroupAlreadyExists, ConsumerGroupNotFound error variants. 1d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.2–B.3
OUTBOX-B2 poll_outbox() with visibility timeout and lease management. Returns next batch for (group, consumer_id) using FOR UPDATE SKIP LOCKED. Acquires lease in pgt_consumer_leases with configurable visibility_seconds (default 30). Auto-registers new consumer_id on first call based on auto_offset_reset. Skips rows already leased by other consumers. 1.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.4
OUTBOX-B2a extend_lease() — lease renewal for long-running relays. extend_lease(group, consumer, extension_seconds INT DEFAULT 30) extends the visibility_until of all active leases held by the named consumer, returning the new visibility_until timestamp. Prevents spurious re-delivery when broker publish or business logic takes longer than the original visibility_seconds. Calling consumer_heartbeat() does not extend leases — heartbeat and lease lifetime are separate concerns. 0.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.4
OUTBOX-B3 commit_offset() + seek_offset(). commit_offset(group, consumer, last_offset) monotonically advances offset, releases lease, rejects regression with warning. seek_offset(group, consumer, new_offset) resets to any position and clears leases; emits pg_trickle_alert event consumer_seeked. 0.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.4, §B.6
OUTBOX-B4 Heartbeat + liveness. consumer_heartbeat(group, consumer) updates last_heartbeat_at (liveness only — does not extend active leases; use extend_lease() for that). Consumer is healthy when last_heartbeat_at > now() - 60 s. pg_trickle_alert event consumer_unhealthy when consumer transitions healthy → unhealthy. consumer_lag() live SQL function (always-fresh, suitable for ad-hoc inspection) exposes per-consumer healthy boolean, current lag, and offset. 0.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.5
OUTBOX-B5 Monitoring stream tables. Three auto-created STs on first create_consumer_group(): pgt_consumer_status (per-consumer offset + heartbeat timestamp, FULL mode, 5 s — FULL because pgt_consumer_offsets is updated on every heartbeat and offset commit; at typical relay poll rates most rows change between refreshes, making FULL simpler than DIFFERENTIAL for this small table), pgt_consumer_group_lag (per-group aggregate lag, DIFFERENTIAL, 10 s), pgt_consumer_active_leases (current leases filtered by visibility_until > now(), FULL mode, 5 s — FULL because the filter changes every cycle as leases expire). Use consumer_lag() for ad-hoc inspection of live health data including heartbeat_age_sec; use pgt_consumer_group_lag ST for Grafana dashboards and alerting rules (materialized every 10 s). 1d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.7
OUTBOX-B6 Dead consumer auto-cleanup. Scheduler step (GUC consumer_cleanup_enabled, default true): reap consumers with last_heartbeat_at < now() - consumer_dead_threshold_hours (GUC, default 24 h), release their leases. Remove from offsets if also last_commit_at < now() - consumer_stale_offset_threshold_days (GUC, default 7 d). Emit pg_trickle_alert event consumer_reaped. 0.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.9
OUTBOX-B7 Retention safety guard. When consumer groups are enabled, retention drain refuses to delete outbox_<st> rows with id > MIN(last_offset across all consumers) to prevent silent data loss for slow relays. For claim-check rows, additionally waits until all consumer groups that have polled past that outbox_id have called outbox_rows_consumed() for it — preventing delta rows from being cascade-deleted via FK before the relay finishes cursor consumption. GUC outbox_force_retention (default false) allows operator override for permanently abandoned consumers. 0.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B.6
OUTBOX-B8 Tests. Integration: multi-relay group creation, visibility timeout expiry + re-poll, commit_offset idempotency, seek_offset replay, heartbeat → unhealthy transition, dead consumer reaping, retention guard prevents early drain. Benchmark: poll_outbox latency < 5 ms at 10K outbox rows. 1.5d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §D
OUTBOX-B9 Documentation & reference relay. SQL_REFERENCE.md: consumer group API + delivery guarantee section (at-least-once per consumer; exactly-once by composition). CONFIGURATION.md: consumer_cleanup_enabled, outbox_force_retention, consumer_dead_threshold_hours (default 24), consumer_stale_offset_threshold_days (default 7) GUCs. Reference Python relay with group coordination (examples/relay/outbox_relay.py). Rust equivalent (examples/relay/outbox_relay.rs). PATTERNS.md: multi-relay competing consumers section + claim-check large delta handling guide (server-side cursor consumption, outbox_rows_consumed(), bounded-memory relay loop) + latest-state consumer section (dedup view). 1d PLAN_TRANSACTIONAL_OUTBOX_HELPER.md §B, §C

Consumer groups subtotal: ~8 days

Ordered Processing for Inbox

In plain terms: For financial, order management, and audit-trail use-cases, messages about the same entity (customer, order, account) must be processed in the order they were produced. enable_inbox_ordering() creates a next_<inbox> stream table that surfaces only the next expected message per aggregate — preventing out-of-order processing automatically. Gap detection alerts when a message is missing too long. Priority queues let critical messages use a 1-second refresh schedule while background messages use 30 seconds. Worker partition affinity reduces contention when multiple processors share an inbox.

Item Description Effort Ref
INBOX-B1 enable_inbox_ordering() + aggregate-ordered stream table. pgt_inbox_ordering_config catalog table. enable_inbox_ordering(inbox, aggregate_id_col, sequence_num_col) creates next_<inbox> ST: DISTINCT ON (aggregate_id) selecting only the row where sequence_num = last_processed_seq + 1. Ensures only the next expected message per aggregate is surfaced. disable_inbox_ordering(inbox) drops the ST + config row. Mutually exclusive with enable_inbox_priority() — returns InboxOrderingPriorityConflict if priority is already enabled on this inbox (and vice versa). 1.5d PLAN_TRANSACTIONAL_INBOX_HELPER.md §B.2–B.3
INBOX-B2 Gap detection stream table + alert. gaps_<inbox> ST uses a LEAD() window function (O(N log N)) to detect missing sequence numbers by comparing adjacent sequences in the pending-only messages. Uses FULL refresh mode (contains now() in gap_age_sec). Emits pg_trickle_alert event inbox_ordering_gap when new gaps appear. inbox_ordering_gaps(inbox_name) SQL function for ad-hoc inspection. 30 s refresh schedule. Scales to 1M+ pending messages without the O(sequence_range) blowup of generate_series. 1d PLAN_TRANSACTIONAL_INBOX_HELPER.md §B.4
INBOX-B3 enable_inbox_priority() + tier-based stream tables. pgt_inbox_priority_config catalog table. enable_inbox_priority(inbox, priority_col, tiers JSONB) creates one pending_<inbox>_<tier> ST per priority tier with per-tier schedule and WHERE priority BETWEEN min AND max. Default 3 tiers: critical (1–2, 1 s), normal (3–6, 5 s), background (7–9, 30 s). Original pending_<inbox> preserved as unified view. disable_inbox_priority(inbox, if_exists) drops all tier STs + config row; original unified pending_<inbox> is restored. 1d PLAN_TRANSACTIONAL_INBOX_HELPER.md §B.5
INBOX-B4 inbox_is_my_partition() helper. Boolean-returning SQL function with signature inbox_is_my_partition(aggregate_id TEXT, worker_id INT, total_workers INT) RETURNS BOOLEAN. Evaluates abs(hashtext(aggregate_id)) % total_workers = worker_id inline in the WHERE clause. Advisory only — workers can still process any message; the condition makes each worker prefer its subset for cache locality. Composable with prepared statements and ORMs without SQL string interpolation. Documented in PATTERNS.md with Python + SQL usage example. 0.5d PLAN_TRANSACTIONAL_INBOX_HELPER.md §B.6
INBOX-B5 Tests. Integration: ordered ST surfaces only next-sequence messages; out-of-order arrivals withheld until preceding sequence processed; gap detection fires alert after configurable delay; priority tier routing; partition affinity correctness (no messages lost). Benchmark gate: gaps_<inbox> ST refresh at 1M messages across 10K aggregates must complete in < 1 s at 30 s schedule (uses LEAD() window function; O(N log N) not O(sequence_range)). Chaos: processor crash mid-processing + replay recovery; concurrent processors with FOR UPDATE SKIP LOCKED (no duplicate processing at 10 concurrent workers). 1.5d PLAN_TRANSACTIONAL_INBOX_HELPER.md §D
INBOX-B6 Documentation & examples. SQL_REFERENCE.md: ordering + priority API. CONFIGURATION.md: ordering GUCs. PATTERNS.md: per-aggregate ordering, gap recovery, priority queue, and competing workers with partition affinity sections. Reference examples/inbox/inbox_processor_ordered.py. 0.5d PLAN_TRANSACTIONAL_INBOX_HELPER.md §B, §C

Ordered processing subtotal: ~6 days

Shared Infrastructure (Part B)

Item Description Effort Ref
SHARED-B1 Upgrade SQL additions. Extend sql/pg_trickle--0.23.0--0.24.0.sql: create pgt_consumer_groups, pgt_consumer_offsets, pgt_consumer_leases, pgt_inbox_ordering_config, pgt_inbox_priority_config tables; register all Part B SQL functions. (Note: pgt_consumer_claim_check_acks is created dynamically by create_consumer_group() at runtime, not in the upgrade script — it has no purpose without consumer groups.) 0.5d
SHARED-B2 Advanced PATTERNS.md sections. Add to “Event-Driven Integration Patterns” chapter: competing relays with consumer groups, ordered inbox processing end-to-end, priority queues (when to use), partition-affinity for high-throughput inboxes, claim-check large delta handling guide (when triggered, cursor consumption loop, outbox_rows_consumed(), interaction with full_refresh flag), latest-state consumer pattern (dedup view), and FULL-refresh fallback handling for relay authors. Note in PATTERNS.md: add Grafana dashboard panel recommendations for consumer lag (pgt_consumer_group_lag ST), DLQ growth rate (dlq_<inbox> ST), inbox pending backlog (pending_<inbox> ST), and inbox throughput (stats_<inbox> ST). 0.5d
SHARED-B3 Advanced E2E tests. (1) Multi-relay group test: 3 relays share one outbox group, verify each row published exactly once, simulate relay crash + visibility timeout redelivery. (2) Ordered inbox test: publish 10 messages out-of-order per aggregate, verify processor receives them in sequence order. (3) Concurrent stress: 10 relay workers + 100K outbox rows; verify < 0.1% duplicate rate at broker. 1d
SHARED-B4 dbt adapter updates. Add outbox_enabled, consumer_group and inbox_config properties to dbt model config; add pgtrickle_outbox_config and pgtrickle_create_inbox macros; update dbt-pgtrickle docs and integration tests. 0.5d dbt-pgtrickle/AGENTS.md

Part B subtotal: ~17.5 days


Implementation Phases

Phase Description Duration
A-SHARED Upgrade SQL, shared Part A catalog infrastructure Day 1
A-OUTBOX Outbox helper: catalog, table DDL, refresh-path hook, payload format, retention, lifecycle, GUCs Days 1–5
A-INBOX Inbox helper: catalog, table DDL, stream tables, enable_inbox_tracking, DLQ alerts, health, replay, retention, lifecycle, GUCs Days 5–11
A-TEST Part A integration tests, E2E pipeline test, benchmarks Days 11–13
A-DOC Part A documentation, PATTERNS.md guide, reference examples Days 13–14
B-OUTBOX Consumer groups: catalog, poll_outbox, commit_offset, seek_offset, heartbeat, monitoring STs, dead consumer cleanup, retention guard Days 14–22
B-INBOX Ordered processing: enable_inbox_ordering, gap detection, priority queues, worker partition helper Days 22–28
B-TEST Part B integration tests, multi-relay E2E, ordered inbox E2E Days 28–31
B-DOC Part B documentation, advanced PATTERNS.md sections, reference relay implementations Days 31–33

v0.28.0 total: ~6–7 weeks solo / ~4–5 weeks with two developers working Part A and Part B tracks in parallel (Part A: essential patterns + Part B: production patterns)

Exit criteria: - [x] OUTBOX-½: enable_outbox() creates outbox table + pgt_outbox_latest_<st> view with correct schema; catalog row present - [x] OUTBOX-1: enable_outbox() on IMMEDIATE-mode stream table returns OutboxRequiresNotImmediateMode with clear message - [x] OUTBOX-2: Naming collision resolution: truncation + hex suffix tested end-to-end; final name stored in catalog - [x] OUTBOX-3/CC: Initial load (first refresh, all rows as "inserted") above outbox_inline_threshold_rows uses claim-check path; outbox_delta_rows_<st> populated atomically; no data loss - [x] OUTBOX-3/CC: Bulk source update (many rows changed in one cycle) above threshold uses claim-check path; relay cursor returns all inserted + deleted rows correctly - [x] OUTBOX-3: Refresh populates outbox payload within same transaction; rollback on outbox INSERT failure leaves no orphan delta rows - [x] OUTBOX-4: Small deltas (≤ outbox_inline_threshold_rows) produce inline {"v":1, "inserted":[…], "deleted":[…]}; large deltas produce claim-check header {"v":1, "claim_check": true, …} with rows in outbox_delta_rows_<st>; relay cursor consumption + outbox_rows_consumed() documented + tested; no truncation path exists - [x] OUTBOX-5: Retention drain removes rows older than outbox_retention_hours; respects batch size - [x] OUTBOX-6: drop_stream_table() cascades to outbox + latest-row view; outbox_status() returns correct data - [x] OUTBOX-7: Benchmark shows < 10 % overhead vs baseline at small payloads - [x] INBOX-½: create_inbox() creates inbox table + 3 stream tables + metadata - [x] INBOX-3: Pending ST reflects unprocessed messages; DLQ ST reflects poisoned messages - [x] INBOX-4: enable_inbox_tracking() works with non-standard column names on existing tables - [x] INBOX-5: pg_trickle_alert fires when new DLQ entries appear - [x] INBOX-6: inbox_health() returns correct health status; inbox_status() lists all inboxes - [x] INBOX-7: replay_inbox_messages() resets messages by explicit event_ids array (no where_clause); uses parameterised WHERE event_id = ANY($1) — no dynamic SQL; retention drain respects DLQ; processor crash + replay recovery path documented - [x] INBOX-8: drop_inbox(cascade := true) drops managed table; preserves adopted tables - [x] SHARED-3: End-to-end inbox → business logic → outbox pipeline test passes - [x] SHARED-4: dbt adapter updated with outbox_enabled and inbox_config properties; integration tests pass - [x] OUTBOX-B1: create_consumer_group() creates group + offset + lease tables; idempotent re-create - [x] OUTBOX-¾: FULL-refresh fallback writes "full_refresh": true in header; claim-check applies when row count exceeds outbox_inline_threshold_rows; reference relay handles all four combinations (inline/claim-check × differential/full-refresh) correctly - [x] OUTBOX-B2: poll_outbox() returns correct batch; no overlap between concurrent relays; visibility timeout expires and row re-delivered - [x] OUTBOX-B2a: extend_lease() extends visibility_until for all active consumer leases; re-delivery does not occur when relay calls extend_lease before timeout - [x] OUTBOX-B3: commit_offset() advances monotonically; seek_offset() enables replay from any position - [x] OUTBOX-B4: Heartbeat tracks liveness; consumer_unhealthy alert fires on timeout - [x] OUTBOX-B5: Three monitoring STs (status/FULL, group lag/DIFFERENTIAL, active leases/FULL) created idempotently (second create_consumer_group() does not fail); refreshed correctly; dropped when last group is dropped (reference count reaches zero) - [x] OUTBOX-B6: Dead relay reaped after consumer_dead_threshold_hours (default 24 h, configurable); leases released; consumer_reaped alert emitted - [x] OUTBOX-B7: Retention drain respects MIN(last_offset); outbox_force_retention override works - [x] OUTBOX-B8: Multi-relay group E2E: each outbox row published exactly once across 3 concurrent relays - [x] OUTBOX-B8b: Concurrent relay stress test: 10 relays, 100K outbox rows, < 0.1% duplicate rate before broker dedup; 0% after - [x] INBOX-B1: next_<inbox> ST surfaces only next expected sequence per aggregate; withholds future sequences; partial index (aggregate_id, sequence_num) WHERE processed_at IS NOT NULL created by enable_inbox_ordering() - [x] INBOX-B1: enable_inbox_ordering() + enable_inbox_priority() together returns InboxOrderingPriorityConflict with clear message - [x] INBOX-B2: gaps_<inbox> ST detects missing sequences using LEAD() window function; inbox_ordering_gap alert fires; gap detection benchmark passes (< 1 s at 10K aggregates, 1M messages; O(N log N) not O(sequence_range)) - [x] INBOX-B3: Priority tier STs refresh at configured schedules; messages route to correct tier - [x] INBOX-B3: disable_inbox_priority() drops all tier STs + config row; unified pending_<inbox> is restored - [x] INBOX-B1: disable_inbox_ordering() drops next_<inbox> ST + config row; inbox resumes normal pending behaviour - [x] INBOX-B4: inbox_is_my_partition(aggregate_id, worker_id, total_workers) returns BOOLEAN; no messages lost across N workers; usable in prepared statements without SQL interpolation - [x] SHARED-B3: Ordered inbox E2E: 10 out-of-order arrivals per aggregate delivered to processor in order - [x] SHARED-B4: dbt adapter updated with consumer group and inbox ordering properties - [x] Extension upgrade path tested (0.27.0 → 0.28.0) — sql/pg_trickle--0.27.0--0.28.0.sql validated by scripts/check_upgrade_completeness.sh - [x] just check-version-sync passes