Contents
- Plan: Allow alter_stream_table to Change the Defining Query
- Progress Summary
- 1. Motivation
- 2. Current State
- 3. Design
- 4. Edge Cases
- 4.1 Query changes source tables (dependency migration)
- 4.2 Query changes the GROUP BY / row identity
- 4.3 DIFFERENTIAL → FULL mode interaction
- 4.4 TopK ↔ non-TopK transitions
- 4.5 __pgt_count column transitions
- 4.6 Concurrent ALTER + scheduled refresh
- 4.7 View inlining & original_query
- 4.8 Diamond consistency groups
- 5. Implementation Steps
- Step 1: Refactor create_stream_table_impl into reusable pieces
- Step 2: Add schema comparison utility
- Step 3: Add storage table migration logic
- Step 4: Add dependency diffing utility
- Step 5: Implement alter_stream_table_query
- Step 6: Update SQL signature & upgrade script
- Step 7: Update dbt materialization
- Step 8: Update docs & FAQ
- Step 9: Tests
- 6. Alternatives Considered
- 7. Risks & Mitigations
- 8. Milestones
Plan: Allow alter_stream_table to Change the Defining Query
Status: Complete
Author: Copilot
Date: 2026-03-04
Last Updated: 2026-03-06
Progress Summary
Completed (Steps 1–9)
- [x] Step 1: Refactor
create_stream_table_impl— Extracted reusable functions:run_query_rewrite_pipeline(),validate_and_parse_query(),setup_storage_table(),insert_catalog_and_deps(),setup_trigger_infrastructure().create_stream_table_implnow calls these. - [x] Step 2: Schema comparison — Added
SchemaChangeenum andclassify_schema_change()(Same / Compatible / Incompatible withpg_castimplicit cast check). - [x] Step 3: Storage table migration — Added
migrate_storage_table_compatible()(ALTER TABLE ADD/DROP COLUMN),migrate_aux_columns()(__pgt_count transitions), and full rebuild path for incompatible changes. - [x] Step 4: Dependency diffing — Added
DependencyDiffstruct,diff_dependencies(), andStDependency::delete_for_st(). - [x] Step 5: Core ALTER QUERY — Implemented
alter_stream_table_query()with Phases 0–5 (validate, suspend, teardown, migrate, catalog update, repopulate). - [x] Step 6: SQL signature — Added
queryparameter toalter_stream_table/alter_stream_table_impl. - [x] Step 6b: Upgrade migration script — Created
sql/pg_trickle--0.2.1--0.2.2.sqlthat DROPs the old 6-parameteralter_stream_tablesignature and CREATEs the new 7-parameter version with thequeryparameter. - [x] Step 7: dbt materialization — Updated
dbt-pgtrickle/macros/materializations/stream_table.sqlto useALTER ... query =>instead of drop+recreate for query changes. Updateddbt-pgtrickle/macros/adapters/alter_stream_table.sqlto accept and pass thequeryparameter. - [x] Step 8: Docs — Updated
docs/SQL_REFERENCE.mdwith newqueryparameter documentation, examples, and schema migration behavior notes. Updateddocs/FAQ.mdto replace “must drop and recreate” guidance with the new ALTER QUERY approach. Updateddocs/ARCHITECTURE.mddescription. - [x] Step 9: E2E tests — Created
tests/e2e_alter_query_tests.rswith 15 tests covering same-schema, compatible-schema (add/remove column, type change), incompatible-schema (type change with full rebuild), dependency changes (add/remove sources), aggregate transitions, query+mode change, error handling (invalid query, cycle detection), view inlining, OID stability, and catalog update verification.
Bugs Found & Fixed During E2E Testing
- Incompatible schema FK violation —
DROP TABLE ... CASCADEon the storage table triggered thesql_dropevent trigger, which detected the table as ST storage and deleted thepgt_stream_tablescatalog row. Fix: setpgt_relid = 0before the DROP sois_st_storage_table()returns false, then the event trigger skips cleanup. - Cycle detection missed ALTER cycles —
check_for_cycles()created a sentinel node (pgt_id = MAX) for CREATE, but for ALTER the existing ST already has edges in the DAG. A sentinel can’t detect A → B → A cycles. Fix: addedcheck_for_cycles_alter()that re-uses the existing ST node and callsStDag::replace_incoming_edges()to swap in proposed sources before running cycle detection.
1. Motivation
Today, changing a stream table’s defining query requires a drop + recreate cycle:
SELECT pgtrickle.drop_stream_table('order_totals');
SELECT pgtrickle.create_stream_table('order_totals', 'SELECT ...', '1m', 'DIFFERENTIAL');
This has three user-facing problems:
- Data gap. Between drop and recreate, the stream table does not exist. Concurrent queries against it fail with “relation does not exist.”
- Lost metadata. Schedule, refresh mode, diamond settings, and refresh history are discarded. The user must re-specify them.
- dbt friction. The dbt materialization detects query changes and does a
full drop/recreate, producing the same data gap and a
--full-refreshequivalent even for minor query tweaks.
Adding a query parameter to alter_stream_table would allow in-place
query evolution with no downtime and minimal data disruption.
2. Current State
What alter_stream_table can change today
| Parameter | Behavior |
|---|---|
schedule |
UPDATE catalog row |
refresh_mode |
Migrate trigger infrastructure (CDC ↔ IVM), UPDATE catalog |
status |
UPDATE catalog row, reset consecutive_errors |
diamond_consistency |
UPDATE catalog row |
diamond_schedule_policy |
UPDATE catalog row |
What create_stream_table does that ALTER would need to replicate
- Query rewrite pipeline — view inlining, DISTINCT ON, GROUPING SETS, scalar subquery, SubLinks-in-OR rewrites.
- Query validation —
LIMIT 0execution, unsupported construct checks, DVM parsing (for DIFFERENTIAL/IMMEDIATE), volatility checks. - TopK detection — extract ORDER BY + LIMIT pattern.
- Storage table creation —
CREATE TABLEwith__pgt_row_id, optional__pgt_count/__pgt_count_l/__pgt_count_rcolumns, indexes. - Catalog insert —
pgt_stream_tablesrow,pgt_dependenciesrows with column snapshots. - CDC/IVM infrastructure — triggers + change buffer tables on source tables.
- DAG signal — notify scheduler to rebuild dependency graph.
- Cache invalidation — bump
CACHE_GENERATIONto flush delta/MERGE template caches. - Initial full refresh — populate the storage table.
The needs_reinit (reinitialize) path
Today, execute_reinitialize_refresh is just execute_full_refresh +
clearing the needs_reinit flag. It does not drop/recreate the storage
table or update the column schema. This means the current reinitialize only
works when the output schema is unchanged (e.g., a function redefinition
that changes behavior but not column types).
3. Design
3.1 New parameter
pgtrickle.alter_stream_table(
name text,
query text DEFAULT NULL, -- NEW
schedule text DEFAULT NULL,
refresh_mode text DEFAULT NULL,
status text DEFAULT NULL,
diamond_consistency text DEFAULT NULL,
diamond_schedule_policy text DEFAULT NULL
) → void
When query is non-NULL, the function performs a query migration in
addition to any other parameter changes.
3.2 Schema classification
The first step after validating the new query is to classify the schema change by comparing old vs. new output columns:
| Classification | Condition | Strategy |
|---|---|---|
| Same schema | Column names, types, and count are identical | Fast path: update catalog + full refresh |
| Compatible schema | Columns added, removed, or reordered; no type conflicts on surviving columns | Storage migration: ALTER TABLE the storage table + full refresh |
| Incompatible schema | Column type changed in a way that’s not implicitly castable | Full rebuild: drop + recreate storage table |
3.3 Execution phases
Phase 0 — Validate & classify
- Run the full rewrite pipeline on the new query (view inlining, DISTINCT
ON, GROUPING SETS, etc.) — same code path as
create_stream_table_impl. - Run all validation checks (LIMIT 0, unsupported constructs, DVM parse for DIFFERENTIAL/IMMEDIATE, volatility, TopK detection).
- Extract new output columns via
validate_defining_query(). - Compare against current storage table columns to determine schema classification.
- Extract new source dependencies via
extract_source_relations(). - Run cycle detection on the new dependency set.
If any validation fails, the ALTER is rejected and the existing stream table is untouched (transactional safety).
Phase 1 — Suspend & drain
- Set
status = 'SUSPENDED'to prevent the scheduler from refreshing during migration. - Flush any pending deferred cleanup for this ST’s source OIDs.
Phase 2 — Tear down old infrastructure
- Diff old vs. new source dependencies.
- Sources removed: drop CDC triggers + change buffer tables (or IVM triggers for IMMEDIATE mode).
- Sources kept: leave CDC infra in place; update column snapshots if needed.
- Sources added: will be set up in Phase 4.
- Invalidate MERGE template cache (
bump_cache_generation()). - Flush prepared-statement tracking for this
pgt_id.
Phase 3 — Migrate storage table
Depends on schema classification:
Same schema (fast path): - No DDL required. Storage table is reused as-is.
Compatible schema:
- ALTER TABLE ... ADD COLUMN for new columns.
- ALTER TABLE ... DROP COLUMN for removed columns.
- Update or recreate the __pgt_row_id unique index.
- Recreate any GROUP BY composite index if the group-by columns changed.
- Handle __pgt_count / __pgt_count_l / __pgt_count_r auxiliary columns:
add or drop as needed.
Incompatible schema (full rebuild):
- DROP TABLE ... CASCADE the existing storage table.
- CREATE TABLE with new schema (same code as create_stream_table_impl).
- The stream table OID changes — update pgt_relid in the catalog.
Phase 4 — Update catalog & set up new infrastructure
- Update
pgt_stream_tables:defining_query← new rewritten queryoriginal_query← new user-supplied query (if view inlining applies)functions_used← re-extracted from new DVM parsetopk_limit/topk_order_by/topk_offset← from new TopK detection- Clear
needs_reinitflag - Reset
frontierto NULL (force full refresh) - Reset
is_populatedto FALSE - Update
updated_at
- Update
pgt_dependencies:- Delete old dependency rows.
- Insert new dependency rows with column snapshots/fingerprints.
- Set up CDC/IVM for new sources:
- New source tables get CDC triggers + change buffer tables (or IVM triggers for IMMEDIATE mode).
- Same logic as
create_stream_table_implPhase 2.
- Signal DAG rebuild (
signal_dag_rebuild()). - Bump cache generation (
bump_cache_generation()).
Phase 5 — Repopulate
- Execute a full refresh to populate the storage table with the new query’s results.
- Compute and store the initial frontier.
- Set
status = 'ACTIVE'andis_populated = TRUE.
All of Phases 1–5 execute within a single SPI transaction, so the stream table atomically transitions from old query → new query. Concurrent readers see either the old data (before commit) or the new data (after commit), never an empty table.
3.4 Transactional safety
Unlike drop + recreate, the ALTER approach keeps the storage table’s OID stable (for same-schema and compatible-schema cases). This means:
- Views and policies referencing the stream table remain valid.
- Logical replication publications continue working.
pg_dependentries are preserved.- No “relation does not exist” window for concurrent queries.
For the incompatible-schema case (full rebuild), the table OID changes.
This is unavoidable but is the same situation as today’s drop + recreate.
We should emit a WARNING when this happens.
3.5 Lock considerations
The ALTER will acquire an AccessExclusiveLock on the storage table for the
duration of the schema migration (Phase 3) and full refresh (Phase 5). This
blocks concurrent reads. For large tables this could be significant.
Future optimization: For same-schema cases, we could use a CONCURRENTLY-
style approach — build the new data into a shadow table, then swap via
ALTER TABLE ... RENAME within a short lock window. This is out of scope for
the initial implementation but is the natural evolution.
4. Edge Cases
4.1 Query changes source tables (dependency migration)
Example: old query reads from orders, new query reads from orders +
customers. The ALTER must:
- Keep CDC triggers on orders (already exists).
- Create CDC triggers + change buffer on customers (new source).
- If a source is removed, clean up only if no other ST depends on it (same
logic as cleanup_cdc_for_source).
4.2 Query changes the GROUP BY / row identity
When the grouping key changes, existing __pgt_row_id values are meaningless.
The full refresh in Phase 5 handles this correctly — it truncates and
re-inserts all rows. But the GROUP BY composite index needs to be rebuilt.
4.3 DIFFERENTIAL → FULL mode interaction
If the user changes both query and refresh_mode in the same ALTER call,
both are applied. The refresh mode migration (CDC ↔ IVM trigger swap)
happens in Phase 2/4 as part of the infrastructure teardown/setup.
4.4 TopK ↔ non-TopK transitions
- Non-TopK → TopK: The new query has ORDER BY + LIMIT. Store TopK metadata. The storage table schema is the same (just the base query columns), so this is typically a same-schema migration.
- TopK → non-TopK: Clear TopK metadata from catalog. Storage schema unchanged.
4.5 __pgt_count column transitions
- If the old query needed
__pgt_count(aggregate) and the new query doesn’t (flat SELECT), the column must be dropped. - If the new query needs
__pgt_countbut the old didn’t, the column must be added. - Same logic for
__pgt_count_l/__pgt_count_r(INTERSECT/EXCEPT).
4.6 Concurrent ALTER + scheduled refresh
Phase 1 suspends the ST, preventing the scheduler from running a refresh in
parallel. The scheduler checks status before each refresh and skips
SUSPENDED tables.
4.7 View inlining & original_query
Both defining_query (rewritten) and original_query (user-supplied) are
updated. If the new query references a view, the full inlining pipeline runs
and both variants are stored.
4.8 Diamond consistency groups
Changing a query may alter the ST’s position in a diamond dependency group. The DAG rebuild in Phase 4 handles this — the scheduler will recompute diamond groups on next cycle.
5. Implementation Steps
Step 1: Refactor create_stream_table_impl into reusable pieces
File: src/api.rs
Effort: ~4 hours
Extract the following into standalone functions:
| Function | Purpose |
|---|---|
run_query_rewrite_pipeline(query) → String |
All 5 rewrites (view inlining, DISTINCT ON, GROUPING SETS, scalar subquery, SubLinks-in-OR) |
validate_and_parse_query(query, mode) → (Vec<ColumnDef>, Option<ParsedTree>, Option<TopkInfo>) |
Validation, DVM parse, TopK detection |
setup_storage_table(schema, name, columns, ...) → Oid |
CREATE TABLE + indexes |
insert_catalog_and_deps(st_meta, deps, ...) → i64 |
Catalog + dependency insertion |
setup_cdc_infrastructure(deps, mode, ...) → () |
CDC trigger + change buffer creation |
This refactoring benefits create_stream_table_impl too — it becomes a
straightforward pipeline of these functions.
Step 2: Add schema comparison utility
File: src/api.rs (or new src/schema_diff.rs)
Effort: ~2 hours
enum SchemaChange {
Same,
Compatible {
added: Vec<ColumnDef>,
removed: Vec<String>,
},
Incompatible {
reason: String,
},
}
fn classify_schema_change(
old_columns: &[ColumnDef],
new_columns: &[ColumnDef],
) -> SchemaChange
Compare by column name and type OID. A column whose name matches but type
changed checks pg_cast for an implicit cast — if castable, it’s Compatible
with an ALTER COLUMN TYPE; if not, Incompatible.
Step 3: Add storage table migration logic
File: src/api.rs
Effort: ~3 hours
fn migrate_storage_table(
schema: &str,
name: &str,
change: &SchemaChange,
needs_pgt_count: bool,
needs_dual_count: bool,
needs_union_dedup: bool,
new_columns: &[ColumnDef],
group_by_cols: Option<&[String]>,
) -> Result<Oid, PgTrickleError>
For Same: no-op, return existing OID.
For Compatible: issue ALTER TABLE ADD/DROP COLUMN, rebuild indexes.
For Incompatible: DROP TABLE CASCADE + CREATE TABLE, return new OID.
Step 4: Add dependency diffing utility
File: src/api.rs or src/catalog.rs
Effort: ~1 hour
struct DependencyDiff {
added: Vec<(Oid, String)>, // (source_oid, source_type)
removed: Vec<(Oid, String)>,
kept: Vec<(Oid, String)>,
}
fn diff_dependencies(
old: &[StDependency],
new: &[(Oid, String)],
) -> DependencyDiff
Step 5: Implement alter_stream_table_query
File: src/api.rs
Effort: ~6 hours
The core logic, called from alter_stream_table_impl when query is
Some(...). Orchestrates Phases 0–5, using the utilities from Steps 1–4.
Step 6: Update SQL signature & upgrade script
Files: src/api.rs (pgrx attribute), sql/pg_trickle--0.2.1--0.2.2.sql
Effort: ~1 hour
Add the query parameter to the #[pg_extern] function signature with
default!(Option<&str>, "NULL"). Write the upgrade migration that
DROP FUNCTION the old signature and recommits via CREATE OR REPLACE.
Step 7: Update dbt materialization
File: dbt-pgtrickle/macros/materializations/stream_table.sql,
dbt-pgtrickle/macros/adapters/alter_stream_table.sql
Effort: ~2 hours
When the defining query changes, call alter_stream_table(name, query => ...)
instead of drop + recreate. Fall back to drop + recreate only if the
alter_stream_table call fails (e.g., older pg_trickle version without
query support).
Step 8: Update docs & FAQ
Files: docs/SQL_REFERENCE.md, docs/FAQ.md, docs/ARCHITECTURE.md
Effort: ~2 hours
- Update the
alter_stream_tableparameter table. - Remove the “must drop and recreate” guidance throughout the FAQ.
- Document same-schema vs. compatible vs. incompatible behavior.
- Update the dbt FAQ section.
Step 9: Tests
Files: tests/e2e_alter_query_tests.rs (new), src/api.rs (unit tests)
Effort: ~6 hours
| Test | Scenario |
|---|---|
test_alter_query_same_schema |
Change WHERE clause, same output columns |
test_alter_query_add_column |
Add a column to SELECT |
test_alter_query_remove_column |
Remove a column from SELECT |
test_alter_query_type_change_compatible |
Change integer → bigint |
test_alter_query_type_change_incompatible |
Change integer → text (full rebuild) |
test_alter_query_change_sources |
Old query uses table A, new uses A+B |
test_alter_query_remove_source |
Remove a source table |
test_alter_query_topk_transition |
Non-TopK → TopK and back |
test_alter_query_pgt_count_transition |
Flat query → aggregate query |
test_alter_query_differential_mode |
Verify CDC infra updated, delta cache flushed |
test_alter_query_immediate_mode |
Verify IVM triggers migrated |
test_alter_query_with_mode_change |
Change query + refresh mode simultaneously |
test_alter_query_cycle_detection |
New query introduces cycle → rejected |
test_alter_query_invalid_query |
Bad SQL → rejected, old ST untouched |
test_alter_query_concurrent_reads |
Verify no “relation does not exist” during alter |
test_alter_query_dbt_integration |
dbt materialization uses ALTER instead of drop/recreate |
test_alter_query_view_inlining |
New query references a view |
6. Alternatives Considered
A. Deferred migration (async, via scheduler)
Instead of doing everything synchronously in alter_stream_table, just
update the catalog and set a new flag (needs_query_migration). The
scheduler picks it up on the next cycle and does the migration.
Pros: Faster ALTER call. User gets control back immediately.
Cons: Data gap between ALTER and next scheduler cycle. More complex
scheduler logic. Harder to report errors synchronously. The dbt
materialization can’t verify success inline.
Decision: Do it synchronously. The full refresh is already O(query
execution time) and that’s what users expect from ALTER ... query.
B. Shadow table swap (zero-lock approach)
Build new data into pgtrickle._shadow_{name}, then ALTER TABLE RENAME
in a short lock window.
Pros: Minimal read blocking time.
Cons: Doubles storage during migration. More complex OID management.
Publications, views, and policies reference the old OID and need updating.
Decision: Out of scope for v1. Revisit when users report lock-duration issues on large tables.
C. Do nothing (keep drop + recreate)
Pros: No new code.
Cons: All the problems listed in §1 remain. The dbt materialization
already works around this, but the data gap is inherent.
Decision: Rejected — this is the second most requested feature.
7. Risks & Mitigations
| Risk | Impact | Mitigation |
|---|---|---|
| Long lock hold during full refresh of large tables | Read queries blocked | Document clearly; plan shadow-swap as future optimization |
| Incompatible schema change invalidates dependent views/policies | Cascading failures | Emit WARNING; document in UPGRADING.md |
| Bug in dependency diffing leaves orphaned CDC triggers | Change buffer bloat | Add pgtrickle.cleanup_orphaned_cdc() utility function |
| Partial failure mid-migration | Corrupted state | Entire ALTER runs in one SPI transaction; rollback on error |
8. Milestones
| Milestone | Steps | Est. Effort |
|---|---|---|
M1: Refactor create_stream_table_impl |
Step 1 | 4h |
| M2: Core ALTER QUERY implementation | Steps 2–5 | 12h |
| M3: SQL upgrade + dbt | Steps 6–7 | 3h |
| M4: Docs + tests | Steps 8–9 | 8h |
| Total | ~27h |