Functions

Sending Messages

send

Send a single message to a queue.

```text pgmq.send( queue_name text, msg jsonb, delay integer DEFAULT 0 )

RETURNS SETOF bigint ```

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg | jsonb | The message to send to the queue | | delay | integer | Time in seconds before the message becomes visible. Defaults to 0. |

Example:

```sql select * from pgmq.send('my_queue', '{"hello": "world"}');

send

4

```


send_batch

Send 1 or more messages to a queue.

text pgmq.send_batch( queue_name text, msgs jsonb[], delay integer DEFAULT 0 ) RETURNS SETOF bigint Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msgs | jsonb[] | Array of messages to send to the queue | | delay | integer | Time in seconds before the messages becomes visible. Defaults to 0. |

```sql select * from pgmq.send_batch('my_queue', ARRAY[ '{"hello": "world_0"}'::jsonb, '{"hello": "world_1"}'::jsonb] );

send_batch

      1
      2

```


Reading Messages

read

Read 1 or more messages from a queue. The VT specifies the amount of time in seconds that the message will be invisible to other consumers after reading.

 
pgmq.read(
    queue_name text,
    vt integer,
    qty integer)

RETURNS SETOF pgmq.message_record
 

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | vt | integer | Time in seconds that the message become invisible after reading | | qty | integer | The number of messages to read from the queue. Defaults to 1 |

Example:

sql select * from pgmq.read('my_queue', 10, 2); msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+---------------------- 1 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608922-05 | {"hello": "world_0"} 2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"} (2 rows)


read_with_poll

Same as read(). Also provides convenient long-poll functionality. When there are no messages in the queue, the function call will wait for max_poll_seconds in duration before returning. If messages reach the queue during that duration, they will be read and returned immediately.

 
 pgmq.read_with_poll(
    queue_name text,
    vt integer,
    qty integer,
    max_poll_seconds integer DEFAULT 5,
    poll_interval_ms integer DEFAULT 100
)
RETURNS SETOF pgmq.message_record
 

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | vt | integer | Time in seconds that the message become invisible after reading. | | qty | integer | The number of messages to read from the queue. Defaults to 1. | | max_poll_seconds | integer | Time in seconds to wait for new messages to reach the queue. Defaults to 5. | | poll_interval_ms | integer | Milliseconds between the internal poll operations. Defaults to 100. |

Example:

sql select * from pgmq.read_with_poll('my_queue', 1, 1, 5, 100); msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+-------------------- 1 | 1 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}


pop

Reads a single message from a queue and deletes it upon read.

Note: utilization of pop() results in at-most-once delivery semantics if the consuming application does not guarantee processing of the message.

 
pgmq.pop(queue_name text)
RETURNS SETOF pgmq.message_record
 

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue |

Example:

sql pgmq=# select * from pgmq.pop('my_queue'); msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+-------------------- 1 | 2 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}


Deleting/Archiving Messages

delete (single)

Deletes a single message from a queue.

text pgmq.delete (queue_name text, msg_id: bigint) RETURNS boolean

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg_id | bigint | Message ID of the message to delete |

Example:

```sql select pgmq.delete('my_queue', 5);

delete

t ```


delete (batch)

Delete one or many messages from a queue.

text pgmq.delete (queue_name text, msg_ids: bigint[]) RETURNS SETOF bigint

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg_ids | bigint[] | Array of message IDs to delete |

Examples:

Delete two messages that exist.

```sql select * from pgmq.delete('my_queue', ARRAY[2, 3]);

delete

  2
  3

```

Delete two messages, one that exists and one that does not. Message 999 does not exist.

```sql select * from pgmq.delete('my_queue', ARRAY[6, 999]);

delete

  6

```


purge_queue

Permanently deletes all messages in a queue. Returns the number of messages that were deleted.

text purge_queue(queue_name text) RETURN bigint

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue |

Example:

Purge the queue when it contains 8 messages;

```sql select * from pgmq.purge_queue('my_queue');

purge_queue

       8

```


archive (single)

Removes a single requested message from the specified queue and inserts it into the queue's archive.

text pgmq.archive(queue_name text, msg_id bigint) RETURNS boolean

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg_id | bigint | Message ID of the message to archive |

Returns Boolean value indicating success or failure of the operation.

Example; remove message with ID 1 from queue my_queue and archive it:

```sql SELECT * FROM pgmq.archive('my_queue', 1);

archive

   t

```


archive (batch)

Deletes a batch of requested messages from the specified queue and inserts them into the queue's archive. Returns an ARRAY of message ids that were successfully archived.

text pgmq.archive(queue_name text, msg_ids bigint[]) RETURNS SETOF bigint

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg_ids | bigint[] | Array of message IDs to archive |

Examples:

Delete messages with ID 1 and 2 from queue my_queue and move to the archive.

