pgmb 1.0.0

This Release
pgmb 1.0.0
Date
Status
Stable
Abstract
A lightweight message broker inside PostgreSQL
Released By
fraruiz
License
PostgreSQL
Resources
Special Files

Extensions

pgmb 1.0.0

Documentation

CHANGELOG
Changelog

README

pgmb

A lightweight message broker system built inside PostgreSQL. pgmb enables asynchronous message processing with HTTP-based worker dispatch, automatic retries, and dead letter queue support.

Features

  • Worker Management: Register HTTP endpoints as workers with configurable rate limits (RPS)
  • Queue System: Create queues with pattern-based routing keys (supports wildcards)
  • Message Routing: Automatic message routing based on routing keys matching binding patterns
  • HTTP Dispatch: Automatic message delivery to worker endpoints via HTTP POST
  • Retry Logic: Configurable retry attempts with exponential backoff support
  • Dead Letter Queue: Failed messages after max retries are moved to DLQ
  • Scheduled Dispatch: Uses pg_cron for automatic message dispatching
  • Delayed Messages: Support for delayed message delivery

Requirements

  • PostgreSQL 12 or higher
  • pg_cron extension
  • http extension (for HTTP requests)

Installation

Using PGXN

pgxn install pgmb

Manual Installation

  1. Clone the repository: bash git clone https://github.com/fraruiz/pgmb.git cd pgmb

  2. Build and install: bash make sudo make install

  3. Enable the extension in your database: sql CREATE EXTENSION pg_cron; CREATE EXTENSION http; CREATE EXTENSION pgmb;

Quick Start

1. Register a Worker

SELECT pgmb.worker(
    'order_processor',                    -- worker name
    'http://localhost:8080/process',      -- endpoint URL
    100                                   -- requests per second limit
);
-- Returns: worker UUID

2. Create a Queue

SELECT pgmb.create(
    'order_queue',                        -- queue name
    'order.*',                            -- binding key pattern (supports *)
    '550e8400-e29b-41d4-a716-446655440000', -- worker UUID
    5                                     -- max retries
);
-- Returns: queue UUID

3. Send Messages

-- Simple message
SELECT pgmb.send(
    gen_random_uuid(),
    'order.created',
    '{"order_id": 123, "amount": 45.67}'::jsonb
);

-- With headers
SELECT pgmb.send(
    gen_random_uuid(),
    'order.created',
    '{"order_id": 123, "amount": 45.67}'::jsonb,
    '{"source": "web", "priority": "high"}'::jsonb
);

-- Delayed message (10 minutes)
SELECT pgmb.send(
    gen_random_uuid(),
    'order.created',
    '{"order_id": 123, "amount": 45.67}'::jsonb,
    '{"source": "web"}'::jsonb,
    now() + interval '10 minutes'
);

-- Delayed message (600 seconds)
SELECT pgmb.send(
    gen_random_uuid(),
    'order.created',
    '{"order_id": 123, "amount": 45.67}'::jsonb,
    '{"source": "web"}'::jsonb,
    600
);

API Reference

pgmb.worker(name, endpoint, rps)

Registers a new worker in the message broker.

Parameters: - name (VARCHAR): The name of the worker - endpoint (VARCHAR): The HTTP endpoint URL where messages will be sent - rps (INT): Requests per second limit for rate limiting

Returns: UUID of the created worker

Example: sql SELECT pgmb.worker('email_sender', 'http://api.example.com/send-email', 50);

pgmb.create(name, binding_key, max_retries, worker_id)

Creates a new queue with a binding key pattern.

Parameters: - name (VARCHAR): Unique name for the queue - binding_key (VARCHAR): Pattern to match routing keys (supports * wildcard) - max_retries (INT): Maximum number of retry attempts before moving to DLQ - worker_id (UUID): The worker UUID that will process messages from this queue

Returns: UUID of the created queue

Example: sql SELECT pgmb.create('order_queue', 'order.*', 5, '550e8400-e29b-41d4-a716-446655440000');

pgmb.send(id, routing_key, body)

Sends a message to the broker.

Parameters: - id (UUID): Unique identifier for the message - routing_key (VARCHAR): Routing key for message routing - body (JSONB): Message payload

Returns: VOID

Example: sql SELECT pgmb.send( gen_random_uuid(), 'order.created', '{"order_id": 123}'::jsonb );

pgmb.send(id, routing_key, body, headers)

Sends a message with custom headers.

Parameters: - id (UUID): Unique identifier for the message - routing_key (VARCHAR): Routing key for message routing - body (JSONB): Message payload - headers (JSONB): Optional message headers

Returns: VOID

pgmb.send(id, routing_key, body, headers, delay)

Sends a delayed message. Delay can be a TIMESTAMP or INTEGER (seconds).

Parameters: - id (UUID): Unique identifier for the message - routing_key (VARCHAR): Routing key for message routing - body (JSONB): Message payload - headers (JSONB): Optional message headers - delay (TIMESTAMPTZ or INT): When to enqueue the message

Returns: VOID

How It Works

  1. Message Publishing: When you call pgmb.send(), a message is inserted into pgmb.messages table.

  2. Automatic Routing: A trigger (enqueue_message_trigger) automatically routes messages to matching queues based on routing key patterns.

  3. Queue Processing: Each queue has its own table ({queue_name}_queue) that stores message references.

  4. Scheduled Dispatch: pg_cron runs pgmb.dispatch_messages() every second for each queue, which:

    • Locks messages for processing (using FOR UPDATE SKIP LOCKED)
    • Sends HTTP POST requests to worker endpoints
    • Handles acknowledgments and retries
    • Moves failed messages to dead letter queues after max retries
  5. Dead Letter Queue: Failed messages are moved to {queue_name}_dead_letter_queue after exceeding max retries.

Database Schema

Tables

  • pgmb.workers: Stores worker registrations
  • pgmb.queues: Stores queue definitions and bindings
  • pgmb.messages: Stores all messages
  • pgmb.{queue_name}_queue: Per-queue message references
  • pgmb.{queue_name}_dead_letter_queue: Per-queue failed messages

Monitoring

Check Worker Status

SELECT * FROM pgmb.workers;

Check Queue Status

SELECT * FROM pgmb.queues;

Check Pending Messages

SELECT COUNT(*) FROM pgmb.order_queue WHERE acknoledge = false;

Check Dead Letter Queue

SELECT * FROM pgmb.order_dead_letter_queue;

Worker Endpoint Requirements

Your worker endpoints should:

  • Accept HTTP POST requests
  • Accept JSON body
  • Return HTTP status codes:
    • 2xx: Success (message will be acknowledged)
    • 4xx/5xx: Failure (message will be retried)

Example Worker Endpoint (Node.js):

app.post('/process', async (req, res) => {
  try {
    await processMessage(req.body);
    res.status(200).json({ success: true });
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

License

PostgreSQL License

Author

Francisco Ruiz - franciscoruizlezcano@gmail.com

Repository

https://github.com/fraruiz/pgmb