Skip to main content

Deep Dive: Structured Concurrency

What you'll learn

How ClawDesk uses Tokio's structured concurrency primitives — CancellationToken hierarchies for graceful shutdown, JoinSet for parallel tool execution, and spawn_blocking for CPU-bound work. Includes comparison to Node.js and throughput analysis.

Why Structured Concurrency?

In a multi-channel AI gateway, concurrency is everywhere:

  • Multiple channels receiving messages simultaneously
  • Multiple pipeline stages processing in sequence (but with parallelizable substages)
  • Multiple tool calls from a single LLM response
  • Multiple provider attempts during fallback
  • Background tasks: cron jobs, health checks, metrics collection

Unstructured concurrency (fire-and-forget tokio::spawn) leads to:

  • Leaked tasks that outlive their parent
  • Uncancellable operations that block shutdown
  • Error propagation black holes
  • Resource leaks (connections, file handles)

Structured concurrency ensures: every spawned task has an owner, and when the owner dies, so do all its children.


CancellationToken Hierarchy

ClawDesk builds a tree of cancellation tokens that mirrors the runtime hierarchy:

Key Property

When a parent token is cancelled, all child tokens are automatically cancelled:

$$ \text{cancel}(\text{parent}) \Rightarrow \forall c \in \text{children}(\text{parent}): \text{cancel}(c) $$

This propagation is immediate and free — no polling, no callbacks, no manual plumbing.

Implementation

/// In clawdesk-runtime/src/lifecycle.rs

use tokio_util::sync::CancellationToken;

pub struct Runtime {
/// Root cancellation token — cancelling this shuts down everything.
root_token: CancellationToken,
}

impl Runtime {
pub fn new() -> Self {
Self {
root_token: CancellationToken::new(),
}
}

/// Create a child token for a subsystem.
pub fn subsystem_token(&self) -> CancellationToken {
self.root_token.child_token()
}

/// Graceful shutdown: cancel root, wait for all children.
pub async fn shutdown(&self) {
tracing::info!("Initiating graceful shutdown");
self.root_token.cancel();
// All child tokens are now cancelled.
// Tasks watching these tokens will wind down.
}
}

Usage in Channels

/// In clawdesk-channels/src/telegram/mod.rs

pub async fn start(
&self,
processor: Arc<MessageProcessor>,
parent_token: CancellationToken,
) -> Result<(), ChannelError> {
// Create a child token for this channel
let channel_token = parent_token.child_token();

// Spawn the webhook listener
let listener_token = channel_token.child_token();
tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(listener_token.cancelled_owned())
.await
.ok();
});

// Spawn the polling loop (for updates/health)
let poll_token = channel_token.child_token();
tokio::spawn(async move {
loop {
tokio::select! {
_ = poll_token.cancelled() => {
tracing::info!("Telegram polling stopped");
break;
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
// Health check
}
}
}
});

Ok(())
}

Cancellation in the Agent Pipeline

/// In clawdesk-agents/src/pipeline/mod.rs

pub async fn run_pipeline(
msg: NormalizedMessage,
ctx: PipelineContext,
cancel: CancellationToken,
) -> Result<OutboundMessage, PipelineError> {
// Each stage checks cancellation
let auth = cancel.run_until_cancelled(
auth_resolve(&msg, &ctx.security)
).await.ok_or(PipelineError::Cancelled)?;

let history = cancel.run_until_cancelled(
history_sanitize(&msg, &ctx.memory, &auth)
).await.ok_or(PipelineError::Cancelled)?;

let guarded = context_guard(&msg, &history, &auth).await?;

// Tool execution is parallelized with JoinSet (see below)
let skills = cancel.run_until_cancelled(
tool_split(&msg, &ctx.skills, guarded.remaining_tokens())
).await.ok_or(PipelineError::Cancelled)?;

let response = cancel.run_until_cancelled(
execute(build_request(&msg, &history, &skills), &ctx.providers, cancel.clone())
).await.ok_or(PipelineError::Cancelled)?;

failover_decide(response, &mut ctx.fsm, &ctx.providers, &request).await
}

JoinSet for Parallel Tool Execution

When an LLM returns multiple tool calls, ClawDesk executes them in parallel using Tokio's JoinSet:

/// In clawdesk-agents/src/tools/executor.rs

use tokio::task::JoinSet;

