AGENTS.md — Development Guidelines for dbt-pg-trickle

Project Overview

dbt package providing a stream_table custom materialization for the pg-trickle PostgreSQL extension. Pure Jinja SQL macros — no Python adapter code. Works with stock dbt-postgres.

  • Language: Jinja2 SQL (dbt macros)
  • Framework: dbt Core ≥ 1.9, ≤ 1.10, dbt-postgres adapter
  • Target database: PostgreSQL 18 with pg_trickle extension
  • Package name: dbt_pg_trickle
  • License: Apache 2.0

Key docs: plans/PLAN_DBT_MACRO.md · README.md · CHANGELOG.md

Directory Layout

macros/
├── materializations/
│   └── stream_table.sql           # Core custom materialization — entry point for dbt run
├── adapters/
│   ├── create_stream_table.sql    # Wraps pgtrickle.create_stream_table()
│   ├── create_or_replace_stream_table.sql  # Wraps pgtrickle.create_or_replace_stream_table() (≥ 0.6.0)
│   ├── alter_stream_table.sql     # Wraps pgtrickle.alter_stream_table()
│   ├── drop_stream_table.sql      # Wraps pgtrickle.drop_stream_table()
│   └── refresh_stream_table.sql   # Wraps pgtrickle.refresh_stream_table()
├── hooks/
│   └── source_freshness.sql       # Source freshness via pg_stat_stream_tables
├── operations/
│   ├── refresh.sql                # Manual refresh run-operation (single table)
│   ├── refresh_all.sql            # Refresh all stream tables in dependency order
│   ├── drop_all.sql               # Drop all stream tables run-operation
│   └── check_cdc_health.sql       # CDC health check run-operation
└── utils/
    ├── stream_table_exists.sql    # Check catalog for stream table existence
    ├── get_stream_table_info.sql  # Read stream table metadata from catalog
    ├── stream_table_status.sql    # Health status (healthy/stale/erroring/paused)
    └── has_create_or_replace.sql  # Detect pg_trickle ≥ 0.6.0

tests/
└── generic/
    └── stream_table_healthy.sql   # Generic dbt test for stream table health

integration_tests/                 # Standalone dbt project for testing
├── dbt_project.yml
├── profiles.yml                   # Postgres → localhost:5432
├── packages.yml                   # Installs parent via local: ../
├── models/                        # Test stream table models
├── seeds/                         # CSV fixtures
└── tests/                         # Data tests / assertions

Workflow — Always Do This

After any macro change, run the integration test suite:

# The just command handles Docker, Python venv, and dbt installation:
just test-dbt-fast   # fast (reuses existing E2E Docker image)
just test-dbt        # full (rebuilds Docker image from source)

The test runner (integration_tests/scripts/run_dbt_tests.sh) automatically: - Creates a dedicated .venv-dbt virtual environment using Python 3.13 (dbt-core 1.10 requires Python ≤3.13; mashumaro dependency incompatible with 3.14) - Installs dbt-core~=1.10 and dbt-postgres~=1.10 into that venv - Adds the Homebrew-keg PostgreSQL bin/ to PATH so psql is available

To run dbt commands manually after just test-dbt-fast (while the container is still up), activate the venv first: bash source .venv-dbt/bin/activate cd dbt-pgtrickle/integration_tests dbt deps && dbt run && dbt test

After changes to the materialization logic, also test the full-refresh and alter paths:

dbt run --full-refresh    # Test drop/recreate
sleep 5
dbt test

Before committing, verify: 1. All integration tests pass 2. No Jinja compilation errors (dbt compile succeeds) 3. Macro naming is consistent with conventions below

Jinja SQL Conventions

Naming

  • Macros: pgtrickle_<action> prefix (e.g., pgtrickle_create_stream_table)
  • Materialization: stream_table (registered via {% materialization stream_table, adapter='postgres' %})
  • Config keys: snake_case (e.g., stream_table_name, refresh_mode)
  • Files: One macro per file, filename matches macro name (without pgtrickle_ prefix for adapters)

Quoting and Safety

  • Always use dbt.string_literal() to quote user-provided string values passed to SQL
  • Never concatenate raw user input into SQL strings
  • Guard run_query() calls with {% if execute %} to prevent parse-time execution

