Extensions
- pgmb 1.0.0
Documentation
- CHANGELOG
- Changelog
README
Contents
pgmb
A lightweight message broker system built inside PostgreSQL. pgmb enables asynchronous message processing with HTTP-based worker dispatch, automatic retries, and dead letter queue support.
Features
- Worker Management: Register HTTP endpoints as workers with configurable rate limits (RPS)
- Queue System: Create queues with pattern-based routing keys (supports wildcards)
- Message Routing: Automatic message routing based on routing keys matching binding patterns
- HTTP Dispatch: Automatic message delivery to worker endpoints via HTTP POST
- Retry Logic: Configurable retry attempts with exponential backoff support
- Dead Letter Queue: Failed messages after max retries are moved to DLQ
- Scheduled Dispatch: Uses
pg_cronfor automatic message dispatching - Delayed Messages: Support for delayed message delivery
Requirements
- PostgreSQL 12 or higher
pg_cronextensionhttpextension (for HTTP requests)
Installation
Using PGXN
pgxn install pgmb
Manual Installation
Clone the repository:
bash git clone https://github.com/fraruiz/pgmb.git cd pgmbBuild and install:
bash make sudo make installEnable the extension in your database:
sql CREATE EXTENSION pg_cron; CREATE EXTENSION http; CREATE EXTENSION pgmb;
Quick Start
1. Register a Worker
SELECT pgmb.worker(
'order_processor', -- worker name
'http://localhost:8080/process', -- endpoint URL
100 -- requests per second limit
);
-- Returns: worker UUID
2. Create a Queue
SELECT pgmb.create(
'order_queue', -- queue name
'order.*', -- binding key pattern (supports *)
'550e8400-e29b-41d4-a716-446655440000', -- worker UUID
5 -- max retries
);
-- Returns: queue UUID
3. Send Messages
-- Simple message
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123, "amount": 45.67}'::jsonb
);
-- With headers
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123, "amount": 45.67}'::jsonb,
'{"source": "web", "priority": "high"}'::jsonb
);
-- Delayed message (10 minutes)
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123, "amount": 45.67}'::jsonb,
'{"source": "web"}'::jsonb,
now() + interval '10 minutes'
);
-- Delayed message (600 seconds)
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123, "amount": 45.67}'::jsonb,
'{"source": "web"}'::jsonb,
600
);
API Reference
pgmb.worker(name, endpoint, rps)
Registers a new worker in the message broker.
Parameters:
- name (VARCHAR): The name of the worker
- endpoint (VARCHAR): The HTTP endpoint URL where messages will be sent
- rps (INT): Requests per second limit for rate limiting
Returns: UUID of the created worker
Example:
sql
SELECT pgmb.worker('email_sender', 'http://api.example.com/send-email', 50);
pgmb.create(name, binding_key, max_retries, worker_id)
Creates a new queue with a binding key pattern.
Parameters:
- name (VARCHAR): Unique name for the queue
- binding_key (VARCHAR): Pattern to match routing keys (supports * wildcard)
- max_retries (INT): Maximum number of retry attempts before moving to DLQ
- worker_id (UUID): The worker UUID that will process messages from this queue
Returns: UUID of the created queue
Example:
sql
SELECT pgmb.create('order_queue', 'order.*', 5, '550e8400-e29b-41d4-a716-446655440000');
pgmb.send(id, routing_key, body)
Sends a message to the broker.
Parameters:
- id (UUID): Unique identifier for the message
- routing_key (VARCHAR): Routing key for message routing
- body (JSONB): Message payload
Returns: VOID
Example:
sql
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123}'::jsonb
);
pgmb.send(id, routing_key, body, headers)
Sends a message with custom headers.
Parameters:
- id (UUID): Unique identifier for the message
- routing_key (VARCHAR): Routing key for message routing
- body (JSONB): Message payload
- headers (JSONB): Optional message headers
Returns: VOID
pgmb.send(id, routing_key, body, headers, delay)
Sends a delayed message. Delay can be a TIMESTAMP or INTEGER (seconds).
Parameters:
- id (UUID): Unique identifier for the message
- routing_key (VARCHAR): Routing key for message routing
- body (JSONB): Message payload
- headers (JSONB): Optional message headers
- delay (TIMESTAMPTZ or INT): When to enqueue the message
Returns: VOID
How It Works
Message Publishing: When you call
pgmb.send(), a message is inserted intopgmb.messagestable.Automatic Routing: A trigger (
enqueue_message_trigger) automatically routes messages to matching queues based on routing key patterns.Queue Processing: Each queue has its own table (
{queue_name}_queue) that stores message references.Scheduled Dispatch:
pg_cronrunspgmb.dispatch_messages()every second for each queue, which:- Locks messages for processing (using
FOR UPDATE SKIP LOCKED) - Sends HTTP POST requests to worker endpoints
- Handles acknowledgments and retries
- Moves failed messages to dead letter queues after max retries
- Locks messages for processing (using
Dead Letter Queue: Failed messages are moved to
{queue_name}_dead_letter_queueafter exceeding max retries.
Database Schema
Tables
pgmb.workers: Stores worker registrationspgmb.queues: Stores queue definitions and bindingspgmb.messages: Stores all messagespgmb.{queue_name}_queue: Per-queue message referencespgmb.{queue_name}_dead_letter_queue: Per-queue failed messages
Monitoring
Check Worker Status
SELECT * FROM pgmb.workers;
Check Queue Status
SELECT * FROM pgmb.queues;
Check Pending Messages
SELECT COUNT(*) FROM pgmb.order_queue WHERE acknoledge = false;
Check Dead Letter Queue
SELECT * FROM pgmb.order_dead_letter_queue;
Worker Endpoint Requirements
Your worker endpoints should:
- Accept HTTP POST requests
- Accept JSON body
- Return HTTP status codes:
2xx: Success (message will be acknowledged)4xx/5xx: Failure (message will be retried)
Example Worker Endpoint (Node.js):
app.post('/process', async (req, res) => {
try {
await processMessage(req.body);
res.status(200).json({ success: true });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
License
PostgreSQL License
Author
Francisco Ruiz - franciscoruizlezcano@gmail.com
Repository
https://github.com/fraruiz/pgmb