pub async fn execute_tools(
tool_calls: Vec<ToolCall>,
registry: &ToolRegistry,
cancel: CancellationToken,
) -> Vec<ToolResult> {
let mut join_set = JoinSet::new();

// Spawn all tool executions in parallel
for call in tool_calls {
let registry = registry.clone();
let token = cancel.child_token();

join_set.spawn(async move {
let tool = registry.get(&call.name)?;
tokio::select! {
result = tool.execute(call.arguments, &ToolContext::new()) => {
result
}
_ = token.cancelled() => {
Err(ToolError::Cancelled)
}
}
});
}

// Collect results as they complete
let mut results = Vec::with_capacity(join_set.len());
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(tool_result)) => results.push(tool_result),
Ok(Err(tool_error)) => {
tracing::warn!(error = %tool_error, "Tool execution failed");
results.push(ToolResult::Error(tool_error.to_string()));
}
Err(join_error) => {
tracing::error!(error = %join_error, "Tool task panicked");
results.push(ToolResult::Error("Internal error".to_string()));
}
}
}

results
}

Why JoinSet, Not Just tokio::spawn?

Featuretokio::spawnJoinSet
Ownership of tasksFire-and-forgetOwned by the set
Wait for allManual trackingjoin_next() loop
Cancel allManual token per taskabort_all() or drop
Collect resultsManual channelBuilt-in
Task countUnknown.len()

spawn_blocking for CPU-Bound Work

Some operations in ClawDesk are CPU-bound and would block the async runtime if run on a Tokio worker thread:

  • Token counting / estimation
  • Regex-based content scanning (security layer)
  • TOON format serialization of large contexts
  • Cryptographic operations (HMAC signature verification)
/// In clawdesk-security/src/scanner.rs

pub async fn scan_content(
content: &str,
rules: &[ScanRule],
) -> Result<ScanResult, SecurityError> {
let content = content.to_string();
let rules = rules.to_vec();

// Move CPU-bound regex scanning to the blocking threadpool
tokio::task::spawn_blocking(move || {
let mut findings = Vec::new();

for rule in &rules {
// Regex compilation + matching is CPU-bound
let re = regex::Regex::new(&rule.pattern)
.map_err(|e| SecurityError::InvalidRule(e.to_string()))?;

for mat in re.find_iter(&content) {
findings.push(ScanFinding {
rule_id: rule.id.clone(),
matched_text: mat.as_str().to_string(),
position: mat.start()..mat.end(),
severity: rule.severity,
});
}
}

Ok(ScanResult {
passed: findings.is_empty(),
findings,
})
})
.await
.map_err(|e| SecurityError::Internal(e.to_string()))?
}

When to Use spawn_blocking

OperationBlocking?Use spawn_blocking?
HTTP requestNo (async I/O)
Database queryNo (async I/O)
Regex matching (large text)Yes (CPU)
Token estimationYes (CPU)✅ (if > 10KB text)
JSON serializationDepends✅ (if > 1MB)
File I/OYes (syscalls)
Cryptographic hashingYes (CPU)

Rule of thumb: If an operation takes >1ms of CPU time, move it to spawn_blocking.


Comparison to Node.js

AbortSignal Manual Plumbing

In Node.js (as used in OpenClaw), cancellation requires manual AbortController/AbortSignal plumbing:

// ❌ Node.js approach — manual plumbing everywhere
async function runPipeline(message, parentSignal) {
// Must create a new controller AND wire up parent signal
const controller = new AbortController();
const signal = controller.signal;

// Manual: if parent aborts, abort this controller
parentSignal.addEventListener('abort', () => controller.abort());

try {
// Must pass signal to EVERY async operation
const auth = await authResolve(message, { signal });
const history = await fetchHistory(message, { signal });

// For parallel operations, must create ANOTHER controller
const toolController = new AbortController();
signal.addEventListener('abort', () => toolController.abort());

const results = await Promise.all(
toolCalls.map(call =>
executeTool(call, { signal: toolController.signal })
)
);
} catch (err) {
if (err.name === 'AbortError') {
// Manual: must catch and re-throw as your own error type
throw new PipelineError('Cancelled');
}
throw err;
}
}

Problems:

  1. Every function needs an explicit signal parameter
  2. Parent-child wiring is manual and error-prone
  3. Forgetting signal in one call = uncancellable operation
  4. No compiler checks for missing signal propagation

ClawDesk's Approach

// ✅ Rust/Tokio approach — automatic propagation
async fn run_pipeline(
msg: NormalizedMessage,
ctx: PipelineContext,
cancel: CancellationToken, // One token covers everything
) -> Result<OutboundMessage, PipelineError> {
// child_token() creates linked children automatically
let auth = auth_resolve(&msg, &ctx.security).await?;
// No need to pass cancel to every function —
// child tasks inherit via child_token()

let mut tools = JoinSet::new();
for call in tool_calls {
let token = cancel.child_token(); // Linked to parent
tools.spawn(async move {
tokio::select! {
r = execute(call) => r,
_ = token.cancelled() => Err(ToolError::Cancelled),
}
});
}
// When `cancel` is cancelled, ALL child tokens fire automatically
}

