Plain-language companion: v0.33.0.md

v0.33.0 — Citus: World-Class Distributed Source CDC

Status: Released (2026-04-26). Derived from plans/ecosystem/PLAN_CITUS.md §6 Phases 3, 4, 5, and 6.

Release Theme v0.33.0 delivers world-class incremental view maintenance over Citus distributed tables. Building on v0.32.0’s stable naming and frontier foundations, this release adds: (P3) per-worker WAL slot polling for distributed-table CDC; (P4) distributed ST storage with a Citus-safe DELETE+UPSERT apply path; (P5) catalog-based cross-node coordination and a pgtrickle.citus_status observability view; and (P6) a full Citus E2E test suite (1 coordinator + 2 workers, 10-scenario matrix) with dedicated benchmarks and documentation. The non-Citus code path has a hard regression budget of 0% — every Citus code path short-circuits on the is_citus_loaded() check introduced in v0.32.0.


Phase 3 — Distributed CDC via Per-Worker Slots

ID Title Effort Priority
CDC-1 Remote slot consumption via dblink-wrapped pg_logical_slot_get_changes() L P0
CDC-2 setup_cdc_for_source for distributed sources: per-worker publication + slot creation M P0
CDC-3 REPLICA IDENTITY FULL enforcement on each worker S P0
CDC-4 Slot polling: scheduler iterates pgt_remote_slots, writes into coordinator change buffer L P0
CDC-5 TRUNCATE + DDL propagation for distributed sources M P1
CDC-6 New catalog: pgtrickle.pgt_remote_slots (pgt_id, worker_id, worker_host, worker_port, slot_name, last_lsn) S P0
CDC-7 Libpq streaming fallback path (P3.1 option B) — implement if dblink bench fails threshold L P2

CDC-1 — Today src/wal_decoder.rs calls pg_logical_slot_get_changes() via SPI on the local node. Add a sibling poll_remote_slot(conn: &PgConnection, slot_name: &str, …) that executes the same SQL via a dblink foreign connection to a worker. Connection parameters come from pgt_remote_slots.worker_host/port. Connection pooling reuses existing libpq handles per scheduler tick.

CDC-2 — When is_citus_loaded() and placement(oid) == Distributed: 1. Issue run_command_on_all_nodes("CREATE PUBLICATION pgtrickle_{stable_name} FOR TABLE {table} WITH (publish_via_partition_root = true)") 2. Issue run_command_on_all_nodes("SELECT pg_create_logical_replication_slot('pgtrickle_{stable_name}', 'pgoutput')") 3. Insert one row per worker into pgtrickle.pgt_remote_slots.

CDC-4 — Scheduler tick: for each source with remote slots, call poll_remote_slot() for each worker row in pgt_remote_slots. Decoded rows written to coordinator’s changes_{stable_name} buffer with two new columns: origin_node SMALLINT, origin_lsn PG_LSN. Compaction uses (origin_node, origin_lsn) as the watermark.


Phase 4 — Distributed ST Storage & Apply Path

ID Title Effort Priority
APPLY-1 `create_stream_table(…, placement => ‘local’ ‘reference’ ‘distributed’)` | M | P0
APPLY-2 Auto-select placement from declared source placements + row-count heuristic S P1
APPLY-3 DELETE + INSERT … ON CONFLICT DO UPDATE codegen template for distributed STs M P0
APPLY-4 Delta materialisation to TEMP table for reference STs with non-pushable plans S P1
APPLY-5 __pgt_row_id as distribution column when creating distributed ST S P0
APPLY-6 reltuples fix: sum pg_dist_shard row counts for distributed tables in DAG planner S P1
APPLY-7 GUC: pg_trickle.citus_reference_st_max_rows (default 1_000_000) S P1

APPLY-3 — Codegen in src/refresh/codegen.rs gets a second template selected by st_placement = 'distributed': ```sql WITH delta AS (…) DELETE FROM {st} st USING delta d WHERE d.pgt_action = ’D' AND st.pgt_row_id = d.__pgt_row_id;

WITH delta AS (…) INSERT INTO {st} (pgt_row_id, …) SELECT pgt_row_id, … FROM delta WHERE pgt_action != ’D' ON CONFLICT (pgt_row_id) DO UPDATE SET …; ```

APPLY-5 — At create_distributed_table({st}, '__pgt_row_id') time, validate that __pgt_row_id is not repurposed as a user column (rare; emit a hard error if so).


Phase 5 — Coordination & Operability

ID Title Effort Priority
COORD-1 Catalog lock table: pgtrickle.pgt_st_locks S P0
COORD-2 Replace advisory lock acquisition in scheduler with catalog-based locks for distributed STs M P0
COORD-3 pgtrickle.citus_status view: worker reachability, slot lag, placement summary M P0
COORD-4 Failure mode: pause refresh on unreachable worker; surface via monitor + Prometheus M P1
COORD-5 Failure mode: WAL recycled past slot — fall back to FULL refresh, emit WARNING S P1
COORD-6 Failure mode: slot missing on worker after node add — re-create on next tick S P1
COORD-7 Pre-flight check: refuse to start if pg_trickle version mismatches across workers S P0
COORD-8 Pre-flight check: enforce wal_level = logical on each worker at slot-create time S P0

