Contents
- Plan: Diamond Dependency Consistency (Multi-Path Refresh)
- 1. Problem Statement
- 2. Theoretical Background
- 3. Current pg_trickle Behavior
- 4. Proposed Solutions
- 5. Recommendation
- 6. Implementation Plan
- Step 1 — Define data structures in dag.rs (no DB) ✅ DONE
- Step 2 — Implement diamond detection in dag.rs (no DB) ✅ DONE
- Step 3 — Implement consistency group computation in dag.rs (no DB) ✅ DONE
- Step 4 — Add catalog column and GUC in catalog.rs / config.rs
- Step 5 — Wire consistency groups into the scheduler (scheduler.rs)
- Step 6 — Add monitoring SQL function (api.rs)
- Step 7 — E2E tests
- Step 8 — Documentation
- Recommended Implementation Order Summary
- What Was Implemented (2026-03-02)
- What Was Implemented (Steps 4–7)
- What Was Implemented (Step 8 — Documentation)
- Prioritized Remaining Work
- 7. Alternatives Considered but Not Recommended
- 8. Impact on Existing Features
- 9. Prior Art & Related Work
- 10. Open Questions (Resolved)
- 11. Implementation Plan: Diamond Schedule Policy (Q4)
- [derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
Plan: Diamond Dependency Consistency (Multi-Path Refresh)
Date: 2026-02-28 Status: IN PROGRESS — Option 1 (Epoch-Based Atomic Groups) Last Updated: 2026-03-02
1. Problem Statement
The Diamond Dependency Pattern
Consider four stream tables with the following dependency graph:
┌─────┐
│ A │ (base table or stream table)
└──┬──┘
│
┌────┴────┐
▼ ▼
┌─────┐ ┌─────┐
│ B │ │ C │
└──┬──┘ └──┬──┘
│ │
└────┬────┘
▼
┌─────┐
│ D │
└─────┘
- A is a source (base table or stream table).
- B depends on A:
B = f(A) - C depends on A:
C = g(A) - D depends on both B and C:
D = h(B, C)
When A changes, the change must propagate through two paths to reach D: - Path 1: A → B → D - Path 2: A → C → D
The Inconsistency Window
With the current scheduler design, stream tables are refreshed sequentially in
topological order. A typical topological ordering might be: [A, B, C, D] or
[A, C, B, D]. In either case, B and C are refreshed separately, each in its
own SPI transaction within the scheduler tick.
The problem arises when B’s refresh succeeds but C’s refresh fails (or is skipped due to advisory lock contention, retry backoff, etc.). In this scenario:
- B has been refreshed to include A’s latest changes (B reflects A@v2).
- C is still stale and reflects A’s older state (C reflects A@v1).
- When D is refreshed, it joins B@v2 with C@v1 — two different versions of A’s data reach D through different paths.
This produces an inconsistent result in D: it sees a “split view” of A where some effects of A’s changes are visible (via B) and some are not (via C).
Concrete Example
-- Base table
CREATE TABLE orders (id INT, amount NUMERIC, region TEXT);
-- Stream table B: orders by region
SELECT pgtrickle.create_stream_table('order_totals_by_region',
'SELECT region, SUM(amount) AS total FROM orders GROUP BY region');
-- Stream table C: order count by region
SELECT pgtrickle.create_stream_table('order_counts_by_region',
'SELECT region, COUNT(*) AS cnt FROM orders GROUP BY region');
-- Stream table D: average order by region (joins B and C)
SELECT pgtrickle.create_stream_table('order_avg_by_region',
'SELECT b.region, b.total / c.cnt AS avg_amount
FROM order_totals_by_region b
JOIN order_counts_by_region c ON b.region = c.region');
If order_totals_by_region (B) is refreshed with new orders but
order_counts_by_region © is not yet refreshed, D computes an incorrect
average — the numerator includes new orders but the denominator doesn’t.
Severity
This is not a data corruption issue — it’s a transient inconsistency. On the next successful refresh cycle where both B and C are refreshed, D will self-correct. However, depending on refresh schedules and failure patterns, the inconsistency window can persist for multiple cycles.
The severity depends on the use case: - Dashboards / analytics: Usually acceptable (eventual consistency). - Financial reporting: Potentially unacceptable if users rely on within-snapshot consistency across related stream tables. - Downstream triggers / alerts: Could fire spurious alerts based on inconsistent intermediate states.
2. Theoretical Background
2.1 DBSP: Timestamps and Logical Time
In the DBSP model (Budiu et al., 2023), all operators in a dataflow circuit process changes at the same logical timestamp $t$. The integration operator $I$ and delay operator $z^{-1}$ ensure that:
$$Q[t] = Q^{\Delta}(\Delta I[t-1], \Delta S[t])$$
Where all operators in the circuit see a consistent snapshot at time $t$. There is no diamond inconsistency because the dataflow runtime processes all operators synchronously within a single logical step.
Key insight: DBSP avoids this problem by design — the entire circuit advances atomically from $t$ to $t+1$. pg_trickle, operating as a PostgreSQL extension with sequential per-ST refreshes, does not have this property.
2.2 Differential Dataflow: Frontiers and Capabilities
McSherry et al.’s Differential Dataflow uses frontiers (Antichain of timestamps) to track progress. A frontier represents a promise: “I will never produce data at timestamps earlier than my frontier.” Operators advance their output frontier only when all their inputs have advanced.
The critical property: - An operator at a join point (like D) does not advance until all upstream paths have delivered all data for the current logical time. - This means D never sees a partial update — it waits until both B and C have fully propagated A’s changes.
This is the mathematical foundation for correct multi-path incremental computation.
2.3 Materialize / Feldera: Timestamps as Versions
Materialize (now using the Timely/Differential Dataflow engine) extends this with multi-dimensional timestamps where each dimension represents a different source. A timestamp like $(lsn_A: 42, lsn_B: 17)$ identifies the exact version of each input. Operators advance their capabilities on each dimension independently, and downstream operators only see consistent combinations.
2.4 Noria: Dataflow with Partial State
Noria (Gjengset et al., 2018) takes a different approach — it propagates changes eagerly through a dataflow graph but uses upqueries to reconstruct state when needed. For diamond dependencies, if an operator receives a partial update, it can issue an upquery to fetch the missing data. However, this introduces complexity and is specific to Noria’s memory-resident model.
3. Current pg_trickle Behavior
3.1 Topological Ordering is Necessary but Not Sufficient
The scheduler refreshes STs in topological order (see scheduler.rs), guaranteeing that upstream STs are refreshed before downstream STs. This prevents D from seeing B@v2 while A is still at v1 — but it does not prevent D from seeing B@v2 and C@v1 when both B and C are upstream but only one successfully refreshed.
3.2 Frontier Per-Source LSN Tracking
Each stream table maintains a Frontier (see version.rs)
that maps source OIDs to LSN values. The frontier records exactly “up to which
WAL position have I consumed changes from each source.” However, the frontier
is per-ST and per-source-OID — it does not encode cross-ST consistency.
3.3 The DVS Guarantee
The architecture promises Delayed View Semantics — that every stream
table’s contents are logically equivalent to evaluating its defining query at
some past time (data_timestamp). But this guarantee is per-ST. There is
currently no guarantee that two stream tables (B and C) reflect the same
point in time for a shared source (A).
4. Proposed Solutions
4.1 Option 1 — Epoch-Based Atomic Refresh Groups
Core idea: Identify sets of stream tables that must be refreshed atomically (or not at all) to maintain cross-path consistency. Assign each refresh cycle a monotonically increasing epoch number. Within an epoch, either all members of a consistency group succeed or all are rolled back.
4.1.1 Consistency Group Detection
A consistency group is a set of STs that share a common upstream ancestor and converge at a common downstream ST. Algorithmically:
For each ST D in the DAG:
For each pair of upstream paths P1, P2 reaching D:
If P1 and P2 share a common ancestor A:
GroupTogether(intermediates on P1, intermediates on P2)
More precisely, find all ST nodes where |reverse_edges| > 1 (multiple
upstream ST dependencies). For each such fan-in node D, trace all paths
backward to find shared ancestors. The union of all intermediate STs on
those paths forms a consistency group.
For the A→B→D, A→C→D diamond, the consistency group is {B, C} (and
implicitly D depends on this group completing).
4.1.2 Implementation Sketch
/// A consistency group: STs that must refresh atomically.
struct ConsistencyGroup {
/// The STs in this group (must all succeed or all rollback).
members: Vec<NodeId>,
/// The fan-in node(s) that require this group's consistency.
convergence_points: Vec<NodeId>,
/// Epoch counter for this group.
epoch: u64,
}
impl StDag {
/// Detect diamond dependencies and return consistency groups.
fn detect_consistency_groups(&self) -> Vec<ConsistencyGroup> {
let mut groups = Vec::new();
// Find all fan-in nodes (STs with multiple upstream ST dependencies)
for node_id in self.topological_order().unwrap_or_default() {
let upstream = self.get_upstream(node_id);
let upstream_sts: Vec<_> = upstream.iter()
.filter(|n| matches!(n, NodeId::StreamTable(_)))
.collect();
if upstream_sts.len() > 1 {
// Check if any pair shares a common ancestor
// ... trace paths and find shared ancestors ...
// If found, create a consistency group
}
}
groups
}
}
4.1.3 Refresh Execution Change
Currently in scheduler.rs, each ST is refreshed independently in Step C.
With consistency groups, the scheduler would:
- Identify groups during DAG rebuild.
- Batch group members — when the first member of a group is due for refresh, mark all group members for refresh.
- Execute in a SAVEPOINT wrapper — wrap the group’s refreshes in a
savepoint. If any member fails,
ROLLBACK TO SAVEPOINTand skip the entire group (including the convergence point D). - Only refresh D after all members of its consistency group have succeeded in the same epoch.
// Pseudocode for group-aware scheduling
for group in consistency_groups {
let savepoint = Spi::run("SAVEPOINT consistency_group")?;
let mut all_ok = true;
for st in &group.members {
if let Err(e) = execute_single_refresh(st) {
all_ok = false;
break;
}
}
if all_ok {
Spi::run("RELEASE SAVEPOINT consistency_group")?;
// Now safe to refresh convergence points
for cp in &group.convergence_points {
execute_single_refresh(cp)?;
}
} else {
Spi::run("ROLLBACK TO SAVEPOINT consistency_group")?;
// Skip the entire group this cycle — D stays at its previous
// consistent state
}
}
4.1.4 Pros & Cons
| Pros | Cons |
|---|---|
| Strong consistency — D never sees split versions | Reduced availability: one failing ST blocks the entire group |
| Simple mental model for users | Increased refresh latency (batch wait) |
| No schema changes needed | Complexity in group detection algorithm |
| Works within existing transaction model | Groups can become large in deep DAGs |
| Backward compatible (groups of size 1 = current behavior) | May require SAVEPOINT support in the scheduler’s SPI context |
4.2 Option 2 — Frontier Alignment (Shared Epoch Frontier)
Core idea: Instead of each ST independently tracking its per-source frontier, introduce a shared epoch frontier for related STs. Before refreshing D, verify that B and C have both advanced their frontier for shared source A to at least the same LSN.
4.2.1 Implementation
Add a pre-check before refreshing fan-in nodes:
fn can_refresh_consistently(
dag: &StDag,
node: NodeId,
frontiers: &HashMap<NodeId, Frontier>,
) -> bool {
let upstream_sts = dag.get_upstream(node)
.into_iter()
.filter(|n| matches!(n, NodeId::StreamTable(_)));
// Find all transitively shared sources
let shared_sources = find_shared_transitive_sources(dag, upstream_sts);
// For each shared source, check that all upstream STs have the same
// frontier LSN
for source_oid in shared_sources {
let lsns: Vec<&str> = upstream_sts
.map(|st| frontiers[st].get_lsn(source_oid))
.collect();
if !all_equal(&lsns) {
return false; // Frontier misalignment — skip D this cycle
}
}
true
}
If the frontiers don’t align (because B succeeded but C failed), D is skipped until the next cycle when both B and C have caught up.
4.2.2 Frontier Alignment Modes
We could offer different strictness levels via a GUC or per-ST option:
strict— D is only refreshed when all upstream paths reflect the exact same LSN for every shared source. Maximum consistency.bounded— D is refreshed if all upstream paths are within N LSN positions of each other. Trades some consistency for availability.none— Current behavior. D is refreshed whenever its own schedule says so, regardless of upstream frontier alignment.
-- Per-ST configuration
SELECT pgtrickle.create_stream_table(
'order_avg_by_region',
'SELECT ...',
consistency_mode => 'strict' -- or 'bounded', 'none'
);
4.2.3 Pros & Cons
| Pros | Cons |
|---|---|
| No rollback needed — just skip | D may be delayed indefinitely if one path keeps failing |
| Fine-grained per-ST control | Requires frontier comparison across STs |
| Low implementation complexity | Does not prevent the inconsistency — only prevents D from observing it |
| Works with existing frontier infrastructure | “Bounded” mode is hard to reason about |
4.3 Option 3 — Unified Transaction for Diamond Subgraphs
Core idea: Execute the entire diamond (B, C, and D) in a single PostgreSQL transaction. If any step fails, the entire transaction rolls back — which means all storage tables revert to their pre-refresh state.
4.3.1 Implementation
The scheduler already runs inside BackgroundWorker::transaction(). Today
it refreshes each ST in a separate transaction. The change:
// Current: each ST gets its own implicit transaction boundary
for node_id in &ordered {
execute_scheduled_refresh(&st, action);
// Implicit commit after each refresh
}
// Proposed: group diamond-related STs in one transaction
for group in &diamond_groups {
BackgroundWorker::transaction(AssertUnwindSafe(|| {
for st in &group.members_in_topo_order {
execute_single_refresh(st)?;
}
// All succeed → commit
// Any failure → entire transaction rolls back
}));
}
4.3.2 Pros & Cons
| Pros | Cons |
|---|---|
| Atomic — D, B, C are all-or-nothing | Longer transactions hold locks longer |
| Simplest correctness argument | A failure in C rollbacks B’s already-successful work |
| No new catalog columns or GUCs needed | Could cause contention with manual refreshes |
| Large diamond groups could produce very long transactions | |
| Change buffer cleanup becomes more complex (deferred across group) |
4.4 Option 4 — Version-Stamped Refresh with Deferred Convergence
Core idea: Let B and C refresh independently (current behavior), but stamp each ST’s storage with the version of each transitive source. When D is refreshed, construct its delta query so that it only reads from B and C the rows that correspond to the same source version.
4.4.1 Implementation
Add a __pgt_source_versions JSONB column to each stream table’s storage:
ALTER TABLE order_totals_by_region
ADD COLUMN __pgt_source_versions JSONB;
Each row in B carries metadata like:
json
{"source_a_lsn": "0/1A2B3C4"}
When D joins B and C, the delta query includes a predicate:
sql
WHERE b.__pgt_source_versions->>'source_a_lsn'
= c.__pgt_source_versions->>'source_a_lsn'
This ensures D only combines rows from B and C that originate from the same version of A.
4.4.2 Pros & Cons
| Pros | Cons |
|---|---|
| B and C refresh independently (no blocking) | Significant storage overhead (JSONB per row) |
| D is always consistent (predicate-guaranteed) | Complex query rewriting — every delta must propagate version columns |
| No coordinator / epoch logic | JSONB comparison in joins is expensive |
| Naturally extends to deeper diamond chains | Fundamentally changes the storage schema of every ST |
| Difficult to retrofit into existing installations |
4.5 Option 5 — Logical Clock / Lamport Timestamps
Core idea: Assign each change event a Lamport timestamp (logical clock) and propagate it through the DAG. Each ST’s refresh advances its logical clock. D only reads from B and C when their logical clocks are “compatible” (both reflect the same causal set from A).
4.5.1 Implementation
struct LogicalClock {
/// Per-source logical timestamps (monotonically increasing).
clocks: HashMap<u32, u64>,
}
impl LogicalClock {
/// Advance the clock for a given source.
fn advance(&mut self, source_oid: u32) {
let entry = self.clocks.entry(source_oid).or_insert(0);
*entry += 1;
}
/// Check if this clock dominates another (all components >=).
fn dominates(&self, other: &LogicalClock) -> bool {
other.clocks.iter().all(|(k, v)| {
self.clocks.get(k).map_or(false, |mine| mine >= v)
})
}
}
Each ST maintains a LogicalClock alongside its Frontier. On refresh:
1. B’s clock advances to {A: 42} after consuming A’s changes.
2. C’s clock advances to {A: 42} after consuming A’s changes.
3. D checks: does B’s clock and C’s clock both have A >= 42? If yes,
refresh. If not, defer.
This is essentially a formalization of Option 4.2 (Frontier Alignment) using vector clock theory.
4.5.2 Pros & Cons
| Pros | Cons |
|---|---|
| Mathematically clean — vector clock theory is well-understood | Adds complexity to frontier tracking |
| Generalizes to arbitrary DAG shapes | Essentially equivalent to Option 4.2 with more formalism |
| Allows partial-order reasoning (bounded staleness) | Still requires skipping D on misalignment |
| Composable with other distributed systems concepts | May be over-engineered for single-node PostgreSQL |
5. Recommendation
Decision: Option 1 — Epoch-Based Atomic Refresh Groups
Rationale:
Strongest guarantee. By grouping related STs and executing them atomically, D never observes an inconsistent state. This aligns with the DVS principle that each ST should be equivalent to evaluating its query at a single point in time.
Leverages existing infrastructure. PostgreSQL’s SAVEPOINT mechanism provides exactly the rollback semantics needed. The scheduler already processes STs in topological order within a transaction.
Backward compatible. STs not involved in diamond dependencies continue to refresh independently (each forms a group of size 1).
Operationally sound. If one member of the group fails, the group is skipped and retried next cycle — matching existing retry/backoff behavior.
Option 2 not needed. Frontier alignment is fully subsumed: within an atomic group B, C, and D are committed together, so misaligned frontiers are structurally impossible. Option 2 is not implemented as a user-facing mode.
Configuration
Only two modes are exposed:
-- Global default
SET pg_trickle.diamond_consistency = 'atomic'; -- or 'none'
-- Per-ST override
SELECT pgtrickle.alter_stream_table(
'order_avg_by_region',
diamond_consistency => 'atomic'
);
| Mode | Behavior |
|---|---|
'none' |
Current behavior. No cross-path consistency. (Default in v0.x for backward compat.) |
'atomic' |
Epoch-based atomic groups. Related STs refresh atomically or not at all. (Recommended.) |
6. Implementation Plan
Implementation order follows strict dependency: each step builds on the
previous one. Steps 1–3 have no database dependency and can be validated
with just test-unit. Steps 4–6 require an integration/E2E environment.
Step 1 — Define data structures in dag.rs (no DB) ✅ DONE
Goal: Represent diamonds and consistency groups as first-class types.
Add
Diamondstruct:rust /// A detected diamond in the ST dependency graph. pub struct Diamond { /// The fan-in ST that joins two or more upstream paths. pub convergence: NodeId, /// Base tables / STs that are the shared root(s) of the paths. pub shared_sources: Vec<NodeId>, /// Intermediate STs on all paths from shared_sources to convergence. pub intermediates: Vec<NodeId>, }Add
ConsistencyGroupstruct:rust /// A set of STs that must refresh atomically. pub struct ConsistencyGroup { /// All members, in topological order, including the convergence ST. pub members: Vec<NodeId>, /// The fan-in ST(s) that required this group. pub convergence_points: Vec<NodeId>, /// Monotonically increasing counter; advances on every successful /// group refresh. pub epoch: u64, }Add an
is_singleton(&self) -> boolhelper onConsistencyGroupthat returnstruewhenmembers.len() == 1— used to short-circuit the SAVEPOINT overhead for non-diamond STs.
Validation: just fmt && just lint must pass with zero warnings.
Step 2 — Implement diamond detection in dag.rs (no DB) ✅ DONE
Goal: A pure function that detects all diamonds in a given StDag and
returns a list of Diamond values.
- Add
pub fn detect_diamonds(dag: &StDag) -> Vec<Diamond>toStDag:
detect_diamonds(dag):
diamonds = []
for each ST node D where |ST-only upstream deps| >= 2:
paths = all_paths_to_roots(D, ST-only edges)
for each pair (P1, P2) in paths:
shared = ancestors(P1) ∩ ancestors(P2)
if shared ≠ ∅:
intermediates = (nodes(P1) ∪ nodes(P2)) \ {D} \ shared
diamonds.push(Diamond {
convergence: D,
shared_sources: shared,
intermediates: intermediates,
})
return merge_overlapping(diamonds)
Add
merge_overlapping(diamonds: Vec<Diamond>) -> Vec<Diamond>: combine any twoDiamondvalues whoseintermediatessets overlap into a singleDiamond. This handles nested and multi-diamond DAGs correctly.Complexity: $O(V \cdot E)$ worst case; acceptable because DAGs are shallow. Cache the result alongside the DAG (rebuild only when
DAG_REBUILD_SIGNALadvances — see Step 5).Write unit tests (all in
src/dag.rsunder#[cfg(test)]):Test name Scenario test_detect_diamonds_simpleA→B→D, A→C→D — one diamond detected test_detect_diamonds_deepA→B→E→D, A→C→D — 3-level path test_detect_diamonds_noneLinear A→B→C — no diamond test_detect_diamonds_overlappingTwo diamonds sharing an intermediate test_detect_diamonds_multiple_rootsB and C have different root tables — not a diamond
Validation: just test-unit must pass.
Step 3 — Implement consistency group computation in dag.rs (no DB) ✅ DONE
Goal: Convert the list of Diamond values into a list of
ConsistencyGroup values that the scheduler can iterate.
Add
pub fn compute_consistency_groups(dag: &StDag) -> Vec<ConsistencyGroup>:- Call
detect_diamonds(dag)to get raw diamonds. - For each diamond, create a group with
members = intermediates ∪ {convergence}sorted in topological order (so the convergence ST is always last). - Merge groups whose member sets overlap (transitive closure — handles nested diamonds and multi-convergence DAGs).
- For every ST not in any group, emit a singleton
ConsistencyGroup(members = [st]) so the scheduler can use a uniform loop.
- Call
Expose
pub fn consistency_groups(&self) -> &[ConsistencyGroup]onStDag(cached after DAG rebuild; invalidated on signal).Write unit tests:
Test name Scenario test_groups_simple_diamondB and C in one group, D as convergence test_groups_singleton_non_diamondLinear ST gets a size-1 group test_groups_nested_mergeTwo overlapping diamonds merged into one group test_groups_independent_diamondsTwo independent diamonds → two separate groups
Validation: just test-unit must pass.
Step 4 — Add catalog column and GUC in catalog.rs / config.rs
Goal: Persist the per-ST diamond_consistency setting and expose the
global GUC. No scheduler logic yet.
In
config.rs: addpg_trickle.diamond_consistencyGUC of typeenumwith values'none'(default, backward-compatible) and'atomic'. Define aDiamondConsistencyRust enum mirroring these values.In
catalog.rs: add adiamond_consistency TEXT NOT NULL DEFAULT 'none'column topgtrickle.pgt_stream_tablesvia a migration SQL block. Addget_diamond_consistency(oid) -> DiamondConsistencyandset_diamond_consistency(oid, DiamondConsistency)helpers.Update
create_stream_table()inapi.rsto accept an optionaldiamond_consistency => 'atomic'parameter; store it via the catalog helper. Default to the GUC value when not supplied.Update
alter_stream_table()inapi.rssimilarly.Write a catalog integration test:
Test name Scenario test_catalog_diamond_consistency_defaultNewly created ST defaults to GUC value test_catalog_diamond_consistency_overridePer-ST override round-trips correctly
Validation: just fmt && just lint && just test-integration.
Step 5 — Wire consistency groups into the scheduler (scheduler.rs)
Goal: Replace the flat for node in topological_order loop with a
group-aware loop that uses SAVEPOINT for atomic groups.
After the DAG rebuild step, call
dag.consistency_groups()to obtain the current group list. Store it in the scheduler’s tick state.Replace the existing per-ST refresh loop with:
for group in dag.consistency_groups() {
if group.is_singleton() {
// Fast path: no SAVEPOINT overhead for non-diamond STs.
execute_single_refresh(&group.members[0], &mut ctx)?;
continue;
}
// Check that every member has diamond_consistency = 'atomic'.
// If any member opts out, fall back to independent refreshes.
let all_atomic = group.members.iter()
.all(|m| catalog::get_diamond_consistency(m) == DiamondConsistency::Atomic);
if !all_atomic {
for member in &group.members {
execute_single_refresh(member, &mut ctx)?;
}
continue;
}
Spi::run("SAVEPOINT pgt_consistency_group")?;
let mut group_ok = true;
for member in &group.members { // already in topo order
if let Err(e) = execute_single_refresh(member, &mut ctx) {
log!("diamond group rollback: member {:?} failed: {}", member, e);
group_ok = false;
break;
}
}
if group_ok {
Spi::run("RELEASE SAVEPOINT pgt_consistency_group")?;
group.advance_epoch(); // see Step 3
} else {
Spi::run("ROLLBACK TO SAVEPOINT pgt_consistency_group")?;
// All members retain their pre-tick state; retry next cycle.
record_group_skip(&group, &mut ctx);
}
}
Update
record_refresh_outcome()(or equivalent) to log the group epoch intopgt_refresh_historyso operators can trace which tick a group successfully committed.Confirm that the
BackgroundWorkertransaction wrapping in the scheduler tick is outside the group loop — each tick is already one outer transaction; SAVEPOINTs nest inside it correctly.
Validation: just fmt && just lint. Do not run E2E yet (needs Step 6).
Step 6 — Add monitoring SQL function (api.rs)
Goal: Give operators visibility into detected diamond groups.
Add
pgtrickle.diamond_groups()returningTABLE(group_id INT, member_name TEXT, member_schema TEXT, is_convergence BOOL, epoch BIGINT). Reads the in-memoryconsistency_groups()list via SPI or a lightweight shared-state slot.Add
pgtrickle.explain_st(name TEXT)output to include adiamond_group_idcolumn (or extend the existing function if it exists).
Validation: just fmt && just lint.
Step 7 — E2E tests
Add to tests/e2e_diamond_tests.rs:
| Test name | Type | What it proves |
|---|---|---|
test_diamond_atomic_all_succeed |
E2E | B and C both succeed → D refreshed, epoch advances |
test_diamond_atomic_partial_fail |
E2E | B succeeds, C fails → ROLLBACK TO SAVEPOINT, D unchanged, B rolled back |
test_diamond_atomic_convergence |
E2E | After retry, B+C succeed → D correct and consistent |
test_diamond_none_mode_unblocked |
E2E | With diamond_consistency='none', C failure does not roll back B |
test_diamond_linear_unaffected |
E2E | Linear chain A→B→C refreshes independently (no SAVEPOINT overhead) |
test_diamond_nested_merge |
E2E | Nested diamond: all intermediates grouped, D only refreshes when all succeed |
test_diamond_groups_sql_function |
E2E | pgtrickle.diamond_groups() returns correct membership |
Validation: just test-e2e.
Step 8 — Documentation
- Add
diamond_consistencyparameter tocreate_stream_table()andalter_stream_table()indocs/SQL_REFERENCE.md. - Add
pg_trickle.diamond_consistencyGUC entry todocs/CONFIGURATION.md. - Add a “Diamond Dependency Consistency” section to
docs/ARCHITECTURE.mdexplaining the problem, the atomic group solution, and the interaction with topological ordering. - Update
CHANGELOG.md.
Validation: just fmt && just lint && just test-all.
Recommended Implementation Order Summary
| Step | File(s) | Depends on | Testable with | Status |
|---|---|---|---|---|
| 1 — Data structures | dag.rs |
— | just lint |
✅ Done |
| 2 — Diamond detection | dag.rs |
Step 1 | just test-unit |
✅ Done |
| 3 — Consistency groups | dag.rs |
Step 2 | just test-unit |
✅ Done |
| 4 — Catalog + GUC | catalog.rs, config.rs, api.rs, lib.rs |
Step 3 | just test-integration |
✅ Done |
| 5 — Scheduler wiring | scheduler.rs |
Steps 3, 4 | just lint |
✅ Done |
| 6 — Monitoring function | api.rs |
Steps 3, 5 | just lint |
✅ Done |
| 7 — E2E tests | tests/e2e_diamond_tests.rs |
Steps 5, 6 | just test-e2e |
✅ Done |
| 8 — Documentation | docs/, CHANGELOG.md |
Steps 1–7 | just test-all |
✅ Done |
What Was Implemented (2026-03-02)
Steps 1–3 completed in src/dag.rs:
Diamondstruct — represents a detected diamond with convergence node, shared sources, and intermediate STs.ConsistencyGroupstruct — represents an atomic refresh group with topologically-ordered members, convergence points, and epoch counter. Includesis_singleton()andadvance_epoch()helpers.StDag::detect_diamonds()— walks all fan-in ST nodes, computes transitive ancestor sets per upstream branch, finds shared ancestors, and merges overlapping diamonds.StDag::compute_consistency_groups()— converts diamonds into scheduler-ready groups in topological order, merges overlapping groups transitively, and emits singleton groups for non-diamond STs.- Private helpers:
collect_ancestors()(recursive upstream walk),merge_overlapping_diamonds()(union-find-style diamond merging).
12 new unit tests added and passing:
test_detect_diamonds_simple, test_detect_diamonds_deep,
test_detect_diamonds_none_linear, test_detect_diamonds_multiple_roots_no_diamond,
test_detect_diamonds_overlapping, test_groups_simple_diamond,
test_groups_singleton_non_diamond, test_groups_independent_diamonds,
test_groups_nested_merge, test_consistency_group_epoch_advance,
test_consistency_group_is_singleton.
All 965 unit tests pass. just fmt && just lint clean.
What Was Implemented (Steps 4–7)
Steps 4–7 completed across multiple files:
Step 4 — Catalog + GUC:
- DiamondConsistency enum in dag.rs — None / Atomic variants
with as_str(), from_sql_str(), Display.
- PGS_DIAMOND_CONSISTENCY GUC in config.rs — string GUC defaulting
to "none", registered as pg_trickle.diamond_consistency.
- diamond_consistency column added to pgtrickle.pgt_stream_tables
DDL in lib.rs — TEXT NOT NULL DEFAULT 'none' CHECK (... IN ('none', 'atomic')).
- StreamTableMeta.diamond_consistency field in catalog.rs — added to
struct, all SELECT queries, from_spi_table, from_spi_heap_tuple,
insert(). New helpers: get_diamond_consistency(), set_diamond_consistency().
- create_stream_table() now accepts optional diamond_consistency param
(defaults to GUC value).
- alter_stream_table() now accepts optional diamond_consistency param.
Step 5 — Scheduler wiring:
- Replaced flat for node in ordered loop with group-aware loop using
dag.compute_consistency_groups().
- Singleton groups use fast path via refresh_single_st() (no SAVEPOINT).
- Multi-member groups check that all members have diamond_consistency = 'atomic'.
If not, fall back to independent refreshes.
- Atomic groups: SAVEPOINT pgt_consistency_group → refresh each member →
RELEASE SAVEPOINT on success, ROLLBACK TO SAVEPOINT on any failure.
- Added refresh_single_st() helper to avoid code duplication.
Step 6 — Monitoring function:
- pgtrickle.diamond_groups() SQL function returning TABLE(group_id,
member_name, member_schema, is_convergence, epoch). Builds DAG on-demand,
computes groups, skips singletons, and resolves ST names from catalog.
Step 7 — E2E tests:
- Created tests/e2e_diamond_tests.rs with 8 test cases:
test_diamond_consistency_default, test_diamond_consistency_create_atomic,
test_diamond_consistency_alter, test_diamond_groups_sql_function,
test_diamond_linear_unaffected, test_diamond_none_mode_no_groups,
test_diamond_atomic_all_succeed, test_diamond_consistency_in_catalog_view.
4 new unit tests for DiamondConsistency enum:
test_diamond_consistency_as_str, test_diamond_consistency_from_sql_str,
test_diamond_consistency_display, test_diamond_consistency_roundtrip.
All 969 unit tests pass. just fmt && just lint clean.
What Was Implemented (Step 8 — Documentation)
SQL_REFERENCE.md:
- Added diamond_consistency (6th) parameter to create_stream_table() signature, parameter table, and description.
- Added diamond_consistency parameter to alter_stream_table() signature and parameter table.
- Added full pgtrickle.diamond_groups() function documentation with return columns, example output, and usage notes.
CONFIGURATION.md:
- Added pg_trickle.diamond_consistency GUC section with value table, description, SQL examples, and usage notes.
- Added pg_trickle.diamond_consistency = 'none' to the Complete postgresql.conf Example.
ARCHITECTURE.md:
- Added section 13 “Diamond Dependency Consistency” covering the problem, detection algorithm, consistency groups, scheduler SAVEPOINT wiring, and monitoring.
- Added diamond_groups to Monitoring section’s function list.
- Added pg_trickle.diamond_consistency row to the GUC quick reference table.
CHANGELOG.md:
- Added diamond dependency consistency feature under [Unreleased] → ### Added.
Prioritized Remaining Work
All 8 steps are complete. No remaining work for the initial implementation.
Potential future enhancements (not prioritized):
- Parallel refresh of diamond group members (see §7.1).
- Expose diamond_group_id in pgtrickle.explain_st() output.
- Cache consistency groups in shared memory for large deployments (1000+ STs).
All 13 implementation steps are now complete (Steps 1–8: core diamond detection
and atomic refresh; Steps 9–13: diamond_schedule_policy per-convergence-node
with GUC fallback, scheduler wiring, documentation, unit tests, and E2E tests).
7. Alternatives Considered but Not Recommended
7.1 Parallel Refresh of Diamond Members
Instead of sequential processing within a group, refresh B and C in
parallel (separate background workers). This would reduce latency but:
- PostgreSQL’s max_worker_processes budget is limited.
- Parallel transactional coordination within one extension is complex.
- The rollback semantics become much harder (two-phase commit within one PG
instance is unnecessary complexity).
Verdict: Defer to a future “parallel refresh” feature if scaling demands justify it.
7.2 Event Sourcing / Change Log with Global Ordering
Maintain a global ordered log of all changes across all sources, and have each ST consumer track a global sequence number. This is the Kafka/event sourcing approach.
Verdict: Over-engineered for embedded PostgreSQL. We already have WAL LSN ordering per source; adding a global ordering layer would duplicate what PostgreSQL already provides and add significant complexity.
7.3 Optimistic Execution with Rollback Detection
Let D refresh optimistically. After D refreshes, check if its inputs were consistent. If not, mark D as “dirty” and re-refresh on the next cycle.
Verdict: This means D temporarily holds incorrect data, which violates the DVS guarantee and could trigger downstream cascading inconsistencies.
8. Impact on Existing Features
8.1 Cascading Stream Tables
Cascading ST-on-ST dependencies already exist and work via topological ordering. The diamond consistency feature extends this by adding fan-in awareness. Non-diamond cascades (linear chains) are unaffected.
8.2 Immediate IVM (Transactional Mode)
The planned immediate IVM mode (PLAN_TRANSACTIONAL_IVM.md) inherently avoids this problem because changes propagate within a single transaction. Diamond consistency is a deferred-mode-only concern.
8.3 Circular References (SCCs)
The SCC-based refresh logic in PLAN_CIRCULAR_REFERENCES.md handles cycles via fixed-point iteration. A diamond that is also part of a cycle would need the SCC logic to account for consistency groups. The recommended approach: treat the entire SCC as one consistency group (which the SCC logic already effectively does by iterating all members together).
8.4 Manual Refresh
pgtrickle.refresh_stream_table('D') already refreshes upstream STs in
topological order. Adding frontier alignment to the manual refresh path
ensures consistency for diamonds even in manual mode.
9. Prior Art & Related Work
The diamond dependency consistency problem — ensuring that a downstream operator that joins two or more independently computed intermediate results sees a coherent snapshot rather than a “split view” — is a well-known challenge in dataflow and streaming systems. This section surveys how the problem manifests and is addressed across the landscape.
9.1 The Problem in Context
Jamie Brandon’s “An opinionated map of incremental and streaming systems” (2021) provides the clearest framing. He distinguishes internally consistent systems (which always return an output that is the correct result for some present or past input) from internally inconsistent ones:
“An internally inconsistent system might return outputs that are not the correct result for any of the past inputs. Perhaps they … combined part of one input with part of another, or finished processed some update down one path in the graph but not yet down another.” — scattered-thoughts.net, 2021
The last clause is precisely the diamond problem. Brandon observes that all high-temporal-locality systems (Flink, Kafka Streams, ksqlDB, Spark Structured Streaming) permit internal inconsistency under various scenarios, whereas low-temporal-locality systems built on explicit logical time (Differential Dataflow, Materialize) are internally consistent by construction. pg_trickle occupies a middle ground — it is a periodic-refresh IVM system (not a continuous streaming system), so it uses per-refresh epoch boundaries rather than continuous logical timestamps to enforce consistency within diamond groups.
9.2 Foundational Theory
Chandy-Lamport Snapshots (1985) — The seminal algorithm for consistent global snapshots in distributed systems via marker messages. Inspiration for our epoch-based approach where the “epoch” acts as a marker that flows through the dependency graph. When all paths within a consistency group complete under the same SAVEPOINT, the system has effectively taken a Chandy-Lamport snapshot of the group.
Vector Clocks (Mattern, 1989; Fidge, 1988) — Logical timestamps for causality tracking in distributed systems. Directly applicable to Option 4.5 (per-path version vectors) in our options analysis. Our chosen Option 1 trades the fine-grained per-event tracking of vector clocks for a simpler epoch-based grouping that is sufficient for periodic refresh.
9.3 Streaming Dataflow Systems
DBSP (Budiu, McSherry, Ryzhyk & Tannen, 2022) — Formalized as synchronous circuits where all operators process the same logical timestamp in lock-step. The circuit advances one discrete step at a time, computing the entire fixed-point before producing output. This eliminates the diamond problem by construction — there is no window in which one path has advanced while another has not. Feldera is the commercial implementation. Our epoch-based atomic groups achieve a similar effect within PostgreSQL’s transactional model: the SAVEPOINT boundary corresponds to a DBSP “step”, ensuring all group members advance atomically. Ref: arXiv:2203.16684; VLDB 2023
Differential Dataflow (McSherry et al., 2013) — Frontier-based progress tracking with capabilities. An operator only advances its output frontier when all input frontiers have advanced past a timestamp. This provides automatic diamond consistency: a join operator at the confluence of two paths will not emit results at time t until both paths have delivered all updates through time t. Direct inspiration for Options 4.2 and 4.5 in our analysis.
Timely Dataflow (Murray et al., SOSP 2013) — Pointstamp-based progress tracking in a distributed dataflow system. The
can_advancepredicate at each operator answers “could I still receive data at or before timestamp t?” — analogous to our frontier alignment check. Timely’s progress protocol is the mechanism that enables Differential Dataflow’s consistency guarantees.Apache Flink — Uses checkpoint barriers (inspired by Chandy-Lamport) that flow through the dataflow graph. At operators with multiple inputs, barriers must be aligned: the operator buffers data from the faster input until the barrier arrives on all inputs, ensuring that the checkpoint captures a consistent cut across all paths. This is structurally identical to the diamond problem — Flink’s barrier alignment ensures that a join operator’s snapshot does not mix pre-barrier data from one path with post-barrier data from another. Flink also offers unaligned checkpoints (since 1.11) which trade off consistent cuts for lower latency by including in-flight data in the snapshot. Our approach is analogous to Flink’s aligned barriers, but at the granularity of per-group refresh rather than per-record processing. Ref: Carbone et al., “Lightweight Asynchronous Snapshots for Distributed Dataflows”, arXiv:1506.08603
Noria / Readyset (Gjengset et al., OSDI 2018) — A partially-stateful dataflow system that takes an interesting different approach. Noria uses upqueries — on-demand backward traversal of the dataflow graph to fill in missing state. For multi-path (diamond) topologies, upqueries from a join node must reconcile state from multiple parents that may be at different points in time. Noria addresses this by processing updates through all paths before making results visible at downstream operators. The Readyset commercial system (née Noria) describes itself as using “partially-stateful streaming dataflow” for incrementally maintaining SQL query result sets. Ref: “Noria: Dynamic, Partially-Stateful Data-Flow for High-Performance Web Applications”, OSDI 2018
9.4 Streaming Databases
Materialize (materialize.com) — The most relevant commercial system. Built on Differential Dataflow and Timely Dataflow, Materialize enforces internal consistency as a core guarantee. Frank McSherry and Natacha Crooks’s 2020 blog post defines internal consistency formally:
“A view is internally consistent at time t if it reflects all events that have a timestamp smaller or equal to t, and no events that have a timestamp greater than t.”
For hierarchies of views (exactly the diamond pattern), Materialize ensures that a downstream view’s output at time t incorporates all upstream changes through time t across every path. The key mechanisms are: (a) logical timestamps assigned at ingestion, (b) frontier tracking that prevents a view from advancing until all inputs have caught up, and © timestamp closing — the ability to determine when no more data will arrive at a given timestamp, even for idle sources.
Materialize also distinguishes causal dependencies (event A caused event B) from atomic dependencies (events A and B should appear simultaneously, e.g. within a transaction). Our
diamond_consistency = 'atomic'mode enforces atomic dependencies within consistency groups. Ref: “Consistency in Streaming Systems”, materialize.com/blog/consistency/RisingWave — A cloud-native streaming database that maintains materialized views incrementally. RisingWave uses a barrier-based checkpoint mechanism similar to Flink’s for consistency across its dataflow operators. Barriers propagate through the operator graph and must align at join points, ensuring multi-path consistency. Ref: docs.risingwave.com
9.5 How pg_trickle Compares
| System | Consistency mechanism | Granularity | Diamond handling |
|---|---|---|---|
| DBSP / Feldera | Synchronous stepping | Per-circuit-step | By construction |
| Differential Dataflow | Frontier tracking | Per-timestamp | By construction |
| Timely Dataflow | Pointstamp progress | Per-timestamp | By construction |
| Flink | Barrier alignment | Per-checkpoint | Barrier buffering |
| Noria / Readyset | Upquery reconciliation | Per-query | On-demand |
| Materialize | Timestamp frontiers | Per-timestamp | By construction |
| RisingWave | Barrier checkpoints | Per-barrier | Barrier alignment |
| pg_trickle | Epoch + SAVEPOINT | Per-refresh-group | Atomic rollback |
pg_trickle’s approach is pragmatic and fits naturally within PostgreSQL’s transactional model. Rather than implementing a full logical-timestamp system (which would require fundamental changes to how PostgreSQL processes queries), we:
- Detect diamond topologies statically via DAG analysis
(
detect_diamonds,compute_consistency_groups) - Group affected stream tables into consistency groups
- Execute each group’s refresh within a single SAVEPOINT, so either all members advance or all roll back
- Configure the trade-off via
diamond_consistency(none/atomic) and the planneddiamond_schedule_policy(fastest/slowest)
This is most similar to Flink’s barrier alignment approach, adapted from a continuous-streaming context to a periodic-refresh context, and implemented using PostgreSQL savepoints rather than in-memory barrier buffers.
10. Open Questions (Resolved)
Granularity of groups: Should the group include D itself, or only the intermediate STs (B, C)? Including D makes the atomic guarantee stronger (D’s old state is preserved on failure) but increases the group size and lock duration. Decision: Include D in the group. ✅ Implemented in Steps 1–3.
Nested diamonds: If D is itself part of another diamond (D→E, D→F→G, with G joining E and F), should groups be merged transitively? Decision: Yes — compute the transitive closure of shared-ancestor relationships and merge overlapping groups. ✅ Implemented in Step 3 (
merge_overlapping_diamonds).Performance impact of group detection: The algorithm runs during DAG rebuild. For typical deployments (10-50 STs), this is negligible. For large deployments (1000+ STs), we may need to cache the result. Decision: Cache consistency groups alongside the DAG; rebuild only when
DAG_REBUILD_SIGNALadvances. ✅ Already the case — groups are computed from the cached DAG which is rebuilt only on signal.Interaction with cron schedules: If B has a
*/5 * * * *schedule and C has a*/10 * * * *schedule, how should the group decide when to refresh? Decision: Per-convergence-nodediamond_schedule_policycolumn (Option A), with a cluster-widepg_trickle.diamond_schedule_policyGUC as the default. The convergence node (the fan-in ST, i.e. D) owns the policy because the group exists to serve D’s freshness requirements. B and C are intermediate nodes and don’t set the policy. Two values:'fastest'(default) — group refreshes when any member is due. Over-delivers on freshness for slower members. Correct default because users who opt intoatomicmode prioritize consistency over efficiency.'slowest'— group refreshes when all members are due. Conserves resources but under-delivers freshness for faster members. The policy only applies whendiamond_consistency = 'atomic'; in'none'mode each ST uses its own schedule independently. SQL API mirrorsdiamond_consistency— set it on the convergence node at create or alter time:sql SELECT pgtrickle.create_stream_table('order_avg_by_region', '...', diamond_consistency => 'atomic', diamond_schedule_policy => 'slowest');The scheduler reads the policy fromConsistencyGroup.convergence_points(already computed bycompute_consistency_groups). When multiple convergence points exist, the strictest policy (slowest > fastest) wins. ⬜ Implementation plan in §11 below.
Monitoring: How do users know their STs are in a diamond group? Decision: Expose via
pgtrickle.diamond_groups()SQL function and show inpgtrickle.explain_st()output. ✅diamond_groups()implemented in Step 6.explain_st()enhancement deferred to future work.
11. Implementation Plan: Diamond Schedule Policy (Q4)
A new per-convergence-node catalog column and matching GUC default to control when a consistency group’s refresh fires relative to its members' individual schedules.
Design: Option A — Per-convergence-node
The convergence node (D — the fan-in ST) owns the policy because the group exists to serve D’s freshness requirements. Intermediate nodes (B, C) do not set the policy. The GUC provides the cluster-wide default used when the catalog column is not set explicitly.
-- Intermediate nodes: set consistency only
SELECT pgtrickle.create_stream_table('order_totals_by_region', '...',
diamond_consistency => 'atomic');
SELECT pgtrickle.create_stream_table('order_counts_by_region', '...',
diamond_consistency => 'atomic');
-- Convergence node: also sets schedule policy
SELECT pgtrickle.create_stream_table('order_avg_by_region', '...',
diamond_consistency => 'atomic',
diamond_schedule_policy => 'slowest');
When multiple convergence points exist (nested diamonds), the strictest policy
(slowest > fastest) wins.
| Policy | Trigger condition | Trade-off |
|---|---|---|
'fastest' (default) |
any(member, is_due) |
Higher freshness, more refreshes |
'slowest' |
all(member, is_due) |
Lower resource cost, staler data |
Only takes effect when diamond_consistency = 'atomic'. In 'none' mode,
each ST uses its own schedule independently — the policy is irrelevant.
Step 9 — Add enum, GUC fallback, and catalog column
Files: src/dag.rs, src/config.rs, src/lib.rs, src/catalog.rs, src/api.rs
Add
DiamondSchedulePolicyenum indag.rs: ```rust[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DiamondSchedulePolicy { #[default] Fastest, Slowest, }
impl DiamondSchedulePolicy { pub fn from_str(s: &str) -> Option
In
src/config.rsadd GUC fallbackPGS_DIAMOND_SCHEDULE_POLICY(default"fastest") and register in_PG_init()aspg_trickle.diamond_schedule_policy. Add accessor:rust pub fn pg_trickle_diamond_schedule_policy() -> DiamondSchedulePolicy { PGS_DIAMOND_SCHEDULE_POLICY .get() .and_then(|s| DiamondSchedulePolicy::from_str(s.to_str().ok()?)) .unwrap_or_default() }In
src/lib.rsDDL, add topgtrickle.pgt_stream_tables:sql diamond_schedule_policy TEXT NOT NULL DEFAULT 'fastest' CHECK (diamond_schedule_policy IN ('fastest', 'slowest')),In
src/catalog.rs, adddiamond_schedule_policy: DiamondSchedulePolicytoStreamTableRecord, update all SELECT/INSERT/UPDATE queries, and add:rust pub fn set_diamond_schedule_policy( pgt_id: i64, policy: DiamondSchedulePolicy, ) -> Result<(), PgTrickleError>In
src/api.rs, adddiamond_schedule_policy: Option<String>parameter tocreate_stream_tableandalter_stream_table, validated and persisted viacatalog::set_diamond_schedule_policy. Only the convergence node needs it; setting it on B or C is allowed but has no effect.
Validation: just fmt && just lint.
Step 10 — Wire group schedule evaluation into the scheduler
Files: src/scheduler.rs
Add a helper that resolves the effective policy for a group by reading the catalog value from each convergence node and taking the strictest:
rust fn group_schedule_policy(group: &ConsistencyGroup) -> DiamondSchedulePolicy { let guc_default = config::pg_trickle_diamond_schedule_policy(); group.convergence_points.iter().fold(guc_default, |acc, node| { if let NodeId::StreamTable(id) = node { load_st_by_id(*id) .map(|st| acc.stricter(st.diamond_schedule_policy)) .unwrap_or(acc) } else { acc } }) }Add
is_group_duehelper:rust fn is_group_due( group: &ConsistencyGroup, policy: DiamondSchedulePolicy, now_ms: i64, ) -> bool { let member_due: Vec<bool> = group.members.iter().filter_map(|m| { if let NodeId::StreamTable(id) = m { load_st_by_id(*id).map(|st| check_schedule(&st, now_ms) || st.needs_reinit) } else { None } }).collect(); match policy { DiamondSchedulePolicy::Fastest => member_due.iter().any(|&d| d), DiamondSchedulePolicy::Slowest => member_due.iter().all(|&d| d), } }In the group-aware refresh loop, replace per-member schedule checks with:
rust let policy = group_schedule_policy(group); if !is_group_due(group, policy, now_ms) { continue; } // proceed with SAVEPOINT refresh of all membersSingleton groups continue using the existing per-STcheck_schedule()directly — no policy read needed.When the group is due, refresh all members regardless of their individual schedule status — the atomic guarantee requires all-or-nothing.
Validation: just fmt && just lint.
Step 11 — Documentation
Files: docs/CONFIGURATION.md, docs/ARCHITECTURE.md, docs/SQL_REFERENCE.md, CHANGELOG.md
CONFIGURATION.md — Add
### pg_trickle.diamond_schedule_policysection: values, default, interaction withdiamond_consistency, note that per-convergence-node value overrides GUC.ARCHITECTURE.md — Update §13 “Diamond Dependency Consistency” to describe the schedule policy, convergence-node ownership, and strictest- wins rule for nested diamonds.
SQL_REFERENCE.md — Add
diamond_schedule_policyparameter tocreate_stream_tableandalter_stream_tabletables; updatediamond_groups()output to show the effective policy per group.CHANGELOG.md — Add under
[Unreleased].
Validation: just fmt && just lint.
Step 12 — Unit tests
Files: src/dag.rs (#[cfg(test)]), src/scheduler.rs (#[cfg(test)])
| Test name | What it proves |
|---|---|
test_diamond_schedule_policy_from_str |
Parses "fastest", "slowest", rejects invalid |
test_diamond_schedule_policy_default |
Default is Fastest |
test_diamond_schedule_policy_stricter |
slowest.stricter(fastest) → slowest |
test_is_group_due_fastest_any_triggers |
With Fastest, group fires when any member is due |
test_is_group_due_slowest_all_required |
With Slowest, group fires only when all are due |
test_is_group_due_singleton_bypass |
Singleton groups bypass policy check |
test_group_policy_convergence_node_overrides_guc |
Convergence node slowest overrides GUC fastest |
test_group_policy_nested_strictest_wins |
Two convergence nodes: one fastest, one slowest → slowest |
Validation: just test-unit.
Step 13 — E2E tests
Files: tests/e2e_diamond_tests.rs
| Test name | What it proves |
|---|---|
test_diamond_schedule_policy_fastest_default |
Default GUC fastest fires when any member due |
test_diamond_schedule_policy_slowest_per_node |
Convergence node slowest waits until all due |
test_diamond_schedule_policy_overrides_guc |
Per-node value overrides cluster GUC |
test_diamond_schedule_policy_none_mode_ignored |
Policy has no effect when diamond_consistency = 'none' |
test_diamond_schedule_policy_nested_strictest_wins |
Nested diamond uses strictest convergence policy |
Validation: just test-e2e.
Implementation Order Summary
| Step | File(s) | Depends on | Testable with | Status |
|---|---|---|---|---|
| 9 — Enum + GUC + catalog + API | dag.rs, config.rs, lib.rs, catalog.rs, api.rs |
Steps 1–8 | just lint |
✅ Done |
| 10 — Scheduler wiring | scheduler.rs |
Step 9 | just lint |
✅ Done |
| 11 — Documentation | docs/, CHANGELOG.md |
Step 10 | just lint |
✅ Done |
| 12 — Unit tests | dag.rs, scheduler.rs |
Step 10 | just test-unit |
✅ Done |
| 13 — E2E tests | tests/e2e_diamond_tests.rs |
Step 10 | just test-e2e |
✅ Done |