15 unstable releases (3 breaking)
| new 0.4.0 | Nov 8, 2025 |
|---|---|
| 0.3.4 | Sep 4, 2025 |
| 0.3.3 | Aug 31, 2025 |
| 0.2.1 | Aug 24, 2025 |
| 0.1.5 | Jun 25, 2025 |
#68 in Finance
220KB
4.5K
SLoC
KiteTicker Async Manager
High-performance async WebSocket client for the Kite Connect API with multi-connection support and dynamic subscription management.
π Documentation | π Getting Started | π Examples | π§ API Reference
β¨ Key Features
- π Multi-Connection Support - Utilize all 3 allowed WebSocket connections (9,000 symbol capacity)
- π Multi-API Support - Manage multiple Kite Connect accounts simultaneously (18,000+ symbols)
- β‘ High Performance - Dedicated parser tasks, optimized buffers, sub-microsecond latency
- π Dynamic Subscriptions - Add/remove symbols at runtime without reconnection
- π Load Balancing - Automatic symbol distribution across connections
- πͺ Production Ready - Comprehensive error handling, health monitoring, reconnection
- π§ Async-First Design - Built with Tokio, follows Rust async best practices
- π§© Zero-Copy Raw Access - Optional, fully safe, endian-correct views over packet bytes
π Quick Start
Installation
Add to your Cargo.toml:
[dependencies]
kiteticker-async-manager = "0.2.1"
tokio = { version = "1.0", features = ["full"] }
Basic Usage
use kiteticker_async_manager::{KiteTickerManager, KiteManagerConfig, Mode, TickerMessage};
#[tokio::main]
async fn main() -> Result<(), String> {
// Setup credentials
let api_key = std::env::var("KITE_API_KEY").unwrap();
let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap();
// Create high-performance manager
let config = KiteManagerConfig {
max_connections: 3,
max_symbols_per_connection: 3000,
enable_dedicated_parsers: true,
default_mode: Mode::LTP,
..Default::default()
};
// Start manager
let mut manager = KiteTickerManager::new(api_key, access_token, config);
manager.start().await?;
// Subscribe to symbols (automatically distributed across connections)
let symbols = vec![256265, 408065, 738561]; // NIFTY 50, HDFC Bank, Reliance
manager.subscribe_symbols(&symbols, Some(Mode::Quote)).await?;
// Process data from independent channels
let channels = manager.get_all_channels();
for (channel_id, mut receiver) in channels {
tokio::spawn(async move {
while let Ok(message) = receiver.recv().await {
if let TickerMessage::Ticks(ticks) = message {
for tick in ticks {
println!("Channel {:?}: {} @ βΉ{:.2}",
channel_id,
tick.instrument_token,
tick.content.last_price.unwrap_or(0.0));
}
}
}
});
}
// Add symbols dynamically
manager.subscribe_symbols(&[5633, 884737], Some(Mode::Full)).await?;
// Remove symbols
manager.unsubscribe_symbols(&[408065]).await?;
// Change subscription mode
manager.change_mode(&[256265], Mode::Full).await?;
Ok(())
}
Multi-API Manager (NEW!)
Manage multiple Kite Connect accounts in a single manager:
use kiteticker_async_manager::{
MultiApiKiteTickerManager,
DistributionStrategy,
Mode,
};
#[tokio::main]
async fn main() -> Result<(), String> {
// Create multi-API manager
let mut manager = MultiApiKiteTickerManager::builder()
.add_api_key("account1", "api_key_1", "token_1")
.add_api_key("account2", "api_key_2", "token_2")
.max_connections_per_api(3)
.distribution_strategy(DistributionStrategy::RoundRobin)
.build();
manager.start().await?;
// Subscribe symbols (auto-distributed across API keys)
let symbols = vec![256265, 408065, 738561];
manager.subscribe_symbols(&symbols, Some(Mode::Quote)).await?;
// Or assign to specific API key
manager.subscribe_symbols_to_api("account1", &symbols, Some(Mode::LTP)).await?;
// Receive unified stream from all API keys
let mut unified = manager.get_unified_channel();
while let Ok((api_key_id, message)) = unified.recv().await {
println!("From {}: {:?}", api_key_id.0, message);
}
Ok(())
}
π Multi-API Guide - Complete multi-API documentation
π Performance Comparison
| Feature | Single Connection | Multi-Connection Manager | Multi-API Manager | Improvement |
|---|---|---|---|---|
| Max Symbols | 3,000 | 9,000 | 18,000+ (9K Γ N APIs) | 6x+ capacity |
| Max API Keys | 1 | 1 | Unlimited | Multi-account |
| Throughput | Limited by 1 connection | 3 parallel connections | 3 Γ N connections | N Γ 3x throughput |
| Latency | ~5-10Β΅s | ~1-2Β΅s | ~1-2Β΅s | 5x faster |
| Resilience | Single point of failure | 3 independent connections | Multi-account redundancy | High availability |
| Dynamic Ops | Manual reconnection | Runtime add/remove | Runtime add/remove | Zero downtime |
ποΈ Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β KiteTickerManager β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β π Symbol Distribution (9,000 symbols max) β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β βConnection 1 β βConnection 2 β βConnection 3 β β
β β3,000 symbolsβ β3,000 symbolsβ β3,000 symbolsβ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β‘ Dedicated Parser Tasks (CPU Optimized) β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Parser 1 β β Parser 2 β β Parser 3 β β
β β ~1Β΅s latencyβ β ~1Β΅s latencyβ β ~1Β΅s latencyβ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β π‘ Independent Output Channels β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Channel 1 β β Channel 2 β β Channel 3 β β
β βbroadcast::Rxβ βbroadcast::Rxβ βbroadcast::Rxβ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Documentation
- π Getting Started - Complete beginner's guide
- οΏ½ Multi-API Guide - Manage multiple API keys (NEW!)
- οΏ½π§ API Reference - Detailed API documentation
- π Examples - Practical code examples
- π Dynamic Subscriptions - Runtime symbol management
- β‘ Performance Guide - Optimization techniques
π Examples
π° Basic Examples
- Single Connection - Simple WebSocket usage
- Portfolio Monitor - Track portfolio stocks
- Runtime Subscriptions - Dynamic symbol management
π Multi-API Examples
- Multi-API Demo - Manage multiple Kite Connect accounts (NEW!)
π Advanced Examples
- Dynamic Demo - Complete dynamic workflow
- Manager Demo - Multi-connection setup
- Market Scanner - High-volume scanning
β‘ Performance Examples
- Performance Demo - Benchmarking
- High Frequency - Maximum throughput
- Raw vs Parsed - Micro-benchmark of raw vs parsed
- Raw Full Peek - Zero-copy field peeking for all packet sizes
π― Use Cases
| Use Case | Configuration | Symbols | Example |
|---|---|---|---|
| Portfolio Monitoring | 1 connection, Quote mode | 10-50 | Track personal investments |
| Algorithmic Trading | 3 connections, Quote mode | 100-1,000 | Trading strategies |
| Market Scanner | 3 connections, LTP mode | 1,000-9,000 | Scan entire market |
| High-Frequency Trading | 3 connections, Full mode | 500-3,000 | Order book analysis |
βοΈ Configuration Presets
Development
let config = KiteManagerConfig {
max_connections: 1,
max_symbols_per_connection: 100,
default_mode: Mode::Full,
..Default::default()
};
Production
let config = KiteManagerConfig {
max_connections: 3,
max_symbols_per_connection: 3000,
connection_buffer_size: 20000,
parser_buffer_size: 50000,
enable_dedicated_parsers: true,
default_mode: Mode::LTP,
..Default::default()
};
π Comparison with Official Library
| Feature | Official kiteconnect-rs | kiteticker-async-manager |
|---|---|---|
| Maintenance | β Unmaintained | β Actively maintained |
| Async Support | β Callback-based | β Full async/await |
| Type Safety | β Untyped JSON | β Fully typed structs |
| Multi-Connection | β Single connection | β Up to 3 connections |
| Dynamic Subscriptions | β Manual reconnection | β Runtime add/remove |
| Performance | β Basic | β High-performance optimized |
| Error Handling | β Limited | β Comprehensive |
π οΈ Development
Prerequisites
# Install Rust and tools
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
cargo install just # Task runner
Building
# Clone and build
git clone https://github.com/SPRAGE/kiteticker-async-manager.git
cd kiteticker-async-manager
just build
Running Examples
# Set API credentials
export KITE_API_KEY=your_api_key
export KITE_ACCESS_TOKEN=your_access_token
# Run examples
cargo run --example single_connection
cargo run --example dynamic_subscription_demo
cargo run --example raw_full_peek --release
cargo run --example raw_vs_parsed --release
Available Tasks
just --list
π¦ Features
- Multi-Connection Management - Utilize all 3 WebSocket connections
- Dynamic Subscriptions - Add/remove symbols without reconnection
- Load Balancing - Automatic symbol distribution
- High Performance - Dedicated parsers, optimized buffers
- Type Safety - Fully typed market data structures
- Error Resilience - Comprehensive error handling and recovery
- Health Monitoring - Real-time connection health tracking
- Async-First - Built for modern Rust async ecosystems
π€ Contributing
Contributions are welcome! Please see our contribution guidelines.
Development Setup
Use just to run development tasks:
just --list # Show available tasks
just build # Build the project
just check # Check code formatting and lints
π License
Licensed under the Apache License, Version 2.0. See LICENSE for details.
π Links
β Star this repository if you find it useful!
π¬ Zero-copy raw access (advanced)
For maximum throughput with minimal allocations, you can work directly with raw WebSocket frame bytes and view packet bodies using endian-safe, zero-copy structs.
Key points:
- Subscribe to raw frames via
subscribe_raw_frames()onKiteTickerAsync, or via the manager usingget_raw_frame_channel(ChannelId)orget_all_raw_frame_channels() - Extract packet bodies using their length prefixes
- Create typed views with
as_tick_raw,as_index_quote_32, oras_inst_header_64 - The returned
zerocopy::Ref<&[u8], T>dereferences to&Tand is valid while the backing bytes live (storeBytesto keep alive)
Example snippet:
use kiteticker_async_manager::{KiteTickerAsync, Mode, as_tick_raw};
use bytes::Bytes;
# async fn demo(mut ticker: KiteTickerAsync) -> Result<(), String> {
let mut frames = ticker.subscribe_raw_frames();
let frame: Bytes = frames.recv().await.unwrap();
let num = u16::from_be_bytes([frame[0], frame[1]]) as usize;
let mut off = 2usize;
for _ in 0..num {
let len = u16::from_be_bytes([frame[off], frame[off+1]]) as usize;
let body = frame.slice(off+2..off+2+len);
if len == 184 {
if let Some(view_ref) = as_tick_raw(&body) {
let tick = &*view_ref;
println!("token={} ltp_scaled={}", tick.header.instrument_token.get(), tick.header.last_price.get());
}
}
off += 2 + len;
}
Ok(())
# }
Safety: All raw structs derive Unaligned and use big-endian wrappers; no unsafe is required.
Manager-level raw frames
use kiteticker_async_manager::{KiteTickerManagerBuilder, Mode, ChannelId, as_tick_raw};
# #[tokio::main]
# async fn main() -> Result<(), String> {
let api_key = std::env::var("KITE_API_KEY").unwrap();
let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap();
let mut mgr = KiteTickerManagerBuilder::new(api_key, access_token)
.raw_only(true)
.build();
mgr.start().await?;
mgr.subscribe_symbols(&[256265], Some(Mode::Full)).await?;
for (id, mut rx) in mgr.get_all_raw_frame_channels() {
tokio::spawn(async move {
while let Ok(frame) = rx.recv().await {
if frame.len() < 2 { continue; }
let mut off = 2usize;
let num = u16::from_be_bytes([frame[0], frame[1]]) as usize;
for _ in 0..num {
if off + 2 > frame.len() { break; }
let len = u16::from_be_bytes([frame[off], frame[off+1]]) as usize;
let body = frame.slice(off+2..off+2+len);
if len == 184 {
if let Some(view) = as_tick_raw(&body) {
let token = view.header.instrument_token.get();
println!("conn={:?} token={}", id, token);
}
}
off += 2 + len;
}
}
});
}
# Ok(()) }
Or, if you only want Full depth packets, use the helper:
use kiteticker_async_manager::{KiteTickerManagerBuilder, Mode, ChannelId, KiteTickerRawSubscriber184};
# #[tokio::main]
# async fn main() -> Result<(), String> {
let api_key = std::env::var("KITE_API_KEY").unwrap();
let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap();
let mut mgr = KiteTickerManagerBuilder::new(api_key, access_token)
.raw_only(true)
.build();
mgr.start().await?;
mgr.subscribe_symbols(&[256265], Some(Mode::Full)).await?;
if let Some(mut sub) = mgr.get_full_raw_subscriber(ChannelId::Connection1) {
tokio::spawn(async move {
while let Ok(Some(view)) = sub.recv_raw_tickraw().await {
let t = &*view; // &TickRaw
println!("token={} ltp={}", t.header.instrument_token.get(), t.header.last_price.get());
}
});
}
# Ok(()) }
Dependencies
~9β17MB
~314K SLoC