ulak

CI PostgreSQL 14-18 License: Apache 2.0

Version

Transactional Outbox Pattern for PostgreSQL — messages committed atomically with your business transactions, delivered reliably via background workers.

ulak writes messages to a PostgreSQL table inside your transaction. Background workers pick them up and deliver asynchronously with retries, circuit breaking, and dead letter queues. Your application gets exactly-once semantics for writes and at-least-once delivery.

Features

Quick Start

# Start PostgreSQL + all protocol services
git clone https://github.com/zeybek/ulak.git
cd ulak
docker compose up -d

# Build with all protocols and install
docker exec ulak-postgres-1 bash -c \
  "cd /src/ulak && make clean && make ENABLE_KAFKA=1 ENABLE_MQTT=1 ENABLE_REDIS=1 ENABLE_AMQP=1 ENABLE_NATS=1 && make install"

# Configure and restart
docker exec ulak-postgres-1 psql -U postgres -c \
  "ALTER SYSTEM SET shared_preload_libraries = 'ulak';
   ALTER SYSTEM SET ulak.database = 'ulak_test';"
docker restart ulak-postgres-1

# Create extension
docker exec ulak-postgres-1 psql -U postgres -d ulak_test -c \
  "CREATE EXTENSION ulak;"

Send your first message:

-- Create an HTTP endpoint
SELECT ulak.create_endpoint('my-webhook', 'http',
  '{"url": "https://httpbin.org/post", "method": "POST"}'::jsonb);

-- Send a message (inside your transaction)
BEGIN;
  INSERT INTO orders (id, total) VALUES (1, 99.99);
  SELECT ulak.send('my-webhook', '{"order_id": 1, "total": 99.99}'::jsonb);
COMMIT;
-- Message is now queued and will be delivered by a background worker

Other protocols work the same way:

-- Kafka
SELECT ulak.create_endpoint('events', 'kafka',
  '{"broker": "kafka:9092", "topic": "order-events"}'::jsonb);

-- Redis Streams
SELECT ulak.create_endpoint('stream', 'redis',
  '{"host": "redis", "stream_key": "my-events"}'::jsonb);

-- MQTT
SELECT ulak.create_endpoint('sensor', 'mqtt',
  '{"broker": "mosquitto", "topic": "sensors/temp", "qos": 1}'::jsonb);

-- AMQP / RabbitMQ
SELECT ulak.create_endpoint('queue', 'amqp',
  '{"host": "rabbitmq", "exchange": "", "routing_key": "my-queue",
    "username": "guest", "password": "guest"}'::jsonb);

-- NATS
SELECT ulak.create_endpoint('bus', 'nats',
  '{"url": "nats://nats:4222", "subject": "orders.created"}'::jsonb);

Installation

Prerequisites

Dependency Required Build Flag
PostgreSQL 14–18 Yes
libcurl Yes
librdkafka Optional ENABLE_KAFKA=1
libmosquitto Optional ENABLE_MQTT=1
hiredis Optional ENABLE_REDIS=1
librabbitmq Optional ENABLE_AMQP=1
libnats / cnats Optional ENABLE_NATS=1

Build from Source

# HTTP only (default)
make && make install

# With all protocols
make ENABLE_KAFKA=1 ENABLE_MQTT=1 ENABLE_REDIS=1 ENABLE_AMQP=1 ENABLE_NATS=1 && make install

Add to postgresql.conf and restart:

shared_preload_libraries = 'ulak'
CREATE EXTENSION ulak;

Docker

The Dockerfile accepts a PG_MAJOR build argument (default: 18):

# Default (PostgreSQL 18)
docker compose up -d

# Specific version
PG_MAJOR=15 docker compose up -d

The compose file includes PostgreSQL, Kafka, Redis, Mosquitto (MQTT), RabbitMQ (AMQP), and NATS.

Usage

Sending Messages

-- Simple send
SELECT ulak.send('my-webhook', '{"event": "order.created"}'::jsonb);

-- With options (priority, idempotency, scheduling)
SELECT ulak.send_with_options(
  'my-webhook',
  '{"event": "order.created"}'::jsonb,
  5,                                    -- priority (0-10, higher = first)
  NOW() + INTERVAL '10 minutes',        -- scheduled delivery
  'order-123-created',                  -- idempotency key
  '550e8400-e29b-41d4-a716-446655440000'::uuid, -- correlation ID (UUID)
  NOW() + INTERVAL '1 hour',           -- TTL / expires at
  'order-123'                           -- ordering key (FIFO per key)
);

