6 releases

0.0.4 Feb 8, 2023
0.0.3-alpha.1 Feb 8, 2023
0.0.2 Feb 7, 2023
0.0.1-alpha Feb 6, 2023

#1712 in Database interfaces

21 downloads per month

MIT license

19KB
289 lines

Postgres Message Queue

A lightweight message queue extension for Postgres. Provides similar experience to AWS SQS and Redis Simple Message Queue, but on Postgres.

Installation

TODO docker run ...

Python Examples

Connect to postgres

import json
import pprint

from sqlalchemy import create_engine, text

engine = create_engine("postgresql://postgres:postrgres@localhost:28814/pgx_pgmq")

Create and list queues

with engine.connect() as con:
    # create a queue
    created = con.execute(text( "select * from pgmq_create('myqueue');"))
    # list queues
    list_queues = con.execute(text( "select * from pgmq_list_queues()"))
    column_names = list_queues.keys()
    rows = list_queues.fetchall()
    print("### Queues ###")
    for row in rows:
        pprint.pprint(dict(zip(column_names, row)))
'### Queues ###'
{'created_at': datetime.datetime(2023, 2, 7, 2, 5, 39, 946356, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
 'queue_name': 'myqueue'}

Send a message to the queue

with engine.connect() as con:
    # send a message
    msg = json.dumps({"yolo": 42})
    msg_id = con.execute(text(f"select * from pgmq_send('x', '{msg}') as msg_id;"))
    column_names = msg_id.keys()
    rows = msg_id.fetchall()
    print("### Message ID ###")
    for row in rows:
        pprint.pprint(dict(zip(column_names, row)))
'### Message ID ###'
{'msg_id': 1}

Read a message from the queue

with engine.connect() as con:
    # read a message, make it unavailable to be read again for 5 seconds
    read = con.execute(text("select * from pgmq_read('x', 5);"))
    column_names = read.keys()
    rows = read.fetchall()
    print("### Read Message ###")
    for row in rows:
        pprint.pprint(dict(zip(column_names, row)))
'### Read Message ###'
{'enqueued_at': datetime.datetime(2023, 2, 7, 2, 51, 50, 468837, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
 'message': {'myqueue': 42},
 'msg_id': 1,
 'read_ct': 1,
 'vt': datetime.datetime(2023, 2, 7, 16, 9, 4, 826669, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800)))}

Delete a message from the queue

with engine.connect() as con:
    # delete a message
    deleted = con.execute(text("select pgmq_delete('x', 1);"))
    column_names = deleted.keys()
    rows = deleted.fetchall()
    print("### Message Deleted ###")
    for row in rows:
        pprint.pprint(dict(zip(column_names, row)))
'### Message Deleted ###'
{'pgmq_delete': True}

SQL Examples

CREATE EXTENSION pgmq;

Creating a queue

SELECT pgmq_create('my_queue');

 pgmq_create
-------------

Send a message

pgmq=# SELECT * from pgmq_send('my_queue', '{"foo": "bar"}');
 pgmq_send
--------------
            1

Read a message

Reads a single message from the queue. Make it invisible for 30 seconds.

pgmq=# SELECT * from pgmq_read('my_queue', 30);

 msg_id | read_ct |              vt               |          enqueued_at          |    message
--------+---------+-------------------------------+-------------------------------+---------------
      1 |       2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}

If the queue is empty, or if all messages are currently invisible, no rows will be returned.

pgx_pgmq=# SELECT * from pgmq_read('my_queue', 30);
 msg_id | read_ct | vt | enqueued_at | message
--------+---------+----+-------------+---------

Pop a message

Read a message and immediately delete it from the queue. Returns None if the queue is empty.

pgmq=# SELECT * from pgmq_pop('my_queue');

 msg_id | read_ct |              vt               |          enqueued_at          |    message
--------+---------+-------------------------------+-------------------------------+---------------
      1 |       2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}

Archive a message

Archiving a message removes it from the queue, and inserts it to the archive table. TODO:

Delete a message

Delete a message with id 1 from queue named my_queue.

pgmq=# select pgmq_delete('my_queue', 1);
 pgmq_delete
-------------
 t

Development

Setup pgx.

cargo install --locked cargo-pgx
cargo pgx init

Then, clone this repo and change into this directory.

git clone git@github.com:CoreDB-io/coredb.git
cd coredb/extensions/pgmq/

Run the dev environment

cargo pgx run pg14

Packaging

Run this script to package into a .deb file, which can be installed on Ubuntu.

/bin/bash build-extension.sh

Dependencies

~30–45MB
~889K SLoC