Contents
- PLAN: DAG Pipeline Test Suite — Comprehensive Coverage
- Table of Contents
- Implementation Status
- Motivation
- Current Coverage Map
- Gap Analysis
- Test Group 1: Multi-Cycle DAG Cascades
- Test Group 2: Mixed Refresh Modes
- Test Group 3: Operational Mid-Pipeline Changes
- Test Group 4: Auto-Refresh Chain Propagation
- Test Group 5: Wide Topologies (Fan-Out & Fan-In)
- Test Group 6: Error Resilience in Pipelines
- Test Group 7: Concurrent DML During Pipeline Refresh
- Test Group 8: IMMEDIATE Mode Cascades
- Implementation Roadmap
- Infrastructure & Helpers
- File Organization
- Success Criteria
PLAN: DAG Pipeline Test Suite — Comprehensive Coverage
Status: Complete
Date: 2026-03-04 (coverage map updated 2026-03-04)
Branch: e2e_pipeline_dag_tests
Scope: Close all remaining gaps in multi-layer DAG pipeline E2E tests — multi-cycle cascades, mixed refresh modes, operational mid-pipeline changes, auto-refresh propagation, wide topologies, error resilience, and concurrent DML during pipeline refresh.
Table of Contents
- Motivation
- Current Coverage Map
- Gap Analysis
- Test Group 1: Multi-Cycle DAG Cascades
- Test Group 2: Mixed Refresh Modes
- Test Group 3: Operational Mid-Pipeline Changes
- Test Group 4: Auto-Refresh Chain Propagation
- Test Group 5: Wide Topologies (Fan-Out & Fan-In)
- Test Group 6: Error Resilience in Pipelines
- Test Group 7: Concurrent DML During Pipeline Refresh
- Test Group 8: IMMEDIATE Mode Cascades
- Implementation Roadmap
- Infrastructure & Helpers
- File Organization
Implementation Status
| Phase | Group | File | Tests | Status |
|---|---|---|---|---|
| 1 | Multi-Cycle DAG Cascades | e2e_multi_cycle_dag_tests.rs |
6 | Done |
| 2 | Mixed Refresh Modes | e2e_mixed_mode_dag_tests.rs |
5 | Done |
| 2 | Auto-Refresh Chain Propagation | e2e_dag_autorefresh_tests.rs |
5 | Done |
| 2 | Wide Topologies | e2e_dag_topology_tests.rs |
5 | Done |
| 3 | Operational Mid-Pipeline Changes | e2e_dag_operations_tests.rs |
7 | Done |
| 3 | Error Resilience | e2e_dag_error_tests.rs |
4 | Done |
| 3 | Concurrent DML | e2e_dag_concurrent_tests.rs |
3 | Done |
| 3 | IMMEDIATE Mode Cascades | e2e_dag_immediate_tests.rs |
4 | Done |
| Total | 39 | 39 done / 0 remaining |
Implementation Complete
All 39 tests across 8 groups have been implemented. No remaining work.
Phase 3 (P2) — Implemented 2026-03-04:
| Group | File | Tests | Description |
|---|---|---|---|
| 3 — Operational Mid-Pipeline | e2e_dag_operations_tests.rs |
7 | SUSPEND/ALTER/DROP on intermediate nodes, suspend-resume cycle |
| 6 — Error Resilience | e2e_dag_error_tests.rs |
4 | Failure isolation, error recovery, consecutive_errors tracking |
| 7 — Concurrent DML | e2e_dag_concurrent_tests.rs |
3 | DML between refreshes, concurrent inserts, rollback safety |
| 8 — IMMEDIATE Cascades | e2e_dag_immediate_tests.rs |
4 | 2-layer + 3-layer IMMEDIATE, mixed IMMED+DIFF, rollback |
Motivation
The DAG (Directed Acyclic Graph) is the core orchestration structure in
pg_trickle. Stream tables form dependency chains: base table → ST₁ → ST₂ →
ST₃, with optional diamonds (fan-in) and fan-out. The refresh scheduler
traverses these DAGs in topological order (Kahn’s algorithm — src/dag.rs),
and correctness depends on every layer seeing a consistent upstream state.
What existing tests cover well
| Existing File | Layers | Modes | Cycles | Topology |
|---|---|---|---|---|
e2e_pipeline_dag_tests.rs |
3–4 | DIFF only | 1 per test | Linear, diamond, fan-out |
e2e_getting_started_tests.rs |
3 | DIFF only | 1 per test | Linear (recursive CTE) |
e2e_cascade_regression_tests.rs |
2–3 | DIFF only | 1–2 | Linear |
e2e_multi_cycle_tests.rs |
1 | DIFF only | 4–8 | Single ST |
e2e_diamond_tests.rs |
2–3 | DIFF only | 0–1 | Diamond |
e2e_ivm_tests.rs |
2 | IMMED | 1 | Linear |
Identified gaps
- Multi-cycle on multi-layer DAGs — the biggest gap.
e2e_multi_cycle_tests.rsonly tests single-layer STs. No test runs 4+ mutation-refresh cycles through a 3+ layer pipeline. - Mixed refresh modes — No test creates a pipeline where one layer is FULL, another DIFFERENTIAL, another IMMEDIATE.
- Operational mid-pipeline changes — No test SUSPENDS, ALTERs, or DROPs an intermediate ST while data is flowing through the chain.
- Auto-refresh chain propagation —
e2e_cascade_regression_tests.rshas 2-layer auto-refresh tests but no 3+ layer scheduler-driven cascade. - Wide fan-out —
e2e_pipeline_dag_tests.rshas some fan-out but no test with 4+ leaves sharing one root. - IMMEDIATE mode multi-layer —
test_ivm_cascading_immediate_stsis 2-layer and requires explicit refresh for the second layer. - Error resilience — No test verifies that an error in one pipeline layer doesn’t corrupt sibling or downstream layers.
- Concurrent DML —
e2e_concurrent_tests.rstests concurrent refresh on the same ST, but not DML racing with a multi-layer pipeline refresh.
Current Coverage Map
Post-implementation state — all 39 tests from this branch are now merged. Legend: ✅ covered · ⚠️ partial · ❌ not covered
Existing tests by topology and behavior
| Behavior | Linear 2L | Linear 3L | Diamond | Fan-Out | Wide (4+L / 4+ leaves) |
|---|---|---|---|---|---|
| Initial population | ✅ | ✅ | ✅ | ✅ | ✅ ¹ |
| INSERT cascade | ✅ | ✅ | ✅ | ✅ | ✅ |
| UPDATE cascade | ✅ | ✅ | ✅ ² | ✅ | ⚠️ ³ |
| DELETE cascade | ✅ | ✅ | ✅ | ✅ | ✅ |
| Multi-cycle (4+) | ✅ | ✅ | ✅ | ✅ | ⚠️ ³ |
| Mixed modes (FULL/DIFF/IMMED) | ✅ | ✅ | ⚠️ ⁴ | ✅ | ❌ |
| Mid-pipeline SUSPEND | ✅ | ✅ | ❌ | ❌ | ❌ |
| Mid-pipeline ALTER (schedule/mode/query) | ✅ | ✅ | ❌ | ❌ | ❌ |
| Mid-pipeline DROP | ✅ | ✅ | ❌ | ❌ | ❌ |
| Auto-refresh cascade | ✅ | ✅ | ✅ | ❌ | ❌ |
| Error in middle layer | ✅ | ✅ | ❌ | ❌ | ❌ |
| Concurrent DML | ✅ | ✅ | ❌ | ❌ | ❌ |
| IMMEDIATE propagation | ✅ | ✅ | ❌ | ❌ | ❌ |
| Rollback safety | ✅ | ✅ | ❌ | ❌ | ❌ |
¹ test_fanout_4_leaves and test_deep_linear_5_layers cover wide/deep initial population.
² test_mc_dag_diamond_multi_cycle includes INSERT+DELETE over 5 cycles; UPDATE is implicitly covered via group-elimination-revival pattern.
³ Wide topologies are covered for INSERT and DELETE isolation (test_wide_fanout_deletion_isolation) but not explicit UPDATE or multi-cycle stress.
⁴ No test creates a diamond where the two intermediate legs use different refresh modes.
Key architecture paths — all closed
| Code Path | Location | Previously | Now |
|---|---|---|---|
has_stream_table_source_changes() with 3+ topo levels |
src/scheduler.rs |
2-layer only | ✅ test_autorefresh_3_layer_cascade, test_autorefresh_no_spurious_3_layer |
resolve_calculated_schedule() with deep chains |
src/dag.rs |
unit tests only | ✅ test_autorefresh_calculated_schedule, test_autorefresh_staggered_schedules |
determine_refresh_action() FULL→DIFF transition in cascade |
src/refresh.rs |
not at pipeline level | ✅ test_alter_mode_mid_pipeline_diff_to_full, test_mixed_mode_alter_mid_pipeline |
| Diamond consistency group update after ALTER | src/dag.rs |
create only | ✅ test_alter_mode_mid_pipeline_diff_to_full |
| Topological ordering with mixed TABLE/STREAM_TABLE sources | src/scheduler.rs |
2-layer only | ✅ test_multi_source_diamond |
pg_sys::BeginInternalSubTransaction atomic diamond group path |
src/scheduler.rs |
untested (was broken via SPI) | ✅ test_autorefresh_diamond_cascade (EC-13 fix) |
Test file inventory
| File | Tests | Topology | Cycles | Notes |
|---|---|---|---|---|
e2e_pipeline_dag_tests.rs |
14 | Linear, diamond, fan-out | 1 | Pre-existing: broad single-cycle coverage |
e2e_cascade_regression_tests.rs |
8 | Linear 2–3L | 1–2 | Pre-existing: regression guard |
e2e_diamond_tests.rs |
15 | Diamond | 0–1 | Pre-existing + EC-13 default update |
e2e_multi_cycle_tests.rs |
5 | Single-ST | 4–8 | Pre-existing: single-layer multi-cycle |
e2e_multi_cycle_dag_tests.rs |
6 | Linear 3L + diamond | 4–10 | New: multi-cycle at pipeline level |
e2e_mixed_mode_dag_tests.rs |
5 | Linear 2–3L + immediate | 1–3 | New: FULL/DIFF/IMMED combinations |
e2e_dag_autorefresh_tests.rs |
5 | Linear 3L + diamond | scheduler | New: scheduler-driven cascade |
e2e_dag_topology_tests.rs |
5 | Fan-out, deep (5L), multi-source | 1 | New: wide/deep topologies |
e2e_dag_operations_tests.rs |
7 | Linear 2–3L | 1–2 | New: SUSPEND/ALTER/DROP mid-pipeline |
e2e_dag_error_tests.rs |
4 | Linear 2–3L | 1–2 | New: error isolation and recovery |
e2e_dag_concurrent_tests.rs |
3 | Linear 2–3L | 1–2 | New: DML racing pipeline refresh |
e2e_dag_immediate_tests.rs |
4 | Linear 2–3L + mixed | 1–2 | New: IMMEDIATE mode cascades |
| New total | 39 |
Remaining gaps (acknowledged, no current plan)
| Gap | Why not covered now | Priority |
|---|---|---|
| Wide topology (4+ leaves) + multi-cycle | High container load; single-cycle is adequate | P3 |
| Diamond + mixed modes | Combinatorial; base diamond + mixed linear covers the logic | P3 |
| Diamond + mid-pipeline ALTER/SUSPEND | Uncommon operational path | P3 |
| Auto-refresh on fan-out (≥4 leaves) | No scheduler policy drives fan-out distinctly | P3 |
| IMMEDIATE + diamond | Not a supported combination today | P3 |
Gap Analysis
Priority Rating
| Priority | Meaning | Gap Groups |
|---|---|---|
| P0 | Bugs hide here; drift accumulates silently | Group 1 (multi-cycle DAG) |
| P1 | Missing coverage for shipped features | Groups 2, 4, 5 |
| P2 | Edge cases in operational workflows | Groups 3, 6, 7, 8 |
Test Group 1: Multi-Cycle DAG Cascades
Priority: P0 — Highest
Target file: tests/e2e_multi_cycle_dag_tests.rs (new)
Rationale: This is the intersection of two tested dimensions (multi-cycle + multi-layer) that has zero coverage. Delta drift accumulates over cycles and the defining characteristic of IVM is that it converges to the correct answer across many updates. A single mutation-then-check does not catch cumulative errors.
Schema: Reuse the Nexmark-inspired base tables
auctions, bidders, bids (base tables)
→ auction_bids (L1: JOIN + agg)
→ bidder_stats (L2: agg on L1)
→ category_metrics (L3: agg on L2)
Tests
| # | Test Name | Description | Cycles | DML Operations |
|---|---|---|---|---|
| 1.1 | test_mc_dag_insert_heavy_10_cycles |
10 cycles of INSERT-only DML into all 3 base tables; full pipeline refresh each cycle; assert_st_matches_query at all 3 layers |
10 | INSERT |
| 1.2 | test_mc_dag_mixed_dml_5_cycles |
5 cycles with mixed INSERT/UPDATE/DELETE at the base; pipeline refresh; check all layers | 5 | INSERT+UPDATE+DELETE |
| 1.3 | test_mc_dag_noop_cycle_no_drift |
Insert data, refresh pipeline, then run 5 no-op refresh cycles (no DML). Assert data_timestamp is stable and contents unchanged at all layers. |
5 | none |
| 1.4 | test_mc_dag_group_elimination_revival |
Delete all data for a group → refresh → group disappears at all layers → insert new data for same group → refresh → group reappears at all layers | 4 | DELETE then INSERT |
| 1.5 | test_mc_dag_bulk_mutation_stress |
Single cycle with large batch DML (100 INSERTs, 50 UPDATEs, 30 DELETEs) followed by pipeline refresh | 1 | bulk mixed |
| 1.6 | test_mc_dag_diamond_multi_cycle |
Diamond topology: base → L1a, base → L1b, L1a+L1b → L2. Run 5 cycles of mixed DML; verify L2 converges each cycle. | 5 | INSERT+DELETE |
Implementation pattern
#[tokio::test]
async fn test_mc_dag_insert_heavy_10_cycles() {
let db = E2eDb::new().await.with_extension().await;
// Setup base tables + pipeline (reusable helpers)
setup_nexmark_base(&db).await;
create_nexmark_pipeline(&db).await;
let queries = nexmark_defining_queries(); // HashMap<&str, &str>
for cycle in 1..=10 {
// Insert new data each cycle
db.execute(&format!(
"INSERT INTO bidders (name) VALUES ('cycle_{cycle}_bidder')"
)).await;
db.execute(&format!(
"INSERT INTO auctions (title, category, seller_id) \
VALUES ('item_{cycle}', 'cat_a', 1)"
)).await;
db.execute(&format!(
"INSERT INTO bids (auction_id, bidder_id, amount) \
VALUES ({cycle}, 1, {cycle} * 10)"
)).await;
// Refresh in topological order
db.refresh_st("auction_bids").await;
db.refresh_st("bidder_stats").await;
db.refresh_st("category_metrics").await;
// DBSP invariant at every layer
for (name, query) in &queries {
db.assert_st_matches_query(name, query).await;
}
}
}
What this catches
- Cumulative delta drift — Errors that only manifest after N cycles of incremental changes
- Change buffer cleanup — Each cycle’s CDC entries are consumed and don’t leak into the next
- data_timestamp monotonicity — Upstream timestamps advance correctly through the chain
- Prepared statement cache — Delta SQL prepared plans survive across cycles (PG generic plan threshold)
Test Group 2: Mixed Refresh Modes
Priority: P1
Target file: tests/e2e_mixed_mode_dag_tests.rs (new)
Rationale: Real-world pipelines mix modes: a FULL-mode root (recursive CTE), DIFFERENTIAL middle layers, and possibly an IMMEDIATE leaf. The scheduler code (determine_refresh_action, check_upstream_changes) has different paths for each mode, but no E2E test exercises them in a single pipeline.
Key architecture consideration
When a DIFFERENTIAL ST depends on a FULL-mode upstream ST, the upstream
does not have a CDC change buffer (FULL mode does a complete recompute).
The scheduler uses data_timestamp comparison for STREAM_TABLE
dependencies (see has_stream_table_source_changes() in
src/scheduler.rs), which works regardless of upstream refresh mode. But
this path has never been tested with mixed modes.
Tests
| # | Test Name | Pipeline | Description |
|---|---|---|---|
| 2.1 | test_mixed_full_then_diff_2_layer |
base →[FULL] L1 →[DIFF] L2 | FULL root feeds DIFFERENTIAL leaf. Mutate base, refresh L1 (FULL recompute), refresh L2 (delta against L1). |
| 2.2 | test_mixed_diff_then_full_2_layer |
base →[DIFF] L1 →[FULL] L2 | DIFFERENTIAL root feeds FULL leaf. Verify L2 does full recompute regardless. |
| 2.3 | test_mixed_3_layer_full_diff_diff |
base →[FULL] L1 →[DIFF] L2 →[DIFF] L3 | 3-layer mixed. FULL root, two DIFFERENTIAL downstream. |
| 2.4 | test_mixed_mode_alter_mid_pipeline |
base →[DIFF] L1 →[DIFF] L2, then ALTER L1 to FULL | Alter middle layer from DIFF→FULL, verify chain still converges. |
| 2.5 | test_mixed_immediate_leaf |
base →[DIFF] L1 →[IMMED] L2 | DIFFERENTIAL root with IMMEDIATE leaf. Insert into base, refresh L1, verify L2 is updated. |
Implementation considerations
The create_st() helper defaults to "DIFFERENTIAL". For FULL mode:
rust
db.create_st("layer1", query, "1m", "FULL").await;
For IMMEDIATE mode (no schedule needed):
rust
db.execute(
"SELECT pgtrickle.create_stream_table('imm_layer', $$...$$, NULL, 'IMMEDIATE')"
).await;
What this catches
determine_refresh_action()dispatching to FULL vs DIFFERENTIAL in a cascade contexthas_stream_table_source_changes()comparingdata_timestampwhen upstream mode differs- Reinit behavior when ALTERing mode mid-pipeline
- IMMEDIATE trigger cascade with a DIFFERENTIAL upstream
Test Group 3: Operational Mid-Pipeline Changes
Priority: P2
Target file: tests/e2e_dag_operations_tests.rs (new)
Rationale: Operators SUSPEND, ALTER (schedule/mode/query), and DROP
stream tables in production. If this happens to a middle node in a DAG,
the downstream STs must behave predictably. The scheduler must skip
SUSPENDED STs and the DAG must be rebuilt after DROP.
Key architecture paths exercised
DAG_REBUILD_SIGNAL— incremented on ALTER/DROP, scheduler rebuilds its in-memoryStDagstatus = 'SUSPENDED'— scheduler skips SUSPENDED STs in topological walkdetermine_refresh_action()returningNoActionfor SUSPENDED upstream- DROP cascade —
drop_stream_table()refuses if dependents exist (unless they’re dropped first)
Tests
| # | Test Name | Operation | Topology | Description |
|---|---|---|---|---|
| 3.1 | test_suspend_middle_layer_blocks_downstream |
SUSPEND | A→B→C | Suspend B. Insert into A, refresh A. Verify C’s data_timestamp doesn’t advance. Resume B, refresh B+C, verify convergence. |
| 3.2 | test_alter_schedule_mid_pipeline |
ALTER schedule | A→B→C | Alter B’s schedule from 1m to 5s. Verify DAG_REBUILD_SIGNAL fires, scheduler picks up new config, chain converges. |
| 3.3 | test_alter_mode_mid_pipeline_diff_to_full |
ALTER mode | A→B→C | Change B from DIFFERENTIAL to FULL. Verify next refresh of B does full recompute, C still sees correct data. |
| 3.4 | test_alter_query_mid_pipeline |
ALTER query | A→B→C | Change B’s defining query (add a WHERE filter). Verify reinit, C reflects the filtered data after refresh. |
| 3.5 | test_drop_leaf_keeps_pipeline_intact |
DROP leaf | A→B→C | Drop C. Verify A and B are unaffected, can still be refreshed. |
| 3.6 | test_drop_middle_layer_blocked_by_dependents |
DROP middle | A→B→C | Attempt to drop B while C depends on it. Verify error. Drop C first, then B succeeds. |
| 3.7 | test_suspend_resume_cycle_data_consistency |
SUSPEND+RESUME | A→B | Suspend B, mutate A several times (no refresh on B), resume B, single refresh on B. Verify B has cumulative changes. |
Implementation pattern
#[tokio::test]
async fn test_suspend_middle_layer_blocks_downstream() {
let db = E2eDb::new().await.with_extension().await;
// base → layer_a → layer_b → layer_c
setup_3_layer_pipeline(&db).await;
// Suspend the middle layer
db.alter_st("layer_b", "status => 'SUSPENDED'").await;
let (status, _, _, _) = db.pgt_status("layer_b").await;
assert_eq!(status, "SUSPENDED");
// Mutate base and refresh layer_a
db.execute("INSERT INTO base_t VALUES (100, 'new')").await;
db.refresh_st("layer_a").await;
// Record layer_c's data_timestamp before attempting refresh
let ts_before: String = db.query_scalar(
"SELECT data_timestamp::text FROM pgtrickle.pgt_stream_tables \
WHERE pgt_name = 'layer_c'"
).await;
// Refresh layer_b should be skipped (SUSPENDED), layer_c should see no upstream change
// (Manual refresh of a SUSPENDED ST should return an error or no-op)
// Resume and verify convergence
db.alter_st("layer_b", "status => 'ACTIVE'").await;
db.refresh_st("layer_b").await;
db.refresh_st("layer_c").await;
db.assert_st_matches_query("layer_c", LAYER_C_QUERY).await;
}
Test Group 4: Auto-Refresh Chain Propagation
Priority: P1
Target file: tests/e2e_dag_autorefresh_tests.rs (new)
Rationale: The scheduler processes STs in topological order within each
tick (src/scheduler.rs). For ST-on-ST dependencies it uses
has_stream_table_source_changes() (data_timestamp comparison). This path
has only been tested with 2-layer cascades
(e2e_cascade_regression_tests.rs). Deeper chains exercise the
scheduler’s ordering guarantee more rigorously.
Critical: Must use E2eDb::new_on_postgres_db()
The background worker only connects to the postgres database. All tests
in this group must use new_on_postgres_db() and
configure_fast_scheduler().
Tests
| # | Test Name | Layers | Description |
|---|---|---|---|
| 4.1 | test_autorefresh_3_layer_cascade |
3 | base → L1 → L2 → L3 all with 1s schedule. Insert into base, wait for L3 to auto-refresh, verify correctness at all layers. |
| 4.2 | test_autorefresh_diamond_cascade |
3 | Diamond: base → L1a + L1b → L2, all 1s. Insert into base, wait for L2. |
| 4.3 | test_autorefresh_calculated_schedule |
2 | L1 (schedule 1s) → L2 (schedule CALCULATED). L2 should refresh whenever L1 has pending changes. |
| 4.4 | test_autorefresh_no_spurious_3_layer |
3 | Like test_no_spurious_cascade_after_noop_upstream_refresh but extended to 3 layers. No DML → all 3 data_timestamps remain stable across 2+ scheduler ticks. |
| 4.5 | test_autorefresh_staggered_schedules |
3 | L1=1s, L2=3s, L3=1s. Verify L3 doesn’t refresh until L2 has caught up. |
Implementation pattern
#[tokio::test]
async fn test_autorefresh_3_layer_cascade() {
let db = E2eDb::new_on_postgres_db().await.with_extension().await;
configure_fast_scheduler(&db).await;
db.execute("CREATE TABLE auto3_src (id SERIAL PRIMARY KEY, val INT)")
.await;
db.execute("INSERT INTO auto3_src VALUES (1, 10), (2, 20)").await;
db.create_st("auto3_l1", "SELECT id, val FROM auto3_src", "1s", "DIFFERENTIAL").await;
db.execute(
"SELECT pgtrickle.create_stream_table('auto3_l2', \
$$SELECT id, val * 2 AS doubled FROM auto3_l1$$, '1s', 'DIFFERENTIAL')"
).await;
db.execute(
"SELECT pgtrickle.create_stream_table('auto3_l3', \
$$SELECT id, doubled + 1 AS result FROM auto3_l2$$, '1s', 'DIFFERENTIAL')"
).await;
// Wait for initial scheduler stabilization
db.wait_for_auto_refresh("auto3_l3", Duration::from_secs(30)).await;
// Mutate and wait for cascade
db.execute("INSERT INTO auto3_src VALUES (3, 30)").await;
// Wait for the deepest layer to pick up the change
let refreshed = db.wait_for_auto_refresh("auto3_l3", Duration::from_secs(60)).await;
assert!(refreshed, "auto3_l3 should auto-refresh after base mutation");
// Verify correctness at all layers
db.assert_st_matches_query("auto3_l1", "SELECT id, val FROM auto3_src").await;
db.assert_st_matches_query("auto3_l2",
"SELECT id, val * 2 AS doubled FROM auto3_src").await;
db.assert_st_matches_query("auto3_l3",
"SELECT id, val * 2 + 1 AS result FROM auto3_src").await;
}
Timing considerations
Auto-refresh tests are inherently slower (wall-clock polling). Each test
should use generous timeouts (30–60s) and short schedules (1s). The
configure_fast_scheduler() helper sets
pg_trickle.scheduler_interval_ms = 100 and
pg_trickle.min_schedule_seconds = 1.
Test Group 5: Wide Topologies (Fan-Out & Fan-In)
Priority: P1
Target file: tests/e2e_dag_topology_tests.rs (new)
Rationale: Most tests use linear chains or simple diamonds. Real-world
schemas have wide fan-out (one source → many STs) and deep fan-in (many
sources converging through several layers).
Tests
| # | Test Name | Topology | Description |
|---|---|---|---|
| 5.1 | test_fanout_4_leaves |
base → L1, L2, L3, L4 | One base table, 4 independent leaf STs. Insert into base, verify all 4 independently correct. |
| 5.2 | test_fanout_then_converge |
base → L1a, L1b, L1c → L2 (JOIN all) | Wide fan-out at L1, fan-in at L2 (3-way join). |
| 5.3 | test_deep_linear_5_layers |
base → L1 → L2 → L3 → L4 → L5 | 5-layer linear chain. Verify propagation to deepest layer. |
| 5.4 | test_multi_source_diamond |
base_a, base_b → L1 (JOIN) → L2, L3 | Two base tables join at L1, L1 fans out to L2, L3. Mutate base_a, verify L2 and L3. Then mutate base_b, verify again. |
| 5.5 | test_wide_fanout_deletion_isolation |
base → L1..L6 | Delete data relevant to only L3’s query. Verify L1, L2, L4, L5, L6 are unaffected. |
Implementation considerations
For the 5-layer chain (test 5.3), the interim STs must add some transformation at each level so we’re testing actual delta propagation rather than just projection:
L1: SELECT id, val FROM base_t -- passthrough
L2: SELECT id, val * 2 AS v2 FROM l1 -- arithmetic
L3: SELECT id, SUM(v2) AS total FROM l2 GROUP BY id -- aggregate
L4: SELECT id, total, RANK() OVER (ORDER BY total DESC) AS rnk FROM l3 -- window
L5: SELECT id, total FROM l4 WHERE rnk <= 10 -- filter (TopK)
This exercises scan → project → aggregate → window → filter operators across the chain, covering the full DVM operator repertoire in one pipeline.
Test Group 6: Error Resilience in Pipelines
Priority: P2
Target file: tests/e2e_dag_error_tests.rs (new)
Rationale: If a middle layer’s refresh fails (e.g., division by zero in
the defining query after a data change, or an out-of-memory), the layers
above and below should not be corrupted. The error should be recorded in
the catalog (consecutive_errors, status), and recovery after data
correction should work.
Tests
| # | Test Name | Description |
|---|---|---|
| 6.1 | test_error_in_middle_layer_does_not_corrupt_siblings |
A→B, A→C. Engineer B’s query to fail on certain data (e.g., division by zero). Insert triggering data. Refresh A (succeeds), refresh B (fails), refresh C (succeeds). Verify C is correct, B has consecutive_errors > 0. |
| 6.2 | test_error_recovery_after_data_fix |
A→B→C. B fails on bad data. Fix the data at the base. Refresh A, then B (succeeds now), then C. Verify full convergence. |
| 6.3 | test_suspended_on_max_errors |
A→B. Set max_consecutive_errors GUC. Trigger repeated failures. Verify B transitions to SUSPENDED after N failures. |
| 6.4 | test_error_in_leaf_does_not_affect_upstream |
A→B→C. C’s refresh fails. Verify A and B are still correct and continue to refresh normally. |
Implementation considerations
To engineer a deterministic failure, create a defining query with division:
SELECT id, val / nullif(denom, 0) AS ratio FROM source_table
Insert a row with denom = 0 to trigger a division-by-zero error. The
refresh should fail, and try_execute can be used to capture the error:
let result = db.try_execute(
"SELECT pgtrickle.refresh_stream_table('faulty_st')"
).await;
assert!(result.is_err(), "refresh should fail on division by zero");
Test Group 7: Concurrent DML During Pipeline Refresh
Priority: P2
Target file: tests/e2e_dag_concurrent_tests.rs (new)
Rationale: In production, DML continues while the scheduler refreshes
the pipeline. This tests that CDC triggers correctly capture changes that
arrive between refresh steps.
Tests
| # | Test Name | Description |
|---|---|---|
| 7.1 | test_dml_between_layer_refreshes |
A→B→C. Insert, refresh A, insert more, refresh B, refresh C. The second insert should appear in the change buffer for the next cycle, not corrupt the current one. |
| 7.2 | test_concurrent_insert_during_pipeline_refresh |
Spawn a task that continuously inserts rows. In the main task, run 5 full pipeline refreshes. After both complete, do a final refresh. Verify convergence. |
| 7.3 | test_rollback_between_refreshes |
A→B. Insert, refresh A. Start a transaction, insert more, ROLLBACK. Refresh B. Verify B reflects only the committed data. |
Implementation pattern
#[tokio::test]
async fn test_dml_between_layer_refreshes() {
let db = E2eDb::new().await.with_extension().await;
setup_2_layer_pipeline(&db).await;
// Insert and refresh L1 only
db.execute("INSERT INTO base_t VALUES (10, 'first')").await;
db.refresh_st("layer1").await;
// More DML arrives between L1 and L2 refresh
db.execute("INSERT INTO base_t VALUES (11, 'between')").await;
// Refresh L2 — it should see L1's state (which doesn't include row 11)
db.refresh_st("layer2").await;
db.assert_st_matches_query("layer2", LAYER2_QUERY).await;
// Now refresh L1 again to pick up row 11, then L2
db.refresh_st("layer1").await;
db.refresh_st("layer2").await;
db.assert_st_matches_query("layer2", LAYER2_QUERY).await;
}
Test Group 8: IMMEDIATE Mode Cascades
Priority: P2
Target file: tests/e2e_dag_immediate_tests.rs (new)
Rationale: test_ivm_cascading_immediate_sts (in e2e_ivm_tests.rs)
shows that a 2-layer IMMEDIATE cascade currently requires explicit refresh
for the second layer — the statement-level trigger only fires for the
directly affected ST. This behavior should be documented via tests, and
deeper IMMEDIATE cascades should be verified.
Key architecture note
IMMEDIATE mode uses statement-level AFTER triggers (src/ivm.rs). When
the base table changes, the trigger refreshes ST₁. But ST₁’s internal
update is done via MERGE, which may or may not fire ST₂’s trigger
(depending on whether ST₂’s trigger is on the materialized table that
ST₁ writes to). Current behavior: it does NOT cascade automatically.
Tests
| # | Test Name | Description |
|---|---|---|
| 8.1 | test_immediate_2_layer_explicit_refresh |
Document current behavior: base → IMMED_A → IMMED_B. Insert into base, A is auto-refreshed, B is NOT (needs explicit refresh). |
| 8.2 | test_immediate_with_differential_downstream |
base → IMMED_A → DIFF_B. Insert into base, A auto-refreshes. Manual refresh of B picks up A’s changes. |
| 8.3 | test_immediate_3_layer_propagation |
base → IMMED_A → IMMED_B → IMMED_C. Test how far the cascade propagates automatically vs requiring explicit refresh. |
| 8.4 | test_immediate_rollback_no_side_effects |
base → IMMED_A. Insert in a transaction that is rolled back. Verify A is unchanged. |
Implementation Roadmap
Phase 1: Multi-Cycle DAG (P0) — ✅ COMPLETE
| Step | Task | Status |
|---|---|---|
| 1.1 | Create tests/e2e_multi_cycle_dag_tests.rs with shared setup helpers |
✅ |
| 1.2 | Implement tests 1.1–1.4 (core multi-cycle cascade coverage) | ✅ |
| 1.3 | Implement tests 1.5–1.6 (stress + diamond multi-cycle) | ✅ |
| 1.4 | Run full test suite, fix any discovered issues | ✅ |
Phase 2: Mixed Modes + Auto-Refresh (P1) — ✅ COMPLETE
| Step | Task | Status |
|---|---|---|
| 2.1 | Create tests/e2e_mixed_mode_dag_tests.rs, implement tests 2.1–2.3 |
✅ |
| 2.2 | Implement tests 2.4–2.5 (alter + IMMEDIATE leaf) | ✅ |
| 2.3 | Create tests/e2e_dag_autorefresh_tests.rs, implement tests 4.1–4.2 |
✅ |
| 2.4 | Implement tests 4.3–4.5 (CALCULATED, no-spurious, staggered) | ✅ |
| 2.5 | Create tests/e2e_dag_topology_tests.rs, implement tests 5.1–5.5 |
✅ |
Phase 3: Operations + Error + Concurrent (P2) — ✅ COMPLETE
| Step | Task | Status |
|---|---|---|
| 3.1 | Create tests/e2e_dag_operations_tests.rs, implement tests 3.1–3.4 |
✅ |
| 3.2 | Implement tests 3.5–3.7 (DROP cascade, suspend-resume) | ✅ |
| 3.3 | Create tests/e2e_dag_error_tests.rs, implement tests 6.1–6.4 |
✅ |
| 3.4 | Create tests/e2e_dag_concurrent_tests.rs, implement tests 7.1–7.3 |
✅ |
| 3.5 | Create tests/e2e_dag_immediate_tests.rs, implement tests 8.1–8.4 |
✅ |
Phase 4: CI Integration + Documentation — ✅ COMPLETE
| Step | Task | Status |
|---|---|---|
| 4.1 | justfile glob e2e_* auto-picks up new files — no changes needed |
✅ |
| 4.2 | cargo check --tests, cargo fmt, cargo clippy --tests — all clean |
✅ |
Total estimated effort: ~15 hours (all complete)
Infrastructure & Helpers
Reusable setup helpers
Each test file should define module-local setup helpers for its pipeline schemas. These keep tests concise and ensure consistent table structures.
/// Create a 3-layer linear pipeline: base → L1 (agg) → L2 (project) → L3 (filter)
async fn setup_3_layer_pipeline(db: &E2eDb) {
db.execute("CREATE TABLE base_t (id SERIAL PRIMARY KEY, grp TEXT, val INT)").await;
db.execute("INSERT INTO base_t (grp, val) VALUES ('a', 10), ('b', 20), ('c', 30)").await;
db.create_st(
"layer1", "SELECT grp, SUM(val) AS total FROM base_t GROUP BY grp",
"1m", "DIFFERENTIAL",
).await;
db.execute(
"SELECT pgtrickle.create_stream_table('layer2', \
$$SELECT grp, total * 2 AS doubled FROM layer1$$, NULL, 'DIFFERENTIAL')"
).await;
db.execute(
"SELECT pgtrickle.create_stream_table('layer3', \
$$SELECT grp, doubled FROM layer2 WHERE doubled > 30$$, NULL, 'DIFFERENTIAL')"
).await;
}
Defining queries map
For assert_st_matches_query, we need the raw defining query for each ST
(evaluated against the base tables). The helper should return a map:
fn layer_queries() -> Vec<(&'static str, &'static str)> {
vec![
("layer1", "SELECT grp, SUM(val) AS total FROM base_t GROUP BY grp"),
("layer2", "SELECT grp, SUM(val) * 2 AS doubled FROM base_t GROUP BY grp"),
("layer3", "SELECT grp, SUM(val) * 2 AS doubled FROM base_t GROUP BY grp \
HAVING SUM(val) * 2 > 30"),
]
}
Important: The validation query for downstream STs must be written in
terms of the base tables, not the intermediate STs, so that
assert_st_matches_query compares the ST’s materialized contents against
the ground truth.
E2eDb helper API reference
| Method | Usage |
|---|---|
E2eDb::new() |
New container with isolated database (manual refresh tests) |
E2eDb::new_on_postgres_db() |
Uses postgres database (required for bgworker/auto-refresh tests) |
db.create_st(name, query, schedule, mode) |
Create + initialize ST |
db.refresh_st(name) |
Manual refresh |
db.drop_st(name) |
Drop ST |
db.alter_st(name, args) |
Alter ST (schedule, status, mode, query) |
db.assert_st_matches_query(st, query) |
DBSP correctness invariant (set equality via EXCEPT) |
db.pgt_status(name) |
Returns (status, refresh_mode, is_populated, consecutive_errors) |
db.count(table) |
Row count |
db.query_scalar::<T>(sql) |
Single scalar result |
db.try_execute(sql) |
Returns Result<(), sqlx::Error> for error tests |
db.wait_for_auto_refresh(name, timeout) |
Poll data_timestamp for bgworker tests |
db.table_exists(schema, table) |
Check table existence |
db.trigger_exists(trigger, table) |
Check trigger existence |
File Organization
New files
| File | Group | Tests | Priority |
|---|---|---|---|
tests/e2e_multi_cycle_dag_tests.rs |
1 | 6 | P0 |
tests/e2e_mixed_mode_dag_tests.rs |
2 | 5 | P1 |
tests/e2e_dag_autorefresh_tests.rs |
4 | 5 | P1 |
tests/e2e_dag_topology_tests.rs |
5 | 5 | P1 |
tests/e2e_dag_operations_tests.rs |
3 | 7 | P2 |
tests/e2e_dag_error_tests.rs |
6 | 4 | P2 |
tests/e2e_dag_concurrent_tests.rs |
7 | 3 | P2 |
tests/e2e_dag_immediate_tests.rs |
8 | 4 | P2 |
| Total | 39 |
Files NOT modified
The existing e2e_pipeline_dag_tests.rs (14 tests) remains as-is. It
covers single-mutation scenarios with realistic schemas (Nexmark,
E-commerce, IoT). The new test files focus on the behavioral gaps
(multi-cycle, mixed mode, operations, auto-refresh) rather than duplicating
schema coverage.
Naming convention
All new test files follow the pattern e2e_dag_<aspect>_tests.rs to
clearly group them as DAG-focused tests. The e2e_* glob in the justfile
will automatically pick them up.
Success Criteria
After implementing all 8 groups:
| Dimension | Before | After |
|---|---|---|
| Multi-cycle on multi-layer | 0 tests | 6 tests |
| Mixed refresh modes | 0 tests | 5 tests |
| Operational mid-pipeline | 0 tests | 7 tests |
| Auto-refresh 3+ layers | 0 tests | 5 tests |
| Wide topologies | 0 tests | 5 tests |
| Error resilience | 0 tests | 4 tests |
| Concurrent DML | 0 tests | 3 tests |
| IMMEDIATE cascades | 1 test | 5 tests |
| Total new DAG pipeline tests | 39 |
Combined with the existing 14 tests in e2e_pipeline_dag_tests.rs, 15 in
e2e_diamond_tests.rs, 5 in e2e_multi_cycle_tests.rs, and 8 in
e2e_cascade_regression_tests.rs, this brings total DAG-related E2E test
coverage to 81 tests.