Skip to main content

Agents

The agent system is the brain of ClawDesk. It takes inbound messages from channels, processes them through a configurable pipeline, invokes AI providers, and routes responses back. The agent architecture is modular—every stage can be customized, reordered, or replaced.

Architecture Overview


Pipeline Stages

The default pipeline has six stages, executed sequentially for every message:

1. AuthResolve

Resolves the sender identity and permissions from the inbound message metadata.

ResponsibilityDetails
Identity resolutionMaps platform user IDs to ClawDesk identities
Permission loadingRetrieves ACL rules for the resolved identity
Session bindingCreates or resumes a conversation session
Rate limitingEnforces per-user rate limits
pub struct AuthResolve {
acl_manager: Arc<AclManager>,
identity_store: Arc<IdentityStore>,
rate_limiter: Arc<RateLimiter>,
}

#[async_trait]
impl PipelineStep for AuthResolve {
async fn process(&self, ctx: &mut PipelineContext) -> Result<StepOutcome> {
let identity = self.identity_store.resolve(&ctx.sender).await?;
let permissions = self.acl_manager.get_permissions(&identity).await?;
self.rate_limiter.check(&identity).await?;

ctx.identity = Some(identity);
ctx.permissions = Some(permissions);
Ok(StepOutcome::Continue)
}
}

2. HistorySanitize

Cleans and prepares conversation history for the context window.

ResponsibilityDetails
Token countingCounts tokens in the conversation history
TruncationRemoves oldest messages to fit the context window
RedactionStrips sensitive data from history (tool outputs, secrets)
CompactionSummarizes long conversations when approaching limits
[agent.history]
max_tokens = 8192 # maximum history tokens
strategy = "sliding" # "sliding" | "summary" | "hybrid"
redact_tool_outputs = true
summary_threshold = 4096 # trigger summary when history exceeds this

3. ContextGuard

Validates that the assembled context is safe and within bounds.

CheckDescription
Token budgetEnsures total tokens (system + history + message) fit the model limit
Content policyScans for prohibited content patterns
Injection detectionDetects prompt injection attempts
Resource limitsValidates attachment sizes and counts
warning

ContextGuard is a security-critical stage. Disabling it is possible but strongly discouraged in production:

[agent.pipeline]
skip_stages = [] # Do NOT add "context_guard" here in production

4. ToolSplit

Analyzes the message to determine which tools might be needed and prepares the tool context.

ResponsibilityDetails
Tool selectionFilters available tools based on user permissions and context
Schema injectionAdds tool schemas to the prompt
Budget allocationDistributes token budget between response and tool calls
Dependency resolutionOrders tools with dependencies

5. Execute

The core execution stage that sends the prepared context to the AI provider and handles the response.

The Execute stage supports:

  • Streaming: Token-by-token response delivery to channels that support it
  • Multi-turn tool use: Iterative tool calling until the model is satisfied
  • Parallel tool execution: Independent tool calls run concurrently
  • Timeout enforcement: Per-call and total execution timeouts
[agent.execution]
max_tool_rounds = 5 # maximum tool call iterations
tool_timeout_secs = 30 # per-tool timeout
total_timeout_secs = 120 # total execution timeout
parallel_tools = true # run independent tools concurrently
stream = true # enable streaming responses

6. FailoverDecide

Handles provider failures by deciding whether to retry, fall back, or abort.

[agent.failover]
enabled = true
max_retries_per_provider = 2
retry_delay_ms = 1000
backoff_factor = 2.0
fallback_order = ["anthropic", "openai", "ollama"]

Agent Configuration

Full Example

[agent]
# Default provider and model
provider = "anthropic"
model = "claude-sonnet-4-20250514"

# System prompt
system_prompt = """
You are ClawDesk, a helpful AI assistant. Be concise and accurate.
When using tools, explain what you're doing.
"""

# Sampling parameters
temperature = 0.7
max_tokens = 4096
top_p = 0.95
top_k = 40

# Pipeline configuration
[agent.pipeline]
stages = [
"auth_resolve",
"history_sanitize",
"context_guard",
"tool_split",
"execute",
"failover_decide",
]
skip_stages = [] # stages to skip

[agent.history]
max_tokens = 8192
strategy = "hybrid"

[agent.execution]
max_tool_rounds = 5
total_timeout_secs = 120
stream = true

[agent.failover]
enabled = true
fallback_order = ["anthropic", "openai", "ollama"]

Key Types

AgentPipeline

The main pipeline orchestrator:

pub struct AgentPipeline {
steps: Vec<Box<dyn PipelineStep>>,
config: AgentConfig,
trace: TraceCollector,
}

