9 stable releases
new 3.1.0 | May 5, 2025 |
---|---|
3.0.8 | May 2, 2025 |
2.0.0 | Apr 26, 2025 |
1.1.2 | Apr 7, 2025 |
1.1.0 | Feb 27, 2025 |
#376 in Database interfaces
466 downloads per month
270KB
4K
SLoC
Floxide 🦀: The Power of Workflows in Rust
Tired of Ad-Hoc Process Management? Build Resilient Systems with Workflows!
Building reliable applications often involves complex sequences of operations, background jobs, or distributed tasks. Handling failures gracefully, managing state consistently, ensuring steps execute in order, and retrying transient errors can lead to brittle, hard-to-maintain code scattered across your system.
Floxide solves this. It provides a robust framework in Rust to explicitly model your complex processes as workflows. Define your steps, connect them declaratively, and let Floxide handle the orchestration, fault tolerance, state management, and distribution. Focus on your core logic, build more reliable systems faster.
Floxide is an extensible framework for building distributed, parallel, and event-driven workflows in Rust. Model complex processes as type-safe, directed graphs with robust support for:
- Distributed Execution: Run workflows across multiple workers.
- Checkpointing & Recovery: Fault tolerance and resumability.
- Declarative Definition: Use
workflow!
andnode!
macros for clear definitions. - Async Native: Built for modern async Rust.
Core Concepts
Floxide models workflows using these key components:
Node
: A single, reusable step in a workflow. Defined using thenode!
macro.Workflow
: The overall directed graph structure, connecting Nodes. Defined using theworkflow!
macro.Transition
: The result of a Node's execution, indicating what happens next (Next
,NextAll
,Hold
,Abort
).WorkflowCtx
/Context
: Shared data or state accessible by all Nodes within a specific workflow run. Often structured using event sourcing (EventLog
) and requires implementing theMerge
trait for distributed consistency.WorkQueue
: (For distributed workflows) A queue holding tasks (WorkItem
s) ready to be processed by Workers.- (
CheckpointStore
/Checkpoint
): (For local persistence/resume) Saved state of a workflow run including context and pending tasks, enabling recovery. ContextStore
: (For distributed workflows) Manages the persistent storage and merging of the sharedContext
data for a run.- Distributed Stores: (For distributed workflows) A collection of specialized stores (
RunInfoStore
,MetricsStore
,ErrorStore
,LivenessStore
,WorkItemStateStore
) for tracking run metadata, errors, worker health, etc. DistributedWorker
: A process that dequeues tasks from theWorkQueue
, interacts with distributed stores (ContextStore
, etc.) for state, and executes Nodes.DistributedOrchestrator
: Manages the lifecycle of distributed workflow runs, interacting with all distributed stores.
Features
- Type-Safe Workflow Definition: Compile-time checks for node inputs/outputs and graph structure.
- Declarative Macros:
workflow!
andnode!
for clear, concise workflow logic. - State Management & Recovery: Fault tolerance via
CheckpointStore
(local) orContextStore
(distributed), enabling resumability. - Distributed Execution: Scale workflows across multiple workers using shared stores.
- Event-Driven & Async: Built on
tokio
for efficient asynchronous execution. - Extensible Stores: Pluggable storage backends for queues and state (In-memory provided, Redis available).
- Retry Policies: Built-in support for automatic retries on node failures.
- Observability: Stores for tracking run status, metrics, errors, and worker liveness.
- Redis Backend: Optional
floxide-redis
crate provides implementations for all distributed stores using Redis.
Quick Example
Here's a conceptual workflow simulating article generation using LLM-like steps:
use floxide::{node, workflow, FloxideError, Node, Transition, Workflow, WorkflowCtx};
use rllm::{
builder::{LLMBackend, LLMBuilder},
chat::{ChatMessage, ChatRole, MessageType},
LLMProvider,
};
use std::sync::Arc;
use std::{
env,
fmt::{self, Debug},
};
use tracing::Level;
#[derive(Clone)]
pub struct LLMProviderWrapper {
inner: Arc<Box<dyn LLMProvider>>,
}
impl LLMProviderWrapper {
pub fn new(inner: Arc<Box<dyn LLMProvider>>) -> Self {
Self { inner }
}
}
impl Debug for LLMProviderWrapper {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LLMProviderWrapper")
}
}
// --- Node 1: Generate Outline ---
node! {
pub struct OutlineNode {
llm: LLMProviderWrapper,
};
context = ();
input = String; // Input: Article Topic
output = String; // Output: Outline Text
| _ctx, topic | {
println!("OutlineNode: Generating outline for topic: '{}'", topic);
// Call LLM to generate an outline
let prompt = format!("Generate an outline for an article about: {}", topic);
let messages = vec![ChatMessage { role: ChatRole::User, content: prompt, message_type: MessageType::Text }];
let outline: String = self.llm.inner.chat(&messages).await.map_err(|e| FloxideError::Generic(e.to_string()))?.text().unwrap_or_default();
// Pass the outline to the next step
Ok(Transition::Next(outline))
}
}
// --- Node 2: Draft Article ---
node! {
pub struct DraftNode {
llm: LLMProviderWrapper,
};
context = ();
input = String; // Input: Outline Text
output = String; // Output: Draft Article Text
| _ctx, outline | {
println!("DraftNode: Drafting article based on outline...");
// Call LLM to draft the article based on the outline
let prompt = format!("Write a detailed draft article based on the following outline:\n{}", outline);
let messages = vec![ChatMessage { role: ChatRole::User, content: prompt, message_type: MessageType::Text }];
let draft: String = self.llm.inner.chat(&messages).await.map_err(|e| FloxideError::Generic(e.to_string()))?.text().unwrap_or_default();
// Pass the draft to the next step
Ok(Transition::Next(draft))
}
}
// --- Node 3: Review Article ---
node! {
pub struct ReviewNode {
llm: LLMProviderWrapper,
};
context = ();
input = String; // Input: Draft Article Text
output = String; // Output: Final Article Text
| _ctx, draft | {
println!("ReviewNode: Reviewing and finalizing draft...");
// Call LLM to review and finalize the draft
let prompt = format!("Review and finalize the following article draft. Provide the final polished version without any additional text:\n{}", draft);
let messages = vec![ChatMessage { role: ChatRole::User, content: prompt, message_type: MessageType::Text }];
let final_article: String = self.llm.inner.chat(&messages).await.map_err(|e| FloxideError::Generic(e.to_string()))?.text().unwrap_or_default();
// Pass the final article as the workflow result
Ok(Transition::Next(final_article))
}
}
// --- Workflow Definition: Connecting the nodes ---
workflow! {
pub struct ArticleWriterWorkflow {
outline: OutlineNode,
draft: DraftNode,
review: ReviewNode,
}
start = outline; // Start with the outline node
context = ();
edges {
// Define the sequence: outline -> draft -> review
outline => { [draft] };
draft => { [review] };
review => {}; // review is the final node
}
}
#[tokio::main]
async fn main() -> Result<(), FloxideError> {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();
// Initialize the LLM for chat completion
let llm = LLMBuilder::new()
.backend(LLMBackend::OpenAI)
.api_key(env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY not set"))
.model("gpt-4o")
.temperature(0.7)
.build()
.expect("Failed to build LLM");
let llm = LLMProviderWrapper::new(Arc::new(llm));
let workflow = ArticleWriterWorkflow {
outline: OutlineNode { llm: llm.clone() },
draft: DraftNode { llm: llm.clone() },
review: ReviewNode { llm: llm.clone() },
};
let ctx = WorkflowCtx::new(());
let result = workflow
.run(&ctx, "Rust Programming Language".to_string())
.await?;
println!("Generated article: {}", result);
Ok(())
}
Examples
The examples/
directory contains various demonstrations of Floxide's features:
node_macro_example.rs
: Basic usage of thenode!
macro to define a node with internal state and custom context.branching_example.rs
: Demonstrates a workflow with a shared context (MyCtx
) and composite nodes that branch based on an enum output (FooAction
).split_example.rs
: Shows how to useSplitNode
to fan-out a single input into multiple items for parallel processing.merge_example.rs
: Complementssplit_example
by using a customMergeNode
to collect results from parallel branches, holding until all expected inputs arrive.batch_example.rs
: DemonstratesBatchNode
for processing items in groups, followed by routing based on batch results.retry_example.rs
: UsingRetryNode
(viawith_retry
) to wrap a node that might fail transiently, applying a retry policy.retry_macro_example.rs
: Defining retry policies directly within theworkflow!
macro using the#[retry = ...]
attribute.error_fallback_macro.rs
: Handling node failures at the workflow level using theon_failure
clause in theedges
block.checkpoint_example.rs
: Shows how to userun_with_checkpoint
andresume
with anInMemoryCheckpointStore
for fault tolerance.cancellation_example.rs
: Demonstrates graceful workflow cancellation using thecancel_token
fromWorkflowCtx
.timeout_example.rs
: Setting a timeout on theWorkflowCtx
to automatically abort long-running workflows.nested_workflow_example.rs
: Embedding one workflow within another usingCompositeNode
.generics_example.rs
: Defining workflows with nodes that have generic type parameters.timer_example.rs
: Using asource
node (backed by a channel) to drive a workflow with external events (like timer ticks).distributed_example.rs
: Simulates a distributed workflow run using in-memory components (InMemoryWorkQueue
,InMemoryContextStore
) and multiple worker tasks, demonstrating event-sourced context.distributed_orchestrated_merge_example.rs
: A more complex distributed example showcasingOrchestratorBuilder
,WorkerBuilder
,WorkerPool
, and various in-memory distributed stores (InMemoryContextStore
,RunInfoStore
, etc.) using theMerge
trait for a split/merge workflow with potential failures.workflow_dot.rs
: Demonstrates generating a Graphviz DOT representation of a workflow's structure using theto_dot()
method.terminal_node_example.rs
: A minimal workflow where the starting node is also the terminal node, directly returning the final result.order_example.rs
: A workflow that simulates an order processing system, including validation, payment processing, and stock allocation.llm_example.rs
: A simple linear workflow that shows how to use LLM-like steps to generate an article.floxide-redis
crate tests: The integration tests within thecrates/floxide-redis/tests
directory serve as examples for configuring and using the Redis-backed implementations of all distributed stores (RedisContextStore
,RedisWorkQueue
, etc.).
Installation
Add Floxide to your Cargo.toml
dependencies:
[dependencies]
floxide = "*" # Check crates.io for the latest version
# Optional: For Redis-backed distributed stores
# floxide-redis = "*"
Getting Started
The best way to learn Floxide is through the documentation and tutorials:
- Floxide Documentation & Tutorials: Start here for comprehensive guides and examples.
- Core Concepts Tutorial: Learn the fundamentals of Workflows, Nodes, Transitions, and Context.
- Distributed Tutorial: Dive into distributed execution with Workers, Orchestrators, Queues, and Checkpointing.
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
License
Floxide is licensed under the MIT License. See the LICENSE file for details.
Dependencies
~8–18MB
~224K SLoC