Comparison Table

PropertyNode.js (AbortSignal)Tokio (CancellationToken)
HierarchyManual wiringchild_token() automatic
PropagationEvent listener per levelInstant, zero-cost
Scope enforcementNone (runtime errors)Ownership (compile-time)
Parallel cancellationManual per-PromiseJoinSet::abort_all()
Forgetting to propagateSilent bugCompiler warning (unused)
Thread safetyN/A (single-threaded)Send + Sync enforced

Throughput Analysis

Sequential vs. Parallel Tool Execution

With $k$ tool calls, each taking average time $t$:

Sequential:

$$ T_{\text{seq}} = \sum_{i=1}^{k} t_i = k \cdot \bar{t} $$

Parallel (JoinSet):

$$ T_{\text{par}} = \max_{i=1}^{k} t_i \approx \bar{t} + \sigma \cdot \sqrt{2 \ln k} $$

(By extreme value theory for independent tasks.)

Speedup:

$$ S = \frac{T_{\text{seq}}}{T_{\text{par}}} = \frac{k \cdot \bar{t}}{\bar{t} + \sigma \sqrt{2 \ln k}} \approx k \quad \text{(for large } k \text{ with similar } t_i \text{)} $$

For a typical case with 3 tool calls at ~200ms each:

$$ T_{\text{seq}} = 600\text{ms}, \quad T_{\text{par}} \approx 220\text{ms}, \quad S \approx 2.7\times $$

Multi-Channel Throughput

With $n$ channels, each handling $m$ messages/second:

Single-threaded (Node.js event loop):

$$ \text{Throughput} = \frac{1}{t_{\text{pipeline}}} \quad \text{(limited by one core)} $$

Multi-threaded (Tokio, $w$ worker threads):

$$ \text{Throughput} = \frac{w}{t_{\text{pipeline}}} = w \times \text{single-threaded} $$

With 8 Tokio worker threads and a 500ms average pipeline time:

MetricNode.js (1 thread)Tokio (8 threads)Improvement
Max concurrent pipelines1 (with async I/O wait)8 (truly parallel)
Pipeline throughput~2/sec~16/sec
Tool execution (3 tools)600ms seq200ms par
Combined2/sec~48/sec~24×

Structured Concurrency Patterns in ClawDesk

Pattern 1: Scoped Tasks

/// Task lifetime is bounded by the scope
async fn process_message(msg: NormalizedMessage, cancel: CancellationToken) {
let child = cancel.child_token();

// This task dies when `child` (and therefore `cancel`) is cancelled
let handle = tokio::spawn(async move {
tokio::select! {
result = do_work() => result,
_ = child.cancelled() => Err(Error::Cancelled),
}
});

// Wait for completion or cancellation
handle.await.ok();
}
// `cancel` goes out of scope → child token is dropped → no leaks

Pattern 2: Fan-Out / Fan-In

/// Execute N operations in parallel, collect all results
async fn fan_out_fan_in<T, F, Fut>(
items: Vec<T>,
f: F,
cancel: CancellationToken,
) -> Vec<Result<T::Output, Error>>
where
F: Fn(T, CancellationToken) -> Fut,
Fut: Future<Output = Result<T::Output, Error>> + Send + 'static,
{
let mut set = JoinSet::new();

for item in items {
let token = cancel.child_token();
set.spawn(f(item, token));
}

let mut results = Vec::new();
while let Some(r) = set.join_next().await {
results.push(r.unwrap_or(Err(Error::Panicked)));
}

results
}

Pattern 3: Graceful Shutdown Cascade

/// Application shutdown — cascades through the entire hierarchy
pub async fn shutdown(runtime: &Runtime) {
// 1. Cancel root token — all children receive cancellation
runtime.root_token.cancel();

// 2. Wait for graceful shutdown with timeout
tokio::select! {
_ = runtime.join_all_tasks() => {
tracing::info!("All tasks shut down gracefully");
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
tracing::warn!("Shutdown timeout — forcing exit");
}
}
}

Key Takeaways

  1. CancellationToken::child_token() creates a linked hierarchy — parent cancellation propagates automatically
  2. JoinSet owns spawned tasks and provides structured collection of results
  3. spawn_blocking offloads CPU-bound work to a separate threadpool, keeping async workers free
  4. Structured concurrency prevents resource leaks — every task has an owner
  5. vs. Node.js: automatic propagation replaces manual AbortSignal plumbing, eliminating an entire class of bugs
  6. Throughput gains: true parallelism (multi-core) + parallel tool execution = ~24× improvement over single-threaded

Further Reading