4 releases
| new 0.1.35 | Mar 2, 2026 |
|---|---|
| 0.1.32 | Feb 17, 2026 |
| 0.1.31 | Feb 2, 2026 |
| 0.1.28 | Jan 6, 2026 |
#8 in #quorum
Used in newton-prover-aggregator-…
11MB
217K
SLoC
Newton Prover Aggregator
Note: This is a library crate embedded within the Gateway service, not a standalone service. There is no separate aggregator binary or deployment. The Gateway imports and uses this library to perform BLS signature aggregation.
Overview
The aggregator crate is the core library of the Newton Prover AVS (Actively Validated Service) responsible for orchestrating BLS signature aggregation from multiple operators. Embedded within the Gateway, it serves as the central coordinator that:
- Initializes tasks for BLS signature aggregation with quorum requirements
- Processes signed task responses from operators, buffering signatures that arrive before task initialization
- Coordinates with the BLS Aggregation Service to aggregate signatures and verify quorum thresholds
- Submits aggregated responses to on-chain contracts once quorum is reached
- Manages memory and resources to ensure reliable operation under load
The aggregator is designed for high availability, low latency, and robustness. It handles edge cases gracefully, prevents memory leaks through bounded data structures and cleanup mechanisms, and ensures errors in one task don't affect others.
Architecture Overview
The aggregator architecture consists of two main components that work together:
flowchart TD
subgraph Aggregator["AggregatorCore (core.rs)"]
A["Task initialization & validation"]
B["Signature processing & buffering"]
C["Task response storage (task_responses)"]
D["Pending signature buffer (pending_signatures)"]
E["Background cleanup tasks"]
end
Aggregator -->|Channels - ServiceHandle| BLS
subgraph BLS["BlsAggregatorService (bls.rs)"]
F["BLS signature aggregation engine"]
G["Per-task aggregator tasks"]
H["Signature verification"]
I["Quorum threshold checking"]
J["Aggregated response generation"]
end
Component Relationships
AggregatorCore (core.rs):
- Central orchestrator that manages the lifecycle of aggregation tasks
- Handles operator signature submission and buffering
- Manages task response storage and cleanup
- Coordinates with the BLS service via
ServiceHandle
BlsAggregatorService (bls.rs):
- Low-level BLS signature aggregation engine
- Runs per-task aggregation loops in isolated spawned tasks
- Performs cryptographic signature verification
- Aggregates signatures and checks quorum thresholds
- Returns aggregated responses via channels
Data Flow
- Task Initialization:
initialize_task()→ BLS service spawnssingle_task_aggregatortask → Creates per-task response channel → Returns receiver to AggregatorCore - Signature Processing:
process_signed_response()→ Buffered if task not initialized, otherwise sent to BLS service - Aggregation: BLS service aggregates signatures per
task_response_digest, checks quorum thresholds - Response: When quorum reached, aggregated response sent via task-specific channel (direct routing, no mutex contention)
- Wait for Aggregation:
wait_for_aggregation(task_id, timeout)→ Receives from task-specific channel → Returns response - Submission:
submit_aggregated_response()submits to contract and cleans up task state
Consensus Module (consensus.rs)
The consensus module handles median-based normalization when operators return different values for time-sensitive data (e.g., prices).
Two-Digest System: BLS signature aggregation requires all operators to sign the same message. However, operators independently generate unique ECDSA attestations. The consensus module uses two digest types:
| Digest Type | Used For | Attestations |
|---|---|---|
| Consensus Digest | BLS signing/verification | Excluded |
| Full Digest | Contract storage, challenge verification | Included |
Median-Based Normalization Algorithm:
- Extract numeric fields from each operator's
policyTaskData.dataJSON - For each numeric field, compute the median across all operators
- Verify all values are within tolerance (default 10%) of the median
- If within tolerance, normalize all responses to use median values
- Recompute consensus digest (attestations excluded) after normalization
Values are considered "in tolerance" if: |value - median| / median <= tolerance_pct / 100
Key Functions:
build_consensus(responses, tolerance_pct)- Attempts consensus on signed responsescompute_consensus_from_unsigned(responses, tolerance_pct)- Prepare phase consensus on unsigned policyTaskDatacheck_early_consensus(responses)- Fast path when all data hashes are identical
Core Components Deep Dive
AggregatorCore (core.rs)
Purpose
AggregatorCore is the central orchestrator that manages the complete lifecycle of aggregation tasks. It provides a high-level API for task initialization, signature processing, and response submission while managing memory, concurrency, and error handling.
Key Data Structures
pub struct AggregatorCore {
/// Service handle to interact with the BLS Aggregator Service.
/// ServiceHandle is Clone with a thread-safe UnboundedSender
pub service_handle: ServiceHandle,
/// Per-task response receivers for direct routing (no mutex contention)
/// Each task gets its own channel, eliminating response stealing and lock contention
/// DashMap enables lock-free concurrent access for different TaskIds
task_response_receivers: DashMap<TaskId, UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>>,
/// DashMap for lock-free concurrent access - different tasks can access their states simultaneously
/// Per-task locking eliminates contention between tasks accessing different TaskIds
pub task_states: Arc<DashMap<TaskId, TaskState>>,
/// Cancellation token for background tasks
cancellation_token: CancellationToken,
}
/// Task state to reduce lock contention
#[derive(Debug, Clone)]
pub struct TaskState {
/// Quorum numbers for reference timestamp queries
quorum_nums: Vec<u8>,
/// Operator errors for this task
operator_errors: Vec<OperatorErrorResponse>,
/// Expected operator count (for early exit detection)
expected_operators: usize,
/// Task responses by digest
task_responses: HashMap<TaskResponseDigest, BindingTaskResponse>,
}
Design Decisions:
task_response_receiversusesDashMap: Lock-free concurrent access for different TaskIds. Per-task channels enable zero mutex contention. Each task has its own dedicated channel, eliminating response stealing and serialization bottlenecks.task_statesusesArc<DashMap>: Lock-free concurrent access for different tasks. Per-task locking eliminates contention between tasks accessing different TaskIds. Critical for high-throughput scenarios with 10k+ concurrent tasks.- Simplified structure: Moved task responses into
TaskStatefor better locality and reduced lock scope. - Eliminated insertion tracking: Removed separate insertion order tracking in favor of simpler cleanup strategies.
Key Methods
new()
Initializes the aggregator core and spawns background tasks:
pub async fn new(
avs_registry_reader: AvsRegistryChainReader,
operator_registry_address: Address,
ws_rpc_url: Option<String>,
http_rpc_url: String,
) -> Result<Self, eyre::Error>
Responsibilities:
- Creates BLS aggregation service (either in-memory or on-chain operator info)
- Initializes all data structures with appropriate synchronization primitives
- Spawns three background tasks:
process_pending_signatures_loop: Processes buffered signatures when tasks are initializedcleanup_expired_pending_signatures_loop: Removes expired pending signatures (5s timeout)cleanup_stale_task_responses_loop: Evicts stale task responses (60s interval)
Memory Safety: All background tasks use CancellationToken for graceful shutdown, ensuring resources are cleaned up when AggregatorCore is dropped.
initialize_task()
Initializes a new aggregation task with validation:
pub async fn initialize_task(
&self,
task_id: TaskId,
task_created_block: u64,
quorum_nums: Vec<u8>,
quorum_threshold_percentage: u8,
time_to_expiry: Duration,
) -> Result<(), eyre::Error>
Input Validation:
- Task ID must be non-zero
- Quorum numbers must be non-empty
- Threshold percentage must be between 1 and 100
- Time to expiry must be non-zero
Flow:
- Validates all inputs
- Creates
TaskMetadatawith task configuration - Clones
ServiceHandlebefore async call (avoids holding lock during async operation) - Sends
InitializeTaskmessage to BLS service - Receives task-specific response receiver from BLS service (per-task channel for direct routing)
- Stores receiver in
task_response_receiversforwait_for_aggregation()to use - Notifies pending signatures loop to retry buffered signatures for this task
Error Handling: Returns structured errors with context. Errors in initialization don't affect other tasks.
Performance: Per-task channels eliminate mutex contention and response stealing, enabling true concurrent processing of multiple tasks.
process_signed_response()
Processes a signed task response from an operator:
pub async fn process_signed_response(
&self,
signed_response: SignedTaskResponse
) -> Result<(), eyre::Error>
Flow:
- Validates
task_idandoperator_id(both must be non-zero) - Computes
task_response_digestvia Keccak256 hash - Creates
TaskSignatureand sends to BLS service - If task not initialized: Buffers signature in
pending_signatures(with size limit check) - If task initialized: Stores successful response in
task_responseswith size limit enforcement
Memory Management:
- Checks
MAX_TASK_RESPONSESlimit before adding new task entries - Uses read lock to find oldest entry (non-blocking)
- Uses write lock only when evicting (minimizes lock duration)
- Lock-free counter increment for insertion order
Error Handling:
TaskNotFound: Buffers signature for later processing- Other errors: Logs with full context (task_id, operator_id, timing) and returns error
- Errors in one signature don't affect others
wait_for_aggregation()
Waits for aggregated response with timeout using per-task channels:
pub async fn wait_for_aggregation(
&self,
task_id: TaskId,
timeout_duration: Duration,
) -> Result<BlsAggregationServiceResponse, AggregatorCoreError>
Flow:
- Validates timeout duration and task_id (must be non-zero)
- Removes task-specific receiver from HashMap (ensures only one waiter per task, receiver can't be cloned)
- Receives directly from task-specific channel (no mutex lock needed, no response stealing)
- Returns response or error with timing information
- Receiver automatically dropped when function returns (no manual cleanup needed)
Performance Benefits:
- Zero mutex contention: Each task has its own channel, no shared lock needed
- Zero response stealing: Responses go directly to the correct task's channel
- Low latency: ~0.1-0.5ms vs 5-500ms under high concurrency (old approach)
- High throughput: Supports 10k+ concurrent tasks efficiently
- Natural cancellation: Dropping receiver = cancellation, no explicit cleanup needed
Error Handling:
TaskNotInitialized: Task not found in receivers mapTimeout: Includes operator errors for the specific taskAggregationServiceError: Task-specific errors (guaranteed to be for this task_id)Cancelled: Operation was cancelled via cancellation token
Isolation: Each task's response channel is independent. One task's channel closure doesn't affect others.
submit_aggregated_response()
Submits aggregated response to contract:
pub async fn submit_aggregated_response(
&self,
avs_writer: &AvsWriter,
task: Task,
task_response: BindingTaskResponse,
service_response: BlsAggregationServiceResponse,
) -> Result<TransactionReceipt, eyre::Error>
Flow:
- Converts BLS response to contract format
- Submits to contract via
AvsWriter - On success: Cleans up
task_responsesentry and insertion order - Records metrics for success/failure and duration
Cleanup: Automatically removes task from task_responses and task_responses_insertion_order after successful submission to prevent memory leaks.
update_response_indices()
Updates the check signatures indices in a BlsAggregationServiceResponse for a new task_created_block:
pub async fn update_response_indices(
&self,
task_id: TaskId,
service_response: BlsAggregationServiceResponse,
new_task_created_block: u64,
quorum_numbers: &[u8],
) -> Result<BlsAggregationServiceResponse, eyre::Error>
Purpose: When a task is created on-chain, the actual taskCreatedBlock may differ from the block used during initial aggregation (due to transaction confirmation timing). This method efficiently recalculates the index arrays for the correct block.
Background Tasks
process_pending_signatures_loop
Continuously processes buffered signatures when notified:
- Listens on
pending_notify_rxchannel for task IDs - When notified, processes all buffered signatures for that task
- Uses cloned
ServiceHandleto avoid holding lock during async operations - Batch inserts successful responses to minimize lock contention
- Drops failed signatures (no retry mechanism)
Performance: Processes signatures in batch, collecting successful responses before single lock acquisition.
cleanup_expired_pending_signatures_loop
Periodically removes expired pending signatures:
- Runs every
PENDING_SIGNATURE_CLEANUP_INTERVAL(10 seconds) - Removes entries older than
PENDING_SIGNATURE_TIMEOUT(5 seconds) - Prevents unbounded growth if tasks never initialize
cleanup_stale_task_responses_loop
Periodically evicts stale task responses:
- Runs every
TASK_RESPONSES_CLEANUP_INTERVAL(60 seconds) - If over
MAX_TASK_RESPONSESlimit, evicts oldest 10% of entries - Uses read lock to find oldest entries, write lock only when removing
BLS Aggregation Service (bls.rs)
Purpose
The BLS Aggregation Service is the low-level engine that performs cryptographic BLS signature aggregation. It runs per-task aggregation loops in isolated spawned tasks, verifying signatures, aggregating them, and checking quorum thresholds.
Key Data Structures
/// Aggregated operators information for a specific task_response_digest
pub struct AggregatedOperators {
signers_apk_g2: BlsG2Point, // Aggregated public key (G2)
signers_agg_sig_g1: Signature, // Aggregated signature (G1)
signers_total_stake_per_quorum: HashMap<u8, U256>, // Total stake per quorum
signers_operator_ids_set: HashMap<FixedBytes<32>, bool>, // Set of signer operator IDs
}
/// Task metadata for initialization
pub struct TaskMetadata {
task_id: TaskId,
quorum_numbers: Vec<u8>,
quorum_threshold_percentages: QuorumThresholdPercentages,
time_to_expiry: Duration,
window_duration: Duration,
task_created_block: u64,
}
/// Response from BLS aggregation service
pub struct BlsAggregationServiceResponse {
pub task_id: TaskId,
pub task_created_block: u64,
pub task_response_digest: TaskResponseDigest,
pub non_signers_pub_keys_g1: Vec<BlsG1Point>,
pub non_signers_operators_ids: Vec<FixedBytes<32>>, // Stored for efficient index updates
pub quorum_apks_g1: Vec<BlsG1Point>,
pub signers_apk_g2: BlsG2Point,
pub signers_agg_sig_g1: Signature,
// Index arrays for on-chain verification
pub non_signer_quorum_bitmap_indices: Vec<u32>,
pub quorum_apk_indices: Vec<u32>,
pub total_stake_indices: Vec<u32>,
pub non_signer_stake_indices: Vec<Vec<u32>>,
}
Design Decisions:
- Per
task_response_digestaggregation: Different operators may propose different responses (differenttask_response_digestvalues). Each digest has its own aggregation state, allowing multiple valid responses to be aggregated simultaneously. - Stake tracking per quorum: Operators may have stake in multiple quorums. The service tracks stake per quorum to check thresholds independently.
- Storing
non_signers_operators_ids: TheBlsAggregationServiceResponsestores non-signer operator IDs computed during aggregation. This enables efficient index updates viaupdate_response_indices()without re-fetching operator state or performing O(n²) public key matching. The operator IDs are already computed inbuild_aggregated_response()and preserving them eliminates a 50-200ms RPC call when recalculating indices for a newtaskCreatedBlock.
Key Methods
start()
Initializes the BLS service and spawns main loop:
pub fn start(self) -> (ServiceHandle, AggregateReceiver)
Responsibilities:
- Creates message channels for task initialization and signature processing
- Creates aggregate response channel
- Spawns main
run()loop in background task - Returns
ServiceHandle(for sending messages) andAggregateReceiver(for receiving responses)
run()
Main message processing loop:
async fn run(
self,
mut msg_receiver: UnboundedReceiver<AggregationMessage>,
aggregate_sender: UnboundedSender<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>,
)
Message Types:
InitializeTask: Creates new task aggregator, spawnssingle_task_aggregatortaskProcessSignature: Forwards signature to appropriate task aggregator via per-task channel
Memory Management:
- Maintains
task_channelsHashMap with FIFO eviction (MAX_ACTIVE_TASKS = 10000) - Tracks insertion order for eviction
- Detects finished tasks via channel closure
Error Isolation: Each task runs in its own spawned task. Panics are caught and logged without affecting other tasks.
single_task_aggregator()
Per-task aggregation logic:
async fn single_task_aggregator(
avs_registry_service: A,
metadata: TaskMetadata,
aggregated_response_sender: UnboundedSender<...>,
signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
) -> Result<(), BlsAggregationServiceError>
Flow:
- Fetches operator AVS state at task creation block
- Fetches quorum AVS state (total stakes, aggregate public keys)
- Enters
loop_task_aggregator()to process signatures - Handles task expiry timer
- Handles window duration for additional signatures after quorum
Isolation: Each task runs in isolated spawned task. Errors don't propagate to other tasks.
loop_task_aggregator()
Main signature processing loop for a task:
async fn loop_task_aggregator(
avs_registry_service: A,
task_id: TaskId,
task_created_block: u64,
time_to_expiry: Duration,
aggregated_response_sender: UnboundedSender<...>,
mut signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
operator_state_avs: HashMap<FixedBytes<32>, OperatorAvsState>,
total_stake_per_quorum: HashMap<u8, Uint<256, 4>>,
quorum_threshold_percentage_map: HashMap<u8, u8>,
quorum_apks_g1: Vec<BlsG1Point>,
quorum_nums: Vec<u8>,
window_duration: Duration,
) -> Result<(), BlsAggregationServiceError>
Flow:
- Initializes
aggregated_operatorsHashMap (keyed bytask_response_digest) - Selects between signature channel and task expiry timer
- For each signature:
- Calls
handle_new_signature()to process - Updates aggregation state
- Checks quorum thresholds
- If thresholds met: aggregates and sends response, opens window
- Calls
- Handles window duration for additional signatures
- Cleans up on task expiry
Memory Management:
aggregated_operatorslimited toMAX_AGGREGATED_OPERATORS_PER_TASK = 100per task- If limit reached, new digests are ignored (logged as warning) - this scenario should never happen in practice
- Cleared when task completes
handle_new_signature()
Processes a new signature:
async fn handle_new_signature(
avs_registry_service: &A,
aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
open_window: &mut bool,
current_aggregated_response: &mut Option<BlsAggregationServiceResponse>,
window_tx: &UnboundedSender<bool>,
task_id: TaskId,
task_created_block: u64,
operator_state_avs: &HashMap<FixedBytes<32>, OperatorAvsState>,
total_stake_per_quorum: &HashMap<u8, Uint<256, 4>>,
quorum_threshold_percentage_map: &HashMap<u8, u8>,
quorum_apks_g1: &[BlsG1Point],
quorum_nums: &[u8],
window_duration: Duration,
signed_task_digest: Option<SignedTaskResponseDigest>,
) -> Result<(), BlsAggregationServiceError>
Flow:
- Validates inputs (operator_id, task_response_digest, quorum_nums)
- Checks for duplicate signatures (same operator signing same digest)
- Verifies signature cryptographically
- Sends verification result to result channel (handles receiver drop gracefully)
- If valid: Updates
aggregated_operatorsfor thetask_response_digest - Checks if quorum thresholds are met
- If met: Aggregates and sends response, opens window
Error Handling:
- Invalid signatures: Logged and returned via result channel
- Duplicate signatures: Detected and rejected
- Missing operator state: Returns
RegistryError - Receiver drop (timeout/cancellation): Handled gracefully, doesn't propagate error
update_aggregated_operators()
Updates aggregation state for a task_response_digest:
fn update_aggregated_operators(
task_id: TaskId,
aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
operator_state: &OperatorAvsState,
task_response_digest: FixedBytes<32>,
bls_signature: Signature,
operator_id: FixedBytes<32>,
) -> Result<AggregatedOperators, BlsAggregationServiceError>
Flow:
- If digest already exists: Calls
aggregate_new_operator(task_id, ...)to add operator to existing aggregation - If new digest: Creates new
AggregatedOperatorsentry - Returns updated aggregation state
Error Handling:
- Returns
RegistryError { task_id, operator_context, reason }if operator public keys are missing - Includes
task_idparameter for error context (replaces previous.unwrap()panics) - Error reason includes operator ID hex for debugging
Memory Management & Safety
Memory Limits & Eviction
The aggregator implements multiple layers of memory protection to prevent unbounded growth:
Task Responses (task_responses)
- Limit:
MAX_TASK_RESPONSES = 10000tasks - Eviction Strategy: FIFO (First-In-First-Out) using insertion order tracking
- Implementation:
task_responses_insertion_order:RwLock<HashMap<TaskId, u64>>tracks insertion ordertask_responses_insertion_counter:AtomicU64for lock-free counter increments- When limit reached: Finds oldest entry (read lock, non-blocking), evicts it (write lock, minimal duration)
- Cleanup:
- Automatic cleanup after successful submission
- Periodic cleanup every 60 seconds (
TASK_RESPONSES_CLEANUP_INTERVAL) - Evicts 10% extra entries when over limit for headroom
Pending Signatures (pending_signatures)
- Limit:
MAX_PENDING_SIGNATURE_TASKS = 1000tasks - Timeout:
PENDING_SIGNATURE_TIMEOUT = 5 seconds - Eviction Strategy: Time-based expiration + size limit
- Implementation:
- Each entry has
created_at: Instanttimestamp - Periodic cleanup every 10 seconds (
PENDING_SIGNATURE_CLEANUP_INTERVAL) - Removes entries older than timeout
- Rejects new tasks if at limit (prevents DoS)
- Each entry has
- Use Case: Handles signatures that arrive before task initialization
BLS Service Limits
- Active Tasks:
MAX_ACTIVE_TASKS = 10000concurrent tasks- FIFO eviction when limit reached
- Tracks insertion order for eviction
- Aggregated Operators Per Task:
MAX_AGGREGATED_OPERATORS_PER_TASK = 100response digests- Prevents memory bloat for tasks with many different responses
- FIFO eviction with insertion order tracking
Lock-Free & Non-Blocking Patterns
AtomicU64 Counter
The insertion counter uses AtomicU64 for lock-free increments:
task_responses_insertion_counter: Arc<AtomicU64>
// Increment (lock-free)
let counter_value = task_responses_insertion_counter.fetch_add(1, Ordering::Relaxed) + 1;
Benefits: No blocking, no contention, constant-time operation.
RwLock for Insertion Order
The insertion order map uses RwLock to allow concurrent reads:
task_responses_insertion_order: Arc<RwLock<HashMap<TaskId, u64>>>
// Read (non-blocking, allows concurrent readers)
let insertion_order_read = task_responses_insertion_order.read().await;
let oldest = insertion_order_read.iter().min_by_key(|(_, order)| *order);
// Write (exclusive, but only when modifying)
let mut insertion_order_write = task_responses_insertion_order.write().await;
insertion_order_write.insert(task_id, counter_value);
Benefits:
- Multiple readers can check insertion order concurrently
- Writers only block other writers, not readers
- Minimizes lock contention
Lock Minimization
ServiceHandle is cloned before async operations to avoid holding locks:
// Clone handle before async call to avoid holding lock during async operation
let handle = {
let locked_handle = self.service_handle.lock().await;
locked_handle.clone()
};
let result = handle.process_signature(task_signature).await;
Benefits: Lock is released immediately after cloning, allowing other operations to proceed during async call.
Resource Cleanup
Automatic Cleanup
- After successful submission:
task_responsesentry removed immediately - On task expiry: BLS service cleans up task state, channels closed
- On channel closure: Detected in
ProcessSignaturehandling, triggers cleanup
Timeout-Based Expiration
- Pending signatures: Auto-removed after 5 seconds if task never initializes
- Task expiry: Handled by timer in
single_task_aggregator
Graceful Shutdown
- CancellationToken: All background tasks check cancellation token
- Drop implementation:
AggregatorCore::Dropcancels background tasks - Channel closure: Detected gracefully, doesn't panic
Concurrency & Async Patterns
Task Isolation
Each aggregation task runs in its own spawned task with dedicated channels, ensuring complete isolation:
// In BLS service run() loop
tokio::spawn(async move {
let result = Self::single_task_aggregator(
avs_registry_service,
metadata,
task_response_sender, // Task-specific response sender
signature_rx,
).await;
// Handle result, log errors
// Response sender dropped here, channel closes naturally
});
// Monitor task for panic detection
tokio::spawn(async move {
if let Err(e) = join_handle.await {
error!("Task aggregator panicked: {:?}", e);
}
});
Benefits:
- Complete isolation: Errors in one task don't affect others
- Channel isolation: Each task has its own response channel, no cross-task interference
- Panic safety: Panics are caught and logged, don't crash the service
- Independent cancellation: Tasks can be cancelled independently via channel closure
- Resource cleanup: Channel closure automatically triggers cleanup
Channel-Based Communication
The system uses unbounded channels for message passing with per-task isolation:
// Task initialization - creates TWO channels per task:
// 1. Signature channel (for sending signatures to task aggregator)
// 2. Response channel (for receiving aggregated responses)
let (signature_tx, signature_rx) = mpsc::unbounded_channel::<SignedTaskResponseDigest>();
let (response_tx, response_rx) = mpsc::unbounded_channel::<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>();
// Store both channels
task_channels.insert(task_id, (signature_tx, response_tx, timestamp));
// Return response receiver to caller (AggregatorCore)
result_sender.send(Ok(response_rx))?;
// Result channel (oneshot for verification results)
let (result_tx, result_rx) = oneshot::channel();
Benefits:
- Per-task isolation: Each task has dedicated channels, no interference
- Zero contention: No shared mutex for response waiting
- Direct routing: Responses go directly to the correct task
- Non-blocking sends: Unbounded channels allow immediate sends
- Natural cancellation: Dropping receiver cancels wait operation
- Clean separation: Signature processing and response delivery are decoupled
Lock Contention Minimization
Multiple strategies minimize lock contention:
- Per-task channels: Eliminates mutex contention for response waiting (biggest win)
- Read locks for lookups:
RwLock::read()allows concurrent reads - Write locks only when modifying: Acquired just before modification, released immediately
- Batch operations: Collect data before acquiring lock, minimize lock duration
- Handle cloning: Clone before async operations to release lock early
- Receiver removal: Receivers removed from HashMap at start of
wait_for_aggregation(), ensuring only one waiter per task
Example:
// Read lock to find oldest (non-blocking)
let oldest_task_id_opt = {
let insertion_order_read = self.task_responses_insertion_order.read().await;
insertion_order_read.iter().min_by_key(|(_, order)| *order).map(|(id, _)| *id)
};
// Write lock only when evicting
if let Some(oldest_task_id) = oldest_task_id_opt {
if task_responses_map.remove(&oldest_task_id).is_some() {
let mut insertion_order_write = self.task_responses_insertion_order.write().await;
insertion_order_write.remove(&oldest_task_id);
}
}
Error Handling & Robustness
Input Validation
All public methods validate inputs before processing:
- Task ID: Must be non-zero (
TaskId::ZEROcheck) - Operator ID: Must be non-zero (all bytes checked)
- Quorum numbers: Must be non-empty
- Threshold percentage: Must be between 1 and 100
- Timeouts: Must be non-zero duration
Example:
if task_id == TaskId::ZERO {
return Err(eyre::eyre!("Invalid task_id: zero task ID"));
}
if operator_id.as_slice().iter().all(|&b| b == 0) {
return Err(eyre::eyre!("Invalid operator_id: zero operator ID"));
}
Error Types & Context
All BlsAggregationServiceError variants are enriched with structured context fields for comprehensive debugging:
TaskExpired
BlsAggregationServiceError::TaskExpired {
task_id: TaskId,
reason: String, // e.g., "task expired without reaching quorum threshold"
}
TaskNotFound
BlsAggregationServiceError::TaskNotFound {
task_id: TaskId,
reason: String, // e.g., "task not found in task_channels (task may not be initialized yet)"
}
SignatureVerificationError
BlsAggregationServiceError::SignatureVerificationError {
task_id: TaskId,
operator_id: FixedBytes<32>,
verification_error: SignatureVerificationError, // DuplicateSignature, IncorrectSignature, etc.
}
SignaturesChannelClosed
BlsAggregationServiceError::SignaturesChannelClosed {
task_id: TaskId,
reason: String, // e.g., "signature channel receiver dropped (task aggregator may have finished)"
}
RegistryError
BlsAggregationServiceError::RegistryError {
task_id: TaskId,
operator_context: String, // e.g., " from operator 0x1234..." or empty
reason: String, // e.g., "failed to get operator AVS state at block 12345: ..."
}
DuplicateTaskId
BlsAggregationServiceError::DuplicateTaskId {
task_id: TaskId,
reason: String, // e.g., "task already exists in task_channels (message #42)"
}
Benefits of Structured Errors:
- Complete Context: Every error includes
task_idand specificreasonfor immediate debugging - Operator Identification: Signature errors include
operator_idto identify problematic operators - Operation Context: Registry errors include
operator_contextwhen applicable - Detailed Reasons: Human-readable
reasonstrings explain exactly what went wrong and where - Structured Logging: Errors can be logged with full context using structured logging:
match result {
Err(BlsAggregationServiceError::TaskNotFound { task_id, reason }) => {
error!(
task_id = %task_id,
reason = %reason,
"Task not found - buffering signature for later processing"
);
}
Err(BlsAggregationServiceError::SignatureVerificationError {
task_id,
operator_id,
verification_error,
}) => {
error!(
task_id = %task_id,
operator_id = %hex!(operator_id.as_slice()),
verification_error = ?verification_error,
"Signature verification failed"
);
}
// ... other error variants
}
Error Isolation
Errors are isolated at multiple levels:
- Signature-level: Failed signature processing doesn't affect other signatures
- Task-level: Errors in one task don't affect other tasks
- Service-level: Panics in spawned tasks are caught and logged
Example: In process_pending_signatures_for_task(), failed signatures are dropped and logged, but processing continues for remaining signatures.
Graceful Degradation
The system degrades gracefully under failure:
- Invalid signatures: Logged and dropped, processing continues
- Missing operator state: Returns error, doesn't panic
- Channel closure: Detected gracefully, doesn't crash
- Memory pressure: Evicts oldest entries, continues operating
Performance Optimizations
Per-Task Channel Architecture with DashMap
The most significant performance improvements come from two architectural changes:
1. Per-Task Channels (eliminates response stealing):
- Each task gets its own
UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>> - Zero response stealing: Responses go directly to the correct task
- Natural cancellation via receiver drop
2. DashMap for Task States (eliminates lock contention):
- Lock-free concurrent access for different TaskIds
- Per-entry locking: Only conflicting operations on same TaskId block each other
- Scales linearly with number of concurrent tasks
Performance Comparison:
| Metric | Before (RwLock) | After (DashMap) | Improvement |
|---|---|---|---|
| Latency (10k tasks) | 5-500ms | 0.1-0.5ms | 10-1000x faster |
| Throughput | Limited by mutex | 10k+ concurrent | Linear scaling |
| Contention | High (global lock) | Zero (per-entry) | Eliminated |
| Response Stealing | Possible | Impossible | Guaranteed correctness |
Implementation:
// In initialize_task()
self.task_response_receivers.insert(task_id, response_receiver);
self.task_states.insert(task_id, TaskState::new(quorum_nums, broadcast_count));
// In wait_for_aggregation()
let receiver = self.task_response_receivers.remove(&task_id).map(|(_, v)| v)?;
// Receive directly from task-specific channel (no mutex lock)
match receiver.recv().await { ... }
// In process_signed_response()
if let Some(mut state) = self.task_states.get_mut(&task_id) {
state.task_responses.insert(digest, response);
}
Benefits:
- Scalability: Linear scaling with number of tasks (no contention)
- Latency: Constant-time response delivery regardless of concurrent load
- Isolation: One task's operations don't affect others
- Cancellation: Natural cancellation via receiver drop
- Memory: Automatic cleanup when receivers are dropped
Batch Operations
Batch insertion minimizes lock contention:
// Collect successful responses before lock acquisition
let mut successful_responses: Vec<(TaskResponseDigest, BindingTaskResponse)> = Vec::new();
for signature in signatures_to_process {
// Process signature...
if result.is_ok() {
successful_responses.push((digest, response));
}
}
// Single lock acquisition for batch insert
if !successful_responses.is_empty() {
let mut task_responses_map = task_responses.lock().await;
let task_entry = task_responses_map.entry(task_id).or_default();
for (digest, response) in successful_responses {
task_entry.entry(digest).or_insert(response);
}
}
Non-Blocking Reads
Read locks allow concurrent access:
// Multiple readers can check insertion order concurrently
let insertion_order_read = task_responses_insertion_order.read().await;
let oldest = insertion_order_read.iter().min_by_key(|(_, order)| *order);
Early Returns
Early returns avoid unnecessary work:
// Early return if no signatures to process
if signatures_to_process.is_empty() {
return;
}
// Early return if task already exists
if task_channels.contains_key(&task_id) {
result_sender.send(Err(BlsAggregationServiceError::DuplicateTaskId)).ok();
continue;
}
Data Flow Examples
Task Initialization Flow
initialize_task()called with task metadata- Input validation: Task ID, quorum numbers, threshold, timeout validated
- TaskMetadata created: Wraps parameters in structured type
- ServiceHandle cloned: Avoids holding lock during async operation
- InitializeTask message sent: Via
ServiceHandleto BLS service - BLS service receives message: In
run()loop - Task aggregator spawned:
single_task_aggregatortask created - Per-task channel created:
signature_txadded totask_channels - Pending signatures notified:
pending_notify_tx.send(task_id)triggers retry - Background loop processes: Buffered signatures for this task are processed
Signature Processing Flow
process_signed_response()called with signed response- Input validation: Task ID and operator ID validated
- Task response digest computed: Keccak256 hash of encoded response
- TaskSignature created: Wraps task_id, digest, signature, operator_id
- ServiceHandle cloned: Released before async call
process_signature()called: Sends to BLS service- If task not initialized:
- Signature buffered in
pending_signatures - Size limit checked (
MAX_PENDING_SIGNATURE_TASKS) - Returns success (signature will be processed later)
- Signature buffered in
- If task initialized:
- Signature sent to task aggregator via per-task channel
- BLS service verifies signature
- If valid: Stored in
task_responseswith size limit check - Returns success or error
Aggregation Flow (BLS Service)
- Signature received via per-task channel in
single_task_aggregator - Signature verification:
- Duplicate check (same operator, same digest)
- Cryptographic verification against operator public key
- Result sent to result channel
- If valid signature:
- Operator state looked up
update_aggregated_operators()called fortask_response_digest- Aggregation state updated (signature aggregated, stake added)
- Quorum check:
check_if_stake_thresholds_met()called- Checks if aggregated stake meets threshold for each quorum
- If quorum met:
- Aggregated response created (
BlsAggregationServiceResponse) - Response sent via
aggregated_response_sender - Window opened for additional signatures (
window_duration)
- Aggregated response created (
- Window handling:
- Additional signatures accepted during window
- Final aggregated response sent when window closes
- Task expiry:
- Timer expires after
time_to_expiry - Task state cleaned up
- Channel closed (detected in
ProcessSignaturehandling)
- Timer expires after
Edge Cases & Failure Modes
Signature Arrives Before Task Initialization
Scenario: Operator submits signature before initialize_task() is called.
Handling:
- Signature buffered in
pending_signaturesHashMap - Entry created with
created_attimestamp - When task initialized:
pending_notify_tx.send(task_id)notifies background loop - Background loop processes all buffered signatures for that task
- If task never initializes: Entry auto-removed after 5 seconds timeout
Memory Safety: Limited to MAX_PENDING_SIGNATURE_TASKS = 1000 tasks. New tasks rejected if at limit.
Memory Pressure
Scenario: System receives more tasks than memory limits allow.
Handling:
- Task responses: When
MAX_TASK_RESPONSESreached, oldest entry evicted (FIFO) - Pending signatures: When
MAX_PENDING_SIGNATURE_TASKSreached, new tasks rejected - BLS service tasks: When
MAX_ACTIVE_TASKSreached, oldest task evicted - Aggregated operators: When
MAX_AGGREGATED_OPERATORS_PER_TASKreached, oldest digest evicted
Logging: All evictions logged with warning level, including which entry was evicted and current size.
Task Expiry
Scenario: Task expires before quorum is reached.
Handling:
- Timer in
single_task_aggregatorexpires aftertime_to_expiry - Task state cleaned up
- Channel closed (receiver dropped)
- Subsequent
ProcessSignaturemessages detect channel closure - Entry removed from
task_channels - No panic, graceful cleanup
Duplicate Signatures
Scenario: Same operator submits multiple signatures for same task_response_digest.
Handling:
-
is_duplicate_signature()checks if operator already insigners_operator_ids_set -
If duplicate: Structured error sent to result channel:
BlsAggregationServiceError::SignatureVerificationError { task_id, operator_id: signed_digest.operator_id, verification_error: SignatureVerificationError::DuplicateSignature, } -
Signature not aggregated
-
Processing continues for other signatures
-
Error logged with full context (task_id, operator_id, verification_error type)
Note: Different operators can sign same digest (aggregated), but same operator cannot sign twice.
Channel Closure (Receiver Drop)
Scenario: Client times out or cancels request, receiver dropped.
Handling:
result_channel.send()returnsErrif receiver dropped- Error logged with warning (expected in timeout scenarios)
- Error not propagated (caller already cancelled)
- Processing continues normally
Example:
if signed_digest.result_channel.send(verification_result).is_err() {
warn!("Failed to send verification result (receiver dropped - likely timeout)");
return Ok(()); // Don't propagate error
}
Missing Operator Public Keys
Scenario: Operator state exists but public keys are None.
Handling:
-
update_aggregated_operators()checks forpub_keysexistence -
Returns
BlsAggregationServiceError::RegistryError { task_id, operator_context, reason }if missing -
Error logged with full context:
BlsAggregationServiceError::RegistryError { task_id, operator_context: format!(" from operator {}", hex!(operator_id.as_slice())), reason: "operator public keys not found in operator state".to_string(), } -
Signature not aggregated, but processing continues
Previous Issue: Used .unwrap() which would panic. Now properly handled with structured error return including task_id, operator_context, and detailed reason.
Configuration Constants
All configuration constants are defined in the respective modules:
AggregatorCore (core.rs)
-
MAX_PENDING_SIGNATURE_TASKS: usize = 1000- Maximum number of tasks that can have pending signatures
- Prevents unbounded memory growth if tasks never initialize
- New tasks rejected if at limit
-
MAX_TASK_RESPONSES: usize = 10000- Maximum number of tasks that can have stored responses
- Prevents unbounded memory growth
- FIFO eviction when limit reached
-
PENDING_SIGNATURE_TIMEOUT: Duration = Duration::from_secs(5)- Timeout for pending signatures before automatic removal
- Prevents permanent memory leaks if tasks never initialize
- Entries older than this are removed by cleanup task
-
PENDING_SIGNATURE_CLEANUP_INTERVAL: Duration = Duration::from_secs(10)- Interval for checking and cleaning up expired pending signatures
- Background task runs every 10 seconds
- Removes entries older than
PENDING_SIGNATURE_TIMEOUT
-
TASK_RESPONSES_CLEANUP_INTERVAL: Duration = Duration::from_secs(60)- Interval for checking and cleaning up stale task responses
- Background task runs every 60 seconds
- Evicts oldest entries if over
MAX_TASK_RESPONSESlimit
BLS Aggregation Service (bls.rs)
-
MAX_ACTIVE_TASKS: usize = 10000- Maximum number of active tasks allowed in
task_channels - Prevents memory leaks from unbounded task growth
- FIFO eviction when limit reached
- Maximum number of active tasks allowed in
-
MAX_AGGREGATED_OPERATORS_PER_TASK: usize = 100- Maximum number of different response digests per task in
aggregated_operators - Prevents memory bloat for tasks with many different responses
- If limit reached, new digests are ignored (logged as warning) - this scenario should never happen in practice
- Maximum number of different response digests per task in
Testing Considerations
Memory Leak Testing
Test scenarios:
- Run aggregator for extended period (24+ hours)
- Monitor memory usage over time
- Verify cleanup tasks are running
- Check that eviction is working when limits reached
Concurrency Testing
Test scenarios:
- Multiple tasks initialized simultaneously
- Signatures arriving concurrently for different tasks
- High signature throughput
- Verify task isolation (errors in one task don't affect others)
Error Injection Testing
Test scenarios:
- Invalid signatures
- Missing operator state
- Channel closure
- Task expiry before quorum
- Memory pressure (limits reached)
Performance Testing
Test scenarios:
- Throughput: Signatures processed per second
- Latency: Time from signature submission to aggregation
- Lock contention: Measure time spent waiting for locks
- Memory usage: Peak memory under load
Code Examples
Basic Usage
use newton_prover_aggregator::AggregatorCore;
use eigensdk::client_avsregistry::reader::AvsRegistryChainReader;
// Initialize aggregator
let aggregator = AggregatorCore::new(
avs_registry_reader,
operator_registry_address,
ws_rpc_url,
http_rpc_url,
).await?;
// Initialize a task
aggregator.initialize_task(
task_id,
task_created_block,
quorum_nums,
quorum_threshold_percentage,
time_to_expiry,
).await?;
// Process signed response from operator
aggregator.process_signed_response(signed_response).await?;
// Wait for aggregation (with timeout) - now requires task_id parameter
let aggregated_response = aggregator.wait_for_aggregation(task_id, timeout_duration).await?;
// Submit to contract
let receipt = aggregator.submit_aggregated_response(
&avs_writer,
task,
task_response,
aggregated_response,
).await?;
Error Handling
All errors include structured context for debugging. Pattern matching on error variants provides access to detailed information:
match aggregator.process_signed_response(signed_response).await {
Ok(()) => {
info!("Signature processed successfully");
}
Err(BlsAggregationServiceError::TaskNotFound { task_id, reason }) => {
warn!(
task_id = %task_id,
reason = %reason,
"Task not initialized yet - signature buffered for later processing"
);
// Signature will be processed when task is initialized
}
Err(BlsAggregationServiceError::SignatureVerificationError {
task_id,
operator_id,
verification_error,
}) => {
error!(
task_id = %task_id,
operator_id = %hex!(operator_id.as_slice()),
verification_error = ?verification_error,
"Signature verification failed"
);
}
Err(BlsAggregationServiceError::RegistryError {
task_id,
operator_context,
reason,
}) => {
error!(
task_id = %task_id,
operator_context = %operator_context,
reason = %reason,
"AVS registry error"
);
}
Err(e) => {
error!("Failed to process signature: {}", e);
// All errors include task_id and reason in their Display implementation
}
}
match aggregator.wait_for_aggregation(task_id, timeout_duration).await {
Ok(response) => {
info!("Aggregation successful: {} signers", response.non_signers_pub_keys_g1.len());
}
Err(AggregatorCoreError::TaskNotInitialized { task_id }) => {
warn!(task_id = %task_id, "Task not initialized - call initialize_task first");
}
Err(AggregatorCoreError::Timeout { duration_ms, timeout_ms, operator_errors }) => {
warn!(
task_id = %task_id,
duration_ms,
timeout_ms,
"Aggregation timed out after {} ms",
timeout_ms
);
if let Some(errors) = operator_errors {
warn!("Operator errors: {:?}", errors);
}
}
Err(AggregatorCoreError::AggregationServiceError(BlsAggregationServiceError::TaskExpired { task_id, reason })) => {
warn!(
task_id = %task_id,
reason = %reason,
"Task expired before aggregation completed"
);
}
Err(AggregatorCoreError::Cancelled) => {
warn!(task_id = %task_id, "Aggregation cancelled");
}
Err(e) => {
error!(task_id = %task_id, "Aggregation failed: {}", e);
}
}
Error Context Access:
- Pattern Matching: Destructure error variants to access
task_id,operator_id,reason, etc. - Display Implementation: All errors implement
Displaywith formatted context (e.g.,"task 123 expired: quorum not reached") - Structured Logging: Use error fields directly in logging macros for better observability
Customization Points
The aggregator can be customized by:
- Adjusting memory limits: Modify constants (
MAX_TASK_RESPONSES, etc.) based on expected load - Custom cleanup intervals: Adjust
PENDING_SIGNATURE_CLEANUP_INTERVALandTASK_RESPONSES_CLEANUP_INTERVAL - Error handling: All errors are logged with context, can be extended with custom error types
- Metrics: Integration points for metrics collection (see
newton_prover_metricsusage)
Performance & Scalability
Throughput & Latency
The per-task channel architecture provides significant performance improvements:
Metrics (under high concurrency with 10k+ concurrent tasks):
- Latency: ~0.1-0.5ms per response (constant time, independent of concurrent load)
- Previous approach: 5-500ms (increased with concurrent tasks due to mutex contention)
- Throughput: Supports 10k+ concurrent tasks efficiently
- Previous approach: Limited by mutex serialization
- Mutex Contention: Zero for response waiting (per-task channels)
- Previous approach: High contention on shared
AggregateReceivermutex
- Previous approach: High contention on shared
- Response Stealing: Zero (direct routing to task-specific channels)
- Previous approach: Responses could be consumed by wrong task's waiter
Scalability Characteristics:
- Linear scaling: Performance scales linearly with number of tasks
- Constant latency: Response delivery time is constant regardless of concurrent load
- No serialization bottlenecks: Each task operates independently
- Memory efficient: Receivers automatically cleaned up when tasks complete
Robustness & Resilience
Fault Tolerance:
- Isolated failures: One task's channel closure doesn't affect others
- Natural cancellation: Dropping receiver = cancellation, no explicit cleanup needed
- Graceful degradation: System continues operating even if individual tasks fail
- Resource cleanup: Automatic cleanup when receivers are dropped
Error Handling:
- Task-specific errors: Errors are guaranteed to be for the correct task_id (no cross-task error leakage)
- Structured errors: All errors include task_id, reason, and context for debugging
- Timeout handling: Per-task timeouts with operator error collection
- Cancellation support: Optional cancellation tokens for request cancellation
Memory Management:
- Automatic cleanup: Receivers removed from HashMap when
wait_for_aggregation()starts - No memory leaks: Receivers dropped when function returns
- Bounded growth: HashMap size limited by number of active tasks
- Efficient storage: Only active tasks have receivers stored
Summary
The Newton Prover Aggregator is designed for reliability, scalability, and high performance.
- Memory Safety: Bounded data structures with FIFO eviction prevent memory leaks
- Concurrency: Per-task channels eliminate mutex contention, enabling true concurrent processing
- Performance: ~0.1-0.5ms latency, supports 10k+ concurrent tasks efficiently
- Scalability: Linear scaling with constant latency regardless of concurrent load
- Error Isolation: Errors in one task don't affect others, task-specific error channels
- Robustness: Graceful handling of edge cases, natural cancellation, automatic cleanup
- Fault Tolerance: Isolated failures, graceful degradation, resource cleanup
- Observability: Comprehensive error logging with structured context (task_id, operator_id, reason) for debugging
- Error Enrichment: All errors include structured fields (task_id, reason, operator_context) for immediate debugging and investigation
The architecture separates concerns cleanly: AggregatorCore handles high-level orchestration while BlsAggregatorService handles low-level cryptographic aggregation. The per-task channel architecture eliminates bottlenecks and enables true concurrent processing of multiple aggregation tasks.
Dependencies
~140MB
~2.5M SLoC