Skip to main content

Agent Pipeline

The agent pipeline is the core inference engine of ClawDesk. Every incoming message passes through a 6-stage linear pipeline that handles authentication, context assembly, tool execution, and fault tolerance.

Pipeline Overview

Stage Details

Stage 1: AuthResolve

Purpose: Resolve the user's identity, load their session, and verify permissions.

/// Stage 1: Resolve authentication and load session context.
pub struct AuthResolve {
session_store: Arc<dyn SessionStore>,
acl_manager: Arc<AclManager>,
}

impl AuthResolve {
pub async fn execute(
&self,
msg: &NormalizedMessage,
) -> Result<AuthContext, PipelineError> {
// 1. Load or create session
let session = self.session_store
.get_session(&msg.session_key)
.await?
.unwrap_or_else(|| Session::new(msg.session_key.clone()));

// 2. Check ACL permissions
let permissions = self.acl_manager
.resolve_permissions(&msg.user_id, &msg.channel_id)
.await?;

if !permissions.can_send_messages() {
return Err(PipelineError::AccessDenied {
user: msg.user_id.clone(),
channel: msg.channel_id.clone(),
reason: "insufficient permissions".into(),
});
}

// 3. Load user preferences and model overrides
let preferences = session.preferences().clone();
let model_override = session.model_override().cloned();

Ok(AuthContext {
session,
permissions,
preferences,
model_override,
})
}
}
InputOutputFailure Mode
NormalizedMessageAuthContext (session + permissions)AccessDenied, StorageError

Stage 2: HistorySanitize

Purpose: Load conversation history and sanitize it for the LLM context window.

/// Stage 2: Load and sanitize conversation history.
pub struct HistorySanitize {
conversation_store: Arc<dyn ConversationStore>,
max_history_messages: usize,
max_history_tokens: usize,
}

impl HistorySanitize {
pub async fn execute(
&self,
msg: &NormalizedMessage,
auth: &AuthContext,
) -> Result<SanitizedHistory, PipelineError> {
// 1. Load raw history
let raw_history = self.conversation_store
.get_history(&msg.session_key, self.max_history_messages)
.await?;

// 2. Remove system-internal messages
let filtered: Vec<_> = raw_history
.into_iter()
.filter(|m| !m.is_system_internal())
.collect();

// 3. Truncate to token budget
let mut token_count = 0;
let mut sanitized = Vec::new();

for message in filtered.into_iter().rev() {
let tokens = estimate_tokens(&message.content);
if token_count + tokens > self.max_history_tokens {
break;
}
token_count += tokens;
sanitized.push(message);
}

sanitized.reverse(); // Restore chronological order

// 4. Redact sensitive content based on permissions
if !auth.permissions.can_view_full_history() {
for msg in &mut sanitized {
msg.redact_sensitive_fields();
}
}

Ok(SanitizedHistory {
messages: sanitized,
total_tokens: token_count,
truncated: raw_history_len > sanitized.len(),
})
}
}
InputOutputFailure Mode
NormalizedMessage, AuthContextSanitizedHistoryStorageError

Stage 3: ContextGuard

Purpose: Assemble the full context window and enforce token budget constraints.

/// Stage 3: Assemble context and enforce token budget.
pub struct ContextGuard {
memory: Arc<HybridSearcher>,
skill_selector: Arc<SkillSelector>,
prompt_builder: Arc<PromptBuilder>,
model_catalog: Arc<ModelCatalog>,
}

