2 releases
| 0.1.0-beta.1 | Nov 16, 2025 |
|---|---|
| 0.1.0-alpha.3 | Oct 12, 2025 |
| 0.1.0-alpha.2 |
|
| 0.1.0-alpha.1 |
|
#2789 in Database interfaces
12MB
267K
SLoC
OxiRS Federate - Federated Query Processing
Status: Beta Release (v0.1.0-beta.1) - Released November 16, 2025
✨ Beta Features Complete! All Beta Release Targets implemented. APIs stabilizing. Ready for beta promotion.
Federated SPARQL query processing across multiple RDF endpoints. Execute queries spanning distributed knowledge graphs with intelligent optimization and result integration.
Features
Federated Query Execution
- Multi-endpoint Queries - Query across multiple SPARQL endpoints
- Intelligent Source Selection - Automatically choose relevant endpoints
- Query Decomposition - Split queries for parallel execution
- Result Integration - Efficiently merge results from multiple sources
Optimization
- Cost-based Planning - Optimize query execution plans
- Join Ordering - Minimize data transfer between endpoints
- Parallel Execution - Execute independent sub-queries concurrently
- Result Caching - Cache frequent sub-query results
Reliability
- Failure Handling - Graceful degradation when endpoints fail
- Retry Logic - Automatic retry with backoff
- Timeout Management - Configure timeouts per endpoint
- Health Monitoring - Track endpoint availability
Installation
Add to your Cargo.toml:
# Beta features complete - APIs stabilizing
[dependencies]
oxirs-federate = "0.1.0-beta.1"
Quick Start
Basic Federated Query
use oxirs_federate::{FederatedEngine, Endpoint};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure endpoints
let endpoints = vec![
Endpoint::new("DBpedia", "https://dbpedia.org/sparql"),
Endpoint::new("Wikidata", "https://query.wikidata.org/sparql"),
];
// Create federated engine
let engine = FederatedEngine::new(endpoints)?;
// Execute federated query
let query = r#"
PREFIX dbo: <http://dbpedia.org/ontology/>
PREFIX wd: <http://www.wikidata.org/entity/>
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
SELECT ?person ?dbpName ?wikidataId WHERE {
# From DBpedia
SERVICE <https://dbpedia.org/sparql> {
?person a dbo:Person .
?person dbo:name ?dbpName .
}
# From Wikidata
SERVICE <https://query.wikidata.org/sparql> {
?wikidataId wdt:P31 wd:Q5 .
?wikidataId rdfs:label ?dbpName .
}
}
LIMIT 10
"#;
let results = engine.execute(query).await?;
for result in results {
println!("{:?}", result);
}
Ok(())
}
Automatic Federation
Let the engine automatically determine which endpoints to query:
use oxirs_federate::{FederatedEngine, EndpointRegistry};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Discover available endpoints
let registry = EndpointRegistry::discover().await?;
let engine = FederatedEngine::builder()
.registry(registry)
.enable_auto_discovery(true)
.build()?;
// Query without explicit SERVICE clauses
let query = r#"
SELECT ?person ?name WHERE {
?person a foaf:Person .
?person foaf:name ?name .
}
LIMIT 100
"#;
// Engine automatically selects relevant endpoints
let results = engine.execute(query).await?;
Ok(())
}
Endpoint Configuration
Basic Endpoint
use oxirs_federate::Endpoint;
let endpoint = Endpoint::builder()
.name("DBpedia")
.url("https://dbpedia.org/sparql")
.timeout(Duration::from_secs(30))
.build()?;
Authenticated Endpoint
use oxirs_federate::{Endpoint, Authentication};
let endpoint = Endpoint::builder()
.name("Private SPARQL")
.url("https://private.example.org/sparql")
.authentication(Authentication::Basic {
username: "user".to_string(),
password: "pass".to_string(),
})
.build()?;
Endpoint with Capabilities
use oxirs_federate::{Endpoint, EndpointCapabilities};
let capabilities = EndpointCapabilities {
supports_aggregation: true,
supports_property_paths: true,
supports_update: false,
max_results: Some(10000),
estimated_triples: 1_000_000_000,
};
let endpoint = Endpoint::builder()
.name("DBpedia")
.url("https://dbpedia.org/sparql")
.capabilities(capabilities)
.build()?;
Query Optimization
Source Selection
use oxirs_federate::{FederatedEngine, SourceSelector};
let selector = SourceSelector::builder()
.strategy(SelectionStrategy::CostBased)
.prefer_local(true)
.max_endpoints_per_query(5)
.build();
let engine = FederatedEngine::builder()
.source_selector(selector)
.build()?;
Query Decomposition
use oxirs_federate::QueryDecomposer;
let decomposer = QueryDecomposer::new();
// Automatically decompose query
let subqueries = decomposer.decompose(query)?;
for (endpoint, subquery) in subqueries {
println!("Send to {}: {}", endpoint, subquery);
}
Join Optimization
use oxirs_federate::JoinOptimizer;
let optimizer = JoinOptimizer::builder()
.strategy(JoinStrategy::BindJoin) // or HashJoin, NestedLoop
.max_bind_size(1000)
.enable_selectivity_estimation(true)
.build();
Advanced Features
Result Caching
use oxirs_federate::{FederatedEngine, CacheConfig};
let cache_config = CacheConfig {
enabled: true,
ttl: Duration::from_secs(3600),
max_size: 1000,
cache_dir: Some("./federation_cache".into()),
};
let engine = FederatedEngine::builder()
.cache_config(cache_config)
.build()?;
Parallel Execution
let engine = FederatedEngine::builder()
.max_parallel_requests(10)
.connection_pool_size(20)
.build()?;
// Executes sub-queries in parallel
let results = engine.execute_parallel(query).await?;
Failure Handling
use oxirs_federate::{FederatedEngine, FailurePolicy};
let policy = FailurePolicy {
retry_attempts: 3,
retry_delay: Duration::from_secs(1),
retry_backoff: 2.0, // Exponential backoff
continue_on_endpoint_failure: true,
partial_results: true,
};
let engine = FederatedEngine::builder()
.failure_policy(policy)
.build()?;
Monitoring
Query Statistics
let results = engine.execute_with_stats(query).await?;
println!("Endpoints queried: {}", results.stats.endpoints_used);
println!("Total execution time: {:?}", results.stats.total_time);
println!("Data transferred: {} bytes", results.stats.bytes_transferred);
println!("Results returned: {}", results.data.len());
for endpoint_stat in results.stats.endpoint_stats {
println!("{}: {:?}", endpoint_stat.name, endpoint_stat.duration);
}
Health Monitoring
use oxirs_federate::HealthMonitor;
let monitor = HealthMonitor::new(&engine);
// Check endpoint health
let health = monitor.check_health().await?;
for (endpoint, status) in health {
match status {
EndpointStatus::Healthy => println!("{}: OK", endpoint),
EndpointStatus::Degraded => println!("{}: Degraded", endpoint),
EndpointStatus::Unavailable => println!("{}: Down", endpoint),
}
}
Integration
With oxirs-arq
use oxirs_arq::QueryEngine;
use oxirs_federate::FederatedExtension;
// Extend query engine with federation
let mut engine = QueryEngine::new();
engine.add_extension(FederatedExtension::new(endpoints));
// Use SERVICE clauses in queries
let results = engine.execute(federated_query).await?;
With oxirs-gql (GraphQL Federation)
use oxirs_gql::GraphQLServer;
use oxirs_federate::FederatedEngine;
let graphql_server = GraphQLServer::builder()
.federated_engine(federated_engine)
.enable_federation(true)
.build()?;
// GraphQL queries can span multiple RDF sources
Performance
Benchmarks
| Endpoints | Query Complexity | Execution Time | Data Transfer |
|---|---|---|---|
| 2 | Simple | 150ms | 50KB |
| 5 | Moderate | 800ms | 500KB |
| 10 | Complex | 2.5s | 2MB |
With caching and parallel execution enabled
Optimization Tips
// Use bind joins for large intermediate results
let engine = FederatedEngine::builder()
.join_strategy(JoinStrategy::BindJoin)
.max_bind_size(1000)
.build()?;
// Enable aggressive caching
let cache_config = CacheConfig {
enabled: true,
ttl: Duration::from_secs(7200),
max_size: 10000,
..Default::default()
};
// Limit data transfer
let engine = FederatedEngine::builder()
.max_result_size(100_000)
.compression(true)
.build()?;
Service Discovery
Automatic Discovery
use oxirs_federate::ServiceDiscovery;
let discovery = ServiceDiscovery::new();
// Discover SPARQL endpoints
let endpoints = discovery.discover().await?;
for endpoint in endpoints {
println!("Found: {} at {}", endpoint.name, endpoint.url);
println!(" Description: {}", endpoint.description);
println!(" Capabilities: {:?}", endpoint.capabilities);
}
VoID Descriptions
use oxirs_federate::VoidParser;
// Parse VoID (Vocabulary of Interlinked Datasets) descriptions
let void_url = "https://dbpedia.org/void";
let description = VoidParser::parse(void_url).await?;
println!("Dataset: {}", description.title);
println!("Triples: {}", description.triples);
println!("SPARQL endpoint: {}", description.sparql_endpoint);
Status
Beta Release (v0.1.0-beta.1) - Beta Features Complete!
- ✅ Distributed Transactions - 2PC and Saga patterns with automatic compensation
- ✅ Advanced Authentication - OAuth2, SAML, JWT, API keys, Basic, Service-to-Service
- ✅ ML-Driven Optimization - Intelligent source selection and query planning
- ✅ Adaptive Join Strategies - Bind join, hash join, nested loop with cost-based selection
- ✅ GraphQL Federation - Schema stitching, entity resolution, query translation
- ✅ Production Monitoring - OpenTelemetry, circuit breakers, auto-healing
- ✅ Streaming Support - Real-time processing with NATS/Kafka and backpressure handling
- ✅ Load Balancing - Adaptive algorithms with health-aware routing
- ✅ 285 Passing Tests - Comprehensive test coverage with zero warnings
Contributing
This is an experimental module. Feedback welcome!
License
MIT OR Apache-2.0
See Also
- oxirs-arq - SPARQL query engine
- oxirs-stream - Stream processing
- oxirs-gql - GraphQL interface
Dependencies
~211MB
~3.5M SLoC