Deep Dive: Structured Concurrency
How ClawDesk uses Tokio's structured concurrency primitives — CancellationToken hierarchies for graceful shutdown, JoinSet for parallel tool execution, and spawn_blocking for CPU-bound work. Includes comparison to Node.js and throughput analysis.
Why Structured Concurrency?
In a multi-channel AI gateway, concurrency is everywhere:
- Multiple channels receiving messages simultaneously
- Multiple pipeline stages processing in sequence (but with parallelizable substages)
- Multiple tool calls from a single LLM response
- Multiple provider attempts during fallback
- Background tasks: cron jobs, health checks, metrics collection
Unstructured concurrency (fire-and-forget tokio::spawn) leads to:
- Leaked tasks that outlive their parent
- Uncancellable operations that block shutdown
- Error propagation black holes
- Resource leaks (connections, file handles)
Structured concurrency ensures: every spawned task has an owner, and when the owner dies, so do all its children.
CancellationToken Hierarchy
ClawDesk builds a tree of cancellation tokens that mirrors the runtime hierarchy:
Key Property
When a parent token is cancelled, all child tokens are automatically cancelled:
$$ \text{cancel}(\text{parent}) \Rightarrow \forall c \in \text{children}(\text{parent}): \text{cancel}(c) $$
This propagation is immediate and free — no polling, no callbacks, no manual plumbing.
Implementation
/// In clawdesk-runtime/src/lifecycle.rs
use tokio_util::sync::CancellationToken;
pub struct Runtime {
/// Root cancellation token — cancelling this shuts down everything.
root_token: CancellationToken,
}
impl Runtime {
pub fn new() -> Self {
Self {
root_token: CancellationToken::new(),
}
}
/// Create a child token for a subsystem.
pub fn subsystem_token(&self) -> CancellationToken {
self.root_token.child_token()
}
/// Graceful shutdown: cancel root, wait for all children.
pub async fn shutdown(&self) {
tracing::info!("Initiating graceful shutdown");
self.root_token.cancel();
// All child tokens are now cancelled.
// Tasks watching these tokens will wind down.
}
}
Usage in Channels
/// In clawdesk-channels/src/telegram/mod.rs
pub async fn start(
&self,
processor: Arc<MessageProcessor>,
parent_token: CancellationToken,
) -> Result<(), ChannelError> {
// Create a child token for this channel
let channel_token = parent_token.child_token();
// Spawn the webhook listener
let listener_token = channel_token.child_token();
tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(listener_token.cancelled_owned())
.await
.ok();
});
// Spawn the polling loop (for updates/health)
let poll_token = channel_token.child_token();
tokio::spawn(async move {
loop {
tokio::select! {
_ = poll_token.cancelled() => {
tracing::info!("Telegram polling stopped");
break;
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
// Health check
}
}
}
});
Ok(())
}
Cancellation in the Agent Pipeline
/// In clawdesk-agents/src/pipeline/mod.rs
pub async fn run_pipeline(
msg: NormalizedMessage,
ctx: PipelineContext,
cancel: CancellationToken,
) -> Result<OutboundMessage, PipelineError> {
// Each stage checks cancellation
let auth = cancel.run_until_cancelled(
auth_resolve(&msg, &ctx.security)
).await.ok_or(PipelineError::Cancelled)?;
let history = cancel.run_until_cancelled(
history_sanitize(&msg, &ctx.memory, &auth)
).await.ok_or(PipelineError::Cancelled)?;
let guarded = context_guard(&msg, &history, &auth).await?;
// Tool execution is parallelized with JoinSet (see below)
let skills = cancel.run_until_cancelled(
tool_split(&msg, &ctx.skills, guarded.remaining_tokens())
).await.ok_or(PipelineError::Cancelled)?;
let response = cancel.run_until_cancelled(
execute(build_request(&msg, &history, &skills), &ctx.providers, cancel.clone())
).await.ok_or(PipelineError::Cancelled)?;
failover_decide(response, &mut ctx.fsm, &ctx.providers, &request).await
}
JoinSet for Parallel Tool Execution
When an LLM returns multiple tool calls, ClawDesk executes them in parallel using Tokio's JoinSet:
/// In clawdesk-agents/src/tools/executor.rs
use tokio::task::JoinSet;
pub async fn execute_tools(
tool_calls: Vec<ToolCall>,
registry: &ToolRegistry,
cancel: CancellationToken,
) -> Vec<ToolResult> {
let mut join_set = JoinSet::new();
// Spawn all tool executions in parallel
for call in tool_calls {
let registry = registry.clone();
let token = cancel.child_token();
join_set.spawn(async move {
let tool = registry.get(&call.name)?;
tokio::select! {
result = tool.execute(call.arguments, &ToolContext::new()) => {
result
}
_ = token.cancelled() => {
Err(ToolError::Cancelled)
}
}
});
}
// Collect results as they complete
let mut results = Vec::with_capacity(join_set.len());
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(tool_result)) => results.push(tool_result),
Ok(Err(tool_error)) => {
tracing::warn!(error = %tool_error, "Tool execution failed");
results.push(ToolResult::Error(tool_error.to_string()));
}
Err(join_error) => {
tracing::error!(error = %join_error, "Tool task panicked");
results.push(ToolResult::Error("Internal error".to_string()));
}
}
}
results
}
Why JoinSet, Not Just tokio::spawn?
| Feature | tokio::spawn | JoinSet |
|---|---|---|
| Ownership of tasks | Fire-and-forget | Owned by the set |
| Wait for all | Manual tracking | join_next() loop |
| Cancel all | Manual token per task | abort_all() or drop |
| Collect results | Manual channel | Built-in |
| Task count | Unknown | .len() |
spawn_blocking for CPU-Bound Work
Some operations in ClawDesk are CPU-bound and would block the async runtime if run on a Tokio worker thread:
- Token counting / estimation
- Regex-based content scanning (security layer)
- TOON format serialization of large contexts
- Cryptographic operations (HMAC signature verification)
/// In clawdesk-security/src/scanner.rs
pub async fn scan_content(
content: &str,
rules: &[ScanRule],
) -> Result<ScanResult, SecurityError> {
let content = content.to_string();
let rules = rules.to_vec();
// Move CPU-bound regex scanning to the blocking threadpool
tokio::task::spawn_blocking(move || {
let mut findings = Vec::new();
for rule in &rules {
// Regex compilation + matching is CPU-bound
let re = regex::Regex::new(&rule.pattern)
.map_err(|e| SecurityError::InvalidRule(e.to_string()))?;
for mat in re.find_iter(&content) {
findings.push(ScanFinding {
rule_id: rule.id.clone(),
matched_text: mat.as_str().to_string(),
position: mat.start()..mat.end(),
severity: rule.severity,
});
}
}
Ok(ScanResult {
passed: findings.is_empty(),
findings,
})
})
.await
.map_err(|e| SecurityError::Internal(e.to_string()))?
}
When to Use spawn_blocking
| Operation | Blocking? | Use spawn_blocking? |
|---|---|---|
| HTTP request | No (async I/O) | ❌ |
| Database query | No (async I/O) | ❌ |
| Regex matching (large text) | Yes (CPU) | ✅ |
| Token estimation | Yes (CPU) | ✅ (if > 10KB text) |
| JSON serialization | Depends | ✅ (if > 1MB) |
| File I/O | Yes (syscalls) | ✅ |
| Cryptographic hashing | Yes (CPU) | ✅ |
Rule of thumb: If an operation takes >1ms of CPU time, move it to spawn_blocking.
Comparison to Node.js
AbortSignal Manual Plumbing
In Node.js (as used in OpenClaw), cancellation requires manual AbortController/AbortSignal plumbing:
// ❌ Node.js approach — manual plumbing everywhere
async function runPipeline(message, parentSignal) {
// Must create a new controller AND wire up parent signal
const controller = new AbortController();
const signal = controller.signal;
// Manual: if parent aborts, abort this controller
parentSignal.addEventListener('abort', () => controller.abort());
try {
// Must pass signal to EVERY async operation
const auth = await authResolve(message, { signal });
const history = await fetchHistory(message, { signal });
// For parallel operations, must create ANOTHER controller
const toolController = new AbortController();
signal.addEventListener('abort', () => toolController.abort());
const results = await Promise.all(
toolCalls.map(call =>
executeTool(call, { signal: toolController.signal })
)
);
} catch (err) {
if (err.name === 'AbortError') {
// Manual: must catch and re-throw as your own error type
throw new PipelineError('Cancelled');
}
throw err;
}
}
Problems:
- Every function needs an explicit
signalparameter - Parent-child wiring is manual and error-prone
- Forgetting
signalin one call = uncancellable operation - No compiler checks for missing signal propagation
ClawDesk's Approach
// ✅ Rust/Tokio approach — automatic propagation
async fn run_pipeline(
msg: NormalizedMessage,
ctx: PipelineContext,
cancel: CancellationToken, // One token covers everything
) -> Result<OutboundMessage, PipelineError> {
// child_token() creates linked children automatically
let auth = auth_resolve(&msg, &ctx.security).await?;
// No need to pass cancel to every function —
// child tasks inherit via child_token()
let mut tools = JoinSet::new();
for call in tool_calls {
let token = cancel.child_token(); // Linked to parent
tools.spawn(async move {
tokio::select! {
r = execute(call) => r,
_ = token.cancelled() => Err(ToolError::Cancelled),
}
});
}
// When `cancel` is cancelled, ALL child tokens fire automatically
}
Comparison Table
| Property | Node.js (AbortSignal) | Tokio (CancellationToken) |
|---|---|---|
| Hierarchy | Manual wiring | child_token() automatic |
| Propagation | Event listener per level | Instant, zero-cost |
| Scope enforcement | None (runtime errors) | Ownership (compile-time) |
| Parallel cancellation | Manual per-Promise | JoinSet::abort_all() |
| Forgetting to propagate | Silent bug | Compiler warning (unused) |
| Thread safety | N/A (single-threaded) | Send + Sync enforced |
Throughput Analysis
Sequential vs. Parallel Tool Execution
With $k$ tool calls, each taking average time $t$:
Sequential:
$$ T_{\text{seq}} = \sum_{i=1}^{k} t_i = k \cdot \bar{t} $$
Parallel (JoinSet):
$$ T_{\text{par}} = \max_{i=1}^{k} t_i \approx \bar{t} + \sigma \cdot \sqrt{2 \ln k} $$
(By extreme value theory for independent tasks.)
Speedup:
$$ S = \frac{T_{\text{seq}}}{T_{\text{par}}} = \frac{k \cdot \bar{t}}{\bar{t} + \sigma \sqrt{2 \ln k}} \approx k \quad \text{(for large } k \text{ with similar } t_i \text{)} $$
For a typical case with 3 tool calls at ~200ms each:
$$ T_{\text{seq}} = 600\text{ms}, \quad T_{\text{par}} \approx 220\text{ms}, \quad S \approx 2.7\times $$
Multi-Channel Throughput
With $n$ channels, each handling $m$ messages/second:
Single-threaded (Node.js event loop):
$$ \text{Throughput} = \frac{1}{t_{\text{pipeline}}} \quad \text{(limited by one core)} $$
Multi-threaded (Tokio, $w$ worker threads):
$$ \text{Throughput} = \frac{w}{t_{\text{pipeline}}} = w \times \text{single-threaded} $$
With 8 Tokio worker threads and a 500ms average pipeline time:
| Metric | Node.js (1 thread) | Tokio (8 threads) | Improvement |
|---|---|---|---|
| Max concurrent pipelines | 1 (with async I/O wait) | 8 (truly parallel) | 8× |
| Pipeline throughput | ~2/sec | ~16/sec | 8× |
| Tool execution (3 tools) | 600ms seq | 200ms par | 3× |
| Combined | 2/sec | ~48/sec | ~24× |
Structured Concurrency Patterns in ClawDesk
Pattern 1: Scoped Tasks
/// Task lifetime is bounded by the scope
async fn process_message(msg: NormalizedMessage, cancel: CancellationToken) {
let child = cancel.child_token();
// This task dies when `child` (and therefore `cancel`) is cancelled
let handle = tokio::spawn(async move {
tokio::select! {
result = do_work() => result,
_ = child.cancelled() => Err(Error::Cancelled),
}
});
// Wait for completion or cancellation
handle.await.ok();
}
// `cancel` goes out of scope → child token is dropped → no leaks
Pattern 2: Fan-Out / Fan-In
/// Execute N operations in parallel, collect all results
async fn fan_out_fan_in<T, F, Fut>(
items: Vec<T>,
f: F,
cancel: CancellationToken,
) -> Vec<Result<T::Output, Error>>
where
F: Fn(T, CancellationToken) -> Fut,
Fut: Future<Output = Result<T::Output, Error>> + Send + 'static,
{
let mut set = JoinSet::new();
for item in items {
let token = cancel.child_token();
set.spawn(f(item, token));
}
let mut results = Vec::new();
while let Some(r) = set.join_next().await {
results.push(r.unwrap_or(Err(Error::Panicked)));
}
results
}
Pattern 3: Graceful Shutdown Cascade
/// Application shutdown — cascades through the entire hierarchy
pub async fn shutdown(runtime: &Runtime) {
// 1. Cancel root token — all children receive cancellation
runtime.root_token.cancel();
// 2. Wait for graceful shutdown with timeout
tokio::select! {
_ = runtime.join_all_tasks() => {
tracing::info!("All tasks shut down gracefully");
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
tracing::warn!("Shutdown timeout — forcing exit");
}
}
}
Key Takeaways
CancellationToken::child_token()creates a linked hierarchy — parent cancellation propagates automaticallyJoinSetowns spawned tasks and provides structured collection of resultsspawn_blockingoffloads CPU-bound work to a separate threadpool, keeping async workers free- Structured concurrency prevents resource leaks — every task has an owner
- vs. Node.js: automatic propagation replaces manual
AbortSignalplumbing, eliminating an entire class of bugs - Throughput gains: true parallelism (multi-core) + parallel tool execution = ~24× improvement over single-threaded
Further Reading
- Concurrency Model Architecture — full concurrency reference
- Fallback FSM — how cancellation interacts with the state machine
- Message Flow Tutorial — see concurrency in the pipeline