Architecture

This document describes the internal architecture of pg_trickle — a PostgreSQL 18 extension that implements stream tables with differential view maintenance. For a high-level description of what pg_trickle does and why, read ESSENCE.md. For release milestones and future plans, see ROADMAP.md.


High-Level Overview

┌─────────────────────────────────────────────────────────────────┐
│                     PostgreSQL 18 Backend                       │
│                                                                 │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌─────────────┐   │
│  │  Source  │   │  Source  │   │  Storage │   │  Storage    │   │
│  │  Table A │   │  Table B │   │  Table X │   │  Table Y    │   │
│  └────┬─────┘   └────┬─────┘   └────▲─────┘   └────▲────────┘   │
│       │              │              │              │            │
│  ═════╪══════════════╪══════════════╪══════════════╪════════    │
│       │              │              │              │            │
│  ┌────▼──────────────▼────┐   ┌────┴──────────────┴────┐        │
│  │  Hybrid CDC Layer      │   │  Delta Application     │        │
│  │  Triggers ──or── WAL   │   │  (INSERT/DELETE diffs) │        │
│  └────────────┬───────────┘   └────────────▲───────────┘        │
│               │                            │                    │
│  ┌────────────▼───────────┐   ┌────────────┴───────────┐        │
│  │   Change Buffer        │   │   DVM Engine           │        │
│  │   (pgtrickle_changes.*) │   │   (Operator Tree)      │        │
│  └────────────┬───────────┘   └────────────▲───────────┘        │
│               │                            │                    │
│               └────────────┬───────────────┘                    │
│                            │                                    │
│  ┌─────────────────────────▼─────────────────────────────┐      │
│  │              Refresh Engine                           │      │
│  │  ┌──────────┐  ┌──────────┐  ┌─────────────────────┐  │      │
│  │  │ Frontier │  │ DAG      │  │ Scheduler           │  │      │
│  │  │ Tracker  │  │ Resolver │  │ (canonical schedule)│  │      │
│  │  └──────────┘  └──────────┘  └─────────────────────┘  │      │
│  └───────────────────────────────────────────────────────┘      │
│                                                                 │
│  ┌────────────────────────────────────────────────────────┐     │
│  │                    Catalog (pgtrickle.*)                │     │
│  │  pgt_stream_tables │ pgt_dependencies │ pgt_refresh_history│  │
│  └────────────────────────────────────────────────────────┘     │
│                                                                 │
│  ┌──────────────────────────────────────────────────────┐       │
│  │                  Monitoring Layer                    │       │
│  │  st_refresh_stats │ slot_health │ check_cdc_health    │       │
│  │  explain_st │ views │ NOTIFY alerting               │       │
│  └──────────────────────────────────────────────────────┘       │
└─────────────────────────────────────────────────────────────────┘

Component Details

1. SQL API Layer (src/api.rs)

The public entry point for users. All operations are exposed as #[pg_extern] functions in the pgtrickle schema:

  • create_stream_table — Applies a chain of auto-rewrite passes (view inlining → DISTINCT ON → GROUPING SETS → scalar subquery in WHERE → correlated scalar subquery in SELECT → SubLinks in OR → multi-PARTITION BY windows), parses the defining query, builds an operator tree, creates the storage table, registers CDC slots, populates the catalog, and optionally performs an initial full refresh.
  • alter_stream_table — Modifies schedule, refresh mode, status (ACTIVE/SUSPENDED), or defining query. Query changes trigger schema migration, dependency updates, and a full refresh within a single transaction.
  • drop_stream_table — Removes the storage table, catalog entries, and cleans up CDC slots.
  • refresh_stream_table — Triggers a manual refresh (same path as automatic scheduling).
  • pgt_status — Returns a summary of all registered stream tables.

2. Catalog (src/catalog.rs)

The catalog manages persistent metadata stored in PostgreSQL tables within the pgtrickle schema:

Table Purpose
pgtrickle.pgt_stream_tables Core metadata: name, query, schedule, status, frontier, etc.
pgtrickle.pgt_dependencies DAG edges from ST to source tables
pgtrickle.pgt_refresh_history Audit log of every refresh operation
pgtrickle.pgt_change_tracking Per-source CDC slot metadata

Schema creation is handled by extension_sql!() macros that run at CREATE EXTENSION time.

Entity-Relationship Diagram