impl AgentPipeline {
pub async fn process(&self, message: InboundMessage) -> Result<Response> {
let mut ctx = PipelineContext::new(message, &self.config);

for step in &self.steps {
match step.process(&mut ctx).await? {
StepOutcome::Continue => continue,
StepOutcome::Skip => continue,
StepOutcome::Abort(reason) => return Err(reason.into()),
StepOutcome::Respond(response) => return Ok(response),
}
}

Ok(ctx.into_response())
}
}

PipelineBuilder

Fluent builder for constructing custom pipelines:

let pipeline = PipelineBuilder::new()
.with_config(config)
.add_step(AuthResolve::new(acl_manager, identity_store))
.add_step(HistorySanitize::new(history_config))
.add_step(ContextGuard::new(guard_config))
.add_step(ToolSplit::new(tool_registry.clone()))
.add_step(MyCustomStep::new()) // insert custom logic
.add_step(Execute::new(provider_registry.clone()))
.add_step(FailoverDecide::new(failover_config))
.build()?;

PipelineStep Trait

#[async_trait]
pub trait PipelineStep: Send + Sync {
/// Human-readable name for logging and tracing
fn name(&self) -> &str;

/// Process the pipeline context
async fn process(&self, ctx: &mut PipelineContext) -> Result<StepOutcome>;

/// Whether this step can be skipped
fn skippable(&self) -> bool { true }
}

pub enum StepOutcome {
Continue,
Skip,
Abort(AbortReason),
Respond(Response),
}

AgentRunner

High-level runner that manages the pipeline lifecycle:

pub struct AgentRunner {
pipeline: AgentPipeline,
session_store: Arc<SessionStore>,
memory: Arc<MemoryManager>,
}

impl AgentRunner {
pub async fn handle_message(
&self,
channel_id: &ChannelId,
message: InboundMessage,
) -> Result<Response> {
// 1. Load or create session
let session = self.session_store
.get_or_create(&message.sender, channel_id)
.await?;

// 2. Process through pipeline
let response = self.pipeline.process(message).await?;

// 3. Store in memory
self.memory.ingest(&session, &response).await?;

Ok(response)
}
}

AgentConfig

pub struct AgentConfig {
pub provider: String,
pub model: String,
pub system_prompt: String,
pub temperature: f32,
pub max_tokens: u32,
pub top_p: f32,
pub top_k: u32,
pub tools: Vec<String>,
pub history: HistoryConfig,
pub execution: ExecutionConfig,
pub failover: FailoverConfig,
}

Tool Registration

Tools extend the agent's capabilities. They're registered through the ToolRegistry.

Built-in Tools

ToolDescriptionDefault
web_searchSearch the web via configurable providerEnabled
code_executionExecute code in a sandboxed environmentDisabled
file_readRead files from the workspaceEnabled
file_writeWrite files to the workspaceDisabled
knowledge_baseQuery the memory/RAG systemEnabled
calculatorEvaluate mathematical expressionsEnabled
http_requestMake HTTP requestsDisabled
shellExecute shell commands (sandboxed)Disabled

Registering a Custom Tool

use clawdesk_agents::tools::{ToolRegistry, Tool, ToolPolicy, ToolSchema};

#[derive(Debug)]
pub struct WeatherTool {
api_key: String,
}

#[async_trait]
impl Tool for WeatherTool {
fn name(&self) -> &str {
"get_weather"
}

fn description(&self) -> &str {
"Get current weather for a location"
}

fn schema(&self) -> ToolSchema {
ToolSchema::new()
.param("location", "string", "City name or coordinates", true)
.param("units", "string", "celsius or fahrenheit", false)
}

fn policy(&self) -> ToolPolicy {
ToolPolicy {
requires_confirmation: false,
max_calls_per_session: 10,
timeout_secs: 15,
allowed_roles: vec!["user", "admin"],
}
}

async fn execute(&self, params: ToolParams) -> Result<ToolResult> {
let location = params.get_string("location")?;
let units = params.get_string_or("units", "celsius");

// Call weather API...
let weather = fetch_weather(&self.api_key, &location, &units).await?;

Ok(ToolResult::text(format!(
"Weather in {}: {}°{}, {}",
location, weather.temp, units.chars().next().unwrap(), weather.description
)))
}
}

// Register it
let mut registry = ToolRegistry::new();
registry.register(WeatherTool { api_key: api_key.clone() });

Tool Policy

Each tool has a policy that controls its behavior:

pub struct ToolPolicy {
/// Whether the user must confirm before execution
pub requires_confirmation: bool,

/// Maximum calls per session (0 = unlimited)
pub max_calls_per_session: u32,

/// Execution timeout in seconds
pub timeout_secs: u32,

/// Roles allowed to use this tool
pub allowed_roles: Vec<String>,
}

