Contents
- Plan: dbt Integration via Full Custom Adapter
- Overview
- Table of Contents
- Architecture
- Prerequisites
- Phase 1 — Adapter Scaffolding
- Phase 2 — Connection & Credentials
- Phase 3 — Relation Types & Catalog
- Phase 4 — Custom Materialization
- Phase 5 — Column Introspection
- Phase 6 — Source Freshness
- Phase 7 — Custom Operations
- Phase 8 — Monitoring Integration
- Phase 9 — Testing
- Phase 10 — Packaging & Distribution
- Comparison with Option A
- File Layout
- Appendix: Adapter API Surface
- Effort Estimate
Plan: dbt Integration via Full Custom Adapter
Option B — Dedicated dbt-pgtrickle Adapter
Date: 2026-02-24 Status: PROPOSED
Note (2026-02-24): Option A — the macro-only package (PLAN_DBT_MACRO.md) — has been implemented in
dbt-pgtrickle/(Phases 1–8, 10). CI pipeline (Phase 9) is live in.github/workflows/ci.yml. This adapter plan (Option B) is the upgrade path for when first-class relation types, column filtering, and native source-freshness support are needed.
Overview
Implement pg_trickle integration with dbt Core
as a full custom adapter (dbt-pgtrickle) that extends dbt-postgres. This approach
gives complete control over how dbt interacts with pg_trickle: custom relation types,
native catalog introspection, column filtering (__pgt_row_id hidden), and first-class
support for stream-table-specific operations (manual refresh, CDC health checks, staleness
monitoring).
Option A (macro-only package) is already implemented and provides the core
stream_table materialization, SQL API wrappers, lifecycle operations, freshness
monitoring, and integration tests. This adapter plan (Option B) builds on that
foundation when pg_trickle becomes a central part of the data platform and users
need a richer dbt experience — hidden __pgt_row_id columns, stream_table
relation type in dbt docs, and native dbt source freshness support.
Table of Contents
- Architecture
- Prerequisites
- Phase 1 — Adapter Scaffolding
- Phase 2 — Connection & Credentials
- Phase 3 — Relation Types & Catalog
- Phase 4 — Custom Materialization
- Phase 5 — Column Introspection
- Phase 6 — Source Freshness
- Phase 7 — Custom Operations
- Phase 8 — Monitoring Integration
- Phase 9 — Testing
- Phase 10 — Packaging & Distribution
- Comparison with Option A
- File Layout
- Appendix: Adapter API Surface
Architecture
┌──────────────────────────────────────────────────────────────────────┐
│ dbt Core (CLI) │
│ │
│ Adapter plugin system: discovers dbt-pgtrickle via entry_points │
│ │
│ dbt run ──────► PgTrickleAdapter.execute_model() │
│ dbt test ─────► Standard test runner (heap table) │
│ dbt source freshness ──► PgTrickleAdapter.calculate_freshness() │
│ dbt docs generate ──────► PgTrickleAdapter.get_columns_in_relation() │
└──────────────────────┬───────────────────────────────────────────────┘
│ Python adapter (extends dbt-postgres)
▼
┌──────────────────────────────────────────────────────────────────────┐
│ PgTrickleAdapter (Python) │
│ │
│ class PgTrickleAdapter(PostgresAdapter): │
│ - Overrides: get_columns_in_relation() → hides __pgt_row_id │
│ - Overrides: list_relations_without_caching() → includes STs │
│ - New: create_stream_table(), alter_stream_table(), etc. │
│ - New: get_cdc_health(), get_stream_table_stats() │
│ │
│ class PgTrickleRelation(PostgresRelation): │
│ - type: 'stream_table' | 'table' | 'view' | ... │
│ │
│ class PgTrickleColumn(PostgresColumn): │
│ - Filters __pgt_row_id from introspection results │
└──────────────────────┬───────────────────────────────────────────────┘
│ psycopg2 / asyncpg
▼
┌──────────────────────────────────────────────────────────────────────┐
│ PostgreSQL 18 │
│ │
│ pgtrickle.create_stream_table() pgtrickle.pgt_stream_tables │
│ pgtrickle.alter_stream_table() pgtrickle.pgt_dependencies │
│ pgtrickle.drop_stream_table() pgtrickle.pg_stat_stream_tables │
│ pgtrickle.refresh_stream_table() pgtrickle.check_cdc_health() │
│ pgtrickle.explain_st() pgtrickle.get_refresh_history() │
└──────────────────────────────────────────────────────────────────────┘
Prerequisites
- dbt Core ≥ 1.7
- Python ≥ 3.9
dbt-postgres≥ 1.7 (base adapter dependency)- PostgreSQL 18 with pg_trickle extension installed
psycopg2-binaryorpsycopg2(inherited from dbt-postgres)
Phase 1 — Adapter Scaffolding
1.1 Python package structure
dbt-pgtrickle/
├── pyproject.toml
├── setup.py # Or just pyproject.toml with setuptools
├── README.md
├── LICENSE
├── dbt/
│ ├── __init__.py
│ └── adapters/
│ ├── __init__.py
│ └── pgtrickle/
│ ├── __init__.py
│ ├── connections.py # Connection manager
│ ├── impl.py # PgTrickleAdapter class
│ ├── relation.py # PgTrickleRelation class
│ ├── column.py # PgTrickleColumn class
│ └── pgtrickle_credentials.py
├── dbt/
│ └── include/
│ └── pgtrickle/
│ ├── dbt_project.yml
│ ├── macros/
│ │ ├── materializations/
│ │ │ └── stream_table.sql
│ │ ├── adapters/
│ │ │ └── pgtrickle_api.sql
│ │ ├── catalog.sql
│ │ └── relations.sql
│ └── profile_template.yml
└── tests/
├── conftest.py
├── unit/
│ ├── test_adapter.py
│ └── test_relation.py
└── functional/
├── test_stream_table_materialization.py
├── test_column_filtering.py
└── test_freshness.py
1.2 Entry point registration
# pyproject.toml
[project]
name = "dbt-pgtrickle"
version = "0.1.0"
description = "dbt adapter for pg_trickle (PostgreSQL streaming tables)"
requires-python = ">=3.9"
dependencies = [
"dbt-core>=1.7,<2.0",
"dbt-postgres>=1.7,<2.0",
]
[project.entry-points."dbt.adapters"]
pgtrickle = "dbt.adapters.pgtrickle"
1.3 Profile type
# dbt/include/pgtrickle/profile_template.yml
fixed:
type: pgtrickle
prompts:
host:
hint: 'Hostname for the PostgreSQL instance'
default: 'localhost'
port:
hint: 'Port number'
default: 5432
user:
hint: 'Database user'
pass:
type: 'password'
hint: 'Database password'
dbname:
hint: 'Database name'
schema:
hint: 'Default schema'
default: 'public'
User profile:
# ~/.dbt/profiles.yml
my_project:
target: dev
outputs:
dev:
type: pgtrickle # ← uses the custom adapter
host: localhost
port: 5432
user: postgres
password: postgres
dbname: mydb
schema: public
threads: 4
Phase 2 — Connection & Credentials
2.1 Credentials class
File: dbt/adapters/pgtrickle/pgtrickle_credentials.py
from dbt.adapters.postgres import PostgresCredentials
from dataclasses import dataclass
@dataclass
class PgTrickleCredentials(PostgresCredentials):
"""Extends PostgresCredentials with pg_trickle-specific options."""
@property
def type(self) -> str:
return "pgtrickle"
No additional credentials fields are needed — pg_trickle uses the same PostgreSQL connection. If future features require stream-table-specific config (e.g., default schedule), add them here.
2.2 Connection manager
File: dbt/adapters/pgtrickle/connections.py
from dbt.adapters.postgres import PostgresConnectionManager
from dbt.adapters.pgtrickle.pgtrickle_credentials import PgTrickleCredentials
class PgTrickleConnectionManager(PostgresConnectionManager):
TYPE = "pgtrickle"
@classmethod
def open(cls, connection):
"""Open a connection and verify pg_trickle extension is available."""
connection = super().open(connection)
# Optionally verify extension is installed
cls._verify_pgtrickle_extension(connection)
return connection
@classmethod
def _verify_pgtrickle_extension(cls, connection):
"""Check that pg_trickle extension exists in the database."""
cursor = connection.handle.cursor()
cursor.execute(
"SELECT 1 FROM pg_extension WHERE extname = 'pg_trickle'"
)
if cursor.fetchone() is None:
raise RuntimeError(
"pg_trickle extension is not installed. "
"Run: CREATE EXTENSION pg_trickle;"
)
Phase 3 — Relation Types & Catalog
3.1 Custom relation type
File: dbt/adapters/pgtrickle/relation.py
from dbt.adapters.postgres.relation import PostgresRelation
from dbt.adapters.contracts.relation import RelationType
from dataclasses import dataclass
class PgTrickleRelationType(RelationType):
StreamTable = "stream_table"
@dataclass(frozen=True, eq=False, repr=False)
class PgTrickleRelation(PostgresRelation):
"""Extends PostgresRelation to recognize stream tables."""
@classmethod
def get_relation_type(cls) -> type:
return PgTrickleRelationType
def is_stream_table(self) -> bool:
return self.type == PgTrickleRelationType.StreamTable
3.2 Catalog integration
Override list_relations_without_caching() to detect stream tables:
# In PgTrickleAdapter (impl.py)
def list_relations_without_caching(self, schema_relation):
"""
List all relations, marking pg_trickle-managed tables as 'stream_table' type.
"""
# Get standard PostgreSQL relations
relations = super().list_relations_without_caching(schema_relation)
# Query pgtrickle catalog for stream table OIDs
st_oids = self._get_stream_table_oids(schema_relation.schema)
# Reclassify matching relations
result = []
for rel in relations:
if rel.type == RelationType.Table and self._oid_of(rel) in st_oids:
result.append(rel.incorporate(type=PgTrickleRelationType.StreamTable))
else:
result.append(rel)
return result
def _get_stream_table_oids(self, schema: str) -> set:
"""Get OIDs of all stream tables in a schema."""
sql = f"""
SELECT pgt_relid
FROM pgtrickle.pgt_stream_tables
WHERE pgt_schema = '{schema}'
"""
_, result = self.execute(sql, fetch=True)
return {row[0] for row in result}
3.3 Catalog macro
File: dbt/include/pgtrickle/macros/catalog.sql
{% macro pgtrickle__get_catalog(information_schema, schemas) -%}
{# Standard postgres catalog query #}
{{ postgres__get_catalog(information_schema, schemas) }}
{%- endmacro %}
The catalog query can be extended to add stream_table as a table_type:
{% macro pgtrickle__get_catalog_relations(information_schema, relations) -%}
{%- call statement('catalog', fetch_result=True) -%}
WITH stream_tables AS (
SELECT pgt_relid::bigint AS relid,
pgt_name,
pgt_schema,
schedule,
refresh_mode,
status
FROM pgtrickle.pgt_stream_tables
)
SELECT
t.table_catalog AS "table_database",
t.table_schema AS "table_schema",
t.table_name AS "table_name",
CASE
WHEN st.relid IS NOT NULL THEN 'stream_table'
ELSE t.table_type
END AS "table_type",
t.table_comment AS "table_comment",
c.column_name AS "column_name",
c.ordinal_position AS "column_index",
c.data_type AS "column_type",
c.column_comment AS "column_comment",
-- pg_trickle metadata
st.schedule AS "pgtrickle_schedule",
st.refresh_mode AS "pgtrickle_refresh_mode",
st.status AS "pgtrickle_status"
FROM information_schema.tables t
JOIN information_schema.columns c
ON t.table_schema = c.table_schema
AND t.table_name = c.table_name
LEFT JOIN stream_tables st
ON t.table_schema = st.pgt_schema
AND t.table_name = st.pgt_name
WHERE (t.table_schema, t.table_name) IN (
{% for relation in relations %}
('{{ relation.schema }}', '{{ relation.identifier }}')
{% if not loop.last %},{% endif %}
{% endfor %}
)
-- Hide __pgt_row_id from catalog
AND c.column_name != '__pgt_row_id'
ORDER BY t.table_schema, t.table_name, c.ordinal_position
{%- endcall -%}
{{ return(load_result('catalog').table) }}
{%- endmacro %}
Phase 4 — Custom Materialization
4.1 Materialization macro
File: dbt/include/pgtrickle/macros/materializations/stream_table.sql
The materialization is similar to the macro-only version (Option A) but leverages adapter-level methods for cleaner integration:
{% materialization stream_table, adapter='pgtrickle' %}
{%- set target_relation = this.incorporate(type='stream_table') -%}
{%- set existing_relation = load_cached_relation(this) -%}
{%- set schedule = config.get('schedule', '1m') -%}
{%- set refresh_mode = config.get('refresh_mode', 'DIFFERENTIAL') -%}
{%- set initialize = config.get('initialize', true) -%}
{%- set st_name = target_relation.identifier -%}
{%- set st_schema = target_relation.schema -%}
{%- set qualified_name = st_schema ~ '.' ~ st_name
if st_schema != 'public'
else st_name -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{{ run_hooks(pre_hooks) }}
{# -- Full refresh: drop and recreate -- #}
{% if full_refresh_mode and existing_relation is not none %}
{% do adapter.pgtrickle_drop_stream_table(qualified_name) %}
{% set existing_relation = none %}
{% endif %}
{%- set defining_query = sql -%}
{% if existing_relation is none %}
{# -- CREATE -- #}
{% do adapter.pgtrickle_create_stream_table(
qualified_name, defining_query, schedule, refresh_mode, initialize
) %}
{% do adapter.cache_new(target_relation) %}
{% elif existing_relation.is_stream_table() %}
{# -- UPDATE: compare query, schedule, mode -- #}
{% set current = adapter.pgtrickle_get_stream_table_info(qualified_name) %}
{% if current.defining_query != defining_query %}
{% do adapter.pgtrickle_drop_stream_table(qualified_name) %}
{% do adapter.pgtrickle_create_stream_table(
qualified_name, defining_query, schedule, refresh_mode, initialize
) %}
{% else %}
{% do adapter.pgtrickle_alter_if_changed(
qualified_name, schedule, refresh_mode, current
) %}
{% endif %}
{% else %}
{# -- Relation exists but is a regular table/view — error -- #}
{{ exceptions.raise_compiler_error(
"Cannot create stream table '" ~ qualified_name ~
"': a " ~ existing_relation.type ~ " with that name already exists."
) }}
{% endif %}
{{ run_hooks(post_hooks) }}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
4.2 Adapter-level methods
These Python methods on PgTrickleAdapter are called from the materialization:
# In PgTrickleAdapter (impl.py)
def pgtrickle_create_stream_table(
self, name: str, query: str, schedule: str,
refresh_mode: str, initialize: bool
):
sql = f"""
SELECT pgtrickle.create_stream_table(
{self._quote_string(name)},
{self._quote_string(query)},
{self._quote_string(schedule)},
{self._quote_string(refresh_mode)},
{initialize}
)
"""
self.execute(sql, auto_begin=True)
self.connections.get_thread_connection().handle.commit()
def pgtrickle_drop_stream_table(self, name: str):
sql = f"SELECT pgtrickle.drop_stream_table({self._quote_string(name)})"
self.execute(sql, auto_begin=True)
self.connections.get_thread_connection().handle.commit()
def pgtrickle_alter_if_changed(
self, name: str, schedule: str, refresh_mode: str, current: dict
):
if current['schedule'] != schedule:
sql = f"""
SELECT pgtrickle.alter_stream_table(
{self._quote_string(name)},
schedule => {self._quote_string(schedule)}
)
"""
self.execute(sql, auto_begin=True)
if current['refresh_mode'] != refresh_mode:
sql = f"""
SELECT pgtrickle.alter_stream_table(
{self._quote_string(name)},
refresh_mode => {self._quote_string(refresh_mode)}
)
"""
self.execute(sql, auto_begin=True)
self.connections.get_thread_connection().handle.commit()
def pgtrickle_refresh_stream_table(self, name: str):
sql = f"SELECT pgtrickle.refresh_stream_table({self._quote_string(name)})"
self.execute(sql, auto_begin=True)
self.connections.get_thread_connection().handle.commit()
def pgtrickle_get_stream_table_info(self, name: str) -> dict | None:
sql = f"""
SELECT pgt_name, defining_query, schedule, refresh_mode, status
FROM pgtrickle.pgt_stream_tables
WHERE pgt_name = {self._quote_string(name)}
"""
_, result = self.execute(sql, fetch=True)
if result and len(result) > 0:
row = result[0]
return {
'pgt_name': row[0],
'defining_query': row[1],
'schedule': row[2],
'refresh_mode': row[3],
'status': row[4],
}
return None
def _quote_string(self, value: str) -> str:
"""Safely quote a string for SQL interpolation."""
escaped = value.replace("'", "''")
return f"'{escaped}'"
Phase 5 — Column Introspection
5.1 Hiding __pgt_row_id
Override get_columns_in_relation() to filter out the internal row ID column:
# In PgTrickleAdapter (impl.py)
def get_columns_in_relation(self, relation):
columns = super().get_columns_in_relation(relation)
if hasattr(relation, 'is_stream_table') and relation.is_stream_table():
columns = [c for c in columns if c.name != '__pgt_row_id']
return columns
This ensures:
- dbt docs generate does not show __pgt_row_id
- SELECT * expansion in dbt does not include the internal column
- Schema tests (e.g., not_null, unique) are not applied to __pgt_row_id
5.2 Column type mapping
Stream tables use standard PostgreSQL types. No custom type mapping is needed beyond
what dbt-postgres provides.
Phase 6 — Source Freshness
6.1 Custom freshness calculation
Override calculate_freshness() for stream table sources:
# In PgTrickleAdapter (impl.py)
def calculate_freshness(
self, source, loaded_at_field, filter_
):
"""
For pg_trickle-managed sources, use the monitoring view
instead of querying loaded_at_field on the table.
"""
# Check if this source is a stream table
st_info = self.pgtrickle_get_stream_table_info(
f"{source.schema}.{source.identifier}"
if source.schema != 'public'
else source.identifier
)
if st_info is not None:
# Use pg_trickle's native staleness tracking
sql = f"""
SELECT
last_refresh_at AS max_loaded_at,
now() AS snapshotted_at,
staleness AS max_loaded_at_time_ago_in_s
FROM pgtrickle.pg_stat_stream_tables
WHERE pgt_name = {self._quote_string(st_info['pgt_name'])}
"""
_, result = self.execute(sql, fetch=True)
if result and len(result) > 0:
return {
'max_loaded_at': result[0][0],
'snapshotted_at': result[0][1],
'age': result[0][2],
}
# Fall back to standard freshness
return super().calculate_freshness(source, loaded_at_field, filter_)
6.2 User experience
# sources.yml
sources:
- name: streaming
schema: public
freshness:
warn_after: {count: 10, period: minute}
error_after: {count: 30, period: minute}
tables:
- name: order_totals
# No loaded_at_field needed — adapter queries pg_trickle monitoring
$ dbt source freshness --select source:streaming.order_totals
Running with dbt=1.7.0
Found 1 source, 1 exposure
Freshness test for source streaming.order_totals:
max_loaded_at: 2026-02-24 12:34:56+00
snapshotted_at: 2026-02-24 12:35:02+00
age: 6.0 seconds
status: pass
Phase 7 — Custom Operations
7.1 Manual refresh operation
-- macros/operations/refresh.sql
{% macro refresh(model_name) %}
{% do adapter.pgtrickle_refresh_stream_table(model_name) %}
{{ log("Refreshed stream table: " ~ model_name, info=true) }}
{% endmacro %}
7.2 Explain operation
-- macros/operations/explain.sql
{% macro explain(model_name) %}
{% set query %}
SELECT property, value FROM pgtrickle.explain_st({{ dbt.string_literal(model_name) }})
{% endset %}
{% set result = run_query(query) %}
{% for row in result.rows %}
{{ log(row['property'] ~ ": " ~ row['value'], info=true) }}
{% endfor %}
{% endmacro %}
Usage:
bash
dbt run-operation explain --args '{"model_name": "order_totals"}'
7.3 CDC health check operation
-- macros/operations/cdc_health.sql
{% macro cdc_health() %}
{% set query %}
SELECT source_table, cdc_mode, slot_name,
lag_bytes, confirmed_lsn, alert
FROM pgtrickle.check_cdc_health()
{% endset %}
{% set result = run_query(query) %}
{% for row in result.rows %}
{{ log(row['source_table'] ~ " | " ~ row['cdc_mode'] ~
" | lag=" ~ (row['lag_bytes'] or 'n/a') ~
" | alert=" ~ (row['alert'] or 'none'), info=true) }}
{% endfor %}
{% endmacro %}
Usage:
bash
dbt run-operation cdc_health
7.4 Refresh history operation
-- macros/operations/refresh_history.sql
{% macro refresh_history(model_name, limit=10) %}
{% set query %}
SELECT start_time, action, status,
rows_inserted, rows_deleted,
EXTRACT(EPOCH FROM (end_time - start_time))::numeric(10,2) AS duration_s,
error_message
FROM pgtrickle.get_refresh_history(
{{ dbt.string_literal(model_name) }}, {{ limit }}
)
ORDER BY start_time DESC
{% endset %}
{% set result = run_query(query) %}
{% for row in result.rows %}
{{ log(row['start_time'] ~ " | " ~ row['action'] ~ " | " ~
row['status'] ~ " | " ~ row['duration_s'] ~ "s | +" ~
row['rows_inserted'] ~ "/-" ~ row['rows_deleted'] ~
(" | ERR: " ~ row['error_message'] if row['error_message'] else ""),
info=true) }}
{% endfor %}
{% endmacro %}
Phase 8 — Monitoring Integration
8.1 Exposures for stream table metadata
Users can define dbt exposures that reference stream tables for documentation:
# models/exposures.yml
exposures:
- name: real_time_dashboard
type: dashboard
depends_on:
- ref('order_totals')
description: "Customer order totals, refreshed every 5 minutes via pg_trickle"
meta:
pgtrickle_schedule: '5m'
pgtrickle_refresh_mode: DIFFERENTIAL
8.2 Custom dbt tests for stream table health
-- tests/generic/test_pgtrickle_not_stale.sql
{% test pgtrickle_not_stale(model) %}
SELECT pgt_name
FROM pgtrickle.pg_stat_stream_tables
WHERE pgt_name = '{{ model.identifier }}'
AND stale = true
{% endtest %}
-- tests/generic/test_pgtrickle_no_errors.sql
{% test pgtrickle_no_errors(model) %}
SELECT pgt_name
FROM pgtrickle.pg_stat_stream_tables
WHERE pgt_name = '{{ model.identifier }}'
AND consecutive_errors > 0
{% endtest %}
Usage in schema YAML:
models:
- name: order_totals
tests:
- pgtrickle_not_stale
- pgtrickle_no_errors
Phase 9 — Testing
9.1 Unit tests
Test adapter methods in isolation with mocked database connections:
# tests/unit/test_adapter.py
class TestPgTrickleAdapter:
def test_get_columns_filters_pgt_row_id(self):
"""__pgt_row_id should be excluded from stream table columns."""
...
def test_list_relations_classifies_stream_tables(self):
"""Stream tables should have type 'stream_table'."""
...
def test_pgtrickle_get_stream_table_info_returns_none(self):
"""Returns None for non-existent stream tables."""
...
9.2 Functional tests
Use dbt’s standard functional test framework with a PostgreSQL 18 + pg_trickle instance:
# tests/functional/test_stream_table_materialization.py
class TestStreamTableMaterialization:
"""End-to-end tests for the stream_table materialization."""
@pytest.fixture(scope="class")
def models(self):
return {
"order_totals.sql": """
{{
config(
materialized='stream_table',
schedule='30s',
refresh_mode='DIFFERENTIAL'
)
}}
SELECT customer_id, SUM(amount) AS total
FROM {{ source('raw', 'orders') }}
GROUP BY customer_id
""",
"schema.yml": """
sources:
- name: raw
tables:
- name: orders
""",
}
def test_first_run_creates_stream_table(self, project):
"""First dbt run should create the stream table."""
results = run_dbt(["run"])
assert len(results) == 1
assert results[0].status == "success"
# Verify ST exists in catalog
result = project.run_sql(
"SELECT 1 FROM pgtrickle.pgt_stream_tables "
"WHERE pgt_name = 'order_totals'",
fetch="one",
)
assert result is not None
def test_second_run_is_noop(self, project):
"""Second run with same query should be a no-op."""
run_dbt(["run"])
results = run_dbt(["run"])
assert len(results) == 1
# Verify only one ST entry (not duplicated)
def test_full_refresh_recreates(self, project):
"""--full-refresh should drop and recreate."""
run_dbt(["run"])
results = run_dbt(["run", "--full-refresh"])
assert len(results) == 1
assert results[0].status == "success"
def test_query_change_triggers_recreate(self, project):
"""Changing the model SQL should drop and recreate."""
run_dbt(["run"])
# Modify model query (add a column)
write_file(
"""
{{ config(materialized='stream_table', schedule='30s') }}
SELECT customer_id, SUM(amount) AS total, COUNT(*) AS cnt
FROM {{ source('raw', 'orders') }}
GROUP BY customer_id
""",
project.project_root,
"models",
"order_totals.sql",
)
results = run_dbt(["run"])
assert results[0].status == "success"
def test_columns_exclude_pgt_row_id(self, project):
"""__pgt_row_id should not appear in column introspection."""
run_dbt(["run"])
columns = project.run_sql(
"SELECT column_name FROM information_schema.columns "
"WHERE table_name = 'order_totals'",
fetch="all",
)
col_names = [c[0] for c in columns]
# Adapter should filter this out
# (Note: information_schema still shows it; adapter filters at Python level)
9.3 Test infrastructure
Tests use the project’s existing tests/Dockerfile.e2e to get a PostgreSQL 18 instance
with pg_trickle. The CI pipeline builds the image and exposes it for pytest.
Phase 10 — Packaging & Distribution
10.1 PyPI publication
# Build
python -m build
# Publish
twine upload dist/*
10.2 Installation
pip install dbt-pgtrickle
10.3 Versioning
Follow semantic versioning aligned with dbt Core major versions:
- dbt-pgtrickle 0.1.x → dbt Core 1.7+
- dbt-pgtrickle 1.0.x → first stable release
Comparison with Option A
| Aspect | Option A (Macro Package) ✅ Implemented | Option B (Full Adapter) |
|---|---|---|
| Effort | ~15 hours (done) | ~54 hours |
| Dependencies | dbt-postgres only | Custom Python package |
| Installation | dbt deps (git/Hub) |
pip install dbt-pgtrickle |
__pgt_row_id hidden |
No (visible in docs) | Yes (filtered in adapter) |
| Relation type | Shows as table |
Shows as stream_table |
| Source freshness | pgtrickle_check_freshness run-operation |
Native dbt source freshness |
| Custom operations | refresh, drop, CDC health, freshness | + explain, refresh history |
| Catalog integration | Standard postgres | Enhanced with ST metadata |
| Profile type | postgres |
pgtrickle (verifies extension) |
| Testing | dbt project tests + CI matrix (1.6–1.9) | Python unit + functional tests |
| Maintenance | Low (Jinja only) | Higher (Python + Jinja) |
| CI | ✅ dbt-integration job in ci.yml | Extends existing CI |
Current status: Option A is implemented and running in CI. Migrate to Option B when
stream tables are a central part of the data platform and users need first-class IDE
support, hidden __pgt_row_id columns in docs, and native dbt source freshness.
File Layout
Migration note: The
dbt-pgtrickle/directory currently contains the Option A macro package. Implementing this adapter plan would restructure the directory into a Python package, moving existing macros underdbt/include/pgtrickle/macros/and adding the Python adapter code. The integration tests would move intotests/functional/.
dbt-pgtrickle/
├── pyproject.toml # Package metadata, entry points
├── setup.py
├── README.md
├── LICENSE
├── CHANGELOG.md
├── dbt/
│ ├── __init__.py
│ ├── adapters/
│ │ ├── __init__.py
│ │ └── pgtrickle/
│ │ ├── __init__.py # Plugin registration
│ │ ├── connections.py # ~40 lines
│ │ ├── impl.py # ~250 lines (core adapter)
│ │ ├── relation.py # ~30 lines
│ │ ├── column.py # ~15 lines
│ │ └── pgtrickle_credentials.py # ~15 lines
│ └── include/
│ └── pgtrickle/
│ ├── dbt_project.yml
│ ├── profile_template.yml
│ └── macros/
│ ├── materializations/
│ │ └── stream_table.sql # ~80 lines
│ ├── adapters/
│ │ └── pgtrickle_api.sql # ~50 lines
│ ├── catalog.sql # ~60 lines
│ ├── relations.sql # ~20 lines
│ ├── operations/
│ │ ├── refresh.sql # ~5 lines
│ │ ├── explain.sql # ~15 lines
│ │ ├── cdc_health.sql # ~15 lines
│ │ └── refresh_history.sql # ~20 lines
│ └── tests/
│ ├── pgtrickle_not_stale.sql # ~8 lines
│ └── pgtrickle_no_errors.sql # ~8 lines
└── tests/
├── conftest.py
├── unit/
│ ├── test_adapter.py # ~100 lines
│ └── test_relation.py # ~50 lines
└── functional/
├── test_stream_table_materialization.py # ~150 lines
├── test_column_filtering.py # ~50 lines
└── test_freshness.py # ~80 lines
Estimated total code: ~350 lines Python + ~280 lines Jinja SQL + ~430 lines tests.
Appendix: Adapter API Surface
Python methods added to PgTrickleAdapter
| Method | Purpose |
|---|---|
pgtrickle_create_stream_table(name, query, schedule, mode, init) |
Create a stream table |
pgtrickle_drop_stream_table(name) |
Drop a stream table |
pgtrickle_alter_if_changed(name, schedule, mode, current) |
Alter schedule/mode if changed |
pgtrickle_refresh_stream_table(name) |
Trigger manual refresh |
pgtrickle_get_stream_table_info(name) → dict |
Read ST metadata from catalog |
get_columns_in_relation(relation) |
Override: filters __pgt_row_id |
list_relations_without_caching(schema) |
Override: classifies stream tables |
calculate_freshness(source, ...) |
Override: uses pg_trickle monitoring |
Jinja macros
| Macro | Purpose |
|---|---|
materialization stream_table |
Core materialization |
pgtrickle__get_catalog_relations |
Catalog with ST metadata |
refresh(model_name) |
Run-operation: manual refresh |
explain(model_name) |
Run-operation: explain DVM plan |
cdc_health() |
Run-operation: CDC health check |
refresh_history(model_name, limit) |
Run-operation: refresh audit log |
test_pgtrickle_not_stale |
Generic test: staleness |
test_pgtrickle_no_errors |
Generic test: error streak |
Effort Estimate
| Phase | Effort |
|---|---|
| Phase 1 — Scaffolding | 4 hours |
| Phase 2 — Connection & Credentials | 2 hours |
| Phase 3 — Relation Types & Catalog | 8 hours |
| Phase 4 — Custom Materialization | 8 hours |
| Phase 5 — Column Introspection | 4 hours |
| Phase 6 — Source Freshness | 4 hours |
| Phase 7 — Custom Operations | 4 hours |
| Phase 8 — Monitoring Integration | 4 hours |
| Phase 9 — Testing | 12 hours |
| Phase 10 — Packaging & Distribution | 4 hours |
| Total | ~54 hours |