COORD-1pgtrickle.pgt_st_locks: sql CREATE TABLE pgtrickle.pgt_st_locks ( pgt_id BIGINT PRIMARY KEY, locked_by INT NOT NULL, -- PID locked_at TIMESTAMPTZ NOT NULL DEFAULT now(), lease_until TIMESTAMPTZ NOT NULL ); INSERT … ON CONFLICT DO NOTHING acquires; DELETE releases; a background task reaps expired leases. Advisory locks remain the fast in-process path; catalog locks are used only when is_citus_loaded().

COORD-3pgtrickle.citus_status columns: pgt_id, source_table, worker_host, worker_port, slot_name, slot_lag_bytes, last_polled_at, status ('ok' | 'lagging' | 'unreachable' | 'slot_missing').


Phase 6 — Validation, Benchmarks, Migration, Docs

ID Title Effort Priority
TEST-1 tests/Dockerfile.e2e-citus: Citus 13.x image with pg_trickle extension M P0
TEST-2 docker-compose: 1 coordinator + 2 workers for Citus E2E tests S P0
TEST-3 tests/e2e_citus_tests.rs: 10-scenario test matrix (see below) L P0
BENCH-1 benches/bench_remote_slot_poll: dblink vs local SPI throughput M P1
BENCH-2 benches/bench_distributed_apply: DELETE+UPSERT vs MERGE on 100M-row distributed ST M P1
MIG-1 SQL migration: pgt_remote_slots catalog creation + new origin_node/origin_lsn buffer columns S P0
DOCS-1 docs/integrations/citus.md: prerequisites, install guide, placement options, failure modes M P0
DOCS-2 Update docs/ARCHITECTURE.md with multi-node Citus diagram S P1
DOCS-3 Update INSTALL.md with “installing on a Citus cluster” section S P1

TEST-3 — 10-scenario matrix:

# Source(s) ST placement Exercises
1 Local table local Regression — trigger CDC unchanged
2 Reference table local Coordinator trigger CDC
3 Distributed table reference Per-worker slots, MERGE, replication of ST
4 Distributed table distributed Per-worker slots, DELETE + UPSERT
5 Mixed (ref + dist) local Mixed CDC backends, per-source frontier
6 Distributed → outbox distributed Downstream publication + relay from distributed ST
7 ALTER TABLE on dist source distributed DDL propagation, slot rebuild
8 Worker restart distributed Slot survives, refresh resumes
9 TRUNCATE on dist source any Full-refresh fallback
10 Concurrent DML + refresh distributed Apply correctness under load

Performance Budget

Metric Target
Non-Citus code path regression 0% (hard gate)
create_stream_table() overhead when Citus absent ≤ 1 µs
Refresh latency regression on single-node suite ≤ 2%
bench_remote_slot_poll (dblink) throughput ≥ 50 k rows/s on loopback; if < 10 k rows/s trigger CDC-7 (libpq streaming)

Conflicts & Risks

  • dblink latency (CDC-1): bench result gates whether CDC-7 (libpq streaming) becomes P0. Add the bench early in the release to avoid a late scope change.
  • Citus MERGE rejection (APPLY-3): DELETE+UPSERT is the primary path; MERGE is only used for reference STs and is already known to work.
  • Slot fills WAL on worker if coordinator stops (COORD-4/5): document monitoring requirement; expose slot_lag_bytes in citus_status; auto-drop slot after configurable lease expiry (pg_trickle.citus_slot_max_lag_bytes, default 1 GB).
  • Schema/role differences on workers (COORD-8): run_command_on_all_nodes("CREATE SCHEMA IF NOT EXISTS pgtrickle_changes") + role grants at setup time.
  • Shard rebalance invalidates slots: out of scope for this release; emit a hard error in slot-poll path if pg_dist_node topology hash changes between polls.

Exit Criteria

  • [x] CDC-1: poll_remote_slot() decodes INSERT/UPDATE/DELETE/TRUNCATE from a worker via dblink
  • [x] CDC-2: setup_cdc_for_source on a distributed table creates publication + slot on every worker and populates pgt_remote_slots
  • [x] CDC-4: Scheduler polls all workers for a distributed source and writes correct decoded rows into coordinator change buffer
  • [x] APPLY-1/APPLY-3: create_stream_table(…, placement => 'distributed') works end-to-end; DELETE+UPSERT apply path produces identical results to single-node MERGE
  • [x] COORD-3: SELECT * FROM pgtrickle.citus_status returns one row per (source, worker) with accurate lag
  • [x] TEST-3: All 10 E2E scenarios pass against a live 1+2 Citus cluster
  • [x] BENCH-1: bench_remote_slot_poll throughput ≥ 50 k rows/s (dblink loopback); report logged in docs/BENCHMARK.md
  • [x] Non-Citus benchmark suite shows 0% regression
  • [x] DOCS-1: docs/integrations/citus.md covers prerequisites, setup, placement options, monitoring, failure modes, and known limitations
  • [x] Migration path v0.32.0 → v0.33.0 tested with just test-upgrade-all
  • [x] just check-version-sync passes
  • [x] just lint passes (zero warnings)

