Contents
- Tembo’s Python Client for PGMQ
- Installation
- Usage
- Start a Postgres Instance with the Tembo extension installed
- Using Environment Variables
- Initialize a connection to Postgres without environment variables
- Create a queue
- or a partitioned queue
- Send a message
- Send a batch of messages
- Read a message, set it invisible for 30 seconds
- Read a batch of messages
- Archive the message after we’re done with it. Archived messages are moved to an archive table
- Delete a message completely
- Pop a message, deleting it and reading it in one transaction
- Purge all messages from a queue
- Get queue metrics
- Access individual metrics
- Get metrics for all queues
Tembo’s Python Client for PGMQ
Installation
Install with pip
from pypi.org:
pip install tembo-pgmq-python
Dependencies:
Postgres running the Tembo PGMQ extension.
Usage
Start a Postgres Instance with the Tembo extension installed
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pgmq-pg:latest
Using Environment Variables
Set environment variables:
export PG_HOST=127.0.0.1
export PG_PORT=5432
export PG_USERNAME=postgres
export PG_PASSWORD=postgres
export PG_DATABASE=test_db
Initialize a connection to Postgres using environment variables:
from tembo_pgmq_python import PGMQueue, Message
queue = PGMQueue()
Initialize a connection to Postgres without environment variables
from tembo_pgmq_python import PGMQueue, Message
queue = PGMQueue(
host="0.0.0.0",
port="5432",
username="postgres",
password="postgres",
database="postgres"
)
Create a queue
queue.create_queue("my_queue")
or a partitioned queue
queue.create_partitioned_queue("my_partitioned_queue", partition_size=10000)
Send a message
msg_id: int = queue.send("my_queue", {"hello": "world"})
Send a batch of messages
msg_ids: list[int] = queue.send_batch("my_queue", [{"hello": "world"}, {"foo": "bar"}])
Read a message, set it invisible for 30 seconds
read_message: Message = queue.read("my_queue", vt=30)
print(read_message)
Read a batch of messages
read_messages: list[Message] = queue.read_batch("my_queue", vt=30, batch_size=5)
for message in read_messages:
print(message)
Archive the message after we’re done with it. Archived messages are moved to an archive table
archived: bool = queue.archive("my_queue", read_message.msg_id)
Delete a message completely
msg_id: int = queue.send("my_queue", {"hello": "world"})
read_message: Message = queue.read("my_queue")
deleted: bool = queue.delete("my_queue", read_message.msg_id)
Pop a message, deleting it and reading it in one transaction
popped_message: Message = queue.pop("my_queue")
print(popped_message)
Purge all messages from a queue
purged_count: int = queue.purge("my_queue")
print(f"Purged {purged_count} messages from the queue.")
Get queue metrics
metrics = queue.metrics("my_queue")
print(f"Queue name: {metrics.queue_name}")
print(f"Queue length: {metrics.queue_length}")
print(f"Newest message age (seconds): {metrics.newest_msg_age_sec}")
print(f"Oldest message age (seconds): {metrics.oldest_msg_age_sec}")
print(f"Total messages: {metrics.total_messages}")
print(f"Scrape time: {metrics.scrape_time}")
Access individual metrics
print(f"Queue length: {queue.metrics('my_queue').queue_length}")
print(f"Total messages: {queue.metrics('my_queue').total_messages}")
Get metrics for all queues
all_metrics = queue.metrics_all()
for metrics in all_metrics:
print(f"Queue name: {metrics.queue_name}")
print(f"Queue length: {metrics.queue_length}")
print(f"Newest message age (seconds): {metrics.newest_msg_age_sec}")
print(f"Oldest message age (seconds): {metrics.oldest_msg_age_sec}")
print(f"Total messages: {metrics.total_messages}")
print(f"Scrape time: {metrics.scrape_time}")