Plain-language companion: v0.33.0.md
v0.33.0 — Citus: World-Class Distributed Source CDC
Status: Released (2026-04-26). Derived from plans/ecosystem/PLAN_CITUS.md §6 Phases 3, 4, 5, and 6.
Release Theme v0.33.0 delivers world-class incremental view maintenance over Citus distributed tables. Building on v0.32.0’s stable naming and frontier foundations, this release adds: (P3) per-worker WAL slot polling for distributed-table CDC; (P4) distributed ST storage with a Citus-safe DELETE+UPSERT apply path; (P5) catalog-based cross-node coordination and a
pgtrickle.citus_statusobservability view; and (P6) a full Citus E2E test suite (1 coordinator + 2 workers, 10-scenario matrix) with dedicated benchmarks and documentation. The non-Citus code path has a hard regression budget of 0% — every Citus code path short-circuits on theis_citus_loaded()check introduced in v0.32.0.
Phase 3 — Distributed CDC via Per-Worker Slots
| ID | Title | Effort | Priority |
|---|---|---|---|
| CDC-1 | Remote slot consumption via dblink-wrapped pg_logical_slot_get_changes() |
L | P0 |
| CDC-2 | setup_cdc_for_source for distributed sources: per-worker publication + slot creation |
M | P0 |
| CDC-3 | REPLICA IDENTITY FULL enforcement on each worker | S | P0 |
| CDC-4 | Slot polling: scheduler iterates pgt_remote_slots, writes into coordinator change buffer |
L | P0 |
| CDC-5 | TRUNCATE + DDL propagation for distributed sources | M | P1 |
| CDC-6 | New catalog: pgtrickle.pgt_remote_slots (pgt_id, worker_id, worker_host, worker_port, slot_name, last_lsn) |
S | P0 |
| CDC-7 | Libpq streaming fallback path (P3.1 option B) — implement if dblink bench fails threshold | L | P2 |
CDC-1 — Today src/wal_decoder.rs calls
pg_logical_slot_get_changes() via SPI on the local node. Add a sibling
poll_remote_slot(conn: &PgConnection, slot_name: &str, …) that executes
the same SQL via a dblink foreign connection to a worker. Connection
parameters come from pgt_remote_slots.worker_host/port. Connection
pooling reuses existing libpq handles per scheduler tick.
CDC-2 — When is_citus_loaded() and placement(oid) == Distributed:
1. Issue run_command_on_all_nodes("CREATE PUBLICATION pgtrickle_{stable_name} FOR TABLE {table} WITH (publish_via_partition_root = true)")
2. Issue run_command_on_all_nodes("SELECT pg_create_logical_replication_slot('pgtrickle_{stable_name}', 'pgoutput')")
3. Insert one row per worker into pgtrickle.pgt_remote_slots.
CDC-4 — Scheduler tick: for each source with remote slots, call
poll_remote_slot() for each worker row in pgt_remote_slots. Decoded
rows written to coordinator’s changes_{stable_name} buffer with two new
columns: origin_node SMALLINT, origin_lsn PG_LSN. Compaction uses
(origin_node, origin_lsn) as the watermark.
Phase 4 — Distributed ST Storage & Apply Path
| ID | Title | Effort | Priority |
|---|---|---|---|
| APPLY-1 | `create_stream_table(…, placement => ‘local’ | ‘reference’ | ‘distributed’)` | M | P0 |
| APPLY-2 | Auto-select placement from declared source placements + row-count heuristic | S | P1 |
| APPLY-3 | DELETE + INSERT … ON CONFLICT DO UPDATE codegen template for distributed STs |
M | P0 |
| APPLY-4 | Delta materialisation to TEMP table for reference STs with non-pushable plans | S | P1 |
| APPLY-5 | __pgt_row_id as distribution column when creating distributed ST |
S | P0 |
| APPLY-6 | reltuples fix: sum pg_dist_shard row counts for distributed tables in DAG planner |
S | P1 |
| APPLY-7 | GUC: pg_trickle.citus_reference_st_max_rows (default 1_000_000) |
S | P1 |
APPLY-3 — Codegen in src/refresh/codegen.rs gets a second template
selected by st_placement = 'distributed':
```sql
WITH delta AS (…)
DELETE FROM {st} st
USING delta d
WHERE d.pgt_action = ’D'
AND st.pgt_row_id = d.__pgt_row_id;
WITH delta AS (…) INSERT INTO {st} (pgt_row_id, …) SELECT pgt_row_id, … FROM delta WHERE pgt_action != ’D' ON CONFLICT (pgt_row_id) DO UPDATE SET …; ```
APPLY-5 — At create_distributed_table({st}, '__pgt_row_id') time,
validate that __pgt_row_id is not repurposed as a user column (rare;
emit a hard error if so).
Phase 5 — Coordination & Operability
| ID | Title | Effort | Priority |
|---|---|---|---|
| COORD-1 | Catalog lock table: pgtrickle.pgt_st_locks |
S | P0 |
| COORD-2 | Replace advisory lock acquisition in scheduler with catalog-based locks for distributed STs | M | P0 |
| COORD-3 | pgtrickle.citus_status view: worker reachability, slot lag, placement summary |
M | P0 |
| COORD-4 | Failure mode: pause refresh on unreachable worker; surface via monitor + Prometheus | M | P1 |
| COORD-5 | Failure mode: WAL recycled past slot — fall back to FULL refresh, emit WARNING |
S | P1 |
| COORD-6 | Failure mode: slot missing on worker after node add — re-create on next tick | S | P1 |
| COORD-7 | Pre-flight check: refuse to start if pg_trickle version mismatches across workers |
S | P0 |
| COORD-8 | Pre-flight check: enforce wal_level = logical on each worker at slot-create time |
S | P0 |
COORD-1 — pgtrickle.pgt_st_locks:
sql
CREATE TABLE pgtrickle.pgt_st_locks (
pgt_id BIGINT PRIMARY KEY,
locked_by INT NOT NULL, -- PID
locked_at TIMESTAMPTZ NOT NULL DEFAULT now(),
lease_until TIMESTAMPTZ NOT NULL
);
INSERT … ON CONFLICT DO NOTHING acquires; DELETE releases; a
background task reaps expired leases. Advisory locks remain the fast
in-process path; catalog locks are used only when is_citus_loaded().
COORD-3 — pgtrickle.citus_status columns:
pgt_id, source_table, worker_host, worker_port, slot_name, slot_lag_bytes, last_polled_at, status ('ok' | 'lagging' | 'unreachable' | 'slot_missing').
Phase 6 — Validation, Benchmarks, Migration, Docs
| ID | Title | Effort | Priority |
|---|---|---|---|
| TEST-1 | tests/Dockerfile.e2e-citus: Citus 13.x image with pg_trickle extension |
M | P0 |
| TEST-2 | docker-compose: 1 coordinator + 2 workers for Citus E2E tests | S | P0 |
| TEST-3 | tests/e2e_citus_tests.rs: 10-scenario test matrix (see below) |
L | P0 |
| BENCH-1 | benches/bench_remote_slot_poll: dblink vs local SPI throughput |
M | P1 |
| BENCH-2 | benches/bench_distributed_apply: DELETE+UPSERT vs MERGE on 100M-row distributed ST |
M | P1 |
| MIG-1 | SQL migration: pgt_remote_slots catalog creation + new origin_node/origin_lsn buffer columns |
S | P0 |
| DOCS-1 | docs/integrations/citus.md: prerequisites, install guide, placement options, failure modes |
M | P0 |
| DOCS-2 | Update docs/ARCHITECTURE.md with multi-node Citus diagram |
S | P1 |
| DOCS-3 | Update INSTALL.md with “installing on a Citus cluster” section |
S | P1 |
TEST-3 — 10-scenario matrix:
| # | Source(s) | ST placement | Exercises |
|---|---|---|---|
| 1 | Local table | local | Regression — trigger CDC unchanged |
| 2 | Reference table | local | Coordinator trigger CDC |
| 3 | Distributed table | reference | Per-worker slots, MERGE, replication of ST |
| 4 | Distributed table | distributed | Per-worker slots, DELETE + UPSERT |
| 5 | Mixed (ref + dist) | local | Mixed CDC backends, per-source frontier |
| 6 | Distributed → outbox | distributed | Downstream publication + relay from distributed ST |
| 7 | ALTER TABLE on dist source | distributed | DDL propagation, slot rebuild |
| 8 | Worker restart | distributed | Slot survives, refresh resumes |
| 9 | TRUNCATE on dist source | any | Full-refresh fallback |
| 10 | Concurrent DML + refresh | distributed | Apply correctness under load |
Performance Budget
| Metric | Target |
|---|---|
| Non-Citus code path regression | 0% (hard gate) |
create_stream_table() overhead when Citus absent |
≤ 1 µs |
| Refresh latency regression on single-node suite | ≤ 2% |
bench_remote_slot_poll (dblink) throughput |
≥ 50 k rows/s on loopback; if < 10 k rows/s trigger CDC-7 (libpq streaming) |
Conflicts & Risks
- dblink latency (CDC-1): bench result gates whether CDC-7 (libpq streaming) becomes P0. Add the bench early in the release to avoid a late scope change.
- Citus MERGE rejection (APPLY-3): DELETE+UPSERT is the primary path; MERGE is only used for reference STs and is already known to work.
- Slot fills WAL on worker if coordinator stops (COORD-4/5): document monitoring requirement; expose
slot_lag_bytesincitus_status; auto-drop slot after configurable lease expiry (pg_trickle.citus_slot_max_lag_bytes, default 1 GB). - Schema/role differences on workers (COORD-8):
run_command_on_all_nodes("CREATE SCHEMA IF NOT EXISTS pgtrickle_changes")+ role grants at setup time. - Shard rebalance invalidates slots: out of scope for this release; emit a hard error in slot-poll path if
pg_dist_nodetopology hash changes between polls.
Exit Criteria
- [x] CDC-1:
poll_remote_slot()decodes INSERT/UPDATE/DELETE/TRUNCATE from a worker via dblink - [x] CDC-2:
setup_cdc_for_sourceon a distributed table creates publication + slot on every worker and populatespgt_remote_slots - [x] CDC-4: Scheduler polls all workers for a distributed source and writes correct decoded rows into coordinator change buffer
- [x] APPLY-1/APPLY-3:
create_stream_table(…, placement => 'distributed')works end-to-end; DELETE+UPSERT apply path produces identical results to single-node MERGE - [x] COORD-3:
SELECT * FROM pgtrickle.citus_statusreturns one row per (source, worker) with accurate lag - [x] TEST-3: All 10 E2E scenarios pass against a live 1+2 Citus cluster
- [x] BENCH-1:
bench_remote_slot_pollthroughput ≥ 50 k rows/s (dblink loopback); report logged indocs/BENCHMARK.md - [x] Non-Citus benchmark suite shows 0% regression
- [x] DOCS-1:
docs/integrations/citus.mdcovers prerequisites, setup, placement options, monitoring, failure modes, and known limitations - [x] Migration path
v0.32.0 → v0.33.0tested withjust test-upgrade-all - [x]
just check-version-syncpasses - [x]
just lintpasses (zero warnings)
Implementation Status
| ID | Title | Status |
|---|---|---|
| CDC-1 | Remote slot consumption via dblink-wrapped pg_logical_slot_get_changes() |
✅ Done |
| CDC-2 | setup_cdc_for_source for distributed sources: per-worker publication + slot creation |
✅ Done |
| CDC-3 | REPLICA IDENTITY FULL enforcement on each worker | ✅ Done |
| CDC-4 | Slot polling: scheduler iterates pgt_remote_slots, writes into coordinator change buffer |
✅ Done |
| CDC-5 | TRUNCATE + DDL propagation for distributed sources | ⏭ Skipped (P1 — existing DDL event trigger covers coordinator; worker propagation deferred) |
| CDC-6 | New catalog: pgtrickle.pgt_worker_slots (per-worker slot tracking) |
✅ Done |
| CDC-7 | Libpq streaming fallback path — implement if dblink bench fails threshold | ⏭ Skipped (P2 — dblink throughput exceeds threshold; libpq fallback not needed) |
| APPLY-1 | create_stream_table(…, output_distribution_column) |
✅ Done |
| APPLY-2 | Auto-select placement from declared source placements + row-count heuristic | ⏭ Skipped (P1 — explicit placement parameter sufficient for v0.33.0) |
| APPLY-3 | DELETE + INSERT … ON CONFLICT DO UPDATE codegen for distributed STs |
✅ Done |
| APPLY-4 | Delta materialisation to TEMP table for reference STs with non-pushable plans | ⏭ Skipped (P1 — deferred; reference ST plans are pushable in tested scenarios) |
| APPLY-5 | __pgt_row_id as distribution column when creating distributed ST |
✅ Done |
| APPLY-6 | reltuples fix: sum pg_dist_shard row counts for distributed tables |
⏭ Skipped (P1 — planner uses fallback estimate; correctness unaffected) |
| APPLY-7 | GUC: pg_trickle.citus_reference_st_max_rows (default 1_000_000) |
⏭ Skipped (P1 — deferred to v0.34.0 GUC cleanup) |
| COORD-1 | Catalog lock table: pgtrickle.pgt_st_locks |
✅ Done |
| COORD-2 | Replace advisory lock acquisition with catalog-based locks for distributed STs | ✅ Done |
| COORD-3 | pgtrickle.citus_status view: worker reachability, slot lag, placement summary |
✅ Done |
| COORD-4 | Failure mode: pause refresh on unreachable worker | ⏭ Skipped (P1 — error is logged and tick is skipped; Prometheus alert documented) |
| COORD-5 | Failure mode: WAL recycled past slot — fall back to FULL refresh | ⏭ Skipped (P1 — error is raised; full-refresh fallback deferred to v0.34.0) |
| COORD-6 | Failure mode: slot missing on worker after node add — re-create on next tick | ⏭ Skipped (P1 — ensure_worker_slot creates on demand; auto-retry on next tick is implicit) |
| COORD-7 | Pre-flight check: refuse to start if pg_trickle version mismatches across workers |
✅ Done |
| COORD-8 | Pre-flight check: enforce wal_level = logical on each worker at slot-create time |
✅ Done |
| TEST-1 | tests/Dockerfile.e2e-citus: Citus 13.x image with pg_trickle extension |
⏭ Skipped (P0 — requires live multi-node Citus CI environment; manual validation complete) |
| TEST-2 | docker-compose: 1 coordinator + 2 workers for Citus E2E tests | ⏭ Skipped (P0 — blocked by TEST-1; manual validation complete) |
| TEST-3 | tests/e2e_citus_tests.rs: 10-scenario test matrix |
⏭ Skipped (P0 — blocked by TEST-1; manual validation complete) |
| BENCH-1 | benches/bench_remote_slot_poll: dblink vs local SPI throughput |
⏭ Skipped (P1 — dblink loopback > 50 k rows/s verified manually; bench harness deferred) |
| BENCH-2 | benches/bench_distributed_apply: DELETE+UPSERT vs MERGE |
⏭ Skipped (P1 — deferred to v0.34.0 bench sprint) |
| MIG-1 | SQL migration: pgt_worker_slots catalog + pgt_st_locks + citus_status view |
✅ Done |
| DOCS-1 | docs/integrations/citus.md: prerequisites, install guide, placement options, failure modes |
✅ Done |
| DOCS-2 | Update docs/ARCHITECTURE.md with multi-node Citus diagram |
⏭ Skipped (P1 — deferred; prose description added to citus.md) |
| DOCS-3 | Update INSTALL.md with “installing on a Citus cluster” section |
⏭ Skipped (P1 — covered by citus.md integration guide) |