Contents
- Plan: Bootstrap Gating for External Source Population
Plan: Bootstrap Gating for External Source Population
Date: 2026-03-04 Status: IMPLEMENTED Last Updated: 2026-03-14
1. Problem Statement
Initial Population Creates Unnecessary Churn
When external ETL processes initially populate source tables, the pg_trickle scheduler sees changes arriving and eagerly refreshes downstream stream tables. If two (or more) source tables feed into a shared downstream ST and their ETL processes run independently, the scheduler will:
- Detect changes in the first source and refresh the downstream ST — joining partially-loaded data from source A with an empty source B.
- Repeat on every scheduler tick as batches land — producing many intermediate results, all incomplete.
- Only converge to the correct result after both ETL processes finish.
This wastes compute and produces misleading intermediate states that may be visible to downstream consumers.
Motivating Example
An analytics platform bootstraps two tables from an external data warehouse:
ETL-A: COPY orders FROM 's3://…' (takes 10 min)
ETL-B: COPY order_lines FROM 's3://…' (takes 20 min)
The DAG:
orders ──→ order_summary ──┐
├──→ order_report
order_lines ──→ line_summary ──┘
Without bootstrap gating:
| Time | Event | Result |
|---|---|---|
| T+0 | ETL-A starts loading orders |
— |
| T+1m | Scheduler tick — orders has changes, order_lines empty |
order_summary refreshed (partial), line_summary empty, order_report has orders without lines |
| T+2m | More orders batches land |
Another churn cycle |
| T+10m | ETL-A finishes | order_summary nearly correct, line_summary still empty |
| T+11m | ETL-B first batches | order_report joins full orders with partial lines |
| T+20m | ETL-B finishes | Finally correct |
Over 20 minutes the scheduler has executed ~20 unnecessary refresh cycles, each producing incomplete results.
Why Existing Mechanisms Don’t Help
| Mechanism | Why insufficient |
|---|---|
initialize => false |
Prevents initial population of the ST itself, but doesn’t prevent the scheduler from refreshing it once changes appear in sources. Also requires the user to manually trigger initialization later. |
| Diamond consistency | No shared PostgreSQL ancestor — the sources are independent base tables. |
| Cross-source snapshot | Ensures PG-level snapshot coherence, but external temporal skew is already baked into the base table contents. |
| Watermark gating (PLAN_WATERMARK_GATING.md) | Addresses ongoing temporal alignment of external sources. Bootstrap is a one-time readiness problem — simpler, but related. See §7 for how they compose. |
Core Insight
The scheduler needs a gate that prevents downstream processing until all required sources have completed their initial population. Only the external process knows when initial load is complete. The gate should:
- Be configurable — not all source tables need bootstrap gating.
- Be temporary — once all sources are ready, it lifts permanently.
- Be simple — binary ready/not-ready per source, no temporal algebra.
- Compose naturally with watermark gating for the steady-state case.
2. Design Options
2.1 Option A — Explicit mark_source_ready() Function
The external ETL process calls a SQL function after the initial load:
-- ETL-A finishes initial population
BEGIN;
COPY orders FROM 's3://warehouse/orders.parquet';
SELECT pgtrickle.mark_source_ready('orders');
COMMIT;
The scheduler checks: for each ST, if any transitive source has a bootstrap gate configured but has not been marked ready, skip the ST.
Configuration — at the source level:
-- Before ETL starts, declare that these sources need bootstrap gating
SELECT pgtrickle.set_bootstrap_gate('orders');
SELECT pgtrickle.set_bootstrap_gate('order_lines');
-- Or at stream table creation time
SELECT pgtrickle.create_stream_table('order_report', '...', '1m',
await_sources => ARRAY['orders', 'order_lines']
);
Readiness signal semantics:
- Transactional:
mark_source_ready()is part of the caller’s transaction. The readiness signal becomes visible only when the data does — preventing the scheduler from seeing “ready” before the data commits. - Idempotent: Calling
mark_source_ready()on an already-ready source is a no-op. - Permanent: Once marked ready, the gate is lifted for the lifetime of
the source’s participation in the DAG. The gate can be re-enabled via
set_bootstrap_gate()for a re-bootstrap scenario. - Signal: Notifies the scheduler (via
pg_notifyor shared-memory signal) so gating is re-evaluated on the next tick.
2.2 Option B — Compose with Watermark Gating
Treat bootstrap as a special case of watermarks: a source in a watermark
group whose watermark is NULL (never advanced) is considered “not ready.”
The current watermark gating plan (§9, question #9) proposes that NULL
watermarks mean “ungated.” This option inverts that default for sources
that are members of a watermark group: NULL = gated, first
advance_watermark() = ready.
-- Create watermark group — gating is active immediately
SELECT pgtrickle.create_watermark_group('order_pipeline',
sources => ARRAY['orders', 'order_lines'],
tolerance => '0 seconds'
);
-- ETL-A finishes, advances watermark for the first time
SELECT pgtrickle.advance_watermark('orders', '2026-03-01');
-- ETL-B finishes, advances watermark for the first time
SELECT pgtrickle.advance_watermark('order_lines', '2026-03-01');
-- Both sources now have non-NULL watermarks → alignment evaluates → STs refresh
Bootstrap naturally transitions to steady-state temporal alignment — no separate API needed.
2.3 Option C — initialize => false + Deferred Initialization
Extend the existing initialize parameter with an await_sources list.
The ST is created but not scheduled until all listed sources signal readiness:
SELECT pgtrickle.create_stream_table('order_report', '...', '1m',
initialize => false,
await_sources => ARRAY['orders', 'order_lines']
);
When all awaited sources are marked ready, the scheduler auto-initializes the ST (runs the initial full refresh) and begins normal scheduling.
2.4 Option D — Source-Level DAG Gating
Model readiness as a property of source nodes in the DAG. A gated source blocks all transitive downstream STs from refreshing.
-- Register sources as gated before ETL starts
SELECT pgtrickle.gate_source('orders');
SELECT pgtrickle.gate_source('order_lines');
-- ETL runs...
-- Signal completion
SELECT pgtrickle.ungate_source('orders');
SELECT pgtrickle.ungate_source('order_lines');
Any ST with a transitive dependency on a gated source is automatically skipped. No per-ST configuration needed — gating flows through the DAG.
3. Comparison
| A — mark_source_ready() | B — Watermark composition | C — Deferred init | D — DAG source gating | |
|---|---|---|---|---|
| API surface | New: set_bootstrap_gate(), mark_source_ready() |
Reuses watermark API | Extends existing create_stream_table |
New: gate_source(), ungate_source() |
| Configuration granularity | Per-source or per-ST (await_sources) |
Per watermark group | Per-ST | Per-source (DAG propagation) |
| Ongoing overhead | None after bootstrap | Full watermark evaluation on every tick | None after init | None after ungating |
| Composes with watermarks | Orthogonal — both can apply | Native composition | Orthogonal | Orthogonal — subsumable by watermarks |
| Requires watermark infra | No | Yes | No | No |
| Handles re-bootstrap | Yes (set_bootstrap_gate() re-arms) |
Yes (reset watermark to NULL — needs new API) | Partially (needs_reinit + re-create) |
Yes (gate_source() re-arms) |
| Intermediate ST behavior | Configurable (per-ST await_sources) |
Depends on watermark gating mode (§5 of watermark plan) | Only the ST with await_sources is gated |
All transitive descendants blocked |
| Implementation complexity | Low | Medium (depends on watermark plan) | Low | Low-Medium |
4. Recommendation
Phased Approach
Phase 1 — Option D (Source-Level DAG Gating): Implement as a standalone, low-overhead feature. It’s the simplest model with the strongest automatic propagation — gating a source automatically gates everything downstream without per-ST configuration.
The API is two functions:
-- Gate a source (blocks all downstream STs)
SELECT pgtrickle.gate_source('orders');
-- Ungate a source (allows downstream processing)
SELECT pgtrickle.ungate_source('orders');
Plus an introspection function:
-- Show current gate status for all tracked sources
SELECT * FROM pgtrickle.source_gates();
Phase 2 — Watermark Subsumption (Option B): When the watermark gating
plan (PLAN_WATERMARK_GATING.md) is implemented,
bootstrap gating becomes a natural subset: a source with no watermark
(NULL) in a watermark group is effectively gated. The gate_source() /
ungate_source() API can be retained as a convenience for users who don’t
need ongoing temporal alignment — internally mapped to a lightweight boolean
flag rather than the full watermark machinery.
Why Option D First
DAG-native propagation. Gating a source automatically blocks all transitive descendants. The user doesn’t need to know the DAG structure — they just declare which sources aren’t ready yet.
Minimal API surface. Two functions plus one introspection view. No new parameters on
create_stream_table.Zero ongoing overhead. The gate is a boolean check at the start of each scheduler tick. Once all sources are ungated, the check is trivial.
Clean composition path. When watermarks land, the bootstrap gate becomes “has this source ever had a watermark advanced?” — no API breakage, just a richer underlying mechanism.
Independent of watermark timeline. Works equally well whether the external data has a meaningful timestamp dimension or not (e.g., a one- time CSV import has no temporal axis).
5. Detailed Design (Phase 1)
5.1 Catalog
Add a gated column to pgtrickle.pgt_change_tracking:
ALTER TABLE pgtrickle.pgt_change_tracking
ADD COLUMN gated BOOLEAN NOT NULL DEFAULT false;
Alternatively, a standalone table:
CREATE TABLE pgtrickle.pgt_source_gates (
source_relid OID PRIMARY KEY,
gated BOOLEAN NOT NULL DEFAULT true,
gated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
ungated_at TIMESTAMPTZ,
gated_by TEXT -- session_user / application_name
);
The standalone table is cleaner — it tracks gating history and doesn’t pollute the CDC tracking table with unrelated state.
5.2 SQL Functions
-- Gate a source table. All downstream STs are blocked from refreshing.
-- Idempotent: gating an already-gated source is a no-op.
pgtrickle.gate_source(source_table TEXT) → void
-- Ungate a source table. Downstream STs are unblocked if all their
-- transitive sources are ungated. Transactional: the ungate becomes
-- visible when the caller's transaction commits.
-- Idempotent: ungating an already-ungated source is a no-op.
pgtrickle.ungate_source(source_table TEXT) → void
-- Introspection: current gate state for all tracked sources.
pgtrickle.source_gates() → TABLE (
source_table TEXT,
schema_name TEXT,
gated BOOLEAN,
gated_at TIMESTAMPTZ,
ungated_at TIMESTAMPTZ,
gated_by TEXT
)
5.3 Scheduler Integration
In run_scheduler_tick(), before processing each ST in topological order:
fn is_source_gated(dag: &StDag, node_id: NodeId) -> bool {
// Collect transitive source set for this ST
let sources = dag.transitive_sources(node_id);
// Check if any source has an active gate
sources.iter().any(|src| {
// Look up gate status from catalog or cached state
source_gate_status(src.relid) == Gated
})
}
If any transitive source is gated:
- Skip the ST for this tick.
- Log: "pg_trickle: skipping {schema}.{name} — source {source} is gated (bootstrap)".
- Record action = 'SKIP' with status = 'SKIPPED' in pgt_refresh_history,
with a descriptive skip reason.
5.4 Signal Mechanism
ungate_source() should signal the scheduler so the gating predicate is
re-evaluated promptly rather than waiting for the next natural tick:
- Option 1:
pg_notify('pgtrickle_gate_change', source_relid::text)— lightweight, the scheduler already listens for notifications. - Option 2: Bump the shared-memory DAG version counter — triggers a re-evaluation on the next tick. Heavier than needed since the DAG structure hasn’t changed, only gating state.
Recommendation: Use pg_notify for a lightweight signal. The
scheduler can subscribe to a pgtrickle_source_gate channel and
re-evaluate gating predicates without a full DAG rebuild.
5.5 Caching
The scheduler should cache gate status at the start of each tick rather than querying the catalog per-ST. A single query fetches all gated sources:
SELECT source_relid FROM pgtrickle.pgt_source_gates WHERE gated = true
This set is intersected with each ST’s transitive source set during the
tick. The cache is invalidated on the next tick (or immediately if a
pg_notify signal is received).
5.6 Interaction with initialize => false
The initialize parameter on create_stream_table controls whether the
ST is populated at creation time. Bootstrap gating is orthogonal:
initialize |
Source gated | Behavior |
|---|---|---|
true |
No | Normal: ST is created and immediately populated |
true |
Yes | ST creation succeeds, initial populate runs (with partial data). Subsequent refreshes are blocked until sources are ungated. Recommendation: warn at creation time that gated sources may produce incomplete initial results. |
false |
No | ST created empty. First scheduler tick triggers initial refresh. |
false |
Yes | ST created empty. Scheduler skips until all sources ungated. First refresh after ungating is the initial population. This is the recommended bootstrap pattern. |
Recommended pattern for bootstrap:
-- 1. Gate sources before ETL starts
SELECT pgtrickle.gate_source('orders');
SELECT pgtrickle.gate_source('order_lines');
-- 2. Create stream tables (initialize => false avoids premature population)
SELECT pgtrickle.create_stream_table('order_summary',
'SELECT region, SUM(amount) FROM orders GROUP BY region',
'1m', initialize => false);
SELECT pgtrickle.create_stream_table('order_report',
'SELECT ... FROM order_summary JOIN line_summary ...',
'1m', initialize => false);
-- 3. Run ETL processes (can be parallel, any order)
BEGIN;
COPY orders FROM 's3://…';
SELECT pgtrickle.ungate_source('orders');
COMMIT;
BEGIN;
COPY order_lines FROM 's3://…';
SELECT pgtrickle.ungate_source('order_lines');
COMMIT;
-- 4. Next scheduler tick: all sources ungated → STs initialize and refresh
6. Extended Use Cases
A. Re-Bootstrap After Schema Change
An external system changes its schema, requiring a full re-import:
-- Gate the source, truncate, re-import
SELECT pgtrickle.gate_source('orders');
TRUNCATE orders;
-- Full re-import
COPY orders FROM 's3://warehouse/orders_v2.parquet';
SELECT pgtrickle.ungate_source('orders');
During the truncate + re-import window, downstream STs are not refreshed, avoiding the churn of processing intermediate states during re-population.
B. Coordinated Multi-Source Bootstrap
Multiple sources with different load times:
-- Gate all sources upfront
SELECT pgtrickle.gate_source('customers');
SELECT pgtrickle.gate_source('orders');
SELECT pgtrickle.gate_source('products');
-- ETL processes run independently, potentially in parallel
-- Each ungates its source upon completion
-- Downstream STs that depend on all three only start refreshing
-- when the last source is ungated
C. Partial DAG Gating
Only gate sources that feed into a specific part of the DAG:
customers ──→ customer_stats ──┐
├──→ customer_report
orders ──→ order_summary ──────┘
│
└──→ order_dashboard (only depends on orders)
If only customers is gated:
- customer_stats and customer_report are blocked (transitive dependency).
- order_summary and order_dashboard refresh normally (no dependency on
gated source).
D. Bootstrap with Existing Stream Tables
Adding a new source to an existing pipeline:
-- New source table, needs initial population
CREATE TABLE inventory (sku TEXT, qty INT);
SELECT pgtrickle.gate_source('inventory');
-- Create a new ST that joins existing and new sources
SELECT pgtrickle.create_stream_table('inventory_report',
'SELECT o.*, i.qty FROM order_summary o JOIN inventory i ON ...',
'1m', initialize => false);
-- ETL populates inventory
COPY inventory FROM 's3://…';
SELECT pgtrickle.ungate_source('inventory');
Existing STs (order_summary, etc.) continue refreshing normally since they
don’t depend on the gated source.
7. Composability with Other Mechanisms
Bootstrap Gating + Watermark Gating
Bootstrap gating and watermark gating serve different lifecycle phases:
┌─────────────────┐ ┌──────────────────────────────────┐
│ BOOTSTRAP PHASE │ │ STEADY-STATE PHASE │
│ │ │ │
│ gate_source() │────▶│ advance_watermark(T) on each │
│ ETL loads data │ │ load cycle; watermark group │
│ ungate_source() │ │ ensures temporal alignment │
└─────────────────┘ └──────────────────────────────────┘
Both can be active simultaneously. The scheduler evaluates gates in order:
- Bootstrap gate: Is any transitive source gated? → Skip.
- Watermark gate: Are watermark groups aligned? → Skip if misaligned.
- Normal scheduling: Check schedule, changes, etc. → Refresh.
When watermarks land, the bootstrap gate for a source can optionally be auto-lifted when the first watermark is advanced (configurable):
SELECT pgtrickle.gate_source('orders',
auto_ungate_on_watermark => true -- lift gate on first advance_watermark()
);
Bootstrap Gating + Diamond Consistency
Orthogonal. Diamond consistency governs how structurally related STs are refreshed atomically. Bootstrap gating governs whether they refresh at all. A diamond group where any source is gated simply doesn’t refresh as a unit.
Bootstrap Gating + refresh_mode = 'IMMEDIATE'
IMMEDIATE mode STs are refreshed synchronously within the source DML transaction. Bootstrap gating doesn’t apply to IMMEDIATE STs — they are maintained on every write by definition. If the user doesn’t want IMMEDIATE STs to see partial bootstrap data, they should use DIFFERENTIAL mode during bootstrap and switch to IMMEDIATE after ungating.
8. Open Questions
Should
gate_source()work on tables not yet tracked by pg_trickle? If the user gates a table before creating any ST that depends on it, should pg_trickle record the gate inpgt_source_gatesspeculatively? This is useful for the “gate first, create STs second” pattern (§5.6). Proposed: yes — the gate table is independent ofpgt_change_tracking.Should ungating be automatic after the first successful refresh? Once all sources are ungated and the downstream ST completes its first full refresh, the gate has served its purpose. Should the gate rows be cleaned up automatically, or kept for audit? Proposed: keep the rows with
ungated_attimestamp, but the scheduler ignores rows wheregated = false.Should there be a timeout / stale gate warning? If a source has been gated for longer than a configurable duration (e.g., 1 hour), pg_trickle could emit a warning: “source X has been gated for 1h — is the ETL still running?” This helps detect stuck ETL processes.
Should
create_stream_tableauto-detect gated sources and setinitialize => false? If the user creates a ST whose sources include a gated source, should pg_trickle automatically skip initial population (since it would produce incomplete results)? Proposed: yes, with aNOTICEexplaining why.Per-ST override? Should a ST be able to opt out of bootstrap gating even if one of its sources is gated? Use case: a monitoring ST that intentionally shows partial progress during bootstrap. Proposed: add
bootstrap_gating => 'ignore'oncreate_stream_tablefor this case.Bulk gate/ungate? For pipelines with many sources, a convenience function:
sql SELECT pgtrickle.gate_sources(ARRAY['orders', 'order_lines', 'products']); SELECT pgtrickle.ungate_sources(ARRAY['orders', 'order_lines', 'products']);
9. Implementation Steps
Step 1 — Catalog: pgt_source_gates
Create table (§5.1), add to upgrade migration SQL. Structs in catalog.rs.
Step 2 — gate_source() / ungate_source() SQL Functions
#[pg_extern(schema = "pgtrickle")] in api.rs. Validation (source must
be a valid relation), idempotency, transactional semantics, pg_notify
signal.
Step 3 — Scheduler Gate Check
In scheduler.rs, at the start of each ST evaluation in topological order:
1. Load gated source set (cached per tick).
2. Compute transitive source set for the ST.
3. Intersect. If non-empty, skip with reason.
Step 4 — source_gates() Introspection Function
Return current gate status for all tracked sources.
Step 5 — create_stream_table Integration
When a new ST is created, check if any of its sources are gated. If so:
- Set initialize => false automatically (skip initial population).
- Emit NOTICE: “Source table X is currently gated; initial population
deferred until all sources are ungated.”
Step 6 — Tests
| Test | Type | What it proves |
|---|---|---|
test_gate_source_blocks_downstream |
E2E | Gated source prevents all downstream ST refresh |
test_ungate_source_allows_refresh |
E2E | Ungating triggers refresh on next tick |
test_gate_multiple_sources_all_must_ungate |
E2E | ST with two gated sources waits for both |
test_gate_partial_dag |
E2E | Only STs depending on gated source are blocked |
test_gate_idempotent |
Unit | Double gate / double ungate are no-ops |
test_gate_transactional |
E2E | Ungate not visible until caller commits |
test_gate_create_st_auto_skip_init |
E2E | ST created with gated source skips initialization |
test_gate_with_immediate_mode |
E2E | IMMEDIATE STs unaffected by source gates |
Step 7 — Documentation
docs/SQL_REFERENCE.md:gate_source(),ungate_source(),source_gates().docs/tutorials/: “Bootstrapping External Data Sources” tutorial.CHANGELOG.md.
10. Prior Art
Apache Airflow Sensors — Airflow’s
ExternalTaskSensorandS3KeySensorgate downstream tasks until upstream dependencies are satisfied. The bootstrap gate is analogous: a sensor that waits for “source data is fully loaded” before allowing downstream processing.dbt
ref()Dependencies +--defer— dbt’s--deferflag allows models to reference artifacts from a prior run when upstream models haven’t been rebuilt yet. The bootstrap gate is a stronger guarantee: don’t run the model at all until inputs are ready, rather than using stale data.Kafka Consumer Group Readiness — Kafka Streams applications wait until all partitions are assigned and initial offsets are committed before processing begins. The bootstrap gate applies the same principle to pg_trickle’s scheduler.
Flink Checkpoint Barriers — Flink uses barriers in the data stream to coordinate exactly-once processing across operators. While more granular than bootstrap gating, the principle of “don’t process until all inputs are at a known state” is the same.
Materialize
SINCEFrontier — Materialize tracks aSINCEtimestamp frontier per source. A source whose frontier is at the initial epoch hasn’t produced any meaningful output yet. Downstream operators wait until all input frontiers advance past the epoch before producing results.
11. Relationship to Other Plans
| Plan | Relationship |
|---|---|
| PLAN_WATERMARK_GATING.md | Complementary. Bootstrap gating handles one-time initial-readiness. Watermark gating handles ongoing temporal alignment. Phase 2 subsumes bootstrap into watermarks via “NULL watermark = gated.” |
| PLAN_DIAMOND_DEPENDENCY_CONSISTENCY.md | Orthogonal. Diamond consistency governs structural refresh atomicity. Bootstrap gating governs whether refresh occurs at all. |
| PLAN_CROSS_SOURCE_SNAPSHOT_CONSISTENCY.md | Orthogonal. Snapshot consistency ensures PG-level coherence. Bootstrap gating ensures external data completeness. |
| PLAN_FUSE.md | Complementary. Fuse halts refresh on anomalous change volume. Bootstrap gating halts refresh on incomplete source population. Both are pre-conditions evaluated before refresh. |
| PLAN_TRANSACTIONAL_IVM.md | Bootstrap gating does not apply to IMMEDIATE mode STs (maintained synchronously in the DML transaction). Users should defer IMMEDIATE mode activation until after bootstrap. |