impl ContextGuard {
pub async fn execute(
&self,
msg: &NormalizedMessage,
auth: &AuthContext,
history: &SanitizedHistory,
) -> Result<AssembledContext, PipelineError> {
// 1. Determine the model's context window
let model = auth.model_override
.as_ref()
.unwrap_or(&self.model_catalog.default_model());
let context_limit = self.model_catalog.context_window(model)?;

// 2. Build the system prompt
let system_prompt = self.prompt_builder
.build(&auth.session, &msg.channel_id)
.await?;
let system_tokens = estimate_tokens(&system_prompt);

// 3. Select relevant skills via weighted knapsack
let available_budget = context_limit
- system_tokens
- history.total_tokens
- RESPONSE_RESERVE; // Reserve tokens for response

let selected_skills = self.skill_selector
.select(msg, available_budget)
.await?;
let skill_tokens: usize = selected_skills.iter()
.map(|s| s.token_cost())
.sum();

// 4. Fetch relevant memory via hybrid search
let memory_budget = available_budget - skill_tokens;
let relevant_context = self.memory
.search(&msg.content.as_text(), memory_budget)
.await?;

// 5. Validate total fits within context window
let total_tokens = system_tokens
+ history.total_tokens
+ skill_tokens
+ relevant_context.token_cost()
+ estimate_tokens(&msg.content);

if total_tokens > context_limit {
return Err(PipelineError::ContextOverflow {
used: total_tokens,
limit: context_limit,
});
}

Ok(AssembledContext {
system_prompt,
history: history.messages.clone(),
skills: selected_skills,
memory: relevant_context,
current_message: msg.clone(),
total_tokens,
model: model.clone(),
})
}
}

Token Budget Allocation:

$$ T_{\text{total}} = T_{\text{system}} + T_{\text{history}} + T_{\text{skills}} + T_{\text{memory}} + T_{\text{message}} \leq T_{\text{context_limit}} - T_{\text{reserve}} $$

Stage 4: ToolSplit

Purpose: Analyze the assembled context to determine which tools should be available and prepare tool definitions.

/// Stage 4: Prepare tool definitions and split execution strategy.
pub struct ToolSplit {
tool_registry: Arc<ToolRegistry>,
tool_policy: Arc<ToolPolicy>,
}

impl ToolSplit {
pub async fn execute(
&self,
context: &AssembledContext,
auth: &AuthContext,
) -> Result<ToolPlan, PipelineError> {
// 1. Get all registered tools
let all_tools = self.tool_registry.list_tools().await;

// 2. Filter by skill requirements
let skill_tools: HashSet<_> = context.skills.iter()
.flat_map(|s| s.required_tools())
.collect();

// 3. Filter by permission policy
let permitted_tools: Vec<_> = all_tools
.into_iter()
.filter(|t| {
self.tool_policy.is_allowed(
&t.name,
&auth.permissions,
&context.current_message.channel_id,
)
})
.filter(|t| skill_tools.is_empty() || skill_tools.contains(&t.name))
.collect();

// 4. Build tool definitions for the provider
let tool_definitions: Vec<ToolDefinition> = permitted_tools
.iter()
.map(|t| t.to_provider_definition())
.collect();

Ok(ToolPlan {
available_tools: permitted_tools,
definitions: tool_definitions,
execution_strategy: if tool_definitions.is_empty() {
ExecutionStrategy::TextOnly
} else {
ExecutionStrategy::ToolsEnabled
},
})
}
}

Stage 5: Execute

Purpose: Send the assembled context to the LLM provider and execute any tool calls.

/// Stage 5: Execute the LLM request, handling tool call loops.
pub struct Execute {
provider_registry: Arc<ProviderRegistry>,
tool_registry: Arc<ToolRegistry>,
max_tool_rounds: usize, // default: 10
}

impl Execute {
pub async fn execute(
&self,
context: &AssembledContext,
tool_plan: &ToolPlan,
) -> Result<ExecutionResult, PipelineError> {
let provider = self.provider_registry
.get_provider(&context.model.provider_name)?;

let mut messages = self.build_messages(context);
let mut tool_round = 0;

loop {
// Build provider request
let request = ProviderRequest {
model: context.model.model_id.clone(),
messages: messages.clone(),
tools: tool_plan.definitions.clone(),
temperature: context.current_message.metadata
.get("temperature")
.and_then(|v| v.as_f64())
.map(|v| v as f32),
max_tokens: None,
};

// Call the provider
let response = provider.chat(&request).await?;

// Check if the response contains tool calls
match response.tool_calls {
Some(calls) if !calls.is_empty() => {
tool_round += 1;
if tool_round > self.max_tool_rounds {
return Err(PipelineError::ToolLoopExceeded {
rounds: tool_round,
max: self.max_tool_rounds,
});
}

// Execute each tool call
let mut tool_results = Vec::new();
for call in &calls {
let result = self.tool_registry
.execute(&call.name, &call.arguments)
.await;
tool_results.push(ToolResult {
call_id: call.id.clone(),
output: result,
});
}

// Append tool calls and results to message history
messages.push(ChatMessage::assistant_tool_calls(calls));
for result in tool_results {
messages.push(ChatMessage::tool_result(result));
}

// Continue the loop for the next round
continue;
}
_ => {
// No tool calls — we have the final response
return Ok(ExecutionResult {
response: response.content,
tool_rounds: tool_round,
usage: response.usage,
provider: provider.name().to_string(),
model: context.model.model_id.clone(),
});
}
}
}
}
}
Tool Call Loop

