#sdk #ipc #pywatt

pywatt_sdk

Standardized SDK for building PyWatt modules in Rust

1 unstable release

new 0.1.0 May 4, 2025

#1718 in Network programming

MIT/Apache

130KB
2.5K SLoC

PyWatt SDK

Latest Version Docs

Standardized SDK for building PyWatt modules in Rust.

Overview

This crate provides the core building blocks for creating PyWatt modules that integrate seamlessly with the PyWatt orchestrator. It handles:

  • IPC Handshake: Standardized startup communication (read_init, send_announce).
  • Logging: Consistent logging to stderr with secret redaction (init_module).
  • Secret Management: Secure retrieval and rotation handling via the integrated secret_client module (get_secret, subscribe_secret_rotations).
  • Runtime IPC: Background task for processing orchestrator messages (process_ipc_messages).
  • Core Types: Re-exports essential types from the integrated ipc_types module (OrchestratorInit, ModuleAnnounce, etc.).
  • (Optional) Macros: Proc macros for simplifying module definition (requires proc_macros feature).
  • (Optional) JWT Auth: Middleware for Axum route protection (requires jwt_auth feature).

Installation

Add this to your module's Cargo.toml:

[dependencies]
# Use the version from crates.io
pywatt_sdk = "0.1.0"
# Or use a path dependency during development
# pywatt_sdk = { path = "../pywatt_sdk" } 

# Other dependencies like axum, tokio, etc.
tokio = { version = "1", features = ["full"] }
axum = "0.7"
tracing = "0.1"

Quickstart Example

Here's a minimal module using the SDK with Axum:

// Use the prelude for common types and functions
use pywatt_sdk::prelude::*;
use axum::{routing::get, Router, extract::State};
use tokio::net::{TcpListener, UnixListener};
use std::sync::Arc;

// Define your module-specific state if needed
#[derive(Clone)]
struct MyModuleState {
    // Example: Database connection pool or config
    message: String,
}

// Use the SDK's AppState wrapper for shared state
type SharedState = AppState<MyModuleState>;

async fn health_handler() -> &'static str {
    "OK"
}

async fn custom_handler(State(state): State<SharedState>) -> String {
    format!("Module {} says: {}", state.module_id(), state.user_state.message)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    // 1. Initialize logging (must be first!)
    init_module();

    // 2. Perform handshake with orchestrator
    let init: OrchestratorInit = read_init().await?;
    tracing::info!(?init, "Received orchestrator initialization");

    // 3. Initialize secret client (built-in)
    let secret_client = get_module_secret_client(&init.orchestrator_api, &init.module_id).await?;

    // 4. Fetch initial secrets (example)
    let initial_message = match get_secret(&secret_client, "INITIAL_MESSAGE").await {
        Ok(secret) => secret.expose_secret().clone(),
        Err(_) => {
            tracing::warn!("INITIAL_MESSAGE secret not found, using default.");
            "Default message".to_string()
        }
    };

    // 5. Create shared state
    let my_state = MyModuleState { message: initial_message };
    let app_state = AppState::new(
        init.module_id.clone(),
        init.orchestrator_api.clone(),
        secret_client.clone(),
        my_state
    );

    // 6. Subscribe to secret rotations (example)
    let app_state_clone = app_state.clone(); // Clone AppState for the task
    let keys = vec!["INITIAL_MESSAGE".to_string()];
    tokio::spawn(subscribe_secret_rotations(secret_client.clone(), keys, move |key, new_val| {
        // Note: To update shared state *mutably* here, MyModuleState would
        // typically contain Arcs/Mutexes, or you'd use a channel to communicate
        // back to the main thread/state manager.
        if key == "INITIAL_MESSAGE" {
            let new_message = new_val.expose_secret().clone();
            tracing::info!(%key, new_message = %new_message, "Secret rotated, state needs update (implementation detail)");
            // Example: If app_state_clone.user_state held an Arc<Mutex<String>>:
            // let mut message = app_state_clone.user_state.lock().unwrap();
            // *message = new_message;
        }
    }));

    // 7. Set up Axum router
    let app = Router::new()
        .route("/health", get(health_handler))
        .route("/custom", get(custom_handler))
        .with_state(app_state.clone());

    // 8. Bind listener (TCP or UDS)
    // Use the `.into_make_service()` for Axum 0.7+
    let serve_future = match &init.listen {
        ListenAddress::Tcp(addr) => {
            tracing::info!(%addr, "Binding TCP listener");
            let listener = TcpListener::bind(addr).await?;
            axum::serve(listener, app.into_make_service())
        }
        ListenAddress::Unix(path) => {
            tracing::info!(path = %path.display(), "Binding Unix listener");
            // Ensure the socket file doesn't exist or clean it up
            if path.exists() {
                tokio::fs::remove_file(path).await?;
            }
            let listener = UnixListener::bind(path)?;
            axum::serve(listener, app.into_make_service())
        }
    };

    // 9. Announce endpoints to orchestrator
    let announce = ModuleAnnounce {
        listen: init.listen_address().to_string_lossy(), // Use helper from ext trait
        endpoints: vec![
            AnnouncedEndpoint { path: "/health".into(), methods: vec!["GET".into()], auth: None },
            AnnouncedEndpoint { path: "/custom".into(), methods: vec!["GET".into()], auth: None },
        ],
    };
    send_announce(&announce)?;
    tracing::info!(?announce, "Sent announcement to orchestrator");

    // 10. Spawn runtime IPC message processor
    let ipc_handle = tokio::spawn(process_ipc_messages());

    // 11. Run the server
    tracing::info!("Module server starting");
    tokio::select! {
        result = serve_future => {
            if let Err(e) = result {
                tracing::error!(error = %e, "Server exited with error");
            }
        }
        _ = tokio::signal::ctrl_c() => {
            tracing::info!("Received Ctrl+C");
        }
        ipc_res = ipc_handle => {
            match ipc_res {
                Ok(_) => tracing::info!("IPC handler finished cleanly"),
                Err(e) => tracing::error!(error = %e, "IPC handler finished with error"),
            }
        }
    }

    tracing::info!("Module shutting down gracefully");
    Ok(())
}

