13 releases
| new 0.1.12 | Apr 14, 2026 |
|---|---|
| 0.1.11 | Mar 17, 2026 |
| 0.1.8 | Feb 26, 2026 |
| 0.1.2 | Jan 23, 2026 |
#24 in #drasi
3MB
63K
SLoC
gRPC Source
The gRPC Source provides a high-performance gRPC service endpoint for submitting data change events to Drasi. It exposes a Protocol Buffer-based API that supports both single event submission and streaming for high-throughput scenarios.
Overview
The gRPC Source is a server-side component that implements the drasi.v1.SourceService gRPC service. External systems can push data changes (insert, update, delete operations) to Drasi using efficient binary serialization and bidirectional streaming via HTTP/2.
Key Capabilities
- Dual submission modes: Unary RPC for single events or client streaming for bulk ingestion
- High performance: Binary Protocol Buffers over HTTP/2 with multiplexing
- Type safety: Strongly-typed messages defined in protobuf schemas
- Health monitoring: Built-in health check endpoint
- Bootstrap support: Extensible API for initial data snapshots (future enhancement)
- Error handling: Detailed error responses with validation messages
- Lifecycle management: Graceful startup/shutdown with status tracking
- Flexible dispatch: Configurable channel-based or broadcast dispatch modes
Use Cases
- IoT data ingestion: Streaming sensor data from edge devices
- Microservice integration: Real-time event streaming between services
- ETL pipelines: Bulk data loading with streaming support
- Change data capture: Pushing database changes via gRPC clients
- System integration: Language-agnostic integration using protobuf
- High-volume scenarios: Efficient handling of thousands of events per second
Configuration
Builder Pattern (Recommended)
The builder pattern provides the most ergonomic and type-safe way to construct a gRPC source:
use drasi_source_grpc::GrpcSource;
use drasi_lib::channels::DispatchMode;
// Basic configuration
let source = GrpcSource::builder("my-grpc-source")
.with_host("0.0.0.0")
.with_port(50051)
.build()?;
// Advanced configuration with custom dispatch settings
let source = GrpcSource::builder("production-grpc")
.with_host("0.0.0.0")
.with_port(50051)
.with_timeout_ms(10000)
.with_dispatch_mode(DispatchMode::Channel)
.with_dispatch_buffer_capacity(2000)
.with_auto_start(true)
.build()?;
// With bootstrap provider
let source = GrpcSource::builder("bootstrapped-grpc")
.with_host("127.0.0.1")
.with_port(50052)
.with_bootstrap_provider(my_bootstrap_provider)
.build()?;
Config Struct Approach
For programmatic configuration or deserialization from external sources:
use drasi_source_grpc::{GrpcSource, GrpcSourceConfig};
// Using config struct
let config = GrpcSourceConfig {
host: "0.0.0.0".to_string(),
port: 50051,
endpoint: None,
timeout_ms: 5000,
};
let source = GrpcSource::new("my-grpc-source", config)?;
// With custom dispatch settings
let source = GrpcSource::with_dispatch(
"my-grpc-source",
config,
Some(DispatchMode::Channel),
Some(1500)
)?;
Direct Integration with DrasiLib
use drasi_lib::DrasiLib;
use drasi_source_grpc::GrpcSource;
let drasi = DrasiLib::builder()
.with_id("my-drasi-instance")
.build()
.await?;
let source = GrpcSource::builder("events-grpc")
.with_host("0.0.0.0")
.with_port(50051)
.build()?;
drasi.add_source(source).await?;
drasi.start_source("events-grpc").await?;
Configuration Options
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
id |
Unique identifier for the source instance | String |
Any non-empty string | (Required) |
host |
Host address to bind the gRPC server to | String |
Valid hostname or IP address (e.g., "0.0.0.0", "127.0.0.1") | "0.0.0.0" |
port |
Port number for the gRPC server | u16 |
1-65535 | 50051 |
endpoint |
Optional custom service endpoint path | Option<String> |
Any valid path string | None |
timeout_ms |
Request timeout in milliseconds | u64 |
Positive integer (milliseconds) | 5000 |
dispatch_mode |
Event dispatch strategy | Option<DispatchMode> |
Channel (isolated, backpressure) or Broadcast (shared, no backpressure) |
Channel |
dispatch_buffer_capacity |
Buffer size for dispatch channel | Option<usize> |
Positive integer | 1000 |
bootstrap_provider |
Provider for initial data snapshots | Option<Box<dyn BootstrapProvider>> |
Any type implementing BootstrapProvider |
None |
auto_start |
Whether to start automatically when added to DrasiLib | bool |
true, false |
true |
Configuration Notes
- Auto-start: When
auto_start=true(default), the source starts immediately if added to a running DrasiLib instance. Whenauto_start=false, start manually withdrasi.start_source("source-id"). - Host binding: Use
"0.0.0.0"to accept connections from any network interface, or"127.0.0.1"for localhost only - Port: Must be available (not in use by another service)
- Timeout: Applies to request processing; higher values for slow network conditions
- Dispatch mode:
Channel: Each subscriber gets an isolated channel with backpressure (prevents message loss)Broadcast: Single shared channel across subscribers (faster but may drop messages under load)
- Buffer capacity: Higher values handle bursts better but use more memory
Input Schema
The gRPC Source accepts events in Protocol Buffer format as defined in proto/drasi/v1/source.proto and proto/drasi/v1/common.proto.
Core Message Types
SourceChange (Top-level event message)
message SourceChange {
ChangeType type = 1; // INSERT, UPDATE, or DELETE
oneof change {
Element element = 2; // For INSERT/UPDATE operations
ElementMetadata metadata = 3; // For DELETE operations
}
google.protobuf.Timestamp timestamp = 4;
string source_id = 5;
}
enum ChangeType {
CHANGE_TYPE_UNSPECIFIED = 0;
CHANGE_TYPE_INSERT = 1;
CHANGE_TYPE_UPDATE = 2;
CHANGE_TYPE_DELETE = 3;
}
Element (Node or Relation)
message Element {
oneof element {
Node node = 1;
Relation relation = 2;
}
}
Node Element
message Node {
ElementMetadata metadata = 1;
google.protobuf.Struct properties = 2;
}
Fields:
metadata: Required metadata containing reference, labels, and effective timestampproperties: Key-value properties usinggoogle.protobuf.Struct(flexible schema)
Relation Element
message Relation {
ElementMetadata metadata = 1;
ElementReference in_node = 2; // Target node (relationship points to)
ElementReference out_node = 3; // Source node (relationship comes from)
google.protobuf.Struct properties = 4;
}
Fields:
metadata: Required metadata for the relationin_node: Reference to the target node (where the arrow points)out_node: Reference to the source node (where the arrow originates)properties: Key-value properties of the relationship
Direction semantics: (out_node)-[relation]->(in_node)
ElementMetadata
message ElementMetadata {
ElementReference reference = 1;
repeated string labels = 2;
uint64 effective_from = 3; // Unix timestamp in nanoseconds
}
Fields:
reference: Unique identifier (source_id + element_id)labels: Classification labels for pattern matching (e.g., ["User", "Customer"])effective_from: Timestamp in nanoseconds since Unix epoch
ElementReference
message ElementReference {
string source_id = 1;
string element_id = 2;
}
Fields:
source_id: Identifies which source owns this elementelement_id: Unique identifier within the source
Service Methods
The gRPC Source implements the drasi.v1.SourceService:
1. SubmitEvent (Unary RPC)
Submit a single event.
rpc SubmitEvent(SubmitEventRequest) returns (SubmitEventResponse);
message SubmitEventRequest {
SourceChange event = 1;
}
message SubmitEventResponse {
bool success = 1;
string message = 2;
string error = 3;
string event_id = 4; // UUID assigned to the event
}
2. StreamEvents (Client Streaming RPC)
Stream multiple events for bulk processing.
rpc StreamEvents(stream SourceChange) returns (stream StreamEventResponse);
message StreamEventResponse {
bool success = 1;
string message = 2;
string error = 3;
uint64 events_processed = 4;
}
Behavior:
- Accepts a stream of
SourceChangemessages - Returns periodic progress updates (every 100 events)
- Returns final count when stream completes
- Individual event errors don't stop the stream
3. RequestBootstrap (Server Streaming RPC)
Request initial data snapshot (extensible for future use).
rpc RequestBootstrap(BootstrapRequest) returns (stream BootstrapResponse);
message BootstrapRequest {
string query_id = 1;
repeated string node_labels = 2;
repeated string relation_labels = 3;
}
message BootstrapResponse {
repeated Element elements = 1;
uint32 total_count = 2;
}
Current behavior: Returns empty stream (placeholder for future implementation)
4. HealthCheck (Unary RPC)
Check service health status.
rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResponse);
message HealthCheckResponse {
enum Status {
STATUS_UNSPECIFIED = 0;
STATUS_HEALTHY = 1;
STATUS_DEGRADED = 2;
STATUS_UNHEALTHY = 3;
}
Status status = 1;
string message = 2;
string version = 3; // Package version
}
Property Value Types
The google.protobuf.Struct supports the following value types:
| Protobuf Type | Drasi ElementValue | Notes |
|---|---|---|
null_value |
Null |
Null/missing value |
bool_value |
Bool |
Boolean true/false |
number_value (integer) |
Integer |
Whole numbers without fractional part |
number_value (float) |
Float |
Numbers with fractional part |
string_value |
String |
UTF-8 text |
list_value |
String (JSON) |
Arrays converted to JSON string |
struct_value |
String (JSON) |
Objects converted to JSON string |
Note: Complex types (lists, structs) are serialized as JSON strings for storage in Drasi's graph model.
Usage Examples
Example 1: Basic Server Setup
use drasi_source_grpc::GrpcSource;
use drasi_lib::DrasiLib;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create Drasi instance
let drasi = DrasiLib::builder()
.with_id("my-instance")
.build()
.await?;
// Create and start gRPC source
let source = GrpcSource::builder("events")
.with_host("0.0.0.0")
.with_port(50051)
.build()?;
drasi.add_source(source).await?;
drasi.start_source("events").await?;
println!("gRPC source listening on 0.0.0.0:50051");
// Keep server running
tokio::signal::ctrl_c().await?;
Ok(())
}
Example 2: Python Client - Submit Single Node
import grpc
from google.protobuf import struct_pb2
import drasi.v1.source_pb2 as source_pb2
import drasi.v1.source_pb2_grpc as source_pb2_grpc
import drasi.v1.common_pb2 as common_pb2
import time
# Connect to gRPC source
channel = grpc.insecure_channel('localhost:50051')
stub = source_pb2_grpc.SourceServiceStub(channel)
# Create properties
props = struct_pb2.Struct()
props['name'] = 'Alice'
props['email'] = 'alice@example.com'
props['age'] = 30
props['active'] = True
# Create metadata
metadata = common_pb2.ElementMetadata(
reference=common_pb2.ElementReference(
source_id="my-grpc-source",
element_id="user_001"
),
labels=["User", "Customer"],
effective_from=int(time.time() * 1e9) # Nanoseconds
)
# Create node
node = common_pb2.Node(
metadata=metadata,
properties=props
)
# Create source change
change = common_pb2.SourceChange(
type=common_pb2.CHANGE_TYPE_INSERT,
element=common_pb2.Element(node=node)
)
# Submit event
request = source_pb2.SubmitEventRequest(event=change)
response = stub.SubmitEvent(request)
print(f"Success: {response.success}")
print(f"Message: {response.message}")
print(f"Event ID: {response.event_id}")
Example 3: Python Client - Stream Events
import grpc
from google.protobuf import struct_pb2
import drasi.v1.source_pb2_grpc as source_pb2_grpc
import drasi.v1.common_pb2 as common_pb2
import time
channel = grpc.insecure_channel('localhost:50051')
stub = source_pb2_grpc.SourceServiceStub(channel)
def event_generator():
"""Generate 1000 user nodes"""
for i in range(1000):
props = struct_pb2.Struct()
props['name'] = f'User {i}'
props['index'] = i
props['active'] = i % 2 == 0
metadata = common_pb2.ElementMetadata(
reference=common_pb2.ElementReference(
source_id="my-grpc-source",
element_id=f"user_{i:04d}"
),
labels=["User"],
effective_from=int(time.time() * 1e9)
)
node = common_pb2.Node(metadata=metadata, properties=props)
yield common_pb2.SourceChange(
type=common_pb2.CHANGE_TYPE_INSERT,
element=common_pb2.Element(node=node)
)
# Stream events
responses = stub.StreamEvents(event_generator())
for response in responses:
print(f"Processed: {response.events_processed} events")
if not response.success:
print(f"Error: {response.error}")
print("Stream completed")
Example 4: Go Client - Insert Relation
package main
import (
"context"
"log"
"time"
pb "your-module/drasi/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/structpb"
)
func main() {
// Connect to gRPC source
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewSourceServiceClient(conn)
// Create relation properties
properties, _ := structpb.NewStruct(map[string]interface{}{
"since": "2024-01-01",
"strength": 0.85,
})
// Create relation (Alice FOLLOWS Bob)
event := &pb.SourceChange{
Type: pb.ChangeType_CHANGE_TYPE_INSERT,
Change: &pb.SourceChange_Element{
Element: &pb.Element{
Element: &pb.Element_Relation{
Relation: &pb.Relation{
Metadata: &pb.ElementMetadata{
Reference: &pb.ElementReference{
SourceId: "my-grpc-source",
ElementId: "follows_001",
},
Labels: []string{"FOLLOWS"},
EffectiveFrom: uint64(time.Now().UnixNano()),
},
OutNode: &pb.ElementReference{
SourceId: "my-grpc-source",
ElementId: "user_001", // Alice
},
InNode: &pb.ElementReference{
SourceId: "my-grpc-source",
ElementId: "user_002", // Bob
},
Properties: properties,
},
},
},
},
}
// Submit event
resp, err := client.SubmitEvent(context.Background(), &pb.SubmitEventRequest{
Event: event,
})
if err != nil {
log.Fatalf("Failed to submit: %v", err)
}
log.Printf("Success: %v, Message: %s", resp.Success, resp.Message)
}
Example 5: Rust Client - Update and Delete
use tonic::transport::Channel;
use prost_types::{Struct, Value};
use std::collections::HashMap;
pub mod drasi {
pub mod v1 {
tonic::include_proto!("drasi.v1");
}
}
use drasi::v1::{
source_service_client::SourceServiceClient,
SubmitEventRequest, SourceChange, Element, Node, ElementMetadata,
ElementReference, ChangeType,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = SourceServiceClient::connect("http://localhost:50051").await?;
// Update example
let mut update_props = HashMap::new();
update_props.insert(
"email".to_string(),
Value {
kind: Some(prost_types::value::Kind::StringValue(
"alice.new@example.com".to_string()
)),
},
);
update_props.insert(
"verified".to_string(),
Value {
kind: Some(prost_types::value::Kind::BoolValue(true)),
},
);
let update_event = SourceChange {
r#type: ChangeType::Update as i32,
change: Some(drasi::v1::source_change::Change::Element(Element {
element: Some(drasi::v1::element::Element::Node(Node {
metadata: Some(ElementMetadata {
reference: Some(ElementReference {
source_id: "my-grpc-source".to_string(),
element_id: "user_001".to_string(),
}),
labels: vec!["User".to_string(), "Verified".to_string()],
effective_from: chrono::Utc::now().timestamp_nanos() as u64,
}),
properties: Some(Struct { fields: update_props }),
})),
})),
timestamp: None,
source_id: String::new(),
};
let response = client.submit_event(SubmitEventRequest {
event: Some(update_event),
}).await?;
println!("Update: {}", response.into_inner().message);
// Delete example
let delete_event = SourceChange {
r#type: ChangeType::Delete as i32,
change: Some(drasi::v1::source_change::Change::Metadata(ElementMetadata {
reference: Some(ElementReference {
source_id: "my-grpc-source".to_string(),
element_id: "user_999".to_string(),
}),
labels: vec!["User".to_string()],
effective_from: chrono::Utc::now().timestamp_nanos() as u64,
})),
timestamp: None,
source_id: String::new(),
};
let response = client.submit_event(SubmitEventRequest {
event: Some(delete_event),
}).await?;
println!("Delete: {}", response.into_inner().message);
Ok(())
}
Example 6: Testing with grpcurl
# Install grpcurl
go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
# List available services
grpcurl -plaintext localhost:50051 list
# Health check
grpcurl -plaintext localhost:50051 drasi.v1.SourceService/HealthCheck
# Submit a node insert event
grpcurl -plaintext -d '{
"event": {
"type": "CHANGE_TYPE_INSERT",
"element": {
"node": {
"metadata": {
"reference": {
"sourceId": "my-grpc-source",
"elementId": "test_001"
},
"labels": ["TestNode"],
"effectiveFrom": "1234567890000000000"
},
"properties": {
"name": "Test Item",
"value": 42
}
}
}
}
}' localhost:50051 drasi.v1.SourceService/SubmitEvent
Integration with Drasi Queries
Events submitted through the gRPC source flow into Drasi's continuous query engine where they can be matched against Cypher patterns.
Label Matching
// Submit a node with labels ["User", "Premium"]
// This will match Cypher patterns like:
// MATCH (u:User) ...
// MATCH (p:Premium) ...
// MATCH (u:User:Premium) ...
Property Filtering
// Submit a node with property active=true
// This will be filtered by Cypher WHERE clauses:
// MATCH (u:User) WHERE u.active = true
// MATCH (u:User) WHERE u.age > 18
Relationship Traversal
// Submit a relation (user_001)-[:FOLLOWS]->(user_002)
// This enables Cypher patterns:
// MATCH (a:User)-[:FOLLOWS]->(b:User)
// MATCH (a)-[f:FOLLOWS]->(b) WHERE f.strength > 0.8
Advanced Topics
Dispatch Modes
Channel Mode (Default):
- Each subscriber gets an isolated channel
- Provides backpressure (blocks if subscriber is slow)
- Guarantees zero message loss
- Higher memory usage under load
Broadcast Mode:
- Single shared channel for all subscribers
- No backpressure (continues if subscriber is slow)
- May drop messages if buffer fills
- Lower latency and memory usage
let source = GrpcSource::builder("events")
.with_dispatch_mode(DispatchMode::Broadcast)
.with_dispatch_buffer_capacity(5000)
.build()?;
Bootstrap Providers
Enable initial data snapshots for queries:
use drasi_lib::bootstrap::BootstrapProvider;
struct MyBootstrapProvider {
// ... your implementation
}
impl BootstrapProvider for MyBootstrapProvider {
// ... implement trait methods
}
let source = GrpcSource::builder("events")
.with_bootstrap_provider(MyBootstrapProvider::new())
.build()?;
Profiling and Performance
The gRPC source includes built-in profiling metadata:
// Automatically tracked:
// - source_send_ns: Timestamp when event enters the source
// - Propagated through the query pipeline
// - Available in reactions for end-to-end latency measurement
Error Handling
Validation Errors:
- Missing required fields (metadata, in_node, out_node)
- Invalid change types
- Malformed protobuf messages
Response contains:
SubmitEventResponse {
success: false,
message: "Invalid event data",
error: "Validation error: Node element missing required 'metadata' field",
event_id: ""
}
Streaming behavior:
- Individual errors don't stop the stream
- Valid events continue processing
- Periodic responses include error counts
Graceful Shutdown
// The source handles shutdown signals gracefully:
// 1. Stops accepting new connections
// 2. Completes in-flight requests
// 3. Closes channels
// 4. Releases resources
drasi.stop_source("events").await?;
Generating Client Code
Python
# Install tools
pip install grpcio-tools
# Generate code
python -m grpc_tools.protoc \
-I./proto \
--python_out=./generated \
--grpc_python_out=./generated \
proto/drasi/v1/source.proto \
proto/drasi/v1/common.proto
Go
# Install tools
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# Generate code
protoc \
--go_out=./generated \
--go-grpc_out=./generated \
--go_opt=paths=source_relative \
--go-grpc_opt=paths=source_relative \
-I ./proto \
proto/drasi/v1/source.proto \
proto/drasi/v1/common.proto
Rust
Add to your build.rs:
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(false)
.build_client(true)
.compile(
&[
"proto/drasi/v1/source.proto",
"proto/drasi/v1/common.proto",
],
&["proto"],
)?;
Ok(())
}
JavaScript/TypeScript
# Install tools
npm install -g grpc-tools grpc_tools_node_protoc_ts
# Generate code
grpc_tools_node_protoc \
--js_out=import_style=commonjs,binary:./generated \
--grpc_out=grpc_js:./generated \
--plugin=protoc-gen-grpc=`which grpc_tools_node_protoc_plugin` \
-I ./proto \
proto/drasi/v1/source.proto \
proto/drasi/v1/common.proto
Performance Guidelines
When to Use SubmitEvent (Unary)
- Low to moderate event rates (< 100 events/second)
- Need immediate per-event confirmation
- Interactive applications with user feedback
- Critical events requiring acknowledgment
- Testing and debugging
When to Use StreamEvents (Streaming)
- High-volume ingestion (> 1000 events/second)
- Bulk data imports or migrations
- ETL pipelines with batching
- IoT sensor streams
- Log aggregation
- When network efficiency matters
Best Practices
- Connection reuse: Create one gRPC channel and reuse for all requests
- Stream for bulk: Use streaming for bulk operations (10x+ faster)
- Buffer sizing: Tune
dispatch_buffer_capacitybased on throughput - Connection pooling: For high concurrency scenarios
- Keep-alive: Configure gRPC keep-alive for long-lived connections
- Batch client-side: Accumulate events before streaming
- Monitor metrics: Track event processing rates and latency
Performance Benchmarks (Approximate)
| Scenario | Events/sec | Notes |
|---|---|---|
| Unary RPC (single event) | 500-1000 | Round-trip per event |
| Streaming (batched) | 10,000-50,000 | Depends on event size |
| Streaming (small events) | 100,000+ | Minimal properties |
Comparison with HTTP Source
| Feature | gRPC Source | HTTP Source |
|---|---|---|
| Protocol | HTTP/2 + Protocol Buffers | HTTP/1.1 + JSON |
| Type Safety | Strongly typed (protobuf) | JSON schema validation |
| Performance | Higher throughput, lower latency | Good for moderate loads |
| Streaming | Native bidirectional streaming | Batch POST endpoint |
| Message Size | Smaller (binary encoding) | Larger (text JSON) |
| Client Generation | Auto-generated from .proto files | Manual or OpenAPI codegen |
| Browser Support | Limited (requires grpc-web proxy) | Native browser support |
| Debugging | Requires gRPC tools (grpcurl, Postman) | Standard HTTP tools (curl) |
| Best For | High-volume, microservices, IoT | Web apps, simple integration |
Security Considerations
The gRPC source currently runs without authentication or encryption. For production deployments:
TLS/SSL
// Future enhancement: TLS configuration
// Configure server TLS certificates
// Enforce encrypted connections
Authentication
// Future enhancement: Authentication interceptors
// Implement token-based auth
// Add API key validation
Network Security
- Use firewall rules to restrict access
- Deploy behind a reverse proxy (Envoy, Nginx)
- Use Kubernetes network policies
- Implement rate limiting at proxy level
Monitoring
- Track request rates and patterns
- Alert on unusual traffic spikes
- Log failed authentication attempts
- Monitor resource usage
Troubleshooting
Connection Refused
Error: Connection refused
Solutions:
- Verify source is started:
drasi.start_source("my-grpc").await? - Check port availability:
lsof -i :50051 - Verify host binding (use "0.0.0.0" not "localhost" for external access)
Invalid Event Data
SubmitEventResponse { success: false, error: "Validation error: Node element missing required 'metadata' field" }
Solutions:
- Ensure all required fields are populated
- Check protobuf message structure
- Validate element_id is not empty
- Verify labels array is not empty
Slow Event Processing
Symptoms: High latency, timeouts
Solutions:
- Increase
dispatch_buffer_capacity - Use streaming instead of unary calls
- Check query complexity
- Monitor system resources
No Subscribers
[my-grpc-source] Failed to dispatch (no subscribers): No subscribers available
This is normal: Events are dropped if no queries are subscribed. Add a query:
let query = Query::cypher("my-query")
.query("MATCH (n) RETURN n")
.from_source("my-grpc-source")
.build();
drasi.add_query(query).await?;
Developer Notes
Source Code Structure
src/lib.rs: Main source implementation and gRPC servicesrc/config.rs: Configuration structs and validationsrc/tests.rs: Unit testsbuild.rs: Protobuf compilationproto/drasi/v1/: Protobuf schema definitions
Testing
# Run unit tests
cargo test -p drasi-source-grpc
# Run with logging
RUST_LOG=debug cargo test -p drasi-source-grpc -- --nocapture
# Test specific module
cargo test -p drasi-source-grpc proto_conversion
Contributing
When modifying the gRPC source:
- Update protobuf schemas in
proto/drasi/v1/ - Regenerate code:
cargo build(runsbuild.rs) - Add tests for new functionality
- Update this README with examples
- Run
cargo clippyandcargo fmt
License
Licensed under the Apache License, Version 2.0. See the LICENSE file for details.
Plugin Packaging
This source is compiled as a dynamic plugin (cdylib) that can be loaded by drasi-server at runtime.
Key files:
Cargo.toml— includescrate-type = ["lib", "cdylib"]src/descriptor.rs— implementsSourcePluginDescriptorwith kind"grpc", configuration DTO, and OpenAPI schema generationsrc/lib.rs— invokesdrasi_plugin_sdk::export_plugin!to export the plugin entry point
Building:
cargo build -p drasi-source-grpc
The compiled .so (Linux) / .dylib (macOS) / .dll (Windows) is placed in target/debug/ and can be copied to the server's plugins/ directory.
For more details on the plugin descriptor pattern and configuration DTOs, see the Source Developer Guide.
Related Components
- HTTP Source: REST API alternative for web applications
- PostgreSQL Source: CDC from PostgreSQL databases
- Application Source: Programmatic event submission in Rust
- Query Engine: Continuous Cypher query evaluation
- Reactions: Output handlers for query results
Dependencies
~39MB
~551K SLoC