Whitespace Control

  • Use {%- ... -%} (trim) on variable assignments: {%- set foo = config.get('bar') -%}
  • Use {{ ... }} (no trim) on output expressions that produce SQL
  • Put {# comments #} on separate lines for readability

Error Handling

  • Use exceptions.raise_compiler_error() for fatal errors (missing config, invalid values)
  • Use log("pg-trickle: ...", info=true) for informational messages
  • Check run_query() results before accessing .rows: sql {% set result = run_query(query) %} {% if result and result.rows | length > 0 %} {{ return(result.rows[0]) }} {% else %} {{ return(none) }} {% endif %}
  • Return none (not empty dict) when a stream table is not found

Materialization Structure

The stream_table materialization must handle exactly four cases: 1. Full refresh + exists: Drop, then create 2. Does not exist: Create 3. Exists + query changed: Drop, then create 4. Exists + query unchanged: Alter (schedule/mode only) or no-op

Always call adapter.cache_new() after creating a new relation. Always call run_hooks(pre_hooks) / run_hooks(post_hooks). Always return {'relations': [target_relation]}.

DDL Transaction Safety (CRITICAL)

Never use run_query() for DDL calls to pgtrickle.* functions.

dbt wraps each model’s main statement in an implicit BEGIN … ROLLBACK on the model’s connection. run_query() reuses the same connection, so the DDL is silently rolled back when the main transaction ends — even when the model reports OK.

Instead, use {% call statement(..., auto_begin=False, fetch_result=False) %} with explicit BEGIN; … COMMIT; embedded in the SQL:

{% call statement('pgtrickle_create', auto_begin=False, fetch_result=False) %}
  BEGIN;
  SELECT pgtrickle.create_stream_table(...);
  COMMIT;
{% endcall %}

This guarantees the DDL commits unconditionally regardless of what dbt’s connection state is.

Rule: Every dbt-pgtrickle/macros/adapters/*.sql macro that mutates the database (create, drop, alter, refresh) must use this pattern.

Config Defaults

Key Default Valid Values
schedule '1m' pg-trickle duration: 30s, 1m, 5m, 1h, etc.
refresh_mode 'DIFFERENTIAL' 'DIFFERENTIAL', 'FULL'
initialize true true, false

pg-trickle SQL API

All functions live in the pgtrickle schema. The extension must be loaded via shared_preload_libraries = 'pg_trickle'.

Function Purpose
pgtrickle.create_stream_table(name, query, schedule, refresh_mode, initialize) Create a new stream table
pgtrickle.alter_stream_table(name, schedule, refresh_mode, status) Alter config (cannot change query)
pgtrickle.drop_stream_table(name) Drop stream table and all resources
pgtrickle.refresh_stream_table(name) Synchronous manual refresh

Catalog: pgtrickle.pgt_stream_tables (metadata), pgtrickle.pg_stat_stream_tables (monitoring view)

Testing

Test tiers

Tier What Command
Compile check Jinja syntax, macro resolution dbt compile
Integration tests Full lifecycle against real PG + pg-trickle cd integration_tests && dbt seed && dbt run && dbt test
Full-refresh path Drop/recreate behavior dbt run --full-refresh && dbt test
Operations test Manual refresh, drop-all dbt run-operation refresh --args '{model_name: ...}'

Test infrastructure

  • Integration tests require PostgreSQL 18 with pg-trickle extension
  • Build the Docker image: docker build -t pg-trickle-e2e:latest -f tests/Dockerfile.e2e . (from the pg-trickle repo)
  • Start: docker run -d -e POSTGRES_PASSWORD=postgres -p 5432:5432 pg-trickle-e2e:latest
  • Create extension: psql -U postgres -c "CREATE EXTENSION pg_trickle;"

Test naming

  • Data tests: assert_<what_is_tested>.sql (e.g., assert_totals_correct.sql)
  • Schema tests: defined in schema.yml alongside model definitions

Do’s and Don'ts

Do

  • Prefix all macros with pgtrickle_ (except the materialization itself)
  • Use dbt.string_literal() for all string parameters to pg-trickle functions
  • Guard all run_query() calls with {% if execute %}
  • Log actions with {{ log("pg-trickle: ...", info=true) }}
  • Handle the “stream table not found” case gracefully (return none)
  • Test both create and full-refresh paths
  • Keep macros focused — one pg-trickle function per adapter macro

Don’t

  • Don’t use adapter.dispatch() — this package is PostgreSQL-only
  • Don’t call pg-trickle functions outside of macro wrappers
  • Don’t use run_query() without {% if execute %} guard
  • Don’t modify the __pgt_row_id column — it’s managed by pg-trickle
  • Don’t assume ALTER can change the defining query — it can’t; must drop/recreate
  • Don’t use println or print — use log() for dbt-compatible output
  • Don’t add Python code — this is a macro-only package (Option A)

Commit Messages

Follow conventional commits: feat: add stream_table materialization fix: handle schema-qualified names in existence check test: add full-refresh integration test docs: update config reference in README chore: update dbt version constraint

Code Review Checklist

  • [ ] Macro names follow pgtrickle_ convention
  • [ ] All string params use dbt.string_literal()
  • [ ] All run_query() calls guarded by {% if execute %}
  • [ ] Materialization handles all four lifecycle cases
  • [ ] Integration tests pass (create, test, full-refresh, test)
  • [ ] Log messages are informative and prefixed with pg-trickle:
  • [ ] No raw SQL concatenation of user input
  • [ ] CHANGELOG.md updated