1 unstable release

0.1.0 May 9, 2024

#1466 in Network programming

Download history 114/week @ 2024-05-05 9/week @ 2024-05-12 13/week @ 2024-05-19

136 downloads per month

MPL-2.0 license

21KB
232 lines

Gozo

Message scheduler for NATS.

Gozo connects to the NATS server (with JetStream enabled) and listens for a message with a subject gozo, a reply subject, payload and relevant headers. When one is received, it schedules to send back a message with the reply subject as the message subject, the same payload and the headers described in the usage section. Due to all communication is via NATS, Gozo can be used in any programming language that has client libraries.

Options

Options can be set in two ways: from the command line or from an environment variable. Command line options are listed below. All environment variables are written in upper case with a NATS_ prefix, e.g. --address option becomes NATS_ADDRESS. On startup gozo reads environment variables from /etc/gozo.env and ./gozo.env files if it exits. If both the command line option and the environment variable are set, the command line option is used.

  • -a, --address: set address to NATS server, default: nats://localhost:4222
  • -s, --secure: enforce secure (TLS) connection
  • -t, --token: set token authentication
  • -u, --user: set username
  • -p, --password: set password
  • -c, --cert: set client TLS certificate
  • -k, --key: set client TLS key
  • -n, --nkey: set NKey authentication
  • -j, --jwt: set path to credentials file

Usage

For readability, all examples below are written in Python.

Schedule a message with absolute time

Gozo-When header contains a string with the Unix epoch timestamp when the message will be sent back.

Gozo-Id header is optional, but if present it must be unique. Schedules with Gozo-Id header can be overwritten or canceled. In addition, schedules with Gozo-Id are written to the NATS key/value store, so they won't be discarded when gozo is restarted.

import asyncio
import nats
import time
import uuid

async def message_cb(msg):
	print(msg.subject) # 'reply.subject'
	print(msg.data) # b'Hello Gozo!'
	print(msg.headers) # {'Gozo-Reply': 'Yes', 'Gozo-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3'}

async def main():
	nc = await nats.connect('nats://localhost:4222')
	sub = await nc.subscribe('reply.subject', cb=message_cb)
	headers = {
		'Gozo-When': str(int(time.time()) + 10),
		'Gozo-Id': str(uuid.uuid4()),
	}
	await nc.publish('gozo', b'Hello Gozo!', 'reply.subject', headers)
	await asyncio.sleep(11)

if __name__ == '__main__':
	asyncio.run(main())

Schedule a message with relative time

This time Gozo-When header contains a string with a "+" sign followed by the number of seconds after which the message will be sent back.

import asyncio
import nats
import uuid

async def message_cb(msg):
	print(msg.subject) # 'reply.subject'
	print(msg.data) # b'Hello Gozo!'
	print(msg.headers) # {'Gozo-Reply': 'Yes', 'Gozo-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3'}

async def main():
	nc = await nats.connect('nats://localhost:4222')
	sub = await nc.subscribe('reply.subject', cb=message_cb)
	headers = {
		'Gozo-When': '+10',
		'Gozo-Id': str(uuid.uuid4()),
	}
	await nc.publish('gozo', b'Hello Gozo!', 'reply.subject', headers)
	await asyncio.sleep(11)

if __name__ == '__main__':
	asyncio.run(main())

Cancel a schedule

To cancel the schedule, send a message with the header Gozo-Del-Id and set the corresponding ID as its value.

import asyncio
import nats

async def main():
	nc = await nats.connect('nats://localhost:4222')
	headers = {
		'Gozo-Del-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3',
	}
	await nc.publish('gozo', b'', None, headers)

if __name__ == '__main__':
	asyncio.run(main())

Dependencies

~21–36MB
~648K SLoC