1 unstable release
new 0.1.0 | May 4, 2025 |
---|
#1718 in Network programming
130KB
2.5K
SLoC
PyWatt SDK
Standardized SDK for building PyWatt modules in Rust.
Overview
This crate provides the core building blocks for creating PyWatt modules that integrate seamlessly with the PyWatt orchestrator. It handles:
- IPC Handshake: Standardized startup communication (
read_init
,send_announce
). - Logging: Consistent logging to
stderr
with secret redaction (init_module
). - Secret Management: Secure retrieval and rotation handling via the integrated
secret_client
module (get_secret
,subscribe_secret_rotations
). - Runtime IPC: Background task for processing orchestrator messages (
process_ipc_messages
). - Core Types: Re-exports essential types from the integrated
ipc_types
module (OrchestratorInit
,ModuleAnnounce
, etc.). - (Optional) Macros: Proc macros for simplifying module definition (requires
proc_macros
feature). - (Optional) JWT Auth: Middleware for Axum route protection (requires
jwt_auth
feature).
Installation
Add this to your module's Cargo.toml
:
[dependencies]
# Use the version from crates.io
pywatt_sdk = "0.1.0"
# Or use a path dependency during development
# pywatt_sdk = { path = "../pywatt_sdk" }
# Other dependencies like axum, tokio, etc.
tokio = { version = "1", features = ["full"] }
axum = "0.7"
tracing = "0.1"
Quickstart Example
Here's a minimal module using the SDK with Axum:
// Use the prelude for common types and functions
use pywatt_sdk::prelude::*;
use axum::{routing::get, Router, extract::State};
use tokio::net::{TcpListener, UnixListener};
use std::sync::Arc;
// Define your module-specific state if needed
#[derive(Clone)]
struct MyModuleState {
// Example: Database connection pool or config
message: String,
}
// Use the SDK's AppState wrapper for shared state
type SharedState = AppState<MyModuleState>;
async fn health_handler() -> &'static str {
"OK"
}
async fn custom_handler(State(state): State<SharedState>) -> String {
format!("Module {} says: {}", state.module_id(), state.user_state.message)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// 1. Initialize logging (must be first!)
init_module();
// 2. Perform handshake with orchestrator
let init: OrchestratorInit = read_init().await?;
tracing::info!(?init, "Received orchestrator initialization");
// 3. Initialize secret client (built-in)
let secret_client = get_module_secret_client(&init.orchestrator_api, &init.module_id).await?;
// 4. Fetch initial secrets (example)
let initial_message = match get_secret(&secret_client, "INITIAL_MESSAGE").await {
Ok(secret) => secret.expose_secret().clone(),
Err(_) => {
tracing::warn!("INITIAL_MESSAGE secret not found, using default.");
"Default message".to_string()
}
};
// 5. Create shared state
let my_state = MyModuleState { message: initial_message };
let app_state = AppState::new(
init.module_id.clone(),
init.orchestrator_api.clone(),
secret_client.clone(),
my_state
);
// 6. Subscribe to secret rotations (example)
let app_state_clone = app_state.clone(); // Clone AppState for the task
let keys = vec!["INITIAL_MESSAGE".to_string()];
tokio::spawn(subscribe_secret_rotations(secret_client.clone(), keys, move |key, new_val| {
// Note: To update shared state *mutably* here, MyModuleState would
// typically contain Arcs/Mutexes, or you'd use a channel to communicate
// back to the main thread/state manager.
if key == "INITIAL_MESSAGE" {
let new_message = new_val.expose_secret().clone();
tracing::info!(%key, new_message = %new_message, "Secret rotated, state needs update (implementation detail)");
// Example: If app_state_clone.user_state held an Arc<Mutex<String>>:
// let mut message = app_state_clone.user_state.lock().unwrap();
// *message = new_message;
}
}));
// 7. Set up Axum router
let app = Router::new()
.route("/health", get(health_handler))
.route("/custom", get(custom_handler))
.with_state(app_state.clone());
// 8. Bind listener (TCP or UDS)
// Use the `.into_make_service()` for Axum 0.7+
let serve_future = match &init.listen {
ListenAddress::Tcp(addr) => {
tracing::info!(%addr, "Binding TCP listener");
let listener = TcpListener::bind(addr).await?;
axum::serve(listener, app.into_make_service())
}
ListenAddress::Unix(path) => {
tracing::info!(path = %path.display(), "Binding Unix listener");
// Ensure the socket file doesn't exist or clean it up
if path.exists() {
tokio::fs::remove_file(path).await?;
}
let listener = UnixListener::bind(path)?;
axum::serve(listener, app.into_make_service())
}
};
// 9. Announce endpoints to orchestrator
let announce = ModuleAnnounce {
listen: init.listen_address().to_string_lossy(), // Use helper from ext trait
endpoints: vec![
AnnouncedEndpoint { path: "/health".into(), methods: vec!["GET".into()], auth: None },
AnnouncedEndpoint { path: "/custom".into(), methods: vec!["GET".into()], auth: None },
],
};
send_announce(&announce)?;
tracing::info!(?announce, "Sent announcement to orchestrator");
// 10. Spawn runtime IPC message processor
let ipc_handle = tokio::spawn(process_ipc_messages());
// 11. Run the server
tracing::info!("Module server starting");
tokio::select! {
result = serve_future => {
if let Err(e) = result {
tracing::error!(error = %e, "Server exited with error");
}
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("Received Ctrl+C");
}
ipc_res = ipc_handle => {
match ipc_res {
Ok(_) => tracing::info!("IPC handler finished cleanly"),
Err(e) => tracing::error!(error = %e, "IPC handler finished with error"),
}
}
}
tracing::info!("Module shutting down gracefully");
Ok(())
}
## Core Functions & Types
(See the `prelude` module for the most common items)
- `init_module()`: Configures logging to stderr with secret redaction.
- `read_init() -> Result<OrchestratorInit, InitError>`: Reads the initial handshake message from stdin.
- `send_announce(announce: &ModuleAnnounce) -> Result<(), AnnounceError>`: Sends the module's announcement message to stdout.
- `process_ipc_messages()`: An async function to spawn that listens for runtime IPC messages (like secret rotations, shutdown) from stdin.
- `get_module_secret_client(...)`: Creates a `SecretClient` instance.
- `get_secret(...)`: Retrieves a secret, ensuring it's registered for redaction.
- `subscribe_secret_rotations(...)`: Spawns a task to handle secret rotation notifications.
- `AppState<T>`: A generic state container holding SDK state and optional user state `T`.
- Types from `ipc_types`: `OrchestratorInit`, `ModuleAnnounce`, `AnnouncedEndpoint`, `ListenAddress`, etc.
- Types from `secret_client`: `SecretClient`, `SecretError`, `RequestMode`.
## JWT Authentication (Optional Feature)
The SDK provides an optional `jwt_auth` feature to easily protect Axum routes with JWT Bearer token validation.
**Enable Feature**:
```toml
[dependencies]
# Ensure the jwt_auth feature is enabled
pywatt_sdk = { version = "0.1.0", features = ["jwt_auth"] }
axum = { version = "0.7", features = ["json"] }
# ... other deps
Usage:
-
Request the HMAC signing secret (e.g.,
JWT_HMAC_SECRET
) in your module configuration (how secrets are requested depends on the orchestrator/module definition). -
Fetch the secret using
get_secret
:use pywatt_sdk::prelude::*; use axum::Router; async fn build_router_with_auth(state: AppState<()>) -> Result<Router, Box<dyn std::error::Error>> { let jwt_secret_value = get_secret(state.secret_client(), "JWT_HMAC_SECRET").await?; // Use the router extension trait for JWT use pywatt_sdk::ext::RouterJwtExt; let app = Router::new() .route("/protected/resource", axum::routing::get(protected_handler)) // ... other routes ... .with_jwt(jwt_secret_value); // Pass the SecretString directly Ok(app) } # async fn protected_handler() -> &'static str { "Protected" }
-
The middleware automatically:
- Expects an
Authorization: Bearer <token>
header. - Validates the token using the provided HMAC secret (HS256 algorithm).
- Returns
401 Unauthorized
if the header is missing, the token is invalid, expired, or has the wrong signature. - On success, inserts the decoded JWT claims (as
serde_json::Value
) into the request extensions.
- Expects an
-
Access claims in your handler:
use axum::extract::Extension; use serde_json::Value; async fn protected_handler(Extension(claims): Extension<Value>) -> String { let user_id = claims.get("sub").and_then(|v| v.as_str()).unwrap_or("anonymous"); format!("Hello, protected user {}!", user_id) }
Contributing
Contributions are welcome! Please follow standard Rust practices and ensure tests pass.
License
This project is licensed under the MIT OR Apache-2.0 license. See LICENSE-MIT or LICENSE-APACHE for details.
Dependencies
~13–26MB
~381K SLoC