5 unstable releases
Uses new Rust 2024
| 0.3.0 | Sep 16, 2025 |
|---|---|
| 0.2.0 | Aug 14, 2025 |
| 0.1.2 | Aug 8, 2025 |
| 0.1.1 | Jul 10, 2025 |
| 0.1.0 | Jul 10, 2025 |
#269 in Web programming
25KB
386 lines
Kinesis Data Streams
A Rust library providing utilities for AWS Kinesis Data Streams operations with built-in retry logic and batch processing capabilities.
Features
- Simple API: Easy-to-use functions for putting records to Kinesis Data Streams
- Batch Processing: Efficient batch record operations with automatic size and count validation
- Records Builder: Builder pattern for constructing batches of records with size constraints
- Error Handling: Comprehensive error handling with custom error types
- Retry Logic: Built-in retry mechanisms for handling transient failures
- AWS SDK Integration: Built on top of the official AWS SDK for Rust
- Testing Support: Comprehensive unit tests with mocking capabilities
Installation
Add this to your Cargo.toml:
[dependencies]
kinesis_data_streams = "0.1.0"
Usage
Basic Usage
use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a Kinesis client with default timeout settings
let client = make_client_with_timeout_default(None).await;
// Put a single record
let result = kinesis_data_streams::add_record(
&client,
"my-stream",
"partition-key",
"Hello, Kinesis!".to_string(),
).await?;
println!("Record added with sequence number: {}", result.sequence_number());
Ok(())
}
Batch Processing with RecordsBuilder
use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream, RecordsBuilder};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = make_client_with_timeout_default(None).await;
// Build a batch of records
let mut builder = RecordsBuilder::new();
builder.add_entry_data("Record 1".to_string())?;
builder.add_entry_data("Record 2".to_string())?;
builder.add_entry("Record 3".to_string(), Some("custom-partition".to_string()), None)?;
// Send the batch
let records = builder.build();
let result = kinesis_data_streams::add_records(&client, "my-stream", records).await?;
println!("Batch sent with {} failed records", result.failed_record_count().unwrap_or(0));
Ok(())
}
Custom Endpoint (for testing)
use kinesis_data_streams::make_client_with_timeout_default;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Use a custom endpoint (e.g., for LocalStack)
let client = make_client_with_timeout_default(Some("http://localhost:4566".to_string())).await;
// Your Kinesis operations here...
Ok(())
}
Timeout Configuration
use kinesis_data_streams::{make_client, make_client_with_timeout, make_client_with_timeout_default};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Use default timeout settings (recommended)
let client = make_client_with_timeout_default(None).await;
// Use custom timeout settings
let client = make_client_with_timeout(
None, // endpoint_url
Some(Duration::from_secs(3100)), // connect_timeout
Some(Duration::from_secs(60)), // operation_timeout
Some(Duration::from_secs(55)), // operation_attempt_timeout
Some(Duration::from_secs(50)), // read_timeout
).await;
// Use legacy client without timeout configuration
let client = make_client(None, None).await;
Ok(())
}
API Reference
Functions
make_client_with_timeout_default(endpoint_url: Option<String>)- Creates a Kinesis client with default timeout settingsmake_client_with_timeout(endpoint_url, connect_timeout, operation_timeout, operation_attempt_timeout, read_timeout)- Creates a Kinesis client with custom timeout settingsmake_client(endpoint_url: Option<String>, timeout_config: Option<TimeoutConfig>)- Creates a Kinesis client with optional custom endpoint and timeout configurationkinesis_data_streams::add_record(client, stream_name, partition_key, data)- Puts a single recordkinesis_data_streams::add_records(client, stream_name, records)- Puts multiple records in batch
RecordsBuilder
A builder for creating batches of records with automatic size validation:
new()- Creates a new builder with default AWS limitsnew_with_limit(single_limit, total_limit, record_limit)- Creates a builder with custom limitsadd_entry_data(data)- Adds a record with auto-generated partition keyadd_entry(data, partition_key, explicit_hash_key)- Adds a record with custom keysbuild()- Builds the final vector of recordslen()- Returns the number of recordsis_empty()- Checks if the builder is empty
Error Handling
The library provides comprehensive error handling through the Error enum using the thiserror crate:
#[derive(Error, Debug)]
pub enum Error {
#[error(transparent)]
BuildError(#[from] Box<aws_sdk_kinesis::error::BuildError>),
#[error("EntryOverAll {0}")]
EntryOverAll(String),
#[error("EntryOverItem {0}")]
EntryOverItem(String),
#[error(transparent)]
AwsSdk(#[from] Box<aws_sdk_kinesis::Error>),
}
Error variants:
BuildError- Errors when building AWS SDK request entriesEntryOverItem- Individual record exceeds the 1MB size limitEntryOverAll- Adding a record would exceed batch limits (5MB total or 500 records)AwsSdk- General AWS SDK errors (network issues, authentication, etc.)
Error Handling Example
use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream, RecordsBuilder, error::Error};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = make_client_with_timeout_default(None).await;
match kinesis_data_streams::add_record(&client, "my-stream", "key", "data").await {
Ok(output) => println!("Success: {}", output.sequence_number()),
Err(Error::AwsSdk(e)) => {
// Handle AWS SDK errors (e.g., stream not found, throttling)
eprintln!("AWS error: {}", e);
}
Err(e) => eprintln!("Other error: {}", e),
}
// Batch operations with size limit handling
let mut builder = RecordsBuilder::new();
match builder.add_entry_data("Large data...".to_string()) {
Ok(()) => println!("Record added to batch"),
Err(Error::EntryOverItem(msg)) => {
// Single record too large
eprintln!("Record too large: {}", msg);
}
Err(Error::EntryOverAll(msg)) => {
// Batch is full, need to send current batch
eprintln!("Batch full: {}", msg);
}
Err(e) => eprintln!("Unexpected error: {}", e),
}
Ok(())
}
AWS Kinesis Limits
The library respects AWS Kinesis Data Streams limits:
- Single Record: Maximum 1MB per record
- Batch Operation: Maximum 5MB total payload and 500 records per batch
- Partition Key: Maximum 256 UTF-8 characters
These limits are enforced by the RecordsBuilder to prevent API errors.
Testing
Run the test suite:
cargo test
For integration tests with specific environment variables:
RUST_LOG=info REALM_CODE=test cargo test test_kinesis_data_streams_records -- --nocapture --test-threads=1
The library includes comprehensive unit tests with mocking capabilities using mockito for testing without actual AWS resources.
Configuration
Authentication
The client uses the AWS SDK's default credential chain for authentication:
- Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_REGION) - ECS task role (for Fargate/ECS)
- EC2 instance profile
- AWS credentials file
- Other configured credential providers
Dependencies
aws-config- AWS configuration managementaws-sdk-kinesis- Official AWS Kinesis SDKthiserror- Error handlingtracing- Logging and tracinguuid- UUID generation for partition keys
License
This project is licensed under either of
- Apache License, Version 2.0
- MIT License
at your option.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Dependencies
~21MB
~356K SLoC