Concurrency Model
ClawDesk runs on a single Tokio runtime with structured concurrency enforced through CancellationToken hierarchies and JoinSet scoping. Every spawned task is owned, tracked, and cancellable—there are no fire-and-forget spawns.
Runtime Architecture
Runtime Configuration
// crates/clawdesk-cli/src/main.rs
fn main() -> Result<(), ClawDeskError> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(num_cpus::get())
.max_blocking_threads(32)
.thread_name("clawdesk-worker")
.enable_all()
.build()?;
runtime.block_on(async_main())
}
async fn async_main() -> Result<(), ClawDeskError> {
// Single root cancellation token for the entire application
let root_token = CancellationToken::new();
// Install signal handlers
let shutdown_token = root_token.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
tracing::info!("shutdown signal received");
shutdown_token.cancel();
});
// Bootstrap and run
let result = run_application(root_token.clone()).await;
// Ensure all children are cancelled
root_token.cancel();
result
}
CancellationToken Hierarchy
ClawDesk uses a tree of CancellationTokens that mirrors the logical task hierarchy. Cancelling a parent token automatically cancels all children:
/// Structured cancellation token tree.
/// Child tokens are cancelled when their parent is cancelled.
pub struct TokenTree {
root: CancellationToken,
}
impl TokenTree {
pub fn new() -> Self {
Self {
root: CancellationToken::new(),
}
}
/// Create a child token scoped to a subsystem.
pub fn child(&self, _name: &str) -> CancellationToken {
self.root.child_token()
}
/// Shut down everything.
pub fn shutdown(&self) {
self.root.cancel();
}
/// Check if shutdown has been requested.
pub fn is_shutting_down(&self) -> bool {
self.root.is_cancelled()
}
}
Propagation Semantics
| Action | Effect |
|---|---|
| Cancel root token | All gateway, channel, cron, watcher tasks terminate |
| Cancel channels token | All channel adapters stop; gateway and cron continue |
| Cancel a single channel | Only that channel stops; sibling channels continue |
| Cancel gateway token | HTTP server stops; channels continue receiving (buffered) |
When a CancellationToken is cancelled, tasks don't abort immediately. Each task checks token.is_cancelled() or uses tokio::select! with token.cancelled() to perform cleanup before exiting.
JoinSet Scoping
JoinSet provides structured task ownership — when a JoinSet is dropped, all tasks in it are aborted. This prevents task leaks:
use tokio::task::JoinSet;
/// Manages all channel adapter tasks.
pub struct ChannelManager {
tasks: JoinSet<Result<(), ChannelError>>,
token: CancellationToken,
}
impl ChannelManager {
pub fn new(token: CancellationToken) -> Self {
Self {
tasks: JoinSet::new(),
token,
}
}
/// Start a channel adapter as a managed task.
pub fn spawn_channel(&mut self, mut adapter: Box<dyn Channel>) {
let token = self.token.child_token();
self.tasks.spawn(async move {
adapter.start().await?;
// Run until cancelled
token.cancelled().await;
// Graceful shutdown
adapter.stop().await?;
Ok(())
});
}
/// Wait for all channels to complete.
pub async fn join_all(&mut self) -> Vec<Result<(), ChannelError>> {
let mut results = Vec::new();
while let Some(result) = self.tasks.join_next().await {
match result {
Ok(inner) => results.push(inner),
Err(e) if e.is_cancelled() => {
// Expected during shutdown
}
Err(e) => {
tracing::error!("channel task panicked: {e}");
results.push(Err(ChannelError::TaskPanicked));
}
}
}
results
}
}
// When ChannelManager is dropped, all channel tasks are aborted
// This is the "structured" part of structured concurrency
The select! Pattern
ClawDesk uses tokio::select! extensively to race multiple futures, always including a cancellation branch:
/// Process messages from a channel with cancellation support.
async fn channel_loop(
adapter: &dyn Channel,
pipeline: &AgentPipeline,
rx: &mut mpsc::Receiver<InboundMessage>,
token: CancellationToken,
) -> Result<(), ChannelError> {
loop {
tokio::select! {
// Branch 1: Process incoming message
Some(msg) = rx.recv() => {
let normalized = NormalizedMessage::from(msg);
match pipeline.process(normalized).await {
Ok(response) => {
adapter.send(&response).await?;
}
Err(e) => {
tracing::error!("pipeline error: {e}");
adapter.send(&error_response(e)).await?;
}
}
}
// Branch 2: Cancellation
_ = token.cancelled() => {
tracing::info!(
channel = %adapter.id(),
"shutting down channel loop"
);
break;
}
// Branch 3: Channel closed
else => {
tracing::warn!(
channel = %adapter.id(),
"channel receiver closed"
);
break;
}
}
}
Ok(())
}
Every select! branch in ClawDesk includes token.cancelled(). Omitting this would create a task that cannot be shut down gracefully. This invariant is enforced by code review and clippy lints.
Backpressure Model
ClawDesk uses bounded channels (tokio::sync::mpsc) for inter-task communication, providing inherent backpressure:
/// Channel buffer sizes are tuned per subsystem.
pub struct BufferConfig {
/// Inbound message buffer per channel adapter
pub channel_buffer: usize, // default: 1024
/// Pipeline request buffer
pub pipeline_buffer: usize, // default: 256
/// Provider response buffer (streaming)
pub stream_buffer: usize, // default: 64
/// Audit log buffer
pub audit_buffer: usize, // default: 4096
}
Backpressure Analysis
When the pipeline is slower than the incoming message rate:
$$ \text{If } \lambda_{\text{input}} > \mu_{\text{pipeline}}: \text{buffer fills in } T = \frac{B}{\lambda_{\text{input}} - \mu_{\text{pipeline}}} $$
Where:
- $\lambda_{\text{input}}$ = incoming message rate (msg/sec)
- $\mu_{\text{pipeline}}$ = pipeline processing rate (msg/sec)
- $B$ = buffer size (messages)
- $T$ = time until backpressure engages (seconds)
With $B = 1024$, $\lambda = 100$ msg/s, and $\mu = 80$ msg/s: $T = \frac{1024}{20} = 51.2s$ before send().await begins blocking producers.
spawn_blocking Boundaries
CPU-intensive or blocking operations are offloaded to the blocking thread pool:
/// Operations that use spawn_blocking in ClawDesk:
///
/// 1. Cryptographic operations (hashing, signing)
/// 2. SochDB compaction
/// 3. Content scanning (regex + AST parsing)
/// 4. Media transcoding
/// 5. Plugin sandbox execution
/// 6. BM25 index building
// Example: Content scanning in the security layer
pub async fn scan_content(
content: &str,
scanner: Arc<CascadeScanner>,
) -> Result<ScanResult, SecurityError> {
let content = content.to_owned();
// Move CPU-intensive scanning to blocking pool
tokio::task::spawn_blocking(move || {
scanner.scan(&content)
})
.await
.map_err(|e| SecurityError::ScanTaskFailed(e.to_string()))?
}
// Example: SochDB compaction
pub async fn compact_database(db: Database) -> Result<CompactionStats, StorageError> {
tokio::task::spawn_blocking(move || {
db.compact()
})
.await
.map_err(|e| StorageError::CompactionFailed(e.to_string()))?
}
Never perform blocking I/O or CPU-intensive computation on Tokio worker threads. This starves other tasks and degrades latency. Use spawn_blocking for any operation that takes > 1ms.
Timeout Patterns
All external calls have explicit timeouts:
/// Execute with a timeout, returning a typed error on expiration.
pub async fn with_timeout<T, E>(
duration: Duration,
future: impl Future<Output = Result<T, E>>,
) -> Result<T, ClawDeskError>
where
E: Into<ClawDeskError>,
{
match tokio::time::timeout(duration, future).await {
Ok(Ok(value)) => Ok(value),
Ok(Err(e)) => Err(e.into()),
Err(_) => Err(ClawDeskError::Timeout {
operation: std::any::type_name::<T>().to_string(),
duration,
}),
}
}
// Usage in provider calls
let response = with_timeout(
Duration::from_secs(30),
provider.chat(&request),
).await?;
Concurrency Patterns Summary
| Pattern | Mechanism | Used For |
|---|---|---|
| Structured ownership | JoinSet | Channel tasks, pipeline workers |
| Graceful cancellation | CancellationToken tree | Shutdown propagation |
| Backpressure | Bounded mpsc channels | Inter-task communication |
| CPU offloading | spawn_blocking | Crypto, scanning, compaction |
| Race with cancel | tokio::select! | All event loops |
| Timeout | tokio::time::timeout | External calls |
| State sharing | Arc<T>, ArcSwap<T> | Config, registries |
| Mutual exclusion | tokio::sync::Mutex | Rare; prefer message passing |
| Fan-out/fan-in | JoinSet + join_next() | Parallel provider queries |
Task Lifecycle
Anti-Patterns Avoided
| Anti-Pattern | Why It's Bad | ClawDesk Alternative |
|---|---|---|
tokio::spawn without tracking | Task leak, no cancellation | JoinSet::spawn |
unwrap() in async context | Panic kills the task silently | Result<T, E> propagation |
| Unbounded channels | Memory exhaustion under load | Bounded mpsc with backpressure |
Mutex<T> across .await | Deadlock risk, blocks executor | tokio::sync::Mutex or redesign |
| Blocking on async thread | Starves the executor | spawn_blocking |
Global state via lazy_static | Testing difficulties | Dependency injection via AppState |
Resource Limits
/// Resource limits enforced by the runtime.
pub struct RuntimeLimits {
/// Maximum concurrent channel connections
pub max_channels: usize, // default: 128
/// Maximum concurrent pipeline executions
pub max_pipeline_concurrency: usize, // default: 64
/// Maximum concurrent provider requests
pub max_provider_concurrency: usize, // default: 32
/// Maximum blocking threads
pub max_blocking_threads: usize, // default: 32
/// Maximum WebSocket connections
pub max_ws_connections: usize, // default: 1024
/// Global memory limit (soft)
pub memory_limit_bytes: usize, // default: 2 GiB
}
Concurrency is bounded using tokio::sync::Semaphore:
/// Rate-limited provider access.
pub struct ProviderPool {
provider: Arc<dyn Provider>,
semaphore: Arc<Semaphore>,
}
impl ProviderPool {
pub fn new(provider: Arc<dyn Provider>, max_concurrent: usize) -> Self {
Self {
provider,
semaphore: Arc::new(Semaphore::new(max_concurrent)),
}
}
pub async fn chat(&self, request: &ProviderRequest) -> Result<ProviderResponse, ProviderError> {
let _permit = self.semaphore.acquire().await
.map_err(|_| ProviderError::PoolClosed)?;
self.provider.chat(request).await
}
}