Configure tool policies in TOML:

[agent.tools.web_search]
enabled = true
requires_confirmation = false
max_calls_per_session = 20
timeout_secs = 15

[agent.tools.code_execution]
enabled = true
requires_confirmation = true
max_calls_per_session = 5
timeout_secs = 30
sandbox = "docker" # "docker" | "wasm" | "none"

[agent.tools.shell]
enabled = false # disabled by default for safety

Tracing

The agent pipeline produces structured traces for debugging and observability.

pub struct TraceCollector {
spans: Vec<TraceSpan>,
}

pub struct TraceSpan {
pub stage: String,
pub started_at: Instant,
pub duration: Duration,
pub metadata: HashMap<String, serde_json::Value>,
pub outcome: StepOutcome,
}

Enable tracing:

[agent.trace]
enabled = true
export = "jaeger" # "jaeger" | "otlp" | "file" | "none"
endpoint = "http://localhost:4317"
sample_rate = 1.0 # 0.0 to 1.0
include_prompts = false # include full prompts in traces (privacy!)

View traces via CLI:

# Show trace for the last message
clawdesk agent trace --last

# Show trace for a specific session
clawdesk agent trace --session sess_abc123

# Example output:
# ┌─ AuthResolve 2ms ✅ identity=user_42
# ├─ HistorySanitize 5ms ✅ tokens=3,421 → 2,048
# ├─ ContextGuard 1ms ✅ budget_ok=true
# ├─ ToolSplit 3ms ✅ tools=[web_search, calculator]
# ├─ Execute 842ms ✅ provider=anthropic, tokens_in=2,100, tokens_out=512
# └─ FailoverDecide 0ms ✅ no_action (success)
# Total: 853ms

Workspace Module

The workspace module manages file-system access for agents:

pub struct Workspace {
root: PathBuf,
allowed_paths: Vec<PathBuf>,
denied_patterns: Vec<Glob>,
max_file_size: u64,
}

impl Workspace {
pub async fn read_file(&self, path: &Path) -> Result<String> { /* ... */ }
pub async fn write_file(&self, path: &Path, content: &str) -> Result<()> { /* ... */ }
pub async fn list_dir(&self, path: &Path) -> Result<Vec<DirEntry>> { /* ... */ }
pub async fn search(&self, query: &str) -> Result<Vec<SearchResult>> { /* ... */ }
}
[agent.workspace]
root = "/home/user/projects"
allowed_paths = ["src/", "docs/", "config/"]
denied_patterns = ["**/.env", "**/secrets/**", "**/*.key"]
max_file_size_mb = 10

Custom Pipeline Stages

Create your own pipeline stages for domain-specific logic:

use clawdesk_agents::pipeline::{PipelineStep, PipelineContext, StepOutcome};

pub struct ProfanityFilter {
word_list: HashSet<String>,
}

#[async_trait]
impl PipelineStep for ProfanityFilter {
fn name(&self) -> &str {
"profanity_filter"
}

async fn process(&self, ctx: &mut PipelineContext) -> Result<StepOutcome> {
let message_lower = ctx.message.text.to_lowercase();

for word in &self.word_list {
if message_lower.contains(word) {
return Ok(StepOutcome::Respond(
Response::text("I'm sorry, I can't process that message.")
));
}
}

Ok(StepOutcome::Continue)
}
}

Insert into the pipeline:

let pipeline = PipelineBuilder::new()
.with_config(config)
.add_step(AuthResolve::new(/* ... */))
.add_step(ProfanityFilter::new(load_word_list()?)) // custom stage
.add_step(HistorySanitize::new(/* ... */))
// ... rest of pipeline
.build()?;
tip

Custom pipeline stages can also be distributed as plugins. The stage just needs to implement PipelineStep and register itself in the plugin's activate() function.


Per-Channel Agent Overrides

Different channels can use different agent configurations:

[agent]
provider = "anthropic"
model = "claude-sonnet-4-20250514"
system_prompt = "You are a helpful assistant."

# Override for the support Telegram channel
[agent.channel_overrides.tg_support]
model = "claude-sonnet-4-20250514"
system_prompt = "You are a support agent for Acme Corp."
tools = ["knowledge_base", "ticket_create", "ticket_status"]
temperature = 0.3

# Override for the Discord community
[agent.channel_overrides.dc_community]
provider = "openai"
model = "gpt-4o"
system_prompt = "You are a friendly community helper."
tools = ["web_search", "code_execution"]
temperature = 0.8