12 releases

new 0.1.11 Dec 10, 2024
0.1.10 Dec 10, 2024

#1623 in Database interfaces

Download history

257 downloads per month

Apache-2.0

38KB
827 lines

Rust Source SDK for Drasi

This library provides the building blocks and infrastructure to implement a Drasi Source in Rust

Getting started

Install the package

cargo add drasi-source-sdk

Example

Proxy

#[tokio::main]
async fn main() {
    let proxy = SourceProxyBuilder::new()
        .with_stream_producer(my_stream)
        .build();

        proxy.start().await;    
}

async fn my_stream(req: BootstrapRequest) -> Result<BootstrapStream, BootstrapError> {
    let stream = stream! {
        if req.node_labels.contains(&"Location".to_string()) {
            yield SourceElement::Node { 
                id: "Location-A".to_string(), 
                labels: vec!["Location".to_string()], 
                properties: vec![
                    ("longitude".to_string(), Value::Number(Number::from_f64(50.1).unwrap())),
                    ("latitude".to_string(), Value::Number(Number::from_f64(60.7).unwrap())),
                ].into_iter().collect(),
            };    

            yield SourceElement::Node { 
                id: "Location-B".to_string(), 
                labels: vec!["Location".to_string()], 
                properties: vec![
                    ("longitude".to_string(), Value::Number(Number::from_f64(58.9).unwrap())),
                    ("latitude".to_string(), Value::Number(Number::from_f64(72.1).unwrap())),
                ].into_iter().collect(),
            };    
        }
    };

    Ok(Box::pin(stream))
}

Reactivator

#[tokio::main]
async fn main() {
    let mut reactivator = ReactivatorBuilder::new()
        .with_stream_producer(my_stream)
        .build()
        .await;

    reactivator.start().await;
}

async fn my_stream(state_store: Arc<dyn StateStore + Send + Sync>) -> Result<ChangeStream, ReactivatorError> {
    
    let mut cursor = match state_store.get("cursor").await.unwrap() {
        Some(cursor) => u64::from_be_bytes(cursor.try_into().unwrap()),
        None => 0,
    };    
    
    let result = stream! {
        let start_location_id = "Location-A";        

        loop {
            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
            let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() * 1000;            

            cursor += 1;
            let vehicle_id = format!("vehicle-{}", cursor);
            let vehicle_node = SourceElement::Node {
                id: vehicle_id.clone(),
                labels: vec!["Vehicle".to_string()],
                properties: vec![
                    ("name".to_string(), Value::String(format!("Vehicle {}", cursor))),
                ].into_iter().collect(),
            };

            yield SourceChange::new(ChangeOp::Create, vehicle_node, time, cursor, None);

            cursor += 1;
            let vehicle_location_relation = SourceElement::Relation {
                id: format!("vehicle-loc-{}", cursor),
                start_id: vehicle_id,
                end_id: start_location_id.to_string(),
                labels: vec!["LOCATED_AT".to_string()],
                properties: Map::new(),
            };

            yield SourceChange::new(ChangeOp::Create, vehicle_location_relation, time, cursor, None);

            state_store.put("cursor", cursor.to_be_bytes().to_vec()).await.unwrap();
        }
        
    };

    Ok(Box::pin(result))
}

Dependencies

~21–33MB
~491K SLoC