erDiagram
    pgt_stream_tables {
        bigserial pgt_id PK
        oid pgt_relid UK "OID of materialized storage table"
        text pgt_name
        text pgt_schema
        text defining_query
        text original_query "User's original SQL (pre-inlining)"
        text schedule "Duration or cron expression"
        text refresh_mode "FULL | DIFFERENTIAL | IMMEDIATE"
        text status "INITIALIZING | ACTIVE | SUSPENDED | ERROR"
        boolean is_populated
        timestamptz data_timestamp "Freshness watermark"
        jsonb frontier "DBSP-style version frontier"
        timestamptz last_refresh_at
        int consecutive_errors
        boolean needs_reinit
        float8 auto_threshold
        float8 last_full_ms
        timestamptz created_at
        timestamptz updated_at
    }

    pgt_dependencies {
        bigint pgt_id PK,FK "References pgt_stream_tables.pgt_id"
        oid source_relid PK "OID of source table"
        text source_type "TABLE | STREAM_TABLE | VIEW"
        text_arr columns_used "Column-level lineage"
        text cdc_mode "TRIGGER | TRANSITIONING | WAL"
        text slot_name "Replication slot (WAL mode)"
        pg_lsn decoder_confirmed_lsn "WAL decoder progress"
        timestamptz transition_started_at "Trigger→WAL transition start"
    }

    pgt_refresh_history {
        bigserial refresh_id PK
        bigint pgt_id FK "References pgt_stream_tables.pgt_id"
        timestamptz data_timestamp
        timestamptz start_time
        timestamptz end_time
        text action "NO_DATA | FULL | DIFFERENTIAL | REINITIALIZE | SKIP"
        bigint rows_inserted
        bigint rows_deleted
        text error_message
        text status "RUNNING | COMPLETED | FAILED | SKIPPED"
        text initiated_by "SCHEDULER | MANUAL | INITIAL"
        timestamptz freshness_deadline
    }

    pgt_change_tracking {
        oid source_relid PK "OID of tracked source table"
        text slot_name "Trigger function name"
        pg_lsn last_consumed_lsn
        bigint_arr tracked_by_pgt_ids "ST IDs sharing this source"
    }

    pgt_stream_tables ||--o{ pgt_dependencies : "has sources"
    pgt_stream_tables ||--o{ pgt_refresh_history : "has refresh history"
    pgt_stream_tables }o--o{ pgt_change_tracking : "tracks via pgt_ids array"

Note: Change buffer tables (pgtrickle_changes.changes_<oid>) are created dynamically per source table OID and live in the separate pgtrickle_changes schema.

3. CDC / Change Data Capture (src/cdc.rs, src/wal_decoder.rs)

pg_trickle uses a hybrid CDC architecture that starts with triggers and optionally transitions to WAL-based (logical replication) capture for lower write-side overhead.

Trigger Mode (default)

  1. Trigger Management — Creates AFTER INSERT OR UPDATE OR DELETE row-level triggers (pg_trickle_cdc_<oid>) on each tracked source table. Each trigger fires a PL/pgSQL function (pg_trickle_cdc_fn_<oid>()) that writes changes to the buffer table.
  2. Change Buffering — Decoded changes are written to per-source change buffer tables in the pgtrickle_changes schema. Each row captures the LSN (pg_current_wal_lsn()), transaction ID, action type (I/U/D), and the new/old row data as JSONB via to_jsonb().
  3. Cleanup — Consumed changes are deleted after each successful refresh via delete_consumed_changes(), bounded by the upper LSN to prevent unbounded scans.
  4. Lifecycle — Triggers and trigger functions are automatically created when a source table is first tracked and dropped when the last stream table referencing a source is removed.

The trigger approach was chosen as the default for transaction safety (triggers can be created in the same transaction as DDL), simplicity (no slot management, no wal_level = logical requirement), and immediate visibility (changes are visible in buffer tables as soon as the source transaction commits).

WAL Mode (optional, automatic transition)

When pg_trickle.cdc_mode is set to 'auto' or 'wal' and wal_level = logical is available, the system transitions from trigger-based to WAL-based CDC after the first successful refresh:

  1. WAL Availability Detection — At stream table creation, checks whether wal_level = logical is configured. If so, the source dependency is marked for WAL transition.
  2. WAL Decoder Background Worker — A dedicated background worker (src/wal_decoder.rs) polls logical replication slots and writes decoded changes into the same change buffer tables used by triggers, ensuring a uniform format for the DVM engine.
  3. Transition Orchestration — The transition is a three-step process: (a) create a replication slot, (b) wait for the decoder to catch up to the trigger’s last confirmed LSN, © drop the trigger and switch the dependency to WAL mode. If the decoder doesn’t catch up within pg_trickle.wal_transition_timeout (default 300s), the system falls back to triggers.
  4. CDC Mode Tracking — Each source dependency in pgt_dependencies carries a cdc_mode column (TRIGGER / TRANSITIONING / WAL) and WAL-specific metadata (slot_name, decoder_confirmed_lsn, transition_started_at).

See ADR-001 and ADR-002 in plans/adrs/PLAN_ADRS.md for the original design rationale and plans/sql/PLAN_HYBRID_CDC.md for the full implementation plan.

Immediate Mode / Transactional IVM (src/ivm.rs)

When refresh_mode = 'IMMEDIATE', pg_trickle uses statement-level AFTER triggers with transition tables instead of row-level CDC triggers. The stream table is maintained synchronously within the same transaction as the base table DML.

  1. BEFORE Triggers — Statement-level BEFORE triggers on each base table acquire an advisory lock on the stream table to prevent concurrent conflicting updates.
  2. AFTER Triggers — Statement-level AFTER triggers with REFERENCING NEW TABLE AS ... OLD TABLE AS ... copy the transition table data to temp tables, then call the Rust pgt_ivm_apply_delta() function.
  3. Delta Computation — The DVM engine’s Scan operator reads from the temp tables (via DeltaSource::TransitionTable) instead of change buffer tables. No LSN filtering or net-effect computation is needed — each trigger invocation represents a single atomic statement.
  4. Delta Application — The computed delta is applied via explicit DML (DELETE + INSERT ON CONFLICT) to the stream table.
  5. TRUNCATE — A separate AFTER TRUNCATE trigger calls pgt_ivm_handle_truncate(), which truncates the stream table and re-populates from the defining query.

No change buffer tables, no scheduler involvement, and no WAL infrastructure is needed for IMMEDIATE mode. See plans/sql/PLAN_TRANSACTIONAL_IVM.md for the design plan.

4. DVM Engine (src/dvm/)

The Differential View Maintenance engine is the core of the system. It transforms the defining SQL query into an executable operator tree that can compute deltas efficiently.

Auto-Rewrite Pipeline (src/dvm/parser.rs)

Before the defining query is parsed into an operator tree, it passes through a chain of auto-rewrite passes that normalize SQL constructs the DVM parser doesn’t handle directly:

Pass Function Purpose
#0 rewrite_views_inline() Replace view references with (view_definition) AS alias subqueries
#1 rewrite_distinct_on() Convert DISTINCT ON to ROW_NUMBER() OVER (…) = 1 window subquery
#2 rewrite_grouping_sets() Decompose GROUPING SETS / CUBE / ROLLUP into UNION ALL of GROUP BY
#3 rewrite_scalar_subquery_in_where() Convert WHERE col > (SELECT …) to CROSS JOIN
#4 rewrite_sublinks_in_or() Split WHERE a OR EXISTS (…) into UNION branches
#5 rewrite_multi_partition_windows() Split multiple PARTITION BY clauses into joined subqueries

The view inlining pass (#0) runs first so that view definitions containing DISTINCT ON, GROUPING SETS, etc. are further rewritten by downstream passes. Nested views are expanded via a fixpoint loop (max depth 10).

Query Parser (src/dvm/parser.rs)

Parses the defining query using PostgreSQL’s internal parser (via pgrx raw_parser) and extracts: - WITH clause — CTE definitions (non-recursive: inline expansion or shared delta; recursive: detected for mode gating) - Target list — output columns - FROM clause — source tables, joins, subqueries, and CTE references - WHERE clause — filters - GROUP BY / aggregate functions - DISTINCT / UNION ALL / INTERSECT / EXCEPT

The parser produces an OpTree — a tree of operator nodes. CTE handling follows a tiered approach:

  1. Tier 1 (Inline Expansion) — Non-recursive CTEs referenced once are expanded into Subquery nodes, equivalent to subqueries in FROM.
  2. Tier 2 (Shared Delta) — Non-recursive CTEs referenced multiple times produce CteScan nodes that share a single delta computation via a CTE registry and delta cache.
  3. Tier 3a/3b/3c (Recursive) — Recursive CTEs (WITH RECURSIVE) are detected via query_has_recursive_cte(). In FULL mode, the query executes as-is. In DIFFERENTIAL mode, the strategy is auto-selected: semi-naive evaluation for INSERT-only changes, Delete-and-Rederive (DRed) for mixed changes, or recomputation fallback when CTE columns don’t match ST storage or when the recursive term contains non-monotone operators (EXCEPT, Aggregate, Window, DISTINCT, AntiJoin, INTERSECT SET). In IMMEDIATE mode, the same semi-naive / DRed machinery runs against statement transition tables and is bounded by pg_trickle.ivm_recursive_max_depth to guard against unbounded recursion.

Operators (src/dvm/operators/)

Each operator knows how to generate a delta query — given a set of changes to its inputs, it produces the corresponding changes to its output:

Operator Delta Strategy
Scan Direct passthrough of CDC changes
Filter Apply WHERE predicate to deltas
Project Apply column projection to deltas
Join Join deltas against the other side’s current state
OuterJoin LEFT/RIGHT outer join with NULL padding
FullJoin FULL OUTER JOIN with 8-part delta (both sides may produce NULLs)
Aggregate Recompute group values where affected keys changed
Distinct COUNT-based duplicate tracking
UnionAll Merge deltas from both branches
Intersect Dual-count multiplicity with LEAST boundary crossing
Except Dual-count multiplicity with GREATEST(0, L-R) boundary crossing
Subquery Transparent delegation + optional column renaming (CTEs, subselects)
CteScan Shared delta lookup from CTE cache (multi-reference CTEs)
RecursiveCte Semi-naive / DRed / recomputation for WITH RECURSIVE
Window Partition-based recomputation for window functions
LateralFunction Row-scoped recomputation for SRFs in FROM (jsonb_array_elements, unnest, etc.)
LateralSubquery Row-scoped recomputation for correlated subqueries in LATERAL FROM
SemiJoin EXISTS / IN subquery delta via semi-join
AntiJoin NOT EXISTS / NOT IN subquery delta via anti-join
ScalarSubquery Correlated scalar subquery in SELECT list

See DVM_OPERATORS.md for detailed descriptions.

Diff Engine (src/dvm/diff.rs)

Generates the final diff SQL that: 1. Computes the delta from the operator tree 2. Produces ('+', row) for inserts and ('-', row) for deletes 3. Applies the diff via DELETE matching old rows and INSERT for new rows

5. DAG / Dependency Graph (src/dag.rs)

Stream tables can depend on other stream tables (cascading), forming a Directed Acyclic Graph:

  • Cycle detection — Detects circular dependencies at creation time using Kahn’s algorithm (BFS topological sort). When pg_trickle.allow_circular = true, monotone cycles (queries using only safe operators — joins, filters, UNION ALL, etc.) are allowed; non-monotone cycles (aggregates, EXCEPT, window functions, anti-joins) are rejected. SCC IDs are automatically assigned to cycle members and recomputed on drop/alter.
  • SCC decomposition — Tarjan’s algorithm decomposes the graph into strongly connected components. Singleton SCCs are acyclic; multi-node SCCs contain cycles that are handled by fixed-point iteration in the scheduler.
  • Monotonicity analysis — Static check (check_monotonicity() in src/dvm/parser.rs) determines whether a query’s operators are safe for cyclic fixed-point iteration. Non-monotone operators (Aggregate, EXCEPT, Window, NOT EXISTS) block cycle creation.
  • Topological ordering — Determines refresh order: upstream STs must be refreshed before downstream STs.
  • Condensation ordercondensation_order() returns SCCs in topological order, grouping cyclic STs for fixed-point iteration. The scheduler’s iterate_to_fixpoint() processes multi-node SCCs by refreshing all members repeatedly until convergence (zero net changes) or max_fixpoint_iterations is exceeded.
  • Cascade operations — When a source table changes, all transitive dependents are identified for refresh.

6. Version / Frontier Tracking (src/version.rs)

Implements a per-source frontier (JSONB map of source_oid → LSN) to track exactly how far each stream table has consumed changes:

  • Read frontier — Before refresh, read the frontier to know where to start consuming changes.
  • Advance frontier — After a successful refresh, the frontier is updated to the latest consumed LSN.
  • Consistent snapshots — The frontier ensures that each refresh processes a contiguous, non-overlapping window of changes.

Delayed View Semantics (DVS) Guarantee

The contents of every stream table are logically equivalent to evaluating its defining query at some past point in time — the data_timestamp. The scheduler refreshes STs in topological order so that when ST B references upstream ST A, A has already been refreshed to the target data_timestamp before B runs its delta query against A’s contents. The frontier lifecycle is:

  1. Created — on first full refresh; records the LSN of each source at that moment.
  2. Advanced — on each differential refresh; the old frontier becomes the lower bound and the new frontier (with fresh LSNs) the upper bound. The DVM engine reads changes in [old, new].
  3. Reset — on reinitialize; a fresh frontier is created from scratch.

7. Refresh Engine (src/refresh.rs)

Orchestrates the complete refresh cycle:

┌──────────────┐
│  Check State │ → Is ST active? Has it been populated?
└──────┬───────┘
       │
 ┌─────▼──────┐
 │ Drain CDC  │ → Read WAL changes into change buffer tables
 └─────┬──────┘
       │
 ┌─────▼──────────────┐
 │ Determine Action   │ → FULL, DIFFERENTIAL, NO_DATA, REINITIALIZE, or SKIP?
 │                    │   (adaptive: if change ratio > pg_trickle.differential_max_change_ratio,
 │                    │    downgrade DIFFERENTIAL → FULL automatically)
 └─────┬──────────────┘
       │
 ┌─────▼──────┐
 │ Execute    │ → Full: TRUNCATE + INSERT ... SELECT
 │            │   Differential: Generate & apply delta SQL
 └─────┬──────┘
       │
 ┌─────▼──────────────┐
 │ Record History     │ → Write to pgtrickle.pgt_refresh_history
 └─────┬──────────────┘
       │
 ┌─────▼──────────────┐
 │ Advance Frontier   │ → Update JSONB frontier in catalog
 └─────┬──────────────┘
       │
 ┌─────▼──────────────┐
 │ Reset Error Count  │ → On success, reset consecutive_errors to 0
 └──────────────────────┘

8. Background Worker & Scheduling (src/scheduler.rs)

Registration & Lifecycle

pg_trickle registers one PostgreSQL background worker — the scheduler — during _PG_init() (extension load). Because it is registered at startup, pg_trickle must appear in shared_preload_libraries, which requires a server restart.

┌──────────────────────────────────────────────────────────────────┐
│                  PostgreSQL postmaster                           │
│                                                                  │
│  shared_preload_libraries = 'pg_trickle'                          │
│       │                                                          │
│       ▼                                                          │
│  _PG_init()                                                      │
│    ├─ Register GUCs (pg_trickle.enabled, scheduler_interval_ms …) │
│    ├─ Register shared memory (PgTrickleSharedState, atomics)      │
│    └─ BackgroundWorkerBuilder::new("pg_trickle scheduler")        │
│         .set_start_time(RecoveryFinished)                        │
│         .set_restart_time(5s)       ← auto-restart on crash      │
│         .load()                                                  │
│                                                                  │
│  After recovery finishes:                                        │
│       │                                                          │
│       ▼                                                          │
│  pg_trickle_scheduler_main()         ← background worker starts   │
│    ├─ Attach SIGHUP + SIGTERM handlers                           │
│    ├─ Connect to SPI (database = "postgres")                     │
│    ├─ Crash recovery: mark stale RUNNING records as FAILED       │
│    └─ Enter main loop ─────────────────────────┐                 │
│         │                                      │                 │
│         ▼                                      │                 │
│     wait_latch(scheduler_interval_ms)          │                 │
│         │                                      │                 │
│     ┌───▼───────────────────────────────┐      │                 │
│     │ SIGTERM? → log + break            │      │                 │
│     │ pg_trickle.enabled = false? → skip │      │                 │
│     │ Otherwise → scheduler tick        │      │                 │
│     └───┬───────────────────────────────┘      │                 │
│         │                                      │                 │
│         └──────────── loop ────────────────────┘                 │
└──────────────────────────────────────────────────────────────────┘

Key lifecycle properties:

Property Behaviour
Start condition After PostgreSQL recovery finishes (RecoveryFinished)
Auto-restart 5-second delay after an unexpected crash
Graceful shutdown Handles SIGTERM — breaks the main loop and exits cleanly
Config reload Handles SIGHUP — re-reads GUC values on the next latch wake
Crash recovery On startup, any pgt_refresh_history rows stuck in RUNNING status are marked FAILED (the transaction that wrote them was rolled back by PostgreSQL, but the status row may have been committed in a prior transaction)
Database Connects to the postgres database via SPI
Standby / replica On standby servers (pg_is_in_recovery() = true), the worker enters a sleep loop and does not attempt refreshes. Stream tables are still readable on standbys — they are regular heap tables replicated via physical streaming replication. After promotion the scheduler resumes automatically. See the FAQ § Replication for details on logical replication and subscriber limitations.

Scheduler Tick

Each tick of the main loop performs the following steps inside a single transaction:

  1. DAG rebuild — Compare the shared-memory DAG_REBUILD_SIGNAL counter against the local copy. If it advanced (a CREATE, ALTER, or DROP stream table occurred), rebuild the in-memory dependency graph (StDag) from the catalog.
  2. Topological traversal — Walk stream tables in dependency order (upstream before downstream). This ensures that when ST B references ST A, A is refreshed first.
  3. Per-ST evaluation — For each active ST:
    • Skip if in retry backoff (exponential, per-ST).
    • Skip if schedule/cron says not yet due.
    • Skip if an advisory lock indicates a concurrent refresh.
    • Check upstream change buffers for pending rows.
  4. Execute refresh — Acquire an advisory lock → record RUNNING in history → run FULL / DIFFERENTIAL / REINITIALIZE → store new frontier → release lock → record completion.
  5. WAL transitions — Advance any trigger→WAL CDC mode transitions (src/wal_decoder.rs).
  6. Slot health — Check replication slot health and emit NOTIFY alerts.
  7. Prune retry state — Remove backoff entries for STs that no longer exist.

Sequential Processing (Default)

By default (parallel_refresh_mode = 'off), the scheduler processes stream tables sequentially within a single background worker. All STs are refreshed one at a time in topological order. pg_trickle.max_concurrent_refreshes (default 4) only prevents a manual pgtrickle.refresh_stream_table() call from overlapping with the scheduler on the same ST — it does not spawn additional workers.

The PostgreSQL GUC max_worker_processes (default 8) sets the server-wide budget for all background workers (autovacuum, parallel query, logical replication, extensions). In sequential mode pg_trickle consumes one slot from that budget.

Parallel Refresh (parallel_refresh_mode = 'on')

When enabled, the scheduler builds an execution-unit DAG from the stream-table dependency graph and dispatches independent units to dynamic background workers:

  1. Execution units — Each independent stream table becomes a singleton unit. Atomic consistency groups and IMMEDIATE-trigger closures are collapsed into composite units that run in a single worker for correctness.
  2. Ready queue — Units whose upstream dependencies have all completed enter the ready queue. The coordinator dispatches them subject to a per-database cap (max_concurrent_refreshes) and a cluster-wide cap (max_dynamic_refresh_workers).
  3. Dynamic workers — Each dispatched unit spawns a short-lived background worker via BackgroundWorkerBuilder::load_dynamic(). Workers claim a job from the pgtrickle.pgt_scheduler_jobs catalog table, execute the refresh, and exit.

The parallel path respects the same topological ordering as the sequential path — downstream units only become ready after all upstream units succeed. The worker-budget caps ensure pg_trickle does not exhaust max_worker_processes.

See PLAN_PARALLELISM.md for the full design and CONFIGURATION.md for tuning guidance.

Retry & Error Handling

Each ST maintains an in-memory RetryState (reset on scheduler restart):

  • Retryable errors (SPI failures, lock contention, slot issues) trigger exponential backoff.
  • Permanent errors (schema mismatch, user errors) skip backoff but increment consecutive_errors.
  • When consecutive_errors reaches pg_trickle.max_consecutive_errors (default 3), the ST is auto-suspended and a NOTIFY alert is emitted.
  • Schema errors additionally set needs_reinit, triggering a REINITIALIZE on the next successful cycle.

Scheduling Policy

Automatic refresh scheduling uses canonical periods (48·2ⁿ seconds, n = 0, 1, 2, …) snapped to the user’s schedule:

  • Picks the smallest canonical period ≤ schedule.
  • For DOWNSTREAM schedule (NULL schedule), the ST refreshes only when explicitly triggered or when a downstream ST needs it.
  • Advisory locks prevent concurrent refreshes of the same ST.
  • The scheduler is driven by the background worker polling at the pg_trickle.scheduler_interval_ms GUC interval.

Shared Memory (src/shmem.rs)

The scheduler background worker and user sessions share a PgTrickleSharedState structure protected by a PgLwLock. Key fields:

Field Type Purpose
dag_version u64 Incremented when the ST catalog changes; used by the scheduler to detect when the DAG needs rebuilding.
scheduler_pid i32 PID of the scheduler background worker (0 if not running).
scheduler_running bool Whether the scheduler is active.
last_scheduler_wake i64 Unix timestamp of the last scheduler wake cycle (for monitoring).

A separate PgAtomic<AtomicU64> named DAG_REBUILD_SIGNAL is incremented by API functions (create, alter, drop) after catalog mutations. The scheduler compares its local copy against the atomic counter to detect when to rebuild its in-memory DAG without holding a lock.

A second PgAtomic<AtomicU64> named CACHE_GENERATION tracks DDL events that may invalidate cached delta or MERGE templates across backends. When DDL hooks fire (view change, ALTER TABLE, function change) or API functions mutate the catalog, CACHE_GENERATION is bumped. Each backend maintains a thread-local generation counter; on the next refresh, if the shared generation has advanced, the backend flushes its delta template cache, MERGE template cache, and explicitly DEALLOCATEs tracked __pgt_merge_* prepared statements before rebuilding local state.

9. DDL Tracking (src/hooks.rs)

Event triggers monitor DDL changes to source tables and functions:

  • _on_ddl_end — Fires on ALTER TABLE to detect column adds/drops/type changes. If a source table used by a ST is altered, the ST’s needs_reinit flag is set. Also detects CREATE OR REPLACE FUNCTION / ALTER FUNCTION — if the function appears in a ST’s functions_used catalog column, the ST is marked for reinit.
  • _on_sql_drop — Fires on DROP TABLE to set needs_reinit for affected STs. Also detects DROP FUNCTION and marks affected STs for reinit.
  • Function name extractionobject_identity strings (e.g., public.my_func(integer, text)) are parsed to extract the bare function name, which is matched against the functions_used TEXT[] column in pgt_stream_tables.

Reinitialization is deferred until the next refresh cycle, which then performs a REINITIALIZE action (drop and recreate the storage table from the updated query).

10. Error Handling (src/error.rs)

Centralized error types using thiserror:

  • PgTrickleError variants cover catalog access, SQL execution, CDC, DVM, DAG, and config errors.
  • Each refresh failure increments consecutive_errors.
  • When consecutive_errors reaches pg_trickle.max_consecutive_errors (default 3), the ST is moved to ERROR status and suspended from automatic refresh.
  • Manual intervention (ALTER ... status => 'ACTIVE') resets the counter.

11. Monitoring (src/monitor.rs)

Provides observability functions:

  • st_refresh_stats — Aggregate statistics (total/successful/failed refreshes, avg duration, staleness status).
  • get_refresh_history — Per-ST audit trail.
  • get_staleness — Current staleness in seconds.
  • slot_health — Checks replication slot state and WAL retention.
  • check_cdc_health — Per-source CDC health status including mode, slot lag, confirmed LSN, and alerts.
  • explain_st — Describes the DVM plan for a given ST.
  • diamond_groups — Lists detected diamond dependency groups, their members, convergence points, and epoch counters.
  • Viewspgtrickle.stream_tables_info (computed staleness) and pgtrickle.pg_stat_stream_tables (combined stats).

NOTIFY Alerting

Operational events are broadcast via PostgreSQL NOTIFY on the pg_trickle_alert channel. Clients can subscribe with LISTEN pg_trickle_alert; and receive JSON-formatted events:

Event Condition
stale data staleness exceeds 2× schedule
auto_suspended ST suspended after pg_trickle.max_consecutive_errors failures
reinitialize_needed Upstream DDL change detected
slot_lag_warning Replication slot WAL retention exceeded pg_trickle.slot_lag_warning_threshold_mb
cdc_transition_complete Source transitioned from trigger to WAL-based CDC
cdc_transition_failed Trigger→WAL transition failed (fell back to triggers)
refresh_completed Refresh completed successfully
refresh_failed Refresh failed with an error

12. Row ID Hashing (src/hash.rs)

Provides deterministic 64-bit row identifiers using xxHash (xxh64) with a fixed seed. Two SQL functions are exposed:

  • pgtrickle.pg_trickle_hash(text) — Hash a single text value; used for simple single-column row IDs.
  • pgtrickle.pg_trickle_hash_multi(text[]) — Hash multiple values (separated by a record-separator byte \x1E) for composite keys (join row IDs, GROUP BY keys).

Row IDs are written into every stream table’s storage as an internal __pgt_row_id BIGINT column and are used by the delta application phase to match DELETE candidates precisely.

13. Diamond Dependency Consistency (src/dag.rs)

When stream tables form diamond-shaped dependency graphs, a convergence (fan-in) node may read from multiple upstream STs that share a common ancestor:

        A (source table)
       / \
      B   C   (intermediate STs)
       \ /
        D     (convergence / fan-in ST)

If B refreshes successfully but C fails, D would read a fresh version of B’s data alongside stale data from C — a split-version inconsistency.

Detection

StDag::detect_diamonds() walks all fan-in nodes (STs with multiple upstream ST dependencies) and computes transitive ancestor sets per branch. If two or more branches share ancestors, a diamond is detected. Overlapping diamonds are merged.

Consistency Groups

StDag::compute_consistency_groups() converts detected diamonds into consistency groups — topologically ordered sets of STs that must be refreshed atomically. Each group contains:

  • Members — All intermediate STs plus the convergence node, in refresh order.
  • Convergence points — The fan-in nodes where multiple paths meet.
  • Epoch counter — Advances on each successful atomic refresh.

STs not involved in any diamond are placed in singleton groups (no overhead).

Scheduler Wiring

When diamond_consistency = 'atomic' (per-ST or via the pg_trickle.diamond_consistency GUC):

  1. The scheduler wraps each multi-member group in a SAVEPOINT pgt_consistency_group.
  2. Each member is refreshed in topological order within the savepoint.
  3. If all succeedRELEASE SAVEPOINT and advance the group epoch.
  4. If any member failsROLLBACK TO SAVEPOINT undoes all members' changes. The failure is logged and the group retries on the next scheduler tick.

With diamond_consistency = 'none' (default), members refresh independently in topological order — matching pre-feature behavior.

Schedule Policy

The diamond_schedule_policy setting (per-convergence-node or via the pg_trickle.diamond_schedule_policy GUC) controls when an atomic group fires:

Policy Trigger condition Trade-off
'fastest' (default) Any member is due Higher freshness, more refreshes
'slowest' All members are due Lower resource cost, staler data

The policy is set on the convergence (fan-in) node. When multiple convergence nodes exist in the same group (nested diamonds), the strictest policy wins (slowest > fastest). The GUC serves as a cluster-wide fallback for nodes without an explicit per-node setting.

Monitoring

The pgtrickle.diamond_groups() SQL function exposes detected groups for operational visibility. See SQL_REFERENCE.md for details.

14. Configuration (src/config.rs)

Runtime behavior is controlled by a growing set of GUC (Grand Unified Configuration) variables. See CONFIGURATION.md for the complete, current list.

GUC Default Purpose
pg_trickle.enabled true Master on/off switch for the scheduler
pg_trickle.scheduler_interval_ms 1000 Scheduler background worker wake interval (ms)
pg_trickle.min_schedule_seconds 60 Minimum allowed schedule
pg_trickle.max_consecutive_errors 3 Errors before auto-suspending a ST
pg_trickle.change_buffer_schema pgtrickle_changes Schema for change buffer tables
pg_trickle.max_concurrent_refreshes 4 Maximum parallel refresh workers
pg_trickle.differential_max_change_ratio 0.15 Change-to-table-size ratio above which DIFFERENTIAL falls back to FULL
pg_trickle.cleanup_use_truncate true Use TRUNCATE instead of DELETE for change buffer cleanup when the entire buffer is consumed
pg_trickle.user_triggers 'auto' User-defined trigger handling: auto / off (on accepted as deprecated alias for auto)
pg_trickle.block_source_ddl false Block column-affecting DDL on tracked source tables instead of reinit
pg_trickle.cdc_mode 'auto' CDC mechanism: auto / trigger / wal
pg_trickle.wal_transition_timeout 300 Max seconds to wait for WAL decoder catch-up during transition
pg_trickle.slot_lag_warning_threshold_mb 100 Warning threshold for WAL slot retention used by slot_lag_warning and health_check()
pg_trickle.slot_lag_critical_threshold_mb 1024 Critical threshold for WAL slot retention used by check_cdc_health() alerts
pg_trickle.diamond_consistency 'none' Diamond dependency consistency mode: none or atomic
pg_trickle.diamond_schedule_policy 'fastest' Schedule policy for atomic diamond groups: fastest or slowest
pg_trickle.merge_planner_hints true Inject SET LOCAL planner hints (disable nestloop, raise work_mem) before MERGE
pg_trickle.merge_work_mem_mb 64 work_mem (MB) applied when delta exceeds 10 000 rows and planner hints enabled
pg_trickle.use_prepared_statements true Use SQL PREPARE/EXECUTE for cached MERGE templates

Data Flow: End-to-End Refresh

 Source Table INSERT/UPDATE/DELETE
           │
           ▼
 Hybrid CDC Layer:
   ┌─────────────────────────────────────────────┐
   │ TRIGGER mode: Row-Level AFTER Trigger        │
   │   pg_trickle_cdc_fn_<oid>() → buffer table    │
   │                                              │
   │ WAL mode: Logical Replication Slot           │
   │   wal_decoder bgworker → same buffer table   │
   └─────────────────────────────────────────────┘
           │
           ▼
 Change Buffer Table (pgtrickle_changes.changes_<oid>)
   Columns: change_id, lsn, xid, action (I/U/D), row_data (jsonb)
           │
           ▼
 DVM Engine: generate delta SQL from operator tree
   - Scan operator reads from change buffer
   - Filter/Project/Join transform the deltas
   - Aggregate recomputes affected groups
           │
           ▼
 Diff Engine: produce (+/-) diff rows
           │
           ▼
 Delta Application:
   DELETE FROM storage WHERE __pgt_row_id IN (removed)
   INSERT INTO storage SELECT ... FROM (added)
           │
           ▼
 Frontier Update: advance per-source LSN
           │
           ▼
 History Record: log to pgtrickle.pgt_refresh_history

Module Map

src/
├── lib.rs           # Extension entry, module declarations, _PG_init
├── bin/
│   └── pgrx_embed.rs# pgrx SQL entity embedding (generated)
├── api.rs           # SQL API functions (create/alter/drop/refresh/status)
├── catalog.rs       # Catalog CRUD operations
├── cdc.rs           # Change data capture (triggers + WAL transition)
├── config.rs        # GUC variable registration
├── dag.rs           # Dependency graph (cycle detection, SCC decomposition, topo sort)
├── error.rs         # Centralized error types
├── hash.rs          # xxHash row ID generation (pg_trickle_hash / pg_trickle_hash_multi)
├── hooks.rs         # DDL event trigger handlers (_on_ddl_end, _on_sql_drop)
├── ivm.rs           # Transactional IVM (IMMEDIATE mode: statement-level triggers)
├── shmem.rs         # Shared memory state (PgTrickleSharedState, DAG_REBUILD_SIGNAL, CACHE_GENERATION)
├── dvm/
│   ├── mod.rs       # DVM module root + recursive CTE orchestration
│   ├── parser.rs    # Query → OpTree converter (CTE extraction, subquery, window support)
│   ├── diff.rs      # Delta SQL generation (CTE delta cache)
│   ├── row_id.rs    # Row ID generation
│   └── operators/
│       ├── mod.rs           # Operator trait + registry
│       ├── scan.rs          # Table scan (CDC passthrough)
│       ├── filter.rs        # WHERE clause filtering
│       ├── project.rs       # Column projection
│       ├── join.rs          # Inner join
│       ├── join_common.rs   # Shared join utilities (snapshot subqueries, column disambiguation)
│       ├── outer_join.rs    # LEFT/RIGHT outer join
│       ├── full_join.rs     # FULL OUTER JOIN (8-part delta)
│       ├── aggregate.rs     # GROUP BY + aggregate functions (39 AggFunc variants)
│       ├── distinct.rs      # DISTINCT deduplication
│       ├── union_all.rs     # UNION ALL merging
│       ├── intersect.rs     # INTERSECT / INTERSECT ALL (dual-count LEAST)
│       ├── except.rs        # EXCEPT / EXCEPT ALL (dual-count GREATEST)
│       ├── subquery.rs      # Subquery / inlined CTE delegation
│       ├── cte_scan.rs      # Shared CTE delta (multi-reference)
│       ├── recursive_cte.rs # Recursive CTE (semi-naive + DRed + recomputation)
│       ├── window.rs        # Window function (partition recomputation)
│       ├── lateral_function.rs  # LATERAL SRF (row-scoped recomputation)
│       ├── lateral_subquery.rs  # LATERAL correlated subquery
│       ├── semi_join.rs     # EXISTS / IN subquery (semi-join delta)
│       ├── anti_join.rs     # NOT EXISTS / NOT IN subquery (anti-join delta)
│       └── scalar_subquery.rs   # Correlated scalar subquery in SELECT
├── monitor.rs       # Monitoring & observability functions
├── refresh.rs       # Refresh orchestration
├── scheduler.rs     # Automatic scheduling with canonical periods
├── version.rs       # Frontier / LSN tracking
└── wal_decoder.rs   # WAL-based CDC (logical replication slot polling, transitions)

Extension Control File (pg_trickle.control)

The pg_trickle.control file in the repository root is required by PostgreSQL’s extension infrastructure. It declares the extension’s description, default version, shared-library path, and privilege requirements. PostgreSQL reads this file when CREATE EXTENSION pg_trickle; is executed.

During packaging (cargo pgrx package), pgrx replaces the @CARGO_VERSION@ placeholder with the version from Cargo.toml and copies the file into the target’s share/extension/ directory alongside the SQL migration scripts.