#rdf #real-time-streaming #kafka #nats

oxirs-stream

Real-time streaming support with Kafka/NATS I/O, RDF Patch, and SPARQL Update delta

2 releases

0.1.0-alpha.3 Oct 12, 2025
0.1.0-alpha.2 Oct 4, 2025
0.1.0-alpha.1 Sep 30, 2025

#2509 in Database interfaces

Download history 207/week @ 2025-09-29 163/week @ 2025-10-06 67/week @ 2025-10-13

437 downloads per month

MIT/Apache

3.5MB
78K SLoC

OxiRS Stream - Real-time RDF Streaming

Version

Status: Alpha Release (v0.1.0-alpha.3) - Released October 12, 2025

⚠️ Alpha Software: This is an early alpha release. Experimental features. APIs may change without notice. Not recommended for production use.

Real-time RDF data streaming with support for Kafka, NATS, and other message brokers. Process RDF streams with windowing, aggregation, and pattern matching.

Features

Message Brokers

  • Apache Kafka - Distributed streaming platform
  • NATS - Lightweight, high-performance messaging
  • RabbitMQ - Reliable message queuing
  • Custom Adapters - Bring your own message broker

Stream Processing

  • Windowing - Tumbling, sliding, and session windows
  • Aggregation - Count, sum, average over windows
  • Pattern Matching - Detect patterns in RDF streams
  • Filtering - Stream-based SPARQL filters

Features

  • At-Least-Once Delivery - Reliable message processing
  • Backpressure - Handle fast producers
  • Checkpointing - Resume from failures
  • Metrics - Monitor stream performance

Installation

Add to your Cargo.toml:

# Experimental feature
[dependencies]
oxirs-stream = "0.1.0-alpha.3"

# Enable specific brokers
oxirs-stream = { version = "0.1.0-alpha.3", features = ["kafka", "nats"] }

Quick Start

Basic Streaming

use oxirs_stream::{StreamSource, KafkaConfig};
use oxirs_core::Triple;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure Kafka source
    let config = KafkaConfig {
        bootstrap_servers: vec!["localhost:9092".to_string()],
        topic: "rdf-triples".to_string(),
        group_id: "oxirs-consumer".to_string(),
        ..Default::default()
    };

    // Create stream
    let mut stream = StreamSource::kafka(config).await?;

    // Process triples
    while let Some(triple) = stream.next().await {
        let triple = triple?;
        println!("{} {} {}", triple.subject, triple.predicate, triple.object);

        // Process triple...
    }

    Ok(())
}

Stream Processing with Windows

use oxirs_stream::{StreamProcessor, WindowConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let processor = StreamProcessor::builder()
        .source(kafka_source)
        .window(WindowConfig::tumbling(Duration::from_secs(60)))
        .build()?;

    // Process windowed batches
    let mut windows = processor.process().await?;

    while let Some(window) = windows.next().await {
        let triples = window?;
        println!("Window received {} triples", triples.len());

        // Aggregate, validate, or process batch
        process_window(triples)?;
    }

    Ok(())
}

Message Broker Configuration

Kafka

use oxirs_stream::KafkaConfig;

let config = KafkaConfig {
    bootstrap_servers: vec!["kafka1:9092".to_string(), "kafka2:9092".to_string()],
    topic: "rdf-events".to_string(),
    group_id: "my-consumer-group".to_string(),

    // Performance tuning
    fetch_min_bytes: 1024,
    fetch_max_wait_ms: 500,
    max_partition_fetch_bytes: 1048576,

    // Reliability
    enable_auto_commit: false,
    auto_commit_interval_ms: 5000,

    // Security
    security_protocol: Some("SASL_SSL".to_string()),
    sasl_mechanism: Some("PLAIN".to_string()),
    sasl_username: Some(std::env::var("KAFKA_USERNAME")?),
    sasl_password: Some(std::env::var("KAFKA_PASSWORD")?),
};

NATS

use oxirs_stream::NatsConfig;

let config = NatsConfig {
    servers: vec!["nats://localhost:4222".to_string()],
    subject: "rdf.>".to_string(),  // Wildcard subscription
    queue_group: Some("oxirs-processors".to_string()),

    // Credentials
    credentials_path: Some("./nats.creds".into()),

    // JetStream (persistent)
    use_jetstream: true,
    stream_name: Some("RDF_STREAM".to_string()),
    durable_name: Some("oxirs-consumer".to_string()),
};

Windowing

Tumbling Windows

Fixed-size, non-overlapping windows:

use oxirs_stream::{WindowConfig, WindowType};
use std::time::Duration;

let config = WindowConfig {
    window_type: WindowType::Tumbling,
    size: Duration::from_secs(60),
    ..Default::default()
};

// Process 60-second windows

Sliding Windows

Overlapping windows:

let config = WindowConfig {
    window_type: WindowType::Sliding,
    size: Duration::from_secs(60),
    slide: Duration::from_secs(30),  // 30-second slide
    ..Default::default()
};

// Windows: [0-60s], [30-90s], [60-120s], ...

Session Windows

Dynamic windows based on inactivity gaps:

let config = WindowConfig {
    window_type: WindowType::Session,
    gap: Duration::from_secs(300),  // 5-minute inactivity closes window
    ..Default::default()
};

Stream Operations