```sql SELECT * FROM pgmq.archive('my_queue', ARRAY[1, 2]);

archive

   1
   2

```

Delete messages 4, which exists and 999, which does not exist.

```sql select * from pgmq.archive('my_queue', ARRAY[4, 999]);

archive

   4

```


Queue Management

create

Create a new queue.

text pgmq.create(queue_name text) RETURNS VOID

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue |

Example:

```sql select from pgmq.create('my_queue');

create

```


create_partitioned

Create a partitioned queue.

text pgmq.create_partitioned ( queue-ue_name text, partition_interval text DEFAULT '10000'::text, retention_interval text DEFAULT '100000'::text ) RETURNS void

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | partition_interval | text | The name of the queue | | retention_interval | text | The name of the queue |

Example:

Create a queue with 100,000 messages per partition, and will retain 10,000,000 messages on old partitions. Partitions greater than this will be deleted.

```sql select from pgmq.create_partitioned( 'my_partitioned_queue', '100000', '10000000' );

create_partitioned

```


create_unlogged

Creates an unlogged table. This is useful when write throughput is more important that durability. See Postgres documentation for unlogged tables for more information.

text pgmq.create_unlogged(queue_name text) RETURNS void

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue |

Example:

```sql select pgmq.create_unlogged('my_unlogged');

create_unlogged

```


detach_archive

Drop the queue's archive table as a member of the PGMQ extension. Useful for preventing the queue's archive table from being drop when DROP EXTENSION pgmq is executed. This does not prevent the further archives() from appending to the archive table.

text pgmq.detach_archive(queue_name text)

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue |

Example:

```sql select * from pgmq.detach_archive('my_queue');

detach_archive

```


drop_queue

Deletes a queue and its archive table.

text pgmq.drop_queue(queue_name text) RETURNS boolean

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue |

Example:

```sql select * from pgmq.drop_queue('my_unlogged');

drop_queue

t ```

Utilities

set_vt

Sets the visibility timeout of a message to a specified time duration in the future. Returns the record of the message that was updated.

text pgmq.set_vt( queue_name text, msg_id bigint, vt_offset integer ) RETURNS pgmq.message_record

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg_id | bigint | ID of the message to set visibility time | | vt_offset | integer | Duration from now, in seconds, that the message's VT should be set to |

Example:

Set the visibility timeout of message 1 to 30 seconds from now.

sql select * from pgmq.set_vt('my_queue', 11, 30); msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+---------------------- 1 | 0 | 2023-10-28 19:42:21.778741-05 | 2023-10-28 19:59:34.286462-05 | {"hello": "world_0"}


list_queues

List all the queues that currently exist.

sql list_queues() RETURNS TABLE( queue_name text, created_at timestamp with time zone, is_partitioned boolean, is_unlogged boolean )

Example:

sql select * from pgmq.list_queues(); queue_name | created_at | is_partitioned | is_unlogged ----------------------+-------------------------------+----------------+------------- my_queue | 2023-10-28 14:13:17.092576-05 | f | f my_partitioned_queue | 2023-10-28 19:47:37.098692-05 | t | f my_unlogged | 2023-10-28 20:02:30.976109-05 | f | t


metrics

Get metrics for a specific queue.

text pgmq.metrics(queue_name: text) RETURNS TABLE( queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone )

Parameters:

| Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue |

Returns:

| Attribute | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | queue_length | bigint | Number of messages currently in the queue | | newest_msg_age_sec | integer \| null | Age of the newest message in the queue, in seconds | | oldest_msg_age_sec | integer \| null | Age of the oldest message in the queue, in seconds | | total_messages | bigint | Total number of messages that have passed through the queue over all time | | scrape_time | timestamp with time zone | The current timestamp |

Example:

sql select * from pgmq.metrics('my_queue'); queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time
------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2445 | 2447 | 35 | 2023-10-28 20:23:08.406259-05


metrics_all

Get metrics for all existing queues.

text pgmq.metrics_all() RETURNS TABLE( queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone )

Returns:

| Attribute | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | queue_length | bigint | Number of messages currently in the queue | | newest_msg_age_sec | integer \| null | Age of the newest message in the queue, in seconds | | oldest_msg_age_sec | integer \| null | Age of the oldest message in the queue, in seconds | | total_messages | bigint | Total number of messages that have passed through the queue over all time | | scrape_time | timestamp with time zone | The current timestamp |

sql select * from pgmq.metrics_all(); queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time
----------------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2563 | 2565 | 35 | 2023-10-28 20:25:07.016413-05 my_partitioned_queue | 1 | 11 | 11 | 1 | 2023-10-28 20:25:07.016413-05 my_unlogged | 1 | 3 | 3 | 1 | 2023-10-28 20:25:07.016413-05