#amqp #python #microservices #file-metadata

bin+lib py_mcai_worker_sdk

AMQP Worker to listen and provide trait to process message

15 releases (4 stable)

2.0.0-rc2 Sep 30, 2022
2.0.0-rc1 Sep 12, 2022
1.1.3 Jan 26, 2022
1.1.0-rc Sep 16, 2021
0.11.9 Oct 29, 2020

#1522 in Asynchronous

MIT license

59KB
1.5K SLoC

Rust 1.5K SLoC // 0.0% comments Python 164 SLoC // 0.2% comments

Python binding for Rust Media Cloud AI Worker SDK

Based on mcai_worker_sdk.

Build

To build the rust application

cargo build

Test

To run the unit tests, you must build the provided worker example (see the Build section above).

cargo test

Usage

This worker uses the PyO3 crate to load a Python file, and to execute it.

To implement a worker, a pyproject.toml file must be created with metadata describing the worker. It must at least contain both project and build-system sections.

Example: (minimal configuration)

[project]
name = "my_python_worker"
version = "1.2.3"
description = "My Python worker"
license = { text = "MIT" }

[build-system]
requires = []

The Python worker parameters must be into a Python class as static attributes. Each parameter type must be explicitly set.

Example:

import typing

class MyWorkerParameters:
    a_parameter: int
    another_parameter: typing.Optional[str] = None

The Python worker itself must be defined as a Python class implementing some methods:

  • get_parameters_type() -> typing.Type (static):
    • To retrieve the worker parameters class
  • init() (static):
    • To initialize the worker process (optional)
  • process(handle_callback, parameters, job_id) -> dict (static) with parameters instance of the worker parameter class:
    • To execute the worker process and return the job result.

If the media feature is enabled, the following function must be implemented:

  • init_process(stream_handler, format_context, parameters) -> list (static) with parameters instance of the worker parameter class:
    • To initialize the media worker process and return a list of GenericStreamDescriptors
  • process_frames(job_id, stream_index, frames) -> dict (static):
    • To process some input audio/video frames and return the job result.
  • process_ebu_ttml_live(job_id, stream_index, ttml_contents) -> dict (static):
    • To process some input EBU TTML frames and return the job result.
  • ending_process() (static):
    • To end the media worker process

NB: the process(handle_callback, parameters, job_id) -> dict function is not called when the media feature is enabled.

For more details, see the provided worker.py and media_worker.py examples.

Set the PYTHON_WORKER_FILENAME environment variable to specify the path of your Python worker. Otherwise, the worker.py file will be loaded by default.

Running examples

Simple worker

RUST_LOG=debug \
SOURCE_ORDERS="examples/message.json" \
PYTHON_WORKER_FILENAME="worker.py" \
SOURCE_PATH="README.md" \
DESTINATION_PATH="README.md.out" \
cargo run

Media worker

First set the media filename:

export SOURCE_PATH="/folder/filename.ext"

Then run the SDK with these parameters:

RUST_LOG=debug \
SOURCE_ORDERS="examples/message.json" \
PYTHON_WORKER_FILENAME="media_worker.py" \
DESTINATION_PATH="results.json" \
cargo run --features media

Dependencies

~34–52MB
~852K SLoC