pgmq 0.2.0

This Release
pgmq 0.2.0
A message queue for PostgreSQL
This library contains a two PostgreSQL extensions, pgmq for queues and pg_partman for managing partitions
Released By
Apache 2.0
Special Files


pgmq 0.2.0
A message queue for PostgreSQL


Postgres Message Queue (PGMQ)

A lightweight distributed message queue. Like AWS SQS and RSMQ but on Postgres.


  • Lightweight - Rust and Postgres only
  • Guaranteed delivery of messages to exactly one consumer within a visibility timeout
  • API parity with AWS SQS and RSMQ
  • Messages stay in the queue until deleted
  • Messages can be archived, instead of deleted, for long-term retention and replayability
  • Completely asynchronous API

Table of Contents

Start CoreDB Postgres

CoreDB Postgres images come with the pgmq extension pre-installed.

bash docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432

Python Examples

See python examples in examples/

SQL Examples


Connect to Postgres

psql postgres://postgres:postgres@ ```

sql -- create the extension CREATE EXTENSION pgmq CASCADE;

Creating a queue

```sql -- creates the queue. SELECT pgmq_create('my_queue');



Send two message

sql -- messages are sent as JSON pgmq=# SELECT * from pgmq_send('my_queue', '{"foo": "bar1"}'); SELECT * from pgmq_send('my_queue', '{"foo": "bar2"}');

```sql -- the message id is returned from the send function



(1 row)



(1 row) ```

Read messages

Read two message from the queue. Make them invisible for 30 seconds.

```sql -- parameters are queue name, visibility timeout, and number of messages to read pgmq=# SELECT * from pgmq_read('my_queue', 30, 2);

msg_id | read_ct | vt | enqueued_at | message --------+---------+-------------------------------+-------------------------------+--------------- 1 | 1 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"} 2 | 1 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"} ```

If the queue is empty, or if all messages are currently invisible, no rows will be returned.

sql pgmq=# SELECT * from pgmq_read('my_queue', 30, 1); msg_id | read_ct | vt | enqueued_at | message --------+---------+----+-------------+---------

Pop a message

``sql -- Read a message and immediately delete it from the queue. ReturnsNone` if the queue is empty. pgmq=# SELECT * from pgmq_pop('my_queue');

msg_id | read_ct | vt | enqueued_at | message --------+---------+-------------------------------+-------------------------------+--------------- 1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"} ```

Archive a message

```sql -- Archiving a message removes it from the queue, and inserts it to the archive table. -- TODO: implement this in the extension


Delete a message

``sql -- Delete a message id1from queue namedmy_queue`. pgmq=# select pgmq_delete('my_queue', 1);


t ```


Setup pgx.

bash cargo install --locked cargo-pgx cargo pgx init

Then, clone this repo and change into this directory.

bash git clone cd coredb/extensions/pgmq/

Setup dependencies

Install: - pg_partman, which is required for partitioned tables.

Update postgresql.conf in the development environment. ```


shared_preload_libraries = 'pg_partman_bgw' ```

Run the dev environment

bash cargo pgx run pg14

Create the extension

pql CREATE EXTENSION pgmq cascade;


Run this script to package into a .deb file, which can be installed on Ubuntu.



Partitioned Queues

pgmq supports partitioned queues with create_partitioned() by msg_id and the default value is 10000 messages per partition. New partitions are automatically created by pg_partman, which is a dependency of this extension. New partitions are created, if necessary, by setting pg_partman_bgw.interval in postgresql.conf. Below are the default configuration values set in CoreDB docker images.