Extensions
- ulak 0.0.2
- Transactional Outbox Pattern for PostgreSQL
Documentation
- CONTRIBUTING
- Contributing to ulak
- LICENSE
- Apache License 2.0
- CHANGELOG
- Changelog
- version
- version
README
Contents
ulak
Transactional Outbox Pattern for PostgreSQL — messages committed atomically with your business transactions, delivered reliably via background workers.
ulak writes messages to a PostgreSQL table inside your transaction. Background workers pick them up and deliver asynchronously with retries, circuit breaking, and dead letter queues. Your application gets exactly-once semantics for writes and at-least-once delivery.
Features
- 6 Protocols — HTTP (built-in), Kafka, MQTT, Redis Streams, AMQP, NATS
- Multi-Worker — 1–32 parallel workers with
FOR UPDATE SKIP LOCKEDand modulo partitioning - Reliability — Circuit breaker, exponential backoff retry, DLQ with redrive
- Security — RBAC, SSRF protection, TLS/mTLS, OAuth2, AWS SigV4, webhook HMAC signing
- Pub/Sub — Event types with JSONB containment filters and multi-endpoint fan-out
- Message Control — Priority, ordering keys, idempotency, TTL, scheduled delivery
- CloudEvents — Binary and structured mode support
- Operational — Backpressure, monthly archive partitions, health checks, response capture
Quick Start
# Start PostgreSQL + all protocol services
git clone https://github.com/zeybek/ulak.git
cd ulak
docker compose up -d
# Build with all protocols and install
docker exec ulak-postgres-1 bash -c \
"cd /src/ulak && make clean && make ENABLE_KAFKA=1 ENABLE_MQTT=1 ENABLE_REDIS=1 ENABLE_AMQP=1 ENABLE_NATS=1 && make install"
# Configure and restart
docker exec ulak-postgres-1 psql -U postgres -c \
"ALTER SYSTEM SET shared_preload_libraries = 'ulak';
ALTER SYSTEM SET ulak.database = 'ulak_test';"
docker restart ulak-postgres-1
# Create extension
docker exec ulak-postgres-1 psql -U postgres -d ulak_test -c \
"CREATE EXTENSION ulak;"
Send your first message:
-- Create an HTTP endpoint
SELECT ulak.create_endpoint('my-webhook', 'http',
'{"url": "https://httpbin.org/post", "method": "POST"}'::jsonb);
-- Send a message (inside your transaction)
BEGIN;
INSERT INTO orders (id, total) VALUES (1, 99.99);
SELECT ulak.send('my-webhook', '{"order_id": 1, "total": 99.99}'::jsonb);
COMMIT;
-- Message is now queued and will be delivered by a background worker
Other protocols work the same way:
-- Kafka
SELECT ulak.create_endpoint('events', 'kafka',
'{"broker": "kafka:9092", "topic": "order-events"}'::jsonb);
-- Redis Streams
SELECT ulak.create_endpoint('stream', 'redis',
'{"host": "redis", "stream_key": "my-events"}'::jsonb);
-- MQTT
SELECT ulak.create_endpoint('sensor', 'mqtt',
'{"broker": "mosquitto", "topic": "sensors/temp", "qos": 1}'::jsonb);
-- AMQP / RabbitMQ
SELECT ulak.create_endpoint('queue', 'amqp',
'{"host": "rabbitmq", "exchange": "", "routing_key": "my-queue",
"username": "guest", "password": "guest"}'::jsonb);
-- NATS
SELECT ulak.create_endpoint('bus', 'nats',
'{"url": "nats://nats:4222", "subject": "orders.created"}'::jsonb);
Installation
Prerequisites
| Dependency | Required | Build Flag |
|---|---|---|
| PostgreSQL 14–18 | Yes | — |
| libcurl | Yes | — |
| librdkafka | Optional | ENABLE_KAFKA=1 |
| libmosquitto | Optional | ENABLE_MQTT=1 |
| hiredis | Optional | ENABLE_REDIS=1 |
| librabbitmq | Optional | ENABLE_AMQP=1 |
| libnats / cnats | Optional | ENABLE_NATS=1 |
Build from Source
# HTTP only (default)
make && make install
# With all protocols
make ENABLE_KAFKA=1 ENABLE_MQTT=1 ENABLE_REDIS=1 ENABLE_AMQP=1 ENABLE_NATS=1 && make install
Add to postgresql.conf and restart:
shared_preload_libraries = 'ulak'
CREATE EXTENSION ulak;
Docker
The Dockerfile accepts a PG_MAJOR build argument (default: 18):
# Default (PostgreSQL 18)
docker compose up -d
# Specific version
PG_MAJOR=15 docker compose up -d
The compose file includes PostgreSQL, Kafka, Redis, Mosquitto (MQTT), RabbitMQ (AMQP), and NATS.
Usage
Sending Messages
-- Simple send
SELECT ulak.send('my-webhook', '{"event": "order.created"}'::jsonb);
-- With options (priority, idempotency, scheduling)
SELECT ulak.send_with_options(
'my-webhook',
'{"event": "order.created"}'::jsonb,
5, -- priority (0-10, higher = first)
NOW() + INTERVAL '10 minutes', -- scheduled delivery
'order-123-created', -- idempotency key
'550e8400-e29b-41d4-a716-446655440000'::uuid, -- correlation ID (UUID)
NOW() + INTERVAL '1 hour', -- TTL / expires at
'order-123' -- ordering key (FIFO per key)
);
-- Batch send
SELECT ulak.send_batch('my-webhook', ARRAY[
'{"id": 1}'::jsonb,
'{"id": 2}'::jsonb,
'{"id": 3}'::jsonb
]);
Pub/Sub
-- Create event types and subscribe endpoints
SELECT ulak.create_event_type('order.created', 'New order placed');
SELECT ulak.subscribe('order.created', 'my-webhook');
SELECT ulak.subscribe('order.created', 'events'); -- fan-out
-- Publish to all subscribers
SELECT ulak.publish('order.created', '{"order_id": 123}'::jsonb);
Monitoring
SELECT * FROM ulak.health_check();
SELECT * FROM ulak.get_worker_status();
SELECT * FROM ulak.get_endpoint_health();
SELECT * FROM ulak.dlq_summary();
DLQ Management
-- Redrive failed messages
SELECT ulak.redrive_message(42); -- single message
SELECT ulak.redrive_endpoint('my-webhook'); -- all for endpoint
SELECT ulak.redrive_all(); -- everything
-- Replay from archive
SELECT ulak.replay_message(100);
SELECT ulak.replay_range(
1,
date_trunc('month', now()) - interval '1 month',
date_trunc('month', now())
);
Configuration
All parameters use the ulak. prefix. Key settings:
| Parameter | Default | Description |
|---|---|---|
workers |
4 | Background workers (1–32) |
poll_interval |
500ms | Queue polling frequency |
batch_size |
200 | Messages per batch cycle |
max_queue_size |
1,000,000 | Backpressure threshold |
circuit_breaker_threshold |
10 | Failures before circuit opens |
circuit_breaker_cooldown |
30s | Wait before half-open probe |
http_timeout |
10s | HTTP request timeout |
dlq_retention_days |
30 | DLQ message retention |
See the Configuration Reference for all 57 parameters with types, ranges, and restart/reload semantics.
Architecture
┌──────────────────────────────────────────────────────────┐
│ Application Transaction │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ BEGIN; │ │
│ │ INSERT INTO orders ... │ │
│ │ SELECT ulak.send('webhook', '{...}'); │ │
│ │ COMMIT; ▲ │ │
│ └────────────────────────────│──────────────────────────┘ │
│ │ SPI INSERT (atomic) │
│ ┌────────────────────────────▼──────────────────────────┐ │
│ │ ulak.queue (pending messages) │ │
│ └───────┬───────────┬───────────┬───────────────────────┘ │
│ │ │ │ │
│ Worker 0 Worker 1 Worker N │
│ (id%N=0) (id%N=1) (id%N=N) │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Dispatcher Factory │ │
│ │ HTTP │ Kafka │ MQTT │ Redis │ AMQP │ NATS │ │
│ └──┬──────┬──────┬──────┬──────┬──────┬───────────────┘ │
└─────│──────│──────│──────│──────│───────────────────────────┘
▼ ▼ ▼ ▼ ▼ ▼
Endpoints (HTTP APIs, Kafka topics, MQTT brokers, message buses, ...)
See the Architecture wiki page for the full technical deep-dive.
Testing
# Regression and isolation tests
docker exec ulak-postgres-1 bash -c \
"cd /src/ulak && make installcheck"
# Code quality
make tools-install # install clang-format, cppcheck, lefthook locally
make tools-versions # print local tool versions
make format # clang-format
make lint # cppcheck
make hooks-install # install local git hooks (auto-installs lefthook via Homebrew or Go when available)
make hooks-run # run local pre-commit checks manually
With lefthook installed, the pre-commit hook auto-formats staged C/H files under src/ and include/, re-stages them, and then runs the remaining checks.
CI uses clang-format-22 and clang-tidy-22. For the closest local parity, install tools with make tools-install and verify with make tools-versions.
Conventional commit headers are enforced locally through the commit-msg hook:
feat(scope): subject
fix: subject
chore(ci)!: subject
Documentation
Full documentation is available in the Wiki.
| Category | Pages |
|---|---|
| Getting Started | Quick Start |
| Architecture | System Architecture |
| Protocols | HTTP · Kafka · MQTT · Redis · AMQP · NATS |
| Features | Pub/Sub Events · Message Features |
| Operations | Reliability · Monitoring · Security |
| Reference | Configuration (57 GUCs) · SQL API (40+ Functions) |
| Development | Building & Testing · Contributing · Changelog |
License
ulak is licensed under Apache License 2.0.
You may use, modify, and distribute the project in commercial and non-commercial settings under Apache 2.0.