The Execute stage runs a tool call loop that alternates between LLM inference and tool execution. The loop terminates when either: (a) the LLM produces a text-only response, or (b) the max_tool_rounds limit is reached.

Stage 6: FailoverDecide

Purpose: Evaluate the execution result and decide whether to accept, retry, or fail over to another provider.

/// Stage 6: Decide on failover based on execution outcome.
pub struct FailoverDecide {
fallback_fsm: Arc<Mutex<FallbackFsm>>,
provider_registry: Arc<ProviderRegistry>,
}

impl FailoverDecide {
pub async fn execute(
&self,
result: Result<ExecutionResult, PipelineError>,
context: &AssembledContext,
) -> Result<FinalResponse, PipelineError> {
let mut fsm = self.fallback_fsm.lock().await;

match result {
Ok(exec_result) => {
fsm.transition(FsmEvent::ProviderSuccess)?;
Ok(FinalResponse {
content: exec_result.response,
metadata: ResponseMetadata {
provider: exec_result.provider,
model: exec_result.model,
tool_rounds: exec_result.tool_rounds,
usage: exec_result.usage,
fallback_attempts: fsm.attempt_count(),
},
})
}
Err(PipelineError::Provider(ProviderError::RateLimited { retry_after, .. })) => {
fsm.transition(FsmEvent::RetryableError)?;

// Wait and retry with same provider
if let Some(delay) = retry_after {
tokio::time::sleep(delay).await;
}
Err(PipelineError::Retry)
}
Err(PipelineError::Provider(ProviderError::Timeout(_))) => {
fsm.transition(FsmEvent::RetryableError)?;

// Fail over to next provider
Err(PipelineError::Failover {
failed_provider: context.model.provider_name.clone(),
})
}
Err(e) => {
fsm.transition(FsmEvent::FatalError)?;
Err(e)
}
}
}
}

Fallback FSM

The FailoverDecide stage is governed by a finite state machine with 7 states and 6 events:

FSM Definition

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FsmState {
Idle,
Selecting,
Attempting,
Succeeded,
Retrying,
Exhausted,
Aborted,
}

#[derive(Debug, Clone, Copy)]
pub enum FsmEvent {
Start,
ProviderSuccess,
RetryableError,
FatalError,
UserAbort,
Timeout,
}

impl FallbackFsm {
/// Total transition function: every (state, event) pair has a defined outcome.
pub fn transition(&mut self, event: FsmEvent) -> Result<FsmState, FsmError> {
let next = match (self.state, event) {
(FsmState::Idle, FsmEvent::Start) => FsmState::Selecting,

(FsmState::Selecting, FsmEvent::ProviderSuccess) => FsmState::Attempting,
(FsmState::Selecting, FsmEvent::FatalError) => FsmState::Exhausted,

(FsmState::Attempting, FsmEvent::ProviderSuccess) => FsmState::Succeeded,
(FsmState::Attempting, FsmEvent::RetryableError) => FsmState::Retrying,
(FsmState::Attempting, FsmEvent::FatalError) => FsmState::Aborted,
(FsmState::Attempting, FsmEvent::UserAbort) => FsmState::Aborted,
(FsmState::Attempting, FsmEvent::Timeout) => FsmState::Retrying,

(FsmState::Retrying, FsmEvent::ProviderSuccess) => FsmState::Attempting,
(FsmState::Retrying, FsmEvent::RetryableError) => {
if self.retry_count >= self.max_retries {
FsmState::Selecting // Try next provider
} else {
self.retry_count += 1;
FsmState::Attempting
}
}
(FsmState::Retrying, FsmEvent::FatalError) => FsmState::Aborted,
(FsmState::Retrying, FsmEvent::UserAbort) => FsmState::Aborted,
(FsmState::Retrying, FsmEvent::Timeout) => FsmState::Selecting,

// Terminal states ignore all events
(FsmState::Succeeded, _) => FsmState::Succeeded,
(FsmState::Exhausted, _) => FsmState::Exhausted,
(FsmState::Aborted, _) => FsmState::Aborted,

// All other combinations
(state, event) => {
return Err(FsmError::InvalidTransition { state, event });
}
};

self.state = next;
self.transitions += 1;
Ok(next)
}
}

