6 releases
0.0.4 | Feb 8, 2023 |
---|---|
0.0.3-alpha.1 | Feb 8, 2023 |
0.0.2 | Feb 7, 2023 |
0.0.1-alpha | Feb 6, 2023 |
#1712 in Database interfaces
21 downloads per month
19KB
289 lines
Postgres Message Queue
A lightweight message queue extension for Postgres. Provides similar experience to AWS SQS and Redis Simple Message Queue, but on Postgres.
Installation
TODO
docker run ...
Python Examples
Connect to postgres
import json
import pprint
from sqlalchemy import create_engine, text
engine = create_engine("postgresql://postgres:postrgres@localhost:28814/pgx_pgmq")
Create and list queues
with engine.connect() as con:
# create a queue
created = con.execute(text( "select * from pgmq_create('myqueue');"))
# list queues
list_queues = con.execute(text( "select * from pgmq_list_queues()"))
column_names = list_queues.keys()
rows = list_queues.fetchall()
print("### Queues ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Queues ###'
{'created_at': datetime.datetime(2023, 2, 7, 2, 5, 39, 946356, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
'queue_name': 'myqueue'}
Send a message to the queue
with engine.connect() as con:
# send a message
msg = json.dumps({"yolo": 42})
msg_id = con.execute(text(f"select * from pgmq_send('x', '{msg}') as msg_id;"))
column_names = msg_id.keys()
rows = msg_id.fetchall()
print("### Message ID ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Message ID ###'
{'msg_id': 1}
Read a message from the queue
with engine.connect() as con:
# read a message, make it unavailable to be read again for 5 seconds
read = con.execute(text("select * from pgmq_read('x', 5);"))
column_names = read.keys()
rows = read.fetchall()
print("### Read Message ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Read Message ###'
{'enqueued_at': datetime.datetime(2023, 2, 7, 2, 51, 50, 468837, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
'message': {'myqueue': 42},
'msg_id': 1,
'read_ct': 1,
'vt': datetime.datetime(2023, 2, 7, 16, 9, 4, 826669, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800)))}
Delete a message from the queue
with engine.connect() as con:
# delete a message
deleted = con.execute(text("select pgmq_delete('x', 1);"))
column_names = deleted.keys()
rows = deleted.fetchall()
print("### Message Deleted ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Message Deleted ###'
{'pgmq_delete': True}
SQL Examples
CREATE EXTENSION pgmq;
Creating a queue
SELECT pgmq_create('my_queue');
pgmq_create
-------------
Send a message
pgmq=# SELECT * from pgmq_send('my_queue', '{"foo": "bar"}');
pgmq_send
--------------
1
Read a message
Reads a single message from the queue. Make it invisible for 30 seconds.
pgmq=# SELECT * from pgmq_read('my_queue', 30);
msg_id | read_ct | vt | enqueued_at | message
--------+---------+-------------------------------+-------------------------------+---------------
1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}
If the queue is empty, or if all messages are currently invisible, no rows will be returned.
pgx_pgmq=# SELECT * from pgmq_read('my_queue', 30);
msg_id | read_ct | vt | enqueued_at | message
--------+---------+----+-------------+---------
Pop a message
Read a message and immediately delete it from the queue. Returns None
if the queue is empty.
pgmq=# SELECT * from pgmq_pop('my_queue');
msg_id | read_ct | vt | enqueued_at | message
--------+---------+-------------------------------+-------------------------------+---------------
1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}
Archive a message
Archiving a message removes it from the queue, and inserts it to the archive table. TODO:
Delete a message
Delete a message with id 1
from queue named my_queue
.
pgmq=# select pgmq_delete('my_queue', 1);
pgmq_delete
-------------
t
Development
Setup pgx
.
cargo install --locked cargo-pgx
cargo pgx init
Then, clone this repo and change into this directory.
git clone git@github.com:CoreDB-io/coredb.git
cd coredb/extensions/pgmq/
Run the dev environment
cargo pgx run pg14
Packaging
Run this script to package into a .deb
file, which can be installed on Ubuntu.
/bin/bash build-extension.sh
Dependencies
~30–45MB
~889K SLoC