Filtering

use oxirs_stream::filters::SparqlFilter;

let filter = SparqlFilter::new(r#"
    PREFIX foaf: <http://xmlns.com/foaf/0.1/>
    FILTER EXISTS {
        ?s a foaf:Person .
        ?s foaf:age ?age .
        FILTER (?age >= 18)
    }
"#)?;

let filtered_stream = stream.filter(filter);

Mapping

let transformed_stream = stream.map(|triple| {
    // Transform each triple
    transform_triple(triple)
});

Aggregation

use oxirs_stream::aggregation::{Count, Sum, Average};

let processor = StreamProcessor::builder()
    .source(source)
    .window(WindowConfig::tumbling(Duration::from_secs(60)))
    .aggregate(Count::new("?person", "foaf:Person"))
    .aggregate(Average::new("?age", "foaf:age"))
    .build()?;

let results = processor.process().await?;

Pattern Matching

Temporal Patterns

use oxirs_stream::patterns::TemporalPattern;

let pattern = TemporalPattern::builder()
    .event("A", "?person foaf:login ?time")
    .followed_by("B", "?person foaf:logout ?time2", Duration::from_secs(3600))
    .within(Duration::from_hours(24))
    .build()?;

let matches = stream.detect_pattern(pattern).await?;

Graph Patterns

use oxirs_stream::patterns::GraphPattern;

let pattern = GraphPattern::parse(r#"
    {
        ?person a foaf:Person .
        ?person foaf:knows ?friend .
        ?friend foaf:age ?age .
        FILTER (?age > 18)
    }
"#)?;

let matches = stream.match_pattern(pattern).await?;

Reliability

Checkpointing

use oxirs_stream::checkpoint::CheckpointConfig;

let checkpoint_config = CheckpointConfig {
    interval: Duration::from_secs(60),
    storage: CheckpointStorage::File("./checkpoints".into()),
    max_failures: 3,
};

let processor = StreamProcessor::builder()
    .source(source)
    .checkpoint(checkpoint_config)
    .build()?;

// Automatically recovers from last checkpoint on failure

Error Handling

use oxirs_stream::error_handling::{ErrorPolicy, RetryPolicy};

let error_policy = ErrorPolicy {
    retry: RetryPolicy::exponential_backoff(3),
    dead_letter_topic: Some("rdf-errors".to_string()),
    log_errors: true,
};

let processor = StreamProcessor::builder()
    .source(source)
    .error_policy(error_policy)
    .build()?;

Integration

With oxirs-shacl (Streaming Validation)

use oxirs_stream::StreamProcessor;
use oxirs_shacl::ValidationEngine;

let validator = ValidationEngine::new(&shapes, config);

let processor = StreamProcessor::builder()
    .source(kafka_source)
    .window(WindowConfig::tumbling(Duration::from_secs(10)))
    .validate_with(validator)
    .build()?;

let mut results = processor.process().await?;

while let Some(window_result) = results.next().await {
    let (triples, validation_report) = window_result?;

    if !validation_report.conforms {
        eprintln!("Validation failed: {} violations",
            validation_report.violations.len());
    }
}

With oxirs-arq (Stream Queries)

use oxirs_stream::StreamProcessor;
use oxirs_arq::StreamingQueryEngine;

let query_engine = StreamingQueryEngine::new();

let query = r#"
    PREFIX foaf: <http://xmlns.com/foaf/0.1/>

    SELECT ?person (COUNT(?friend) as ?friendCount)
    WHERE {
        ?person a foaf:Person .
        ?person foaf:knows ?friend .
    }
    GROUP BY ?person
    HAVING (COUNT(?friend) > 10)
"#;

let processor = StreamProcessor::builder()
    .source(source)
    .window(WindowConfig::tumbling(Duration::from_secs(60)))
    .query(query_engine, query)
    .build()?;

Performance

Throughput Benchmarks

Message Broker Throughput Latency (p99)
Kafka 100K triples/s 15ms
NATS 80K triples/s 8ms
RabbitMQ 50K triples/s 20ms

Benchmarked on M1 Mac with local brokers

Optimization Tips

// Batch processing
let processor = StreamProcessor::builder()
    .source(source)
    .batch_size(1000)  // Process in batches of 1000
    .parallelism(4)    // 4 parallel workers
    .build()?;

// Backpressure control
let processor = StreamProcessor::builder()
    .source(source)
    .buffer_size(10000)
    .backpressure_strategy(BackpressureStrategy::Block)
    .build()?;

Status

Alpha Release (v0.1.0-alpha.3)

  • ✅ Kafka/NATS integrations with persisted offset checkpoints
  • ✅ Windowing, filtering, and mapping tied into CLI persistence workflows
  • ✅ SPARQL stream federation with SERVICE bridging to remote endpoints
  • ✅ Prometheus/SciRS2 metrics for throughput, lag, and error rates
  • 🚧 Aggregation operators (tumbling/sliding) final polish (in progress)
  • 🚧 Pattern matching DSL and CEP (in progress)
  • ⏳ Exactly-once semantics (planned for beta)
  • ⏳ Distributed stream processing (planned for v0.2.0)

Contributing

This is an experimental module. Feedback welcome!

License

MIT OR Apache-2.0

See Also

Dependencies

~0–67MB
~1M SLoC