Termination Guarantee

The FSM is guaranteed to terminate within a bounded number of transitions:

$$ T_{\max} = 2n + 1 $$

Where $n$ is the number of configured providers. Each provider can be attempted at most once, with at most max_retries retry transitions each, plus one terminal transition.

Proof Sketch
  • There are $n$ providers and $r$ max retries per provider.
  • Each provider cycle: Selecting → Attempting → (Retrying → Attempting)^r → Selecting = $2r + 2$ transitions.
  • With $n$ providers: $n(2r + 2) + 1$ total. With $r = 1$: $4n + 1$. Bounded by $O(n)$.

Pipeline Builder

The pipeline is assembled using a builder pattern with compile-time dependency checking:

pub struct PipelineBuilder {
session_store: Option<Arc<dyn SessionStore>>,
conversation_store: Option<Arc<dyn ConversationStore>>,
vector_store: Option<Arc<dyn VectorStore>>,
provider_registry: Option<Arc<ProviderRegistry>>,
tool_registry: Option<Arc<ToolRegistry>>,
security: Option<Arc<SecurityLayer>>,
}

impl PipelineBuilder {
pub fn new() -> Self {
Self {
session_store: None,
conversation_store: None,
vector_store: None,
provider_registry: None,
tool_registry: None,
security: None,
}
}

pub fn session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
self.session_store = Some(store);
self
}

// ... other builder methods

pub fn build(self) -> Result<AgentPipeline, PipelineError> {
Ok(AgentPipeline {
auth_resolve: AuthResolve {
session_store: self.session_store
.ok_or(PipelineError::MissingDependency("session_store"))?,
acl_manager: self.security
.as_ref()
.ok_or(PipelineError::MissingDependency("security"))?
.acl_manager(),
},
history_sanitize: HistorySanitize {
conversation_store: self.conversation_store
.ok_or(PipelineError::MissingDependency("conversation_store"))?,
max_history_messages: 50,
max_history_tokens: 8000,
},
// ... remaining stages
})
}
}

Agent Trace

Every pipeline execution produces an AgentTrace for observability:

/// Complete trace of a pipeline execution.
#[derive(Debug, Serialize)]
pub struct AgentTrace {
pub trace_id: TraceId,
pub session_key: SessionKey,
pub started_at: DateTime<Utc>,
pub completed_at: DateTime<Utc>,
pub stages: Vec<StageTrace>,
pub tool_calls: Vec<ToolCallTrace>,
pub provider_used: String,
pub model_used: String,
pub total_tokens: TokenUsage,
pub fallback_attempts: usize,
pub outcome: PipelineOutcome,
}

#[derive(Debug, Serialize)]
pub struct StageTrace {
pub stage: String,
pub started_at: DateTime<Utc>,
pub duration: Duration,
pub outcome: StageOutcome,
}

Summary

StageConcernKey Decision
AuthResolveIdentity & permissionsAllow / deny message processing
HistorySanitizeHistory loading & cleanupWhat context the LLM sees
ContextGuardToken budget enforcementPrevent context overflow
ToolSplitTool availabilityWhich tools the LLM can invoke
ExecuteLLM inference + tool loopProduce the response
FailoverDecideFault toleranceRetry, failover, or accept