Contents
Plan: CDC Mode / Refresh Mode Interaction Gaps
Date: 2026-03-07 Status: DONE Last Updated: 2026-03-08
G1–G6 implemented — per-table
cdc_modeoverride, explicit IMMEDIATE+cdc_mode => 'wal'rejection, WAL slot advancement, adaptive-fallback cleanup,pgt_cdc_status, and the defensive differential-baseline guard all shipped in v0.2.3.
1. Problem Statement
pg_trickle has four refresh modes (AUTO, FULL, DIFFERENTIAL, IMMEDIATE)
and four CDC modes (auto, trigger, wal, plus the internal transitioning
state). Not all combinations are explicitly validated, and several edge cases
can produce surprising behavior: incorrect results, resource leaks, or opaque
errors.
This plan addresses six specific gaps, ordered by user impact.
Gap Summary
| # | Gap | Severity | Effort |
|---|---|---|---|
| G1 | No per-table cdc_mode override |
High | Medium |
| G2 | IMMEDIATE + wal GUC — no validation |
Medium | Small |
| G3 | FULL refresh does not advance WAL slot |
Medium | Small |
| G4 | AUTO→FULL adaptive fallback does not flush change buffers |
Medium | Medium |
| G5 | TRANSITIONING state lacks user-facing query ergonomics |
Low | Small |
| G6 | DIFFERENTIAL without initialization baseline |
Medium | Small |
2. Current State
Refresh Modes
| Mode | Mechanism | CDC Path |
|---|---|---|
AUTO |
Attempts DIFFERENTIAL; falls back to FULL when query is non-differentiable or change ratio > threshold |
Standard CDC (trigger/WAL) |
FULL |
Truncate + reload | Standard CDC |
DIFFERENTIAL |
Delta query over change buffers | Standard CDC |
IMMEDIATE |
Statement-level IVM triggers (in-transaction) | IVM triggers, bypasses CDC entirely |
CDC Modes
| Mode | Set By | Scope |
|---|---|---|
auto (default GUC) |
pg_trickle.cdc_mode |
Cluster-wide |
trigger |
GUC | Cluster-wide |
wal |
GUC | Cluster-wide |
transitioning |
Internal state machine | Per-source in pgt_dependencies.cdc_mode |
Key Code Locations
| Component | File | Entry Point |
|---|---|---|
| CDC mode GUC | src/config.rs:88–106 |
PGS_CDC_MODE |
| CDC setup | src/api.rs:2319 |
setup_cdc_for_source() |
| IVM vs CDC branch | src/api.rs:452–470 |
setup_trigger_infrastructure() |
| Full refresh | src/refresh.rs:1011 |
execute_full_refresh() |
| Differential refresh | src/refresh.rs:1260 |
execute_differential_refresh() |
| Adaptive fallback | src/refresh.rs:1476–1554 |
Change ratio check |
| Scheduler dispatch | src/scheduler.rs:1240–1395 |
RefreshAction match |
| Buffer cleanup | src/refresh.rs:128 |
drain_pending_cleanups() |
| dbt materialization | dbt-pgtrickle/macros/materializations/stream_table.sql |
N/A |
3. Gap Details & Implementation Plans
G1: Per-Table cdc_mode Override ✅
Problem. cdc_mode is a cluster-wide GUC (pg_trickle.cdc_mode). In
mixed environments — some tables have a PK (WAL-capable), others don’t — users
must choose the lowest common denominator (trigger) globally or manually
ALTER TABLE ... REPLICA IDENTITY FULL on every keyless table. There is no way
to say “use WAL for this table and triggers for that one.”
Current code path. setup_cdc_for_source() (src/api.rs:2341) reads
config::pg_trickle_cdc_mode() — the global GUC — for every source. The
catalog (pgt_dependencies.cdc_mode) already stores a per-source CDC mode, but
only as a result of the state machine (never as user input).
dbt impact. The stream_table materialization does not expose cdc_mode
as a model config key. Users of dbt-pgtrickle have no way to set this per
model.
Design
Add an optional cdc_mode parameter to create_stream_table() and
alter_stream_table(). When set, it overrides the global GUC for all source
tables of that stream table. When NULL (default), the global GUC applies
(preserving backward compatibility).
Implementation Steps
SQL API —
create_stream_table()(src/api.rs:33–41)- Add parameter:
cdc_mode: default!(Option<&str>, "NULL") - Pass through to
create_stream_table_impl().
- Add parameter:
SQL API —
alter_stream_table()(src/api.rs)- Add parameter:
cdc_mode: default!(Option<&str>, "NULL") - When non-NULL on ALTER, re-evaluate CDC setup for all sources:
trigger→wal: validate PK/REPLICA IDENTITY, create slot, begin transition.wal→trigger: drop slot, recreate trigger.- Same mode: no-op.
- Add parameter:
Catalog —
pgt_stream_tables(sql/upgrade migration)- Add column:
requested_cdc_mode TEXT DEFAULT NULL CHECK (requested_cdc_mode IN ('trigger', 'wal', 'auto', NULL)) - This stores the user’s intent;
pgt_dependencies.cdc_modecontinues to store the effective state per source.
- Add column:
CDC setup (
src/api.rs:2319–2400)- Modify
setup_cdc_for_source()to accept an optionalcdc_mode_overrideparameter. WhenSome(...), use it instead of reading the GUC. - Update the PK/REPLICA IDENTITY validation to use the effective mode.
- Modify
dbt adapter (
dbt-pgtrickle/macros/materializations/stream_table.sql)- Add config key
cdc_mode(defaultnone). - Thread it into
pgtrickle_create_stream_table()andpgtrickle_alter_stream_table()macro calls.
- Add config key
Upgrade migration (
sql/pg_trickle--<prev>--<next>.sql)ALTER TABLE pgtrickle.pgt_stream_tables ADD COLUMN requested_cdc_mode TEXT DEFAULT NULL ...- Update
create_stream_tableandalter_stream_tablefunction signatures.
Documentation
docs/SQL_REFERENCE.md: Document new parameter.docs/CONFIGURATION.md: Clarify GUC vs per-table precedence.
Tests
- Unit test:
cdc_mode_overrideparameter parsing. - Integration test: create ST with
cdc_mode => 'trigger'while global GUC isauto; verify trigger is used, no slot created. - Integration test: create ST with
cdc_mode => 'wal'on a keyless table without REPLICA IDENTITY FULL; verify error. - E2E test: alter existing ST’s
cdc_modefromtriggertowal; verify transition completes. - dbt integration test: model config
cdc_mode: 'trigger'.
- Unit test:
G2: Explicit Validation of IMMEDIATE + WAL CDC ✅
Problem. If a user sets pg_trickle.cdc_mode = 'wal' and creates a stream
table with refresh_mode = 'IMMEDIATE', the system silently bypasses WAL
entirely (the is_immediate() branch in setup_trigger_infrastructure()
skips CDC setup). This is correct behavior, but confusing: the user asked for
WAL and got IVM triggers with no feedback.
With G1 (per-table cdc_mode), the risk increases: a user could explicitly
write cdc_mode => 'wal', refresh_mode => 'IMMEDIATE', which is an
incoherent configuration.
Current code path. setup_trigger_infrastructure() (src/api.rs:452)
has an if refresh_mode.is_immediate() { ... } else { ... } branch. The
else branch calls setup_cdc_for_source(). The if branch calls
ivm::setup_ivm_triggers(). No validation rejects the combination.
Progress (2026-03-08). Phase 1 is implemented: when the cluster-wide
pg_trickle.cdc_mode GUC is 'wal' and a stream table is created or altered
to refresh_mode = 'IMMEDIATE', pg_trickle now emits an INFO explaining that
WAL CDC is ignored and that statement-level IVM triggers will be used instead.
The explicit rejection path is still pending because there is not yet a
per-table cdc_mode override surface; that arrives with G1.
Implementation Steps
Phase 1 — INFO for implicit global-GUC mismatch (
src/api.rs)- If
refresh_mode = 'IMMEDIATE'and the effectivecdc_modecomes from the global GUC with value'wal', emit:text INFO: cdc_mode 'wal' has no effect for IMMEDIATE refresh mode — using IVM triggers - Implement in both
create_stream_table_impl()andalter_stream_table_impl().
- If
Phase 2 — Explicit rejection once G1 exists (
src/api.rs)- After parsing
refresh_modeand determining an explicit per-tablecdc_modeoverride:rust if refresh_mode.is_immediate() && effective_cdc_mode == "wal" { return Err(PgTrickleError::InvalidArgument( "refresh_mode = 'IMMEDIATE' is incompatible with cdc_mode = 'wal'. \ IMMEDIATE uses in-transaction IVM triggers; WAL-based CDC is async. \ Use cdc_mode = 'trigger' or 'auto', or choose a different refresh_mode." .to_string(), )); } - Same check in
alter_stream_table_impl()when altering refresh mode or cdc mode.
- After parsing
Tests
- Phase 1 E2E test: GUC
wal+IMMEDIATEcreate path succeeds and does not install CDC triggers or WAL slots. - Phase 1 E2E test: GUC
wal+ALTER ... refresh_mode='IMMEDIATE'succeeds, leaves no WAL slots behind, and preserves synchronous IMMEDIATE propagation on subsequent DML. - Integration test: explicit
cdc_mode => 'wal'+IMMEDIATE→ error. - Integration test: GUC
wal+IMMEDIATE(no per-table override) → success with INFO log.
- Phase 1 E2E test: GUC
G3: FULL Refresh Does Not Advance WAL Slot
Problem. When execute_full_refresh() runs, it truncates the stream table
and reloads from the defining query. The result is correct and a new frontier
is stored (scheduler handles this at src/scheduler.rs:1261–1273). However,
the change buffer rows consumed during prior differential cycles — and any new
rows that accumulated during the full refresh — remain in the WAL slot’s
unacknowledged window.
For trigger-based CDC this is benign: buffer rows are pruned by frontier-based
cleanup. For WAL-based CDC, the logical replication slot’s confirmed_flush_lsn
is only advanced by the WAL decoder polling loop, not by the refresh
executor. If the scheduler happens to do repeated FULL refreshes (e.g., the
table is in AUTO mode with a high change ratio), the slot may retain WAL
segments that are never needed.
This causes:
- WAL segment bloat (pg_wal/ grows).
- pg_replication_slots.active_pid shows stale lag.
- Monitoring false alarms on replication lag.
Implementation Steps
Add helper:
advance_slot_to_current()(src/wal_decoder.rs)- New function:
rust pub fn advance_slot_to_current(slot_name: &str) -> Result<(), PgTrickleError> { Spi::run_with_args( "SELECT pg_replication_slot_advance($1, pg_current_wal_lsn())", &[slot_name.into()], ).map_err(|e| PgTrickleError::SpiError(e.to_string())) }
- New function:
Call after FULL refresh in scheduler (
src/scheduler.rs:1261–1275)- After
execute_full_refresh()succeeds and the new frontier is stored, advance all WAL-mode slots for this ST’s sources:rust // Advance WAL slots past the current LSN since full refresh // made all prior change-buffer data irrelevant. for dep in deps.iter().filter(|d| d.cdc_mode == CdcMode::Wal) { if let Some(ref slot) = dep.slot_name { if let Err(e) = wal_decoder::advance_slot_to_current(slot) { log!("pg_trickle: failed to advance slot {}: {}", slot, e); } } }
- After
Also flush change buffer tables after FULL (
src/refresh.rs:1011)- At the end of
execute_full_refresh(), truncate change buffers for the ST’s sources so the next differential cycle doesn’t reprocess stale rows:rust for oid in &source_oids { let _ = Spi::run(&format!( "TRUNCATE {change_schema}.changes_{}", oid.to_u32() )); } - This is safe because the FULL refresh already materialized the complete
state. Note: if a source is shared by multiple STs, the truncate must
be conditional (only if all co-tracking STs also received a full refresh
or their frontier is being reset). Use a safe cleanup approach: delete
only rows with
lsn <= new_frontier_lsnrather than TRUNCATE.
- At the end of
Tests
- Integration test: create WAL-mode ST, insert rows, trigger FULL refresh,
check
pg_replication_slots.confirmed_flush_lsnhas advanced. - Integration test: verify change buffer is empty after FULL refresh.
- Integration test: create WAL-mode ST, insert rows, trigger FULL refresh,
check
Progress (2026-03-08). Implemented in Unreleased. advance_slot_to_current(slot_name)
added to src/wal_decoder.rs; uses pg_replication_slot_advance($1, pg_current_wal_lsn())
and silently skips if the slot does not exist. The shared
post_full_refresh_cleanup() helper in src/refresh.rs iterates all
WAL/TRANSITIONING StDependency entries, calls advance_slot_to_current() for
each, then calls cleanup_change_buffers_by_frontier(). It is invoked from
scheduler.rs after store_frontier() in the Full, Reinitialize, and
empty-prev Differential arms.
G4: AUTO→FULL Adaptive Fallback — Change Buffer Cleanup
Problem. When execute_differential_refresh() detects that the change
ratio exceeds PGS_DIFFERENTIAL_MAX_CHANGE_RATIO (default 15%), it falls back
to execute_manual_full_refresh() (src/refresh.rs:1476–1554). The buffer
cleanup (drain_pending_cleanups() + cleanup_change_buffers_by_frontier())
runs before the fallback decision at line 1331, operating only on rows
≤ the safe frontier LSN.
When FULL runs, it recomputes the entire table from scratch, making all pending change buffer rows irrelevant. But these rows are not flushed — they persist in the buffer and will be picked up by the next scheduler tick, which may: 1. See changes → attempt DIFFERENTIAL again. 2. Exceed threshold again → fall back to FULL again. 3. Loop indefinitely on bulk-loaded data.
This can also cause a “change ratio ping-pong” where the scheduler alternates between DIFFERENTIAL (small delta) and FULL (accumulated stale delta pushes ratio over threshold).
Implementation Steps
Flush change buffers after adaptive fallback (
src/refresh.rs)- In the adaptive fallback path (after
execute_manual_full_refresh()returns), delete all change buffer rows up to the new frontier LSN:rust if should_fallback { let result = execute_manual_full_refresh(st, schema, table_name, source_oids); // After successful FULL, clear stale deltas to prevent // the next DIFFERENTIAL from re-triggering fallback. if result.is_ok() { cleanup_change_buffers_by_frontier(&change_schema, &catalog_source_oids); } return result; }
- In the adaptive fallback path (after
Deduplicate with G3 — The FULL-refresh cleanup logic from G3 applies here too. Factor out into a shared
post_full_refresh_cleanup()helper called from both the scheduled FULL path and the adaptive fallback path.Tests
- Integration test: bulk INSERT that triggers adaptive fallback → verify change buffer is empty after refresh.
- Integration test: after fallback FULL, insert one row → next cycle should succeed as DIFFERENTIAL without hitting the ratio threshold.
Progress (2026-03-08). Implemented in Unreleased. G3 and G4 share the
post_full_refresh_cleanup() helper in src/refresh.rs. In the adaptive
fallback path inside execute_differential_refresh(), the helper is called
after a successful adaptive FULL (TRUNCATE-needed path and ratio-threshold
path). This prevents stale change-buffer rows from pushing the ratio over the
threshold again on the next scheduler tick, eliminating the ping-pong cycle.
G5: TRANSITIONING State User Visibility
Problem. The TRANSITIONING CDC state is tracked in
pgt_dependencies.cdc_mode and is visible in
pgtrickle.pgt_stream_table_sources. However:
- There is no simple way to query “which stream tables are currently transitioning?” without joining across catalog tables.
- The
pgtrickle.pg_stat_stream_tablesmonitoring view does not surface per-source CDC mode. - No NOTIFY/event is emitted when a transition starts or completes.
This makes it difficult to debug slow transitions or stuck states.
Implementation Steps
Add CDC mode to
pg_stat_stream_tables(src/monitor.rs)- Add a
cdc_modescolumn (text array) showing the distinct CDC modes across all sources. Example:{wal},{trigger,wal},{transitioning,wal}. - Alternatively, add a scalar
cdc_statuscolumn with a summary:'wal','trigger','mixed','transitioning'.
- Add a
Add convenience view (
sql/upgrade migration)sql CREATE VIEW pgtrickle.pgt_cdc_status AS SELECT st.pgt_schema, st.pgt_name, d.source_relid, c.relname AS source_name, d.cdc_mode, d.slot_name FROM pgtrickle.pgt_dependencies d JOIN pgtrickle.pgt_stream_tables st ON st.pgt_id = d.pgt_id JOIN pg_class c ON c.oid = d.source_relid WHERE d.source_type = 'TABLE';NOTIFY on transition events (
src/wal_decoder.rsorsrc/scheduler.rs)- When CDC mode changes from
TRIGGER→TRANSITIONING:sql NOTIFY pgtrickle_cdc_transition, '{"source_oid": 12345, "from": "trigger", "to": "transitioning"}' - When
TRANSITIONING→WAL:sql NOTIFY pgtrickle_cdc_transition, '{"source_oid": 12345, "from": "transitioning", "to": "wal"}'
- When CDC mode changes from
Documentation — Update
docs/SQL_REFERENCE.mdwith the new view and NOTIFY channel.Tests
- Integration test: create ST with
automode on awal_level = logicalcluster → verifypgt_cdc_statusshowsTRANSITIONINGthenWAL. - Integration test: verify NOTIFY payload is emitted.
- Integration test: create ST with
Progress (2026-03-08). Implemented in Unreleased. pgtrickle.pgt_cdc_status
view added to src/lib.rs (the pg_trickle_monitoring_views extension_sql!
block) alongside a cdc_modes text-array column in pg_stat_stream_tables.
NOTIFY on transitions (TRIGGER → TRANSITIONING via finish_wal_transition()
and TRANSITIONING → WAL via complete_wal_transition()) was already
implemented by emit_cdc_transition_notify() in src/wal_decoder.rs in
prior work.
G6: DIFFERENTIAL Without Initialization Baseline
Progress (2026-03-08). Completed in Unreleased. The low-level
execute_differential_refresh() path now rejects unpopulated stream tables
and empty previous frontiers before any SPI work begins, and E2E coverage
continues to verify that manual refresh for initialize => false falls back
to FULL rather than surfacing that internal guard.
Problem. If execute_differential_refresh() is called on a stream table
that has is_populated = false (never initialized), the frontier defaults to
'0/0'::pg_lsn. This means the delta query scans the entire change buffer
from the beginning of WAL, which:
- Is semantically wrong: a delta applied to an empty table is not the same as
a full materialization (aggregates, JOINs, etc. produce different results).
- Is prohibitively slow on large buffers.
Current mitigation. The manual refresh path
(execute_manual_differential_refresh() at src/api.rs:1898) checks
st.is_populated and falls back to FULL. The scheduler path
(src/scheduler.rs:1294–1340) checks prev_frontier.is_empty() and falls
back to FULL. So in practice, this gap is mitigated.
Residual risk. execute_differential_refresh() itself has no guard — it
trusts its callers. A future caller could skip the check.
Implementation Steps
Defensive check in
execute_differential_refresh()(src/refresh.rs)- Add an early return at the top of the function:
rust if !st.is_populated { return Err(PgTrickleError::InvalidArgument(format!( "Cannot run DIFFERENTIAL refresh on unpopulated stream table {}.{} — \ a FULL refresh is required first.", st.pgt_schema, st.pgt_name ))); } - Callers already handle errors by falling back to FULL or marking for reinit, so this is safe.
- Add an early return at the top of the function:
Also guard on empty frontier (belt-and-suspenders)
- After the
is_populatedcheck:rust if prev_frontier.is_empty() { return Err(PgTrickleError::InvalidArgument(format!( "Cannot run DIFFERENTIAL refresh on {}.{} — no previous frontier exists.", st.pgt_schema, st.pgt_name ))); }
- After the
Tests
- Unit tests: call
execute_differential_refresh()withis_populated = falseand withprev_frontier.is_empty()→ both returnInvalidArgumentbefore SPI work. - Integration/E2E test: create ST with
initialize => false, attempt manual DIFFERENTIAL refresh → verify it still falls back to FULL and populates the stream table.
- Unit tests: call
4. Implementation Order
The gaps have interdependencies. Recommended order:
Phase 1 (quick wins — small, independent, high value):
├── G6: Defensive check in execute_differential_refresh()
├── G2: Validate IMMEDIATE + WAL combination
└── G3: Advance WAL slot after FULL refresh
Phase 2 (buffer hygiene):
└── G4: Flush change buffers on adaptive fallback
(shares cleanup helper with G3)
Phase 3 (observability):
└── G5: TRANSITIONING visibility (view + NOTIFY)
Phase 4 (feature):
└── G1: Per-table cdc_mode override
(largest change — SQL API, catalog, dbt, migration, docs)
Effort Estimates
| Phase | Gaps | Complexity |
|---|---|---|
| 1 | G6, G2, G3 | ~3–5 files changed, no migration |
| 2 | G4 | ~2 files changed, refactor shared cleanup |
| 3 | G5 | ~3 files + migration, new view |
| 4 | G1 | ~8–10 files + migration + dbt macro changes |
5. Migration Strategy
Phases 1–2 require no SQL migration (pure Rust logic changes).
Phase 3 requires an upgrade migration for the convenience view.
Phase 4 requires an upgrade migration for the new
pgt_stream_tables.requested_cdc_mode column and updated function
signatures.
All changes are backward compatible: existing stream tables continue to work
without any user action. The per-table cdc_mode defaults to NULL (inherit
from GUC), and the new validations only reject configurations that were
already broken or undefined.
6. Open Questions
G1 granularity: Should per-table
cdc_modebe on the stream table (applies to all its sources) or on individual source dependencies? The former is simpler; the latter handles the case where one ST joins a PK-table with a keyless table.G3 shared sources: When a source table is tracked by multiple STs and only one does a FULL refresh, we cannot TRUNCATE the shared change buffer. Should we use per-ST frontier-based DELETE instead? Or maintain per-ST buffer tables (major architectural change)?
G4 ping-pong prevention: Beyond buffer cleanup, should we add a backoff mechanism? E.g., after an adaptive fallback, force the next N cycles to use FULL to let the buffer stabilize.
G5 NOTIFY volume: On a cluster with hundreds of stream tables, CDC transition NOTIFYs could be noisy. Should this be gated behind a GUC (e.g.,
pg_trickle.notify_cdc_transitions = on)?
7. Related Plans
- PLAN_HYBRID_CDC.md — The original trigger→WAL transition design. G1 and G2 extend this.
- PLAN_TRANSACTIONAL_IVM.md — IMMEDIATE mode design. G2 adds validation at the boundary between IMMEDIATE and CDC.
- PLAN_BOOTSTRAP_GATING.md — Bootstrap readiness. G6’s defensive check complements this by preventing incorrect differential refreshes on uninitiated tables.
- PLAN_REFRESH_MODE_DEFAULT.md — AUTO mode default behavior. G4 addresses the buffer cleanup gap in AUTO’s fallback.