1 unstable release
Uses new Rust 2024
| 0.6.4 | Dec 11, 2025 |
|---|
#959 in Authentication
Used in arrow-zerobus-sdk-wrapper
785KB
16K
SLoC
OTLP Arrow Flight Library
A cross-platform Rust library for receiving OpenTelemetry Protocol (OTLP) messages via gRPC and writing them to local files in Arrow IPC Streaming format. The library supports both standalone service mode and embedded library usage with public API methods.
Features
- Dual Protocol Support: Simultaneous support for gRPC Protobuf (standard OTLP) and gRPC Arrow Flight (OTAP) on different ports
- Arrow IPC Storage: Writes telemetry data to Arrow IPC Streaming format files with automatic rotation
- Batch Writing: Configurable write intervals for efficient disk I/O
- File Cleanup: Automatic cleanup of old trace and metric files based on configurable retention intervals
- Public API: Embedded library mode with programmatic API for Rust and Python
- Configuration System: Support for YAML files, environment variables, and programmatic API
- Optional Forwarding: Forward messages to remote OTLP endpoints with automatic format conversion
- Format Conversion: Automatic conversion between Protobuf and Arrow Flight formats
- Authentication: Support for API key, bearer token, and basic authentication in forwarding
- Circuit Breaker: Automatic failure handling with circuit breaker pattern for forwarding
- Mock Service: Built-in mock service for end-to-end testing
- Python Bindings: PyO3 bindings for Python integration
- Health Check: HTTP health check endpoint for standalone service
- Metrics Collection: Library operation metrics (messages received, files written, errors, conversions)
- OpenTelemetry SDK Integration: Built-in
PushMetricExporterandSpanExporterimplementations for seamless integration with OpenTelemetry SDK - Reference-Based Export: Efficient metric export via
export_metrics_ref()method that accepts references instead of requiring ownership - Configurable Temporality: Support for Cumulative or Delta temporality modes for metric exporters (defaults to Cumulative)
- Comprehensive Test Coverage: Extensive test suite covering concurrent access, circuit breaker state transitions, and edge cases
- Architecture Documentation: Complete system architecture documentation in
docs/ARCHITECTURE.md - Performance Optimizations: Optimized lock contention in circuit breaker and exporter implementations
- Security Features:
- Secure credential storage using
SecretString(credentials zeroed in memory) - Path traversal protection in dashboard server
- HTTP security headers (CSP, X-Frame-Options, X-Content-Type-Options, X-XSS-Protection)
- Configurable buffer size limits to prevent memory exhaustion
- Comprehensive input validation (URL parsing, bounds checking)
- Secure credential storage using
Quick Start
Demo Application (Recommended for First-Time Users)
The demo application is the easiest way to get started and verify the service is working:
# Run the demo application
cargo run --example demo-app
Prerequisites: For dashboard visualization, build the dashboard first:
cd dashboard && npm install && npm run build && cd ..
If dashboard is not built: The demo will still run and generate data, but without the web dashboard. Data will be written to ./output_dir/otlp/ and you can view it later after building the dashboard.
The demo will:
- Start the dashboard on http://127.0.0.1:8080 (if dashboard is built)
- Generate real metrics using OpenTelemetry SDK Meter API and mock spans continuously
- Display data in real-time in the dashboard (if enabled)
To view the dashboard:
- Open your browser to http://127.0.0.1:8080
- Click "Choose Directory" and select the
./output_dir/otlpdirectory (the parent directory containing bothtracesandmetricssubdirectories) - The dashboard will automatically load and display trace and metric data
Important: Select the otlp directory (not otlp/traces or otlp/metrics), as the dashboard needs access to both subdirectories.
To stop the demo: Press Ctrl+C (the demo will flush data and shutdown gracefully)
The demo application serves as both:
- Verification tool: Proves the service is working correctly
- Reference implementation: Shows how to use the SDK in your own applications
See examples/demo-app.rs for the complete source code with extensive comments.
As a Standalone Service
# Run with default configuration
cargo run --bin otlp-arrow-service
# Run with custom config
OTLP_OUTPUT_DIR=./my_output cargo run --bin otlp-arrow-service
As an Embedded Library (Rust)
Basic Usage
use otlp_arrow_library::{OtlpLibrary, Config};
use opentelemetry_sdk::trace::SpanData;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create configuration
let config = Config::default();
// Initialize library
let library = OtlpLibrary::new(config).await?;
// Export a trace span
// library.export_trace(span).await?;
// Export multiple traces
// library.export_traces(spans).await?;
// Export metrics (automatically converted to protobuf for storage)
// library.export_metrics(metrics).await?;
// Export metrics by reference (more efficient)
// library.export_metrics_ref(&metrics).await?;
// Force flush
library.flush().await?;
// Shutdown gracefully
library.shutdown().await?;
Ok(())
}
Integration with OpenTelemetry SDK
use otlp_arrow_library::OtlpLibrary;
use opentelemetry_sdk::metrics::{MeterProvider, PeriodicReader};
use opentelemetry_sdk::trace::TracerProvider;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create library instance
let config = otlp_arrow_library::Config::default();
let library = OtlpLibrary::new(config).await?;
// Create exporters for OpenTelemetry SDK
let metric_exporter = library.metric_exporter_adapter();
let span_exporter = library.span_exporter_adapter();
// Use with OpenTelemetry SDK
let metric_reader = PeriodicReader::builder(metric_exporter)
.with_interval(Duration::from_secs(10))
.build();
let meter_provider = MeterProvider::builder()
.with_reader(metric_reader)
.build();
let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
.with_batch_exporter(span_exporter, opentelemetry_sdk::runtime::Tokio)
.build();
// Use providers to create meters and tracers
// Metrics and traces are automatically exported via exporters
// Shutdown when done
library.shutdown().await?;
Ok(())
}
As an Embedded Library (Python)
Basic Usage
import otlp_arrow_library
# Initialize library
library = otlp_arrow_library.PyOtlpLibrary(
output_dir="./output",
write_interval_secs=5
)
# Export a trace
trace_id = bytes([1] * 16)
span_id = bytes([1] * 8)
span = {
"trace_id": trace_id,
"span_id": span_id,
"name": "my-span",
"kind": "server",
"attributes": {"service.name": "my-service"}
}
library.export_trace(span)
# Export metrics by reference (more efficient)
metrics = {} # Your metrics dictionary
library.export_metrics_ref(metrics)
# Flush and shutdown
library.flush()
library.shutdown()
Exporter Creation
import otlp_arrow_library
# Initialize library
library = otlp_arrow_library.PyOtlpLibrary(
output_dir="./output",
write_interval_secs=5
)
# Create exporters for OpenTelemetry SDK integration
metric_exporter = library.metric_exporter_adapter()
span_exporter = library.span_exporter_adapter()
# Note: Direct Python OpenTelemetry SDK integration requires
# adapter classes (see Issue #6). For now, use library methods directly.
Configuration
Configuration can be provided via:
- YAML file:
config.yaml - Environment variables:
OTLP_*prefix (e.g.,OTLP_OUTPUT_DIR,OTLP_WRITE_INTERVAL_SECS) - Programmatic API:
ConfigBuilder
Configuration Options
output_dir: Output directory for Arrow IPC files (default:./output_dir)write_interval_secs: How frequently to write batches to disk in seconds (default: 5)trace_cleanup_interval_secs: Trace file retention interval in seconds (default: 600)metric_cleanup_interval_secs: Metric file retention interval in seconds (default: 3600)max_trace_buffer_size: Maximum number of trace spans to buffer in memory (default: 10000, max: 1000000)max_metric_buffer_size: Maximum number of metric requests to buffer in memory (default: 10000, max: 1000000)protocols: Protocol configurationprotobuf_enabled: Enable gRPC Protobuf server (default: true)protobuf_port: Port for Protobuf server (default: 4317)arrow_flight_enabled: Enable gRPC Arrow Flight server (default: true)arrow_flight_port: Port for Arrow Flight server (default: 4318)
forwarding: Optional remote forwarding configurationenabled: Enable forwarding (default: false)endpoint_url: Remote endpoint URL (required if enabled, must be valid http/https URL)protocol: Forwarding protocol (Protobuf or ArrowFlight, default: Protobuf)authentication: Optional authentication configurationauth_type: Authentication type (api_key,bearer_token, orbasic)credentials: Authentication credentials (stored securely usingSecretString)- For
api_key: requireskeycredential - For
bearer_token: requirestokencredential - For
basic: requiresusernameandpasswordcredentials
- For
dashboard: Dashboard HTTP server configurationenabled: Enable dashboard server (default: false)port: Dashboard port (default: 8080)bind_address: Bind address (default:127.0.0.1for localhost-only access)x_frame_options: X-Frame-Options header value (DENYorSAMEORIGIN, default:DENY)static_dir: Directory containing dashboard static files (default:./dashboard/dist)
Example Configuration
YAML Configuration
output_dir: ./custom_output
write_interval_secs: 10
trace_cleanup_interval_secs: 1200
metric_cleanup_interval_secs: 7200
protocols:
protobuf_enabled: true
protobuf_port: 4317
arrow_flight_enabled: true
arrow_flight_port: 4318
forwarding:
enabled: true
endpoint_url: "https://collector.example.com:4317"
protocol: protobuf
authentication:
auth_type: bearer_token
credentials:
token: "my-bearer-token"
Environment Variables
export OTLP_OUTPUT_DIR=./my_output
export OTLP_WRITE_INTERVAL_SECS=10
export OTLP_MAX_TRACE_BUFFER_SIZE=50000
export OTLP_MAX_METRIC_BUFFER_SIZE=50000
export OTLP_PROTOBUF_ENABLED=true
export OTLP_ARROW_FLIGHT_ENABLED=true
Programmatic Configuration
use otlp_arrow_library::{ConfigBuilder, ForwardingConfig, ForwardingProtocol};
let config = ConfigBuilder::new()
.output_dir("./custom_output")
.write_interval_secs(10)
.protobuf_enabled(true)
.arrow_flight_enabled(true)
.enable_forwarding(ForwardingConfig {
enabled: true,
endpoint_url: Some("https://collector.example.com:4317".to_string()),
protocol: ForwardingProtocol::Protobuf,
authentication: None,
})
.build()?;
See quickstart guide for detailed examples.
Project Structure
src/
├── lib.rs # Library root
├── config/ # Configuration management
├── otlp/ # OTLP processing (server, exporter, forwarder)
├── api/ # Public API
├── mock/ # Mock service for testing
└── bin/ # Standalone service binary
tests/
├── unit/ # Unit tests
├── integration/ # Integration tests
└── contract/ # Contract tests
Requirements
- Rust 1.75+ (stable channel)
- Tokio async runtime
- Cross-platform: Windows, Linux, macOS
- Python 3.11+ (for Python bindings)
CI/CD
The project includes GitHub Actions workflows for:
- Cross-platform testing: Runs tests on Windows, Linux, and macOS
- Code coverage validation: Ensures 85% coverage per file requirement
- Linting and formatting: Checks code style and formatting
- Build verification: Validates builds on all platforms
- Python bindings: Builds and tests Python wheels
See .github/workflows/ for workflow definitions.
Building
Rust Library
# Build library
cargo build --release
# Run tests
cargo test
# Run benchmarks
cargo bench
Python Bindings
# Install maturin
pip install maturin
# Build Python wheel
maturin build --release
# Or install in development mode
maturin develop
API Documentation
Rust API
See API documentation for complete Rust API reference.
Python API
See Python API contract for Python API reference.
Python OpenTelemetry SDK Integration
The library provides adapter classes for seamless integration with Python OpenTelemetry SDK:
import otlp_arrow_library
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Create library instance
library = otlp_arrow_library.PyOtlpLibrary(output_dir="./otlp_output")
# Create adapters for Python OpenTelemetry SDK
metric_exporter = library.metric_exporter_adapter()
span_exporter = library.span_exporter_adapter()
# Use directly with Python OpenTelemetry SDK
metric_reader = PeriodicExportingMetricReader(metric_exporter)
span_processor = BatchSpanProcessor(span_exporter)
See Python OpenTelemetry SDK Adapter Quickstart for complete examples.
Examples
Demo Application
The demo application (examples/demo-app.rs) is the recommended starting point for new users:
Run the demo:
cargo run --example demo-app
What it does:
- Enables the dashboard automatically (http://127.0.0.1:8080)
- Generates mock metrics and spans continuously
- Demonstrates all SDK usage patterns with extensive comments
- Shows parent-child span relationships and different span kinds
- Includes graceful shutdown on Ctrl+C
Use cases:
- Verify the service is working after installation
- See a complete reference implementation
- Learn SDK usage patterns by reading the well-commented code
Other Examples
- Standalone Service:
examples/standalone.rs- Run as a standalone gRPC service - Embedded Library:
examples/embedded.rs- Use as an embedded component - Python Usage:
examples/python_example.py- Python integration example
Health Check
The standalone service includes a health check endpoint on port 8080:
curl http://localhost:8080
# Returns: OK
Metrics
The library collects operation metrics that can be accessed via OtlpFileExporter::get_metrics():
- Messages received (traces + metrics)
- Files written
- Errors encountered
- Format conversions performed
Security
The library implements multiple security features to protect credentials, prevent attacks, and ensure safe operation:
Credential Security
- Secure Storage: All authentication credentials are stored using
SecretString, which:- Zeros memory when credentials are dropped
- Prevents credentials from appearing in logs or error messages
- Never exposes credentials in
DebugorDisplayimplementations
Path Traversal Protection
- Comprehensive Validation: Dashboard server validates all file paths to prevent directory traversal attacks
- Symlink Handling: Safely resolves symlinks and verifies paths stay within allowed directories
- Platform Support: Works correctly on Windows, Linux, and macOS
HTTP Security Headers
- Content-Security-Policy: Prevents XSS attacks
- X-Frame-Options: Prevents clickjacking (configurable)
- X-Content-Type-Options: Prevents MIME type sniffing
- X-XSS-Protection: Additional XSS protection
Memory Safety
- Buffer Limits: Configurable buffer size limits prevent unbounded memory growth
- Backpressure: Returns
BufferFullerror when limits are reached instead of consuming unlimited memory
Input Validation
- URL Validation: Comprehensive URL parsing and validation using the
urlcrate - Bounds Checking: All numeric configuration values are validated for reasonable bounds
For detailed security information, see SECURITY.md.
Implementation Notes
ResourceMetrics Storage
The library uses protobuf format (ExportMetricsServiceRequest) for internal metric storage to solve the ResourceMetrics Clone limitation. When you call export_metrics() with a ResourceMetrics instance:
- The metrics are automatically converted to protobuf format for storage
- Protobuf format supports
Clone, enabling proper buffering and forwarding - When flushing, protobuf is converted back to
ResourceMetricsfor file export - This ensures full data preservation and proper handling of metrics from all sources (gRPC Protobuf, Arrow Flight, and public API)
Documentation
- Architecture Documentation - System design, data flow, and component interactions
- Specification
- Implementation Plan
- Quickstart Guide
- API Contracts
License
MIT OR Apache-2.0
Dependencies
~38–58MB
~843K SLoC