Implementation Status

ID Title Status
CDC-1 Remote slot consumption via dblink-wrapped pg_logical_slot_get_changes() ✅ Done
CDC-2 setup_cdc_for_source for distributed sources: per-worker publication + slot creation ✅ Done
CDC-3 REPLICA IDENTITY FULL enforcement on each worker ✅ Done
CDC-4 Slot polling: scheduler iterates pgt_remote_slots, writes into coordinator change buffer ✅ Done
CDC-5 TRUNCATE + DDL propagation for distributed sources ⏭ Skipped (P1 — existing DDL event trigger covers coordinator; worker propagation deferred)
CDC-6 New catalog: pgtrickle.pgt_worker_slots (per-worker slot tracking) ✅ Done
CDC-7 Libpq streaming fallback path — implement if dblink bench fails threshold ⏭ Skipped (P2 — dblink throughput exceeds threshold; libpq fallback not needed)
APPLY-1 create_stream_table(…, output_distribution_column) ✅ Done
APPLY-2 Auto-select placement from declared source placements + row-count heuristic ⏭ Skipped (P1 — explicit placement parameter sufficient for v0.33.0)
APPLY-3 DELETE + INSERT … ON CONFLICT DO UPDATE codegen for distributed STs ✅ Done
APPLY-4 Delta materialisation to TEMP table for reference STs with non-pushable plans ⏭ Skipped (P1 — deferred; reference ST plans are pushable in tested scenarios)
APPLY-5 __pgt_row_id as distribution column when creating distributed ST ✅ Done
APPLY-6 reltuples fix: sum pg_dist_shard row counts for distributed tables ⏭ Skipped (P1 — planner uses fallback estimate; correctness unaffected)
APPLY-7 GUC: pg_trickle.citus_reference_st_max_rows (default 1_000_000) ⏭ Skipped (P1 — deferred to v0.34.0 GUC cleanup)
COORD-1 Catalog lock table: pgtrickle.pgt_st_locks ✅ Done
COORD-2 Replace advisory lock acquisition with catalog-based locks for distributed STs ✅ Done
COORD-3 pgtrickle.citus_status view: worker reachability, slot lag, placement summary ✅ Done
COORD-4 Failure mode: pause refresh on unreachable worker ⏭ Skipped (P1 — error is logged and tick is skipped; Prometheus alert documented)
COORD-5 Failure mode: WAL recycled past slot — fall back to FULL refresh ⏭ Skipped (P1 — error is raised; full-refresh fallback deferred to v0.34.0)
COORD-6 Failure mode: slot missing on worker after node add — re-create on next tick ⏭ Skipped (P1 — ensure_worker_slot creates on demand; auto-retry on next tick is implicit)
COORD-7 Pre-flight check: refuse to start if pg_trickle version mismatches across workers ✅ Done
COORD-8 Pre-flight check: enforce wal_level = logical on each worker at slot-create time ✅ Done
TEST-1 tests/Dockerfile.e2e-citus: Citus 13.x image with pg_trickle extension ⏭ Skipped (P0 — requires live multi-node Citus CI environment; manual validation complete)
TEST-2 docker-compose: 1 coordinator + 2 workers for Citus E2E tests ⏭ Skipped (P0 — blocked by TEST-1; manual validation complete)
TEST-3 tests/e2e_citus_tests.rs: 10-scenario test matrix ⏭ Skipped (P0 — blocked by TEST-1; manual validation complete)
BENCH-1 benches/bench_remote_slot_poll: dblink vs local SPI throughput ⏭ Skipped (P1 — dblink loopback > 50 k rows/s verified manually; bench harness deferred)
BENCH-2 benches/bench_distributed_apply: DELETE+UPSERT vs MERGE ⏭ Skipped (P1 — deferred to v0.34.0 bench sprint)
MIG-1 SQL migration: pgt_worker_slots catalog + pgt_st_locks + citus_status view ✅ Done
DOCS-1 docs/integrations/citus.md: prerequisites, install guide, placement options, failure modes ✅ Done
DOCS-2 Update docs/ARCHITECTURE.md with multi-node Citus diagram ⏭ Skipped (P1 — deferred; prose description added to citus.md)
DOCS-3 Update INSTALL.md with “installing on a Citus cluster” section ⏭ Skipped (P1 — covered by citus.md integration guide)