21 releases
| new 0.0.22 | Mar 4, 2026 |
|---|---|
| 0.0.21 | Feb 25, 2026 |
| 0.0.4 | Jan 30, 2026 |
#341 in Compression
Used in 4 crates
2MB
35K
SLoC
rivven-client
Native Rust client library for the Rivven event streaming platform.
Overview
rivven-client is a production-grade async client with connection pooling, automatic failover, circuit breakers, and exactly-once semantics.
Features
| Category | Features |
|---|---|
| Connectivity | Connection pooling, request pipelining, automatic failover |
| Resilience | Circuit breaker, exponential backoff with jitter, reconnection, health monitoring |
| Security | TLS/mTLS (rustls) for all clients including Producer and Consumer, SCRAM-SHA-256 authentication |
| Semantics | Transactions, idempotent producer, exactly-once delivery |
| Compression | LZ4, Snappy, Zstd (Gzip returns an error) |
Installation
[dependencies]
rivven-client = "0.2"
# With TLS support
rivven-client = { version = "0.2", features = ["tls"] }
Usage
Basic Client
For simple use cases, use the basic Client:
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
// Publish a message
client.publish("my-topic", b"value").await?;
// Consume messages
let messages = client.consume("my-topic", 0, 0, 100).await?;
Ok(())
}
Authentication
Rivven supports multiple authentication methods:
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
// Simple authentication (use with TLS in production)
let session = client.authenticate("alice", "password123").await?;
println!("Session ID: {}", session.session_id);
// SCRAM-SHA-256 authentication (recommended)
// Password never sent over the wire, mutual authentication
let session = client.authenticate_scram("alice", "password123").await?;
println!("Authenticated! Expires in {}s", session.expires_in);
// Now use the authenticated session for operations
client.publish("my-topic", b"secure message").await?;
Ok(())
}
Production-Grade Resilient Client
For production deployments, use ResilientClient which provides:
- Connection pooling across multiple servers
- Automatic retry with exponential backoff and jitter
- Circuit breaker pattern for fault isolation
- Real-time health monitoring
use rivven_client::{ResilientClient, ResilientClientConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure the resilient client
let config = ResilientClientConfig::builder()
.servers(vec![
"node1:9092".to_string(),
"node2:9092".to_string(),
"node3:9092".to_string(),
])
.pool_size_per_server(5)
.max_retries(3)
.retry_initial_delay(Duration::from_millis(100))
.retry_max_delay(Duration::from_secs(5))
.circuit_breaker_failure_threshold(5)
.circuit_breaker_recovery_timeout(Duration::from_secs(30))
.build();
// Create the resilient client
let client = ResilientClient::new(config);
// All operations automatically use connection pooling,
// retries, and circuit breakers
client.publish("my-topic", Some(b"key"), b"value").await?;
// Check client health
let stats = client.stats().await;
println!("Active connections: {}", stats.active_connections);
println!("Healthy servers: {}", stats.healthy_servers);
Ok(())
}
Circuit Breaker Behavior
The circuit breaker protects against cascading failures:
- Closed (Normal): Requests flow normally. Failures are counted.
- Open (Failing): After threshold failures, the circuit opens. All requests fail fast without attempting connection.
- Half-Open (Recovery): After recovery timeout, one request is allowed through. If successful, circuit closes; if failed, circuit reopens.
// Circuit breaker configuration
let config = ResilientClientConfig::builder()
.servers(vec!["localhost:9092".to_string()])
.circuit_breaker_failure_threshold(5) // Open after 5 failures
.circuit_breaker_recovery_timeout(Duration::from_secs(30)) // Try recovery after 30s
.build();
Retry with Exponential Backoff
Failed operations are automatically retried with exponential backoff and jitter:
let config = ResilientClientConfig::builder()
.servers(vec!["localhost:9092".to_string()])
.max_retries(3) // Retry up to 3 times
.retry_initial_delay(Duration::from_millis(100)) // Start with 100ms delay
.retry_max_delay(Duration::from_secs(5)) // Cap at 5 seconds
.retry_multiplier(2.0) // Double delay each retry
.build();
High-Throughput Pipelined Client
For maximum throughput, use PipelinedClient which allows multiple in-flight requests over a single connection. Supports optional TLS and authentication.
Handshake: PipelinedClient performs a version handshake before authentication, matching the basic Client connection sequence. This ensures protocol version compatibility is verified before any credentials are exchanged.
Connection safety: The pipelined client tracks wire state via a bytes_sent flag. If an I/O error occurs after bytes have been written (partial send, broken read), the connection is automatically poisoned to prevent TCP stream desync from stale responses. Non-pipelined requests poison on every I/O operation (write, flush, read). ProtocolError, ResponseTooLarge, and request timeouts also trigger poisoning and consumer auto-reconnect. Timeout cancellation mid-I/O poisons the stream because the dropped future may leave partial frames on the wire.
Concurrency improvement: flush_batch registers pending responses under the lock, then writes to the socket outside the lock. This prevents TCP backpressure from blocking the reader task's response dispatch. If Phase 2 (socket write) fails, all pending entries registered in Phase 1 are cleaned up and callers receive a ConnectionError, preventing leaked oneshot channels.
Security: SASL/PLAIN authentication requires a TLS connection. The client refuses to send plaintext credentials over unencrypted connections, returning an error before any bytes are written. TLS connections automatically enable TCP_NODELAY to minimize latency.
use rivven_client::{PipelinedClient, PipelineConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// High-throughput configuration
let config = PipelineConfig::high_throughput();
let client = PipelinedClient::connect("localhost:9092", config).await?;
// Send 1000 requests concurrently - all pipelined over single connection
let handles: Vec<_> = (0..1000)
.map(|i| {
let client = client.clone();
tokio::spawn(async move {
client.publish("topic", format!("msg-{}", i)).await
})
})
.collect();
for handle in handles {
handle.await??;
}
// Check pipeline statistics
let stats = client.stats();
println!("Requests sent: {}", stats.requests_sent);
println!("Responses received: {}", stats.responses_received);
println!("Success rate: {:.1}%", stats.success_rate() * 100.0);
Ok(())
}
Pipeline Configuration
| Config | Default | High-Throughput | Low-Latency |
|---|---|---|---|
max_in_flight |
100 | 1000 | 32 |
batch_linger_us |
1000 | 5000 | 0 |
max_batch_size |
64 | 256 | 1 |
request_timeout |
30s | 60s | 10s |
close_timeout |
5s | 10s | 3s |
High-Performance Producer
For maximum throughput with all best practices, use Producer:
use rivven_client::{Producer, ProducerConfig, CompressionType};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure with Kafka-like semantics
let config = ProducerConfig::builder()
.bootstrap_servers(vec!["localhost:9092".to_string()])
.batch_size(16384) // Batch up to 16KB
.linger_ms(5) // Wait 5ms for batch
.buffer_memory(32 * 1024 * 1024) // 32MB buffer
.compression_type(CompressionType::Lz4) // LZ4 batch compression
.enable_idempotence(true) // Exactly-once semantics
.retries(5) // Retry failed batches up to 5 times
.retry_backoff_ms(100) // Start with 100ms backoff
.retry_backoff_max_ms(2000) // Cap at 2s
.auth("producer-app", "password") // SCRAM-SHA-256 auth
.build();
// Producer::new() connects with auto-handshake and authentication
let producer = Arc::new(Producer::new(config).await?);
// Share across tasks (sticky partitioning for keyless messages)
for i in 0..1000 {
let producer = Arc::clone(&producer);
tokio::spawn(async move {
producer.send("topic", format!("msg-{}", i)).await
});
}
// With key (consistent partition assignment)
producer.send_with_key("topic", Some("user-123"), "event").await?;
// Flush ensures all pending records are delivered
producer.flush().await?;
// Check producer statistics
let stats = producer.stats();
println!("Records sent: {}", stats.records_sent);
println!("Success rate: {:.1}%", stats.success_rate() * 100.0);
Ok(())
}
Producer Features
| Feature | Description |
|---|---|
| Authentication | SCRAM-SHA-256 auto-auth via ProducerAuthConfig |
| Auto-Handshake | Protocol version negotiated on connect |
| Compression | LZ4/Snappy/Zstd batch compression (feature-gated) |
| Idempotency | Sequence tracking + IdempotentPublish wire type; is_idempotent() detects silent degradation |
| Metadata Cache | TTL-based caching with persistent metadata client (avoids per-topic connection churn) |
| Sticky Partitioning | Batches keyless messages to same partition |
| Backpressure | Memory-bounded buffers prevent OOM; applies to standard, idempotent, and transactional publish paths |
| Batch Retry | Per-batch retry with exponential backoff; undelivered records are returned and resent on a fresh connection. Configurable via retries, retry_backoff_ms, retry_backoff_max_ms. Permanent errors (PRODUCER_FENCED, INVALID_TOPIC, auth) skip retries via Error::is_retriable() |
| Error Propagation | Permanent server errors are propagated as ServerError (not ConnectionError), preserving is_retriable() = false so callers can programmatically distinguish permanent rejections from transient failures. needs_reconnect is only set for transient errors |
| Zero-Duplication Timeout | read_batch_responses drains delivered records from the vectors as it goes (Vec::remove(0) instead of dummy oneshot channels). On timeout cancellation, only truly undelivered records remain — eliminating duplicate retries in non-idempotent mode and removing per-record oneshot allocation overhead |
| Hot-Path Allocation | heartbeat() and remove_member() accept &str instead of &String, avoiding a heap allocation on every heartbeat call (3–10 second intervals per consumer) |
| Per-Batch Timeout | Response-reading phase in send_batch is wrapped in tokio::time::timeout(request_timeout), preventing stalled broker responses from blocking the producer |
| Murmur2 Hashing | Kafka-compatible key partitioning (optimized) |
| Batched I/O | Single flush per batch minimizes syscalls |
| Pipelined Responses | Write-all, then read-all for throughput |
| Pipelined Consumer Fetch | Sends all partition fetch requests in one flush, reads all responses |
| Pipelined Offset Commit | Batch-commits all partition offsets in one flush, collects all errors |
| Multi-Server Failover | Tries all bootstrap servers on connect |
| Flush Safety | pending_records correctly decremented on batch failure; flush() always terminates |
| Completion Tracking | flush() waits for all pending records |
| Metadata Refresh | refresh_metadata() fetches partition info |
Producer Configuration
| Config | Default | High-Throughput | Low-Latency | Exactly-Once |
|---|---|---|---|---|
batch_size |
16KB | 64KB | 1 | 16KB |
linger_ms |
0 | 10 | 0 | 0 |
compression_type |
None | Lz4 | None | None |
max_in_flight_requests |
5 | 10 | 1 | 5 |
enable_idempotence |
false | false | false | true |
acks |
1 | 1 | 1 | -1 (all) |
auth |
None | — | — | — |
Idempotent producer constraint: When
enable_idempotenceistrue,max_in_flight_requestsmust be ≤ 5 (matching Kafka's KIP-98). The builder returns an error otherwise.
Health Monitoring
Monitor client and server health in real-time:
let stats = client.stats().await;
println!("Client Statistics:");
println!(" Total servers: {}", stats.total_servers);
println!(" Healthy servers: {}", stats.healthy_servers);
println!(" Active connections: {}", stats.active_connections);
println!(" Available connections: {}", stats.available_connections);
for server in &stats.servers {
println!("\n Server: {}", server.address);
println!(" Circuit state: {:?}", server.circuit_state);
println!(" Active connections: {}", server.active_connections);
println!(" Available connections: {}", server.available_connections);
}
Admin Operations
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
// Create topic
client.create_topic("my-topic", Some(3)).await?;
// List topics
let topics = client.list_topics().await?;
for topic in topics {
println!("Topic: {}", topic);
}
// Delete topic
client.delete_topic("my-topic").await?;
Ok(())
}
Advanced Admin API
Rivven supports advanced admin operations for topic configuration management:
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
// Create topic
client.create_topic("events", Some(3)).await?;
// Describe topic configurations
let configs = client.describe_topic_configs(&["events"]).await?;
for (topic, config) in &configs {
println!("Topic '{}' configuration:", topic);
for (key, value) in config {
println!(" {}: {}", key, value);
}
}
// Alter topic configuration
let result = client.alter_topic_config("events", &[
("retention.ms", Some("86400000")), // 1 day retention
("cleanup.policy", Some("compact")), // Log compaction
("max.message.bytes", Some("2097152")), // 2 MB max message
]).await?;
println!("Changed {} config entries", result.changed_count);
// Reset configuration to default
client.alter_topic_config("events", &[
("retention.ms", None), // Reset to broker default
]).await?;
// Increase partition count
let new_count = client.create_partitions("events", 6).await?;
println!("Topic now has {} partitions", new_count);
// Delete records before offset (log truncation)
let results = client.delete_records("events", &[
(0, 1000), // Delete records before offset 1000 in partition 0
(1, 500), // Delete records before offset 500 in partition 1
]).await?;
for result in results {
println!("Partition {}: low watermark now {}",
result.partition, result.low_watermark);
}
Ok(())
}
Supported Topic Configurations
| Configuration | Description | Example |
|---|---|---|
retention.ms |
Message retention time | 86400000 (1 day) |
retention.bytes |
Max partition size | 1073741824 (1 GB) |
max.message.bytes |
Max message size | 2097152 (2 MB) |
segment.bytes |
Segment file size | 536870912 (512 MB) |
segment.ms |
Segment rotation time | 604800000 (7 days) |
cleanup.policy |
delete or compact |
compact |
min.insync.replicas |
Min ISR for acks=all | 2 |
compression.type |
lz4, zstd, snappy, gzip |
lz4 |
### Schema Registration
Register schemas with the Rivven Schema Registry (`rivven-schema`) directly from the client using a lightweight HTTP/1.1 call — no external HTTP dependencies required:
```rust
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
// Register an Avro schema with the schema registry
let schema_id = client.register_schema(
"http://localhost:8081", // Schema registry URL
"users-value", // Subject name
"AVRO", // Schema type: AVRO, PROTOBUF, or JSON
r#"{"type":"record","name":"User","fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]}"#,
).await?;
println!("Registered schema with ID: {}", schema_id);
Ok(())
}
Note: For advanced schema registry operations (compatibility checks, Glue integration, codec management), use
rivven-connect'sSchemaRegistryClient. TheClient::register_schema()method is designed for quick schema bootstrapping without additional dependencies.
Security: HTTP responses from the registry are bounded by
MAX_CHUNK_SIZE(16 MB per chunk) andMAX_RESPONSE_SIZE(16 MB total) to prevent memory exhaustion from malicious or misconfigured registries.
Transactions & Idempotent Producer
Rivven supports native transactions and idempotent producers for exactly-once semantics:
Idempotent Producer
Automatic message deduplication using producer IDs and sequence numbers:
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
// Initialize idempotent producer (assigns producer_id and epoch)
let mut producer = client.init_producer_id(None).await?;
println!("Producer ID: {}, Epoch: {}", producer.producer_id, producer.producer_epoch);
// Publish with deduplication
let (offset, partition, was_duplicate) = client
.publish_idempotent("orders", None::<Vec<u8>>, b"order-data".to_vec(), &mut producer)
.await?;
if was_duplicate {
println!("Message was a duplicate (already delivered)");
} else {
println!("Published to partition {} at offset {}", partition, offset);
}
Ok(())
}
Transactions
Atomic, all-or-nothing message delivery across partitions and topics:
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
// Initialize transactional producer
let mut producer = client.init_producer_id(None).await?;
// Begin transaction
let txn_id = "payment-processor";
client.begin_transaction(txn_id, &producer, None).await?;
// Register partitions before writing
client.add_partitions_to_txn(txn_id, &producer, &[
("orders", 0),
("payments", 0),
]).await?;
// Atomic writes across multiple topics
client.publish_transactional(txn_id, "orders", None::<Vec<u8>>, b"order-created".to_vec(), &mut producer).await?;
client.publish_transactional(txn_id, "payments", None::<Vec<u8>>, b"payment-processed".to_vec(), &mut producer).await?;
// Commit (all-or-nothing)
client.commit_transaction(txn_id, &producer).await?;
println!("Transaction committed atomically!");
Ok(())
}
Exactly-Once Consume-Transform-Produce
For stream processing with exactly-once semantics:
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
let mut producer = client.init_producer_id(None).await?;
let txn_id = "stream-processor";
let consumer_group = "processor-group";
// Begin transaction
client.begin_transaction(txn_id, &producer, None).await?;
// Add output partition to transaction
client.add_partitions_to_txn(txn_id, &producer, &[("output-topic", 0)]).await?;
// Consume input
let messages = client.consume("input-topic", 0, 0, 100).await?;
// Transform and produce
for msg in &messages {
let transformed = format!("processed: {:?}", msg.value);
client.publish_transactional(
txn_id, "output-topic", None::<Vec<u8>>,
transformed.into_bytes(), &mut producer
).await?;
}
// Commit consumer offsets as part of transaction
client.add_offsets_to_txn(
txn_id, &producer, consumer_group,
&[("input-topic", 0, messages.len() as i64)]
).await?;
// Atomic commit (output messages + consumed offsets)
client.commit_transaction(txn_id, &producer).await?;
Ok(())
}
Transaction Error Handling
use rivven_client::{Client, Error};
// On error, abort the transaction
match client.commit_transaction(txn_id, &producer).await {
Ok(()) => println!("Committed successfully"),
Err(e) => {
eprintln!("Commit failed: {}", e);
// Abort to discard all writes
client.abort_transaction(txn_id, &producer).await?;
}
}
Configuration Reference
ResilientClientConfig
| Option | Default | Description |
|---|---|---|
servers |
Required | List of server addresses |
pool_size_per_server |
10 | Maximum connections per server |
connection_timeout |
10s | Timeout for establishing connections |
request_timeout |
30s | Timeout for individual requests |
max_retries |
3 | Maximum retry attempts |
retry_initial_delay |
100ms | Initial retry delay |
retry_max_delay |
5s | Maximum retry delay |
retry_multiplier |
2.0 | Delay multiplier between retries |
circuit_breaker_failure_threshold |
5 | Failures before circuit opens |
circuit_breaker_recovery_timeout |
30s | Time before attempting recovery |
max_connection_lifetime |
300s | Maximum time a pooled connection can be reused before recycling |
idle_timeout |
60s | Maximum time a pooled connection can sit idle before eviction |
ConsumerConfig
| Option | Default | Description |
|---|---|---|
bootstrap_servers |
["127.0.0.1:9092"] |
Bootstrap server addresses |
group_id |
"default-group" |
Consumer group ID |
max_poll_records |
500 | Maximum records per poll |
max_poll_interval_ms |
5000 | Maximum poll interval in ms |
auto_commit_interval |
5s | Auto-commit interval (None to disable) |
isolation_level |
0 | 0 = read_uncommitted, 1 = read_committed |
metadata_refresh_interval |
300s | Interval for re-discovering partitions |
reconnect_backoff_ms |
100 | Initial reconnect backoff delay in ms |
reconnect_backoff_max_ms |
10 000 | Maximum reconnect backoff delay in ms |
max_reconnect_attempts |
10 | Maximum reconnect attempts before giving up |
session_timeout_ms |
10 000 | Session timeout for group coordination in ms |
rebalance_timeout_ms |
30 000 | Maximum time the coordinator waits for all members to join during rebalance |
heartbeat_interval_ms |
3 000 | Heartbeat interval in ms (clamped to ≤ 1/3 of session timeout at build time) |
tls_config |
None |
TLS configuration (requires tls feature) |
tls_server_name |
None |
Override SNI hostname for TLS handshake |
Coordination mode: When no explicit partition assignments are configured (i.e.,
partitionsis empty), the consumer automatically uses server-side group coordination via the JoinGroup/SyncGroup/Heartbeat/LeaveGroup protocol. Set explicit partitions via.assign("topic", vec![0, 1, 2])to bypass coordination and use static partition assignment.
Error Handling
The client provides typed errors for different failure modes:
use rivven_client::{ResilientClient, Error};
match client.publish("topic", None, b"data").await {
Ok(offset) => println!("Published at offset {}", offset),
Err(Error::CircuitBreakerOpen(server)) => {
println!("Server {} is unhealthy, circuit breaker open", server);
}
Err(Error::AllServersUnavailable) => {
println!("All servers are unavailable");
}
Err(Error::AuthenticationFailed(msg)) => {
println!("Authentication failed: {}", msg);
}
Err(Error::ConnectionError(msg)) => {
println!("Connection failed: {}", msg);
}
Err(Error::IoError(kind, msg)) => {
println!("I/O error ({:?}): {}", kind, msg);
// ErrorKind preserved for retry classification
}
Err(e) => println!("Other error: {}", e),
}
Auth config structs (ProducerAuthConfig, ConsumerAuthConfig, PipelineAuthConfig) use custom Debug impls that redact passwords as [REDACTED], so debug logging never leaks credentials.
TLS Configuration
Enable TLS for secure connections:
rivven-client = { version = "0.2", features = ["tls"] }
use rivven_client::{Client, TlsConfig};
let tls_config = TlsConfig::builder()
.ca_cert_path("/path/to/ca.crt")
.client_cert_path("/path/to/client.crt")
.client_key_path("/path/to/client.key")
.build()?;
let client = Client::connect_with_tls("localhost:9093", tls_config).await?;
Consumer with TLS
use rivven_client::{Consumer, ConsumerConfig, TlsConfig};
let tls = TlsConfig::builder()
.ca_cert_path("/path/to/ca.crt")
.build()?;
let config = ConsumerConfig::builder()
.bootstrap_servers(vec!["localhost:9093".into()])
.group_id("my-group")
.tls(tls, None) // None = use bootstrap server hostname for SNI
.build()?;
let mut consumer = Consumer::new(config).await?;
Rebalance Listener
Register callbacks for partition assignment changes during consumer group rebalances:
use rivven_client::{Consumer, ConsumerConfig, RebalanceListener, TopicPartition};
use async_trait::async_trait;
struct MyListener;
#[async_trait]
impl RebalanceListener for MyListener {
async fn on_partitions_revoked(&self, partitions: &[TopicPartition]) {
// Commit offsets before partitions are taken away
for tp in partitions {
println!("Revoking {}/{}", tp.topic, tp.partition);
}
}
async fn on_partitions_assigned(&self, partitions: &[TopicPartition]) {
// Initialize state for newly assigned partitions
for tp in partitions {
println!("Assigned {}/{}", tp.topic, tp.partition);
}
}
}
let mut consumer = Consumer::new(config).await?;
consumer.set_rebalance_listener(MyListener);
Documentation
License
Apache-2.0. See LICENSE.
Dependencies
~20–49MB
~841K SLoC