Contents
PLAN: Non-Deterministic Function Handling
Status: Completed
Problem Statement
pg_trickle does not check the volatility of functions used in defining queries. This is a correctness gap for DIFFERENTIAL mode: volatile functions (e.g., random(), gen_random_uuid(), clock_timestamp()) produce different values on each evaluation, which breaks delta computation because the DVM engine assumes expressions are deterministic across refreshes.
How it breaks
The DVM engine computes deltas by comparing “what changed in the source” against the stored state. When a volatile function is present:
- A row is inserted into the source table → delta query evaluates
random()→ stores0.42in the stream table - On the next refresh, the merge CTE re-evaluates the same expression → gets
0.73 - The engine sees a phantom change (new value ≠ stored value) or misses real changes
- The row hash (
__pgt_row_id) may also differ, breaking row identity entirely
PostgreSQL function volatility categories
| Category | Meaning | Safe for DIFFERENTIAL? |
|---|---|---|
| IMMUTABLE | Same result for same inputs, always | ✅ Yes |
| STABLE | Same result within a single SQL statement | ⚠️ Mostly safe (but value may shift between refreshes) |
| VOLATILE | Result can change at any time, even within a statement | ❌ No — breaks delta computation |
Current state
- Volatility lookup, expression scanning, and OpTree walking are implemented in
parser.rs, includingExpr::Rawre-parsing and custom-operator lookup viapg_operator→pg_proc. - SQL value functions such as
CURRENT_TIMESTAMPare classified directly so STABLE warnings are preserved even when they round-trip throughExpr::Raw. create_stream_table()enforces volatility rules for parsedDIFFERENTIALandIMMEDIATEqueries: VOLATILE expressions are rejected and STABLE expressions emit a warning.- E2E coverage exists for volatile rejection, immutable acceptance, nested
volatile expressions in
WHERE, volatile custom operators, and STABLE warning emission. - The core roadmap scope is complete.
Implementation Plan
Part 1 — Volatility Checking Infrastructure
Step 1: Add a volatility lookup function
Create a utility function that resolves a function name to its volatility class using pg_catalog.pg_proc:
/// Look up the volatility category of a PostgreSQL function.
///
/// Returns 'i' (immutable), 's' (stable), or 'v' (volatile).
/// Returns 'v' (volatile) if the function cannot be found (safe default).
fn lookup_function_volatility(func_name: &str) -> Result<char, PgTrickleError> {
Spi::connect(|client| {
let result = client.select(
"SELECT provolatile::text FROM pg_catalog.pg_proc \
WHERE proname = $1 LIMIT 1",
None,
&[&func_name],
)?;
if let Some(row) = result.first() {
let vol: String = row.get_by_name("provolatile")?.unwrap_or("v".into());
Ok(vol.chars().next().unwrap_or('v'))
} else {
Ok('v') // Unknown function → assume volatile
}
})
}
Note on overloaded functions: PostgreSQL allows multiple functions with the same name but different argument types. The lookup should ideally resolve using the full signature (function name + argument types). For the initial implementation, matching on proname alone is acceptable — if any overload is volatile, we treat the function as volatile. A future refinement can resolve argument types from the parse tree for exact matching.
Step 2: Add a recursive Expr volatility scanner
Walk an Expr tree and collect all FuncCall nodes, then check each one:
/// Scan an Expr tree and return the "worst" volatility found.
///
/// Returns 'i' if all functions are immutable,
/// 's' if the worst is stable,
/// 'v' if any function is volatile.
fn worst_volatility(expr: &Expr) -> Result<char, PgTrickleError> {
let mut worst = 'i'; // Start optimistic
collect_volatilities(expr, &mut worst)?;
Ok(worst)
}
fn collect_volatilities(expr: &Expr, worst: &mut char) -> Result<(), PgTrickleError> {
match expr {
Expr::FuncCall { func_name, args } => {
let vol = lookup_function_volatility(func_name)?;
*worst = max_volatility(*worst, vol);
for arg in args {
collect_volatilities(arg, worst)?;
}
}
Expr::BinaryOp { left, right, .. } => {
collect_volatilities(left, worst)?;
collect_volatilities(right, worst)?;
}
// ... recurse into all Expr variants that contain sub-expressions
_ => {}
}
Ok(())
}
fn max_volatility(a: char, b: char) -> char {
// v > s > i
match (a, b) {
('v', _) | (_, 'v') => 'v',
('s', _) | (_, 's') => 's',
_ => 'i',
}
}
Part 2 — Validation at Parse Time
Step 3: Scan the OpTree for volatility after parsing
After parse_query() produces an OpTree, walk the tree and collect the worst volatility from all expressions (target list, WHERE, JOIN conditions, HAVING, GROUP BY expressions, window function arguments):
/// Walk an OpTree and return the worst volatility found in any expression.
fn tree_worst_volatility(tree: &OpTree) -> Result<char, PgTrickleError> {
let mut worst = 'i';
match tree {
OpTree::Filter { predicate, child } => {
collect_volatilities(predicate, &mut worst)?;
let child_vol = tree_worst_volatility(child)?;
worst = max_volatility(worst, child_vol);
}
OpTree::Project { expressions, child, .. } => {
for expr in expressions {
collect_volatilities(expr, &mut worst)?;
}
let child_vol = tree_worst_volatility(child)?;
worst = max_volatility(worst, child_vol);
}
// ... all other OpTree variants
_ => {}
}
Ok(worst)
}
Step 4: Enforce volatility rules
Apply the following policy in create_stream_table():
| Mode | Volatile | Stable | Immutable |
|---|---|---|---|
| DIFFERENTIAL | ❌ Reject | ⚠️ Warn | ✅ Allow |
| FULL | ⚠️ Warn | ✅ Allow | ✅ Allow |
let vol = tree_worst_volatility(&op_tree)?;
match (refresh_mode, vol) {
(RefreshMode::Differential, 'v') => {
return Err(PgTrickleError::UnsupportedOperator(
"Defining query contains volatile functions (e.g., random(), \
clock_timestamp()). Volatile functions are not supported in \
DIFFERENTIAL mode because they produce different values on \
each evaluation, breaking delta computation. \
Use FULL refresh mode instead, or replace with a \
deterministic alternative.".into(),
));
}
(RefreshMode::Differential, 's') => {
pgrx::warning!(
"Defining query contains stable functions (e.g., now(), \
current_timestamp). These return the same value within a \
single refresh but may shift between refreshes. \
Delta computation is correct within each refresh cycle."
);
}
(RefreshMode::Full, 'v') => {
pgrx::warning!(
"Defining query contains volatile functions (e.g., random()). \
Each FULL refresh will re-evaluate them, potentially producing \
different results. This is expected behavior but may be surprising."
);
}
_ => {} // Immutable or stable-in-FULL: no action
}
Part 3 — Handling Specific Built-in Functions
Some built-in functions deserve special treatment:
| Function | Volatility | Recommendation |
|---|---|---|
now(), current_timestamp |
STABLE | Allow with warning — value is consistent within a refresh |
clock_timestamp() |
VOLATILE | Reject in DIFFERENTIAL |
random() |
VOLATILE | Reject in DIFFERENTIAL |
gen_random_uuid() |
VOLATILE | Reject in DIFFERENTIAL |
nextval() |
VOLATILE | Reject in DIFFERENTIAL |
txid_current() |
STABLE | Allow with warning |
pg_backend_pid() |
STABLE | Allow with warning |
COALESCE, NULLIF, GREATEST, LEAST |
IMMUTABLE | Allow — these are safe |
No special-casing is needed beyond the general volatility lookup — PostgreSQL already classifies these correctly in pg_proc.provolatile.
Part 4 — Operator-Level Volatility (Implicit Functions)
Some SQL operators implicitly call functions with specific volatility:
- Type casts —
Expr::Raw("CAST(x AS type)"): Casts use the type’s input/output functions, which are typically IMMUTABLE. No action needed. - Operators (
=,<,+, etc.) — These resolve topg_operator→pg_proc. The underlying functions are almost always IMMUTABLE. Can be checked in a future refinement. - Aggregate functions — These are already restricted to a known set (
COUNT,SUM,AVG,MIN,MAX). All are IMMUTABLE in their accumulation. No action needed.
Decision: For the initial implementation, only check explicit FuncCall nodes. Extend to operators and casts in a future iteration if needed.
Files to Change
| File | Change |
|---|---|
src/dvm/parser.rs |
Implemented: lookup helpers, recursive scanners, raw-expression walker, and tree-level volatility checks |
src/api.rs |
Implemented: volatility enforcement during create_stream_table() |
src/error.rs |
No changes needed — UnsupportedOperator already covers this |
docs/SQL_REFERENCE.md |
Implemented: volatility behavior documented |
docs/DVM_OPERATORS.md |
Implemented: deterministic-expression note added |
README.md |
Implemented: support matrix clarified |
Unit Tests
#[test]
fn test_max_volatility_ordering() {
assert_eq!(max_volatility('i', 'i'), 'i');
assert_eq!(max_volatility('i', 's'), 's');
assert_eq!(max_volatility('s', 'v'), 'v');
assert_eq!(max_volatility('v', 'i'), 'v');
}
#[test]
fn test_worst_volatility_immutable_only() {
let expr = Expr::FuncCall {
func_name: "lower".to_string(),
args: vec![Expr::ColumnRef { table_alias: None, column_name: "name".into() }],
};
// Would need pg backend or mock — see note below
}
Note: lookup_function_volatility requires SPI (live PostgreSQL). Unit tests for the volatility scanner should either:
- Test the max_volatility() and collect_volatilities() helpers with mock data
- Use e2e tests for the full integration path
E2E Tests
#[tokio::test]
async fn test_volatile_function_rejected_in_differential() {
let db = E2eDb::new().await.with_extension().await;
db.execute("CREATE TABLE vol_src (id INT PRIMARY KEY)").await;
let result = db.try_execute(
"SELECT pgtrickle.create_stream_table('vol_st', \
$$ SELECT id, random() AS r FROM vol_src $$, '1m', 'DIFFERENTIAL')"
).await;
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(msg.contains("volatile") || msg.contains("random"));
}
#[tokio::test]
async fn test_volatile_function_allowed_in_full_mode() {
let db = E2eDb::new().await.with_extension().await;
db.execute("CREATE TABLE vol_full_src (id INT PRIMARY KEY)").await;
db.execute("INSERT INTO vol_full_src VALUES (1)").await;
// Should succeed (with warning) in FULL mode
db.create_st("vol_full_st", "SELECT id, random() AS r FROM vol_full_src", "1m", "FULL").await;
assert_eq!(db.count("public.vol_full_st").await, 1);
}
#[tokio::test]
async fn test_stable_function_allowed_in_differential_with_warning() {
let db = E2eDb::new().await.with_extension().await;
db.execute("CREATE TABLE stable_src (id INT PRIMARY KEY, ts TIMESTAMPTZ DEFAULT now())").await;
db.execute("INSERT INTO stable_src VALUES (1)").await;
// now() is STABLE — allowed in DIFFERENTIAL with warning
db.create_st("stable_st", "SELECT id, now() AS refresh_time FROM stable_src", "1m", "DIFFERENTIAL").await;
assert_eq!(db.count("public.stable_st").await, 1);
}
#[tokio::test]
async fn test_immutable_function_fully_allowed() {
let db = E2eDb::new().await.with_extension().await;
db.execute("CREATE TABLE imm_src (id INT PRIMARY KEY, name TEXT)").await;
db.execute("INSERT INTO imm_src VALUES (1, 'HELLO')").await;
db.create_st("imm_st", "SELECT id, lower(name) AS lname FROM imm_src", "1m", "DIFFERENTIAL").await;
assert_eq!(db.count("public.imm_st").await, 1);
}
#[tokio::test]
async fn test_nested_volatile_in_where_clause_rejected() {
let db = E2eDb::new().await.with_extension().await;
db.execute("CREATE TABLE nest_vol_src (id INT PRIMARY KEY, val FLOAT)").await;
let result = db.try_execute(
"SELECT pgtrickle.create_stream_table('nest_vol_st', \
$$ SELECT id, val FROM nest_vol_src WHERE val > random() $$, '1m', 'DIFFERENTIAL')"
).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_gen_random_uuid_rejected_in_differential() {
let db = E2eDb::new().await.with_extension().await;
db.execute("CREATE TABLE uuid_src (id INT PRIMARY KEY)").await;
let result = db.try_execute(
"SELECT pgtrickle.create_stream_table('uuid_st', \
$$ SELECT id, gen_random_uuid() AS uid FROM uuid_src $$, '1m', 'DIFFERENTIAL')"
).await;
assert!(result.is_err());
}
Implemented so far:
test_volatile_function_rejected_in_differential_modetest_stable_function_warned_in_differential_modetest_immutable_function_allowed_in_differential_modetest_nested_volatile_where_expression_rejected_in_differential_mode- Custom volatile operator coverage in
test_volatile_operator_rejected_in_differential
Estimated Effort
| Part | Effort | Description |
|---|---|---|
| 1 — Volatility infrastructure | Medium | ~80 lines: SPI lookup + Expr walker |
| 2 — Validation at parse time | Low | ~30 lines: OpTree walker + policy enforcement |
| 3 — Built-in functions | None | Covered by general pg_proc lookup |
| 4 — Operators/casts | Low (future) | Optional refinement |
| Tests | Medium | 3-4 unit tests + 6 e2e tests |
| Docs | Low | ~20 lines across 3 files |
Total: ~1-2 hours of implementation.
Edge Cases
User-defined functions — Handled automatically by
pg_proclookup. Users marking their functions with wrong volatility is their problem (same as PostgreSQL’s own stance).Schema-qualified function names —
myschema.my_func()needs the lookup to handle schema qualification. Join againstpg_namespaceor useregprocedureresolution.Overloaded functions — Multiple
pg_procrows for the sameproname. Conservative approach: if any overload is volatile, treat as volatile. Better approach: resolve argument types for exact match.Functions in DEFAULT expressions — Not relevant; DEFAULT is not part of the defining query.
Functions in CTE bodies — Already traversed because CTE bodies are parsed into OpTree nodes.
Recursive CTEs — In DIFFERENTIAL mode, recursive CTEs use recomputation diff (re-execute + anti-join). Volatile functions would produce different results on each re-execution. The volatility check should apply to recursive CTE bodies too.
Expr::Rawpassthrough — Some expressions are stored as raw SQL strings (Expr::Raw). These cannot be introspected for function calls. Consider either: (a) parsing them more thoroughly, or (b) flaggingExpr::Rawcontaining parentheses as potentially volatile (conservative warning).
Prior Art — How Other Systems Handle Volatile Functions
The challenge of non-deterministic functions in incremental view maintenance is well-known. Different systems address it based on their underlying architecture.
Compiled Dataflow Systems
Feldera, Flink SQL, and similar systems use a compiled dataflow model: the defining query is compiled into a dataflow graph, and expressions (including function calls) are evaluated once as data enters the pipeline. The result becomes part of the record flowing through operators. This means volatile functions are naturally “captured” at ingestion — random() would produce a value once and that value propagates deterministically through the rest of the graph. Feldera’s UDFs are compiled into the dataflow and are deterministic by construction.
Flink SQL distinguishes explicitly between deterministic and non-deterministic functions. Non-deterministic results are pinned — evaluated once and materialized into the changelog stream. It also provides dedicated temporal primitives (PROCTIME() for processing time, ROWTIME() for event time) rather than relying on now().
Materialize
Materialize takes a hybrid approach. Truly volatile functions like random() are rejected in materialized views. For temporal semantics, Materialize introduces mz_now(), a special temporal filter that has well-defined semantics for incremental maintenance. PostgreSQL’s now() is rewritten to mz_now() which can be efficiently maintained as a “temporal filter” (the system knows how to advance it). Stable functions are evaluated once per logical timestamp.
ksqlDB
ksqlDB rejects non-deterministic functions in persistent (continuously maintained) queries. Pull queries (point-in-time, equivalent to FULL refresh) allow them since they re-execute from scratch each time.
PipelineDB (discontinued)
PipelineDB evaluated expressions at trigger/insertion time, capturing volatile results once into the continuous view’s storage. This is conceptually similar to Feldera’s approach but implemented at the PostgreSQL trigger layer.
Architectural Insight
The fundamental distinction is between:
| Architecture | Volatility Handling | Examples |
|---|---|---|
| Compiled dataflow | Expressions evaluated once at ingestion; results flow deterministically | Feldera, Flink SQL |
| Temporal special-casing | Reject volatile, rewrite temporal functions to special operators | Materialize |
| Reject volatile | Block non-deterministic functions in incremental modes | ksqlDB |
| Capture at trigger | Evaluate volatile functions once in the CDC trigger | PipelineDB |
| SQL re-execution | Re-executes the defining query; volatile functions produce different results each time | pg_trickle (current) |
pg_trickle uses SQL re-execution — the delta and merge CTEs re-evaluate expressions on each refresh. This means pg_trickle cannot adopt the compiled-dataflow approach without a fundamental architecture change. The two viable options are:
- Reject volatile (chosen approach) — simplest, safest, matches ksqlDB and Materialize’s strategy
- Capture at trigger — evaluate volatile expressions inside the CDC trigger and store the results; technically possible but extremely complex (requires partial evaluation of the defining query inside PL/pgSQL triggers)
The reject-volatile approach is the right choice for pg_trickle’s current architecture. If a future version introduces a compiled dataflow engine (per DBSP’s formal model), volatile functions could be re-evaluated.
Alternative Approaches Considered
A: Blocklist of known volatile functions
Reject a hardcoded list (random, gen_random_uuid, clock_timestamp, nextval, etc.). Rejected because it wouldn’t catch user-defined volatile functions and requires ongoing maintenance.
B: Allow volatile everywhere, document the caveat
Simply document that volatile functions may produce incorrect results in DIFFERENTIAL mode. Rejected because silent data corruption is worse than a clear error.
C: Snapshot volatile values at CDC time
Capture the volatile value once (at insert/update time in the trigger) and store it in the change buffer. This would make random() deterministic per-row. Rejected because it requires evaluating the defining query’s expressions inside the trigger function, which is far too complex and brittle.
D: Use pg_stat_user_functions or expression analysis
Use PostgreSQL’s built-in expression analysis (contain_volatile_functions() in src/backend/optimizer/util/clauses.c). This is the most accurate approach but requires calling a C function via pgrx FFI. Considered for future refinement — the SPI-based approach is simpler and sufficient for the initial implementation.