pgmq 0.2.0

This Release
pgmq 0.2.0
Date
Status
Stable
Latest Stable
pgmq 1.1.1 —
Other Releases
Abstract
A message queue for PostgreSQL
Description
This library contains a two PostgreSQL extensions, pgmq for queues and pg_partman for managing partitions
Released By
adam
License
Apache 2.0
Resources
Special Files
Tags

Extensions

pgmq 0.2.0
A message queue for PostgreSQL

README

Postgres Message Queue (PGMQ)

A lightweight distributed message queue. Like AWS SQS and RSMQ but on Postgres.

Features

  • Lightweight - Rust and Postgres only
  • Guaranteed delivery of messages to exactly one consumer within a visibility timeout
  • API parity with AWS SQS and RSMQ
  • Messages stay in the queue until deleted
  • Messages can be archived, instead of deleted, for long-term retention and replayability
  • Completely asynchronous API

Table of Contents

Start CoreDB Postgres

CoreDB Postgres images come with the pgmq extension pre-installed.

bash docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/coredb/postgres:latest

Python Examples

See python examples in examples/python.py

SQL Examples

```bash

Connect to Postgres

psql postgres://postgres:postgres@0.0.0.0:5432/postgres ```

sql -- create the extension CREATE EXTENSION pgmq CASCADE;

Creating a queue

```sql -- creates the queue. SELECT pgmq_create('my_queue');

pgmq_create

```

Send two message

sql -- messages are sent as JSON pgmq=# SELECT * from pgmq_send('my_queue', '{"foo": "bar1"}'); SELECT * from pgmq_send('my_queue', '{"foo": "bar2"}');

```sql -- the message id is returned from the send function

pgmq_send

     1

(1 row)

pgmq_send

     2

(1 row) ```

Read messages

Read two message from the queue. Make them invisible for 30 seconds.

```sql -- parameters are queue name, visibility timeout, and number of messages to read pgmq=# SELECT * from pgmq_read('my_queue', 30, 2);

msg_id | read_ct | vt | enqueued_at | message --------+---------+-------------------------------+-------------------------------+--------------- 1 | 1 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"} 2 | 1 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"} ```

If the queue is empty, or if all messages are currently invisible, no rows will be returned.

sql pgmq=# SELECT * from pgmq_read('my_queue', 30, 1); msg_id | read_ct | vt | enqueued_at | message --------+---------+----+-------------+---------

Pop a message

``sql -- Read a message and immediately delete it from the queue. ReturnsNone` if the queue is empty. pgmq=# SELECT * from pgmq_pop('my_queue');

msg_id | read_ct | vt | enqueued_at | message --------+---------+-------------------------------+-------------------------------+--------------- 1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"} ```

Archive a message

```sql -- Archiving a message removes it from the queue, and inserts it to the archive table. -- TODO: implement this in the extension

```

Delete a message

``sql -- Delete a message id1from queue namedmy_queue`. pgmq=# select pgmq_delete('my_queue', 1);

pgmq_delete

t ```

Development

Setup pgx.

bash cargo install --locked cargo-pgx cargo pgx init

Then, clone this repo and change into this directory.

bash git clone git@github.com:CoreDB-io/coredb.git cd coredb/extensions/pgmq/

Setup dependencies

Install: - pg_partman, which is required for partitioned tables.

Update postgresql.conf in the development environment. ```

~/.pgx/data-14/postgresql.conf

shared_preload_libraries = 'pg_partman_bgw' ```

Run the dev environment

bash cargo pgx run pg14

Create the extension

pql CREATE EXTENSION pgmq cascade;

Packaging

Run this script to package into a .deb file, which can be installed on Ubuntu.

/bin/bash build-extension.sh

Configuration

Partitioned Queues

pgmq supports partitioned queues with create_partitioned() by msg_id and the default value is 10000 messages per partition. New partitions are automatically created by pg_partman, which is a dependency of this extension. New partitions are created, if necessary, by setting pg_partman_bgw.interval in postgresql.conf. Below are the default configuration values set in CoreDB docker images.