## Core Functions & Types

(See the `prelude` module for the most common items)

-   `init_module()`: Configures logging to stderr with secret redaction.
-   `read_init() -> Result<OrchestratorInit, InitError>`: Reads the initial handshake message from stdin.
-   `send_announce(announce: &ModuleAnnounce) -> Result<(), AnnounceError>`: Sends the module's announcement message to stdout.
-   `process_ipc_messages()`: An async function to spawn that listens for runtime IPC messages (like secret rotations, shutdown) from stdin.
-   `get_module_secret_client(...)`: Creates a `SecretClient` instance.
-   `get_secret(...)`: Retrieves a secret, ensuring it's registered for redaction.
-   `subscribe_secret_rotations(...)`: Spawns a task to handle secret rotation notifications.
-   `AppState<T>`: A generic state container holding SDK state and optional user state `T`.
-   Types from `ipc_types`: `OrchestratorInit`, `ModuleAnnounce`, `AnnouncedEndpoint`, `ListenAddress`, etc.
-   Types from `secret_client`: `SecretClient`, `SecretError`, `RequestMode`.

## JWT Authentication (Optional Feature)

The SDK provides an optional `jwt_auth` feature to easily protect Axum routes with JWT Bearer token validation.

**Enable Feature**:
```toml
[dependencies]
# Ensure the jwt_auth feature is enabled
pywatt_sdk = { version = "0.1.0", features = ["jwt_auth"] }
axum = { version = "0.7", features = ["json"] } 
# ... other deps

Usage:

  1. Request the HMAC signing secret (e.g., JWT_HMAC_SECRET) in your module configuration (how secrets are requested depends on the orchestrator/module definition).

  2. Fetch the secret using get_secret:

    use pywatt_sdk::prelude::*;
    use axum::Router;
    
    async fn build_router_with_auth(state: AppState<()>) -> Result<Router, Box<dyn std::error::Error>> {
        let jwt_secret_value = get_secret(state.secret_client(), "JWT_HMAC_SECRET").await?;
        // Use the router extension trait for JWT
        use pywatt_sdk::ext::RouterJwtExt; 
        let app = Router::new()
            .route("/protected/resource", axum::routing::get(protected_handler))
            // ... other routes ...
            .with_jwt(jwt_secret_value); // Pass the SecretString directly
        Ok(app)
    }
    # async fn protected_handler() -> &'static str { "Protected" }
    
  3. The middleware automatically:

    • Expects an Authorization: Bearer <token> header.
    • Validates the token using the provided HMAC secret (HS256 algorithm).
    • Returns 401 Unauthorized if the header is missing, the token is invalid, expired, or has the wrong signature.
    • On success, inserts the decoded JWT claims (as serde_json::Value) into the request extensions.
  4. Access claims in your handler:

    use axum::extract::Extension;
    use serde_json::Value;
    
    async fn protected_handler(Extension(claims): Extension<Value>) -> String {
        let user_id = claims.get("sub").and_then(|v| v.as_str()).unwrap_or("anonymous");
        format!("Hello, protected user {}!", user_id)
    }
    

Contributing

Contributions are welcome! Please follow standard Rust practices and ensure tests pass.

License

This project is licensed under the MIT OR Apache-2.0 license. See LICENSE-MIT or LICENSE-APACHE for details.

Dependencies

~13–26MB
~381K SLoC