-- Batch send
SELECT ulak.send_batch('my-webhook', ARRAY[
  '{"id": 1}'::jsonb,
  '{"id": 2}'::jsonb,
  '{"id": 3}'::jsonb
]);

Pub/Sub

-- Create event types and subscribe endpoints
SELECT ulak.create_event_type('order.created', 'New order placed');
SELECT ulak.subscribe('order.created', 'my-webhook');
SELECT ulak.subscribe('order.created', 'events');  -- fan-out

-- Publish to all subscribers
SELECT ulak.publish('order.created', '{"order_id": 123}'::jsonb);

Monitoring

SELECT * FROM ulak.health_check();
SELECT * FROM ulak.get_worker_status();
SELECT * FROM ulak.get_endpoint_health();
SELECT * FROM ulak.dlq_summary();

DLQ Management

-- Redrive failed messages
SELECT ulak.redrive_message(42);           -- single message
SELECT ulak.redrive_endpoint('my-webhook'); -- all for endpoint
SELECT ulak.redrive_all();                  -- everything

-- Replay from archive
SELECT ulak.replay_message(100);
SELECT ulak.replay_range(
  1,
  date_trunc('month', now()) - interval '1 month',
  date_trunc('month', now())
);

Configuration

All parameters use the ulak. prefix. Key settings:

Parameter Default Description
workers 4 Background workers (1–32)
poll_interval 500ms Queue polling frequency
batch_size 200 Messages per batch cycle
max_queue_size 1,000,000 Backpressure threshold
circuit_breaker_threshold 10 Failures before circuit opens
circuit_breaker_cooldown 30s Wait before half-open probe
http_timeout 10s HTTP request timeout
dlq_retention_days 30 DLQ message retention

See the Configuration Reference for all 57 parameters with types, ranges, and restart/reload semantics.

Architecture

┌──────────────────────────────────────────────────────────┐
│  Application Transaction                                  │
│  ┌──────────────────────────────────────────────────────┐ │
│  │ BEGIN;                                                │ │
│  │   INSERT INTO orders ...                              │ │
│  │   SELECT ulak.send('webhook', '{...}');         │ │
│  │ COMMIT;                    ▲                          │ │
│  └────────────────────────────│──────────────────────────┘ │
│                               │ SPI INSERT (atomic)        │
│  ┌────────────────────────────▼──────────────────────────┐ │
│  │ ulak.queue          (pending messages)          │ │
│  └───────┬───────────┬───────────┬───────────────────────┘ │
│          │           │           │                          │
│   Worker 0    Worker 1    Worker N                          │
│   (id%N=0)    (id%N=1)    (id%N=N)                         │
│      │           │           │                              │
│      ▼           ▼           ▼                              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ Dispatcher Factory                                   │   │
│  │  HTTP │ Kafka │ MQTT │ Redis │ AMQP │ NATS           │   │
│  └──┬──────┬──────┬──────┬──────┬──────┬───────────────┘   │
└─────│──────│──────│──────│──────│───────────────────────────┘
      ▼      ▼      ▼      ▼      ▼      ▼
   Endpoints (HTTP APIs, Kafka topics, MQTT brokers, message buses, ...)

See the Architecture wiki page for the full technical deep-dive.

Testing

# Regression and isolation tests
docker exec ulak-postgres-1 bash -c \
  "cd /src/ulak && make installcheck"

# Code quality
make tools-install  # install clang-format, cppcheck, lefthook locally
make tools-versions # print local tool versions
make format    # clang-format
make lint      # cppcheck
make hooks-install  # install local git hooks (auto-installs lefthook via Homebrew or Go when available)
make hooks-run      # run local pre-commit checks manually

With lefthook installed, the pre-commit hook auto-formats staged C/H files under src/ and include/, re-stages them, and then runs the remaining checks.

CI uses clang-format-22 and clang-tidy-22. For the closest local parity, install tools with make tools-install and verify with make tools-versions.

Conventional commit headers are enforced locally through the commit-msg hook:

feat(scope): subject
fix: subject
chore(ci)!: subject

Documentation

Full documentation is available in the Wiki.

Category Pages
Getting Started Quick Start
Architecture System Architecture
Protocols HTTP · Kafka · MQTT · Redis · AMQP · NATS
Features Pub/Sub Events · Message Features
Operations Reliability · Monitoring · Security
Reference Configuration (57 GUCs) · SQL API (40+ Functions)
Development Building & Testing · Contributing · Changelog

License

ulak is licensed under Apache License 2.0.

You may use, modify, and distribute the project in commercial and non-commercial settings under Apache 2.0.