11 releases (5 breaking)
| new 0.6.3 | Feb 8, 2026 |
|---|---|
| 0.6.2 | Dec 23, 2025 |
| 0.6.1 | Nov 26, 2025 |
#266 in Debugging
137 downloads per month
Used in opentelemetry-langfuse
135KB
2K
SLoC
langfuse-ergonomic
Ergonomic Rust client for Langfuse, the open-source LLM observability platform.
Features
- Builder pattern - intuitive API using the Bon builder library
- Async/await ready - full async support with Tokio
- Type safe - strongly typed with compile-time guarantees
- Easy setup - simple configuration from environment variables
- Comprehensive - support for traces, observations, scores, and more
- Batch processing - automatic batching with retry logic and chunking
- Production ready - built-in timeouts, connection pooling, and error handling
- Self-hosted support - full support for self-hosted Langfuse instances
Installation
[dependencies]
langfuse-ergonomic = "*"
tokio = { version = "1", features = ["full"] }
serde_json = "1"
Optional Features
[dependencies]
langfuse-ergonomic = { version = "*", features = ["compression"] }
compression- Enable gzip, brotli, and deflate compression for requests (reduces bandwidth usage)
Quick Start
use langfuse_ergonomic::{ClientBuilder, LangfuseClient, Traces};
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client from environment variables
let client = ClientBuilder::from_env()?.build()?;
// Create a trace
let trace = client.trace()
.name("my-application")
.input(json!({"query": "Hello, world!"}))
.output(json!({"response": "Hi there!"}))
.user_id("user-123")
.tags(["production", "chat"])
.call()
.await?;
println!("Created trace: {}", trace.id);
// Fetch and list traces - returns strongly-typed structs
let fetched_trace = client.get_trace(&trace.id).await?;
let traces: Traces = client.list_traces()
.limit(10)
.user_id("user-123")
.call()
.await?;
// Access trace data with type safety
for trace in &traces.data {
println!("Trace ID: {}, Name: {:?}", trace.id, trace.name);
}
// Create a dataset
let dataset = client.create_dataset()
.name("my-dataset")
.description("Example dataset")
.call()
.await?;
Ok(())
}
Type Safety
All API methods return strongly-typed structs instead of JSON values. Types are auto-generated from the Langfuse OpenAPI specification and re-exported for convenience:
use langfuse_ergonomic::{Traces, Dataset, Prompt, ObservationsView};
// You can also import directly from the base crate (both are equivalent):
// use langfuse_ergonomic::Traces;
// use langfuse_client_base::models::Traces;
Benefits:
- ✅ Compile-time type checking
- ✅ IDE autocomplete for all fields
- ✅ No runtime JSON parsing errors
- ✅ Full API documentation on types
Configuration
Set these environment variables:
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=sk-lf-...
LANGFUSE_BASE_URL=https://cloud.langfuse.com # Optional
Or configure explicitly with advanced options:
use langfuse_ergonomic::ClientBuilder;
use std::time::Duration;
let client = ClientBuilder::new()
.public_key("pk-lf-...")
.secret_key("sk-lf-...")
.base_url("https://cloud.langfuse.com")
.timeout(Duration::from_secs(30)) // Custom timeout
.connect_timeout(Duration::from_secs(5)) // Connection timeout
.user_agent("my-app/1.0.0") // Custom user agent
.build()?;
Examples
Check the examples/ directory for more usage examples:
# Trace examples
cargo run --example basic_trace
cargo run --example trace_with_metadata
cargo run --example multiple_traces
# Trace fetching and management
cargo run --example traces_fetch
# Observations (spans, generations, events)
cargo run --example observations
# Scoring and evaluation
cargo run --example scores
# Dataset management
cargo run --example datasets
# Prompt management
cargo run --example prompts
# Batch processing
cargo run --example batch_ingestion
# Self-hosted configuration
cargo run --example self_hosted
# Advanced features (prompts, dataset items, observation updates)
cargo run --example advanced_features
Batch Processing
The client supports efficient batch processing with automatic chunking, retry logic, and comprehensive error handling:
Default Configuration
- Max events per batch: 100 events
- Max batch size: 3.5 MB (conservative limit for Langfuse Cloud's 5MB limit)
- Auto-flush interval: 5 seconds
- Max retries: 3 attempts with exponential backoff
- Retry jitter: Enabled by default (25% random jitter to avoid thundering herd)
- Backpressure policy: Block (waits when queue is full)
- Max queue size: 10,000 events
use langfuse_ergonomic::{Batcher, BackpressurePolicy, ClientBuilder, LangfuseClient};
use std::time::Duration;
let client = ClientBuilder::from_env()?.build()?;
// Create a batcher with custom configuration
let batcher = Batcher::builder()
.client(client)
.max_events(50) // Events per batch (default: 100)
.max_bytes(2_000_000) // Max batch size in bytes (default: 3.5MB)
.flush_interval(Duration::from_secs(10)) // Auto-flush interval (default: 5s)
.max_retries(5) // Retry attempts (default: 3)
.max_queue_size(5000) // Max events to queue (default: 10,000)
.backpressure_policy(BackpressurePolicy::DropNew) // What to do when queue is full
.build()
.await;
// Add events - they'll be automatically batched
for event in events {
batcher.add(event).await?;
}
// Manual flush if needed
let response = batcher.flush().await?;
println!("Sent {} events", response.success_count);
// Monitor metrics
let metrics = batcher.metrics();
println!("Queued: {}, Flushed: {}, Failed: {}, Dropped: {}",
metrics.queued, metrics.flushed, metrics.failed, metrics.dropped);
// Graceful shutdown (flushes remaining events)
let final_response = batcher.shutdown().await?;
Advanced Features
207 Multi-Status Handling: Automatically handles partial failures where some events succeed and others fail.
Backpressure Policies:
Block: Wait when queue is full (default)DropNew: Drop new events when queue is fullDropOldest: Remove oldest events to make room
Metrics & Monitoring:
let metrics = batcher.metrics();
// Available metrics:
// - queued: Current events waiting to be sent
// - flushed: Total successfully sent
// - failed: Total failed after all retries
// - dropped: Total dropped due to backpressure
// - retries: Total retry attempts
// - last_error_ts: Unix timestamp of last error
Error Handling:
match batcher.flush().await {
Ok(response) => {
println!("Success: {}, Failed: {}",
response.success_count, response.failure_count);
}
Err(Error::PartialFailure { success_count, failure_count, errors, .. }) => {
println!("Partial success: {} ok, {} failed", success_count, failure_count);
for error in errors {
if error.retryable {
println!("Retryable error: {}", error.message);
}
}
}
Err(e) => eprintln!("Complete failure: {}", e),
}
API Coverage
Implemented Features
Traces
- Creation - Full trace creation with metadata support
- Fetching - Get individual traces by ID
- Listing - List traces with filtering and pagination
- Management - Delete single or multiple traces
- Session and user tracking
- Tags and custom timestamps
- Input/output data capture
Observations
- Spans - Track execution steps and nested operations
- Generations - Monitor LLM calls with token usage
- Events - Log important milestones and errors
- Nested observations with parent-child relationships
- Log levels (DEBUG, INFO, WARNING, ERROR)
Scoring
- Numeric scores - Evaluate with decimal values (0.0-1.0)
- Categorical scores - Text-based classifications
- Binary scores - Success/failure tracking
- Rating scores - Star ratings and scales
- Trace-level and observation-level scoring
- Score metadata and comments
- Annotation queue linkage for human-review workflows
Attach scores to annotation queues when triaging human review tasks:
let score_id = client
.score()
.trace_id(&trace.id)
.name("manual_review_verdict")
.queue_id("annotation-queue-123")
.string_value("needs_follow_up")
.comment("Flagged during human review")
.call()
.await?;
Dataset Management
- Creation - Create datasets with metadata
- Listing - List all datasets with pagination
- Fetching - Get dataset details by name
- Run Management - Get, list, and delete dataset runs
Prompt Management
- Fetching - Get prompts by name and version
- Listing - List prompts with filtering
- Creation - Basic prompt creation (placeholder implementation)
Batch Processing
- Automatic Batching - Events are automatically grouped into optimal batch sizes
- Size Limits - Respects Langfuse's 3.5MB batch size limit
- Retry Logic - Exponential backoff for failed requests
- Partial Failures - Handles 207 Multi-Status responses
- Background Processing - Non-blocking event submission
Production Features
- Timeouts - Configurable request and connection timeouts
- Compression - Optional gzip, brotli, and deflate support (via
compressionfeature flag) - HTTP/2 - Efficient connection multiplexing
- Connection Pooling - Reuses connections for better performance
- Error Handling - Structured error types with retry metadata
- Self-Hosted Support - Full compatibility with self-hosted instances
License
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE)
- MIT license (LICENSE-MIT)
Contributing
See CONTRIBUTING.md for guidelines.
Links
Dependencies
~18–36MB
~442K SLoC