Skip to main content

Tutorial: Message Flow

What you'll learn

Trace a Telegram message from raw webhook JSON through every crate it touches, across the agent pipeline, out to an LLM provider, and back as a reply. By the end you'll understand ClawDesk's core data flow.

Overview

Every message in ClawDesk follows this path:

$$ \text{Channel} \xrightarrow{\text{receive}} \text{InboundMessage} \xrightarrow{\text{normalize}} \text{NormalizedMessage} \xrightarrow{\text{pipeline}} \text{OutboundMessage} \xrightarrow{\text{send}} \text{Channel} $$

The journey touches 8 crates and involves 6 pipeline stages. Let's walk through each step.

The Full Sequence


Step 1: Channel Receives the Webhook

When a Telegram user sends "Hello", Telegram delivers a JSON payload to ClawDesk's webhook endpoint.

Crate: clawdesk-channels (the Telegram channel implementation)

/// In clawdesk-channels/src/telegram/webhook.rs
pub async fn handle_webhook(
State(state): State<TelegramState>,
Json(update): Json<TelegramUpdate>,
) -> Result<StatusCode, ChannelError> {
// Parse the Telegram-specific update structure
let message = update.message.ok_or(ChannelError::NoMessage)?;

// Construct the channel-specific InboundMessage variant
let inbound = InboundMessage::Telegram(TelegramInbound {
chat_id: message.chat.id,
from: message.from.map(|u| u.into()),
text: message.text.clone(),
reply_to_message_id: message.reply_to_message.map(|r| r.message_id),
media: extract_media(&message),
// Only Telegram-specific fields — no Option<_> waste
});

// Forward to the channel trait's processing pipeline
state.processor.process(inbound).await?;
Ok(StatusCode::OK)
}
Type Safety

InboundMessage::Telegram carries only Telegram-specific fields. There is no Option<discord_guild_id> or Option<slack_thread_ts>. This is the type algebra at work — see Type Algebra Deep Dive.


Step 2: The InboundMessage Sum Type

Crate: clawdesk-channel

InboundMessage is a 13-variant enum defined in clawdesk-channel. Each variant is a dedicated struct:

/// In clawdesk-channel/src/types.rs
pub enum InboundMessage {
Telegram(TelegramInbound),
Discord(DiscordInbound),
Slack(SlackInbound),
WhatsApp(WhatsAppInbound),
Signal(SignalInbound),
IMessage(IMessageInbound),
Matrix(MatrixInbound),
WebChat(WebChatInbound),
Email(EmailInbound),
Sms(SmsInbound),
Cli(CliInbound),
Irc(IrcInbound),
Custom(CustomInbound),
}

The compiler ensures every match is exhaustive. Adding a 14th variant forces updates everywhere — no runtime surprises.


Step 3: Normalization

Crate: clawdesk-domain

The normalize() function converts any InboundMessage variant into a single NormalizedMessage:

/// In clawdesk-domain/src/normalize.rs
pub fn normalize(inbound: InboundMessage) -> Result<NormalizedMessage, DomainError> {
match inbound {
InboundMessage::Telegram(tg) => Ok(NormalizedMessage {
channel_id: ChannelId::Telegram,
conversation_id: ConversationId::from(tg.chat_id),
sender: tg.from.map(|u| Sender {
id: u.id.to_string(),
display_name: u.display_name(),
}),
content: Content::from_text_and_media(tg.text, tg.media),
thread_id: tg.reply_to_message_id.map(ThreadId::from),
timestamp: Utc::now(),
metadata: Metadata::telegram_specific(&tg),
}),
InboundMessage::Discord(dc) => {
// Discord-specific normalization...
Ok(NormalizedMessage { /* ... */ })
}
// ... all 13 variants handled exhaustively
}
}

After normalization, the rest of the system works with a single unified type. This is the canonical form:

pub struct NormalizedMessage {
pub channel_id: ChannelId,
pub conversation_id: ConversationId,
pub sender: Option<Sender>,
pub content: Content,
pub thread_id: Option<ThreadId>,
pub timestamp: DateTime<Utc>,
pub metadata: Metadata,
}

Step 4: The Agent Pipeline (6 Stages)

Crate: clawdesk-agents

The AgentPipeline processes the NormalizedMessage through 6 sequential stages:

Stage 1: AuthResolve

Resolves the sender's identity against the security layer and determines permissions.

/// In clawdesk-agents/src/pipeline/auth_resolve.rs
pub async fn auth_resolve(
msg: &NormalizedMessage,
security: &SecurityService,
) -> Result<AuthContext, PipelineError> {
let identity = security.resolve_identity(&msg.sender).await?;
let permissions = security.get_permissions(&identity).await?;

Ok(AuthContext {
identity,
permissions,
rate_limit: security.check_rate_limit(&identity).await?,
})
}

Stage 2: HistorySanitize

Fetches conversation history from SochDB and sanitizes it (removes expired entries, enforces retention policies).

/// In clawdesk-agents/src/pipeline/history_sanitize.rs
pub async fn history_sanitize(
msg: &NormalizedMessage,
memory: &MemoryService,
auth: &AuthContext,
) -> Result<Vec<HistoryEntry>, PipelineError> {
let raw_history = memory
.get_history(&msg.conversation_id, auth.permissions.history_depth)
.await?;

// Remove messages beyond retention window
let sanitized = raw_history
.into_iter()
.filter(|entry| entry.timestamp > auth.permissions.retention_cutoff)
.collect();

Ok(sanitized)
}

Stage 3: ContextGuard

Enforces the token budget and content policy before sending to the LLM.

/// In clawdesk-agents/src/pipeline/context_guard.rs
pub async fn context_guard(
msg: &NormalizedMessage,
history: &[HistoryEntry],
auth: &AuthContext,
) -> Result<GuardedContext, PipelineError> {
let total_tokens = estimate_tokens(msg, history);

if total_tokens > auth.permissions.max_tokens {
// Trim history from oldest, preserving system prompt
let trimmed = trim_to_budget(history, auth.permissions.max_tokens);
return Ok(GuardedContext::Trimmed(trimmed));
}

Ok(GuardedContext::Full(history.to_vec()))
}

Stage 4: ToolSplit

Crate: clawdesk-skills (called from clawdesk-agents)

Selects applicable skills for the current message, solving a weighted knapsack problem under the remaining token budget:

/// In clawdesk-skills/src/selector.rs
pub fn select_skills(
available: &[Skill],
context: &NormalizedMessage,
remaining_tokens: usize,
) -> Vec<SelectedSkill> {
// 1. Topological sort by dependencies
let sorted = topological_sort(available);

// 2. Greedy packing — O(k log k)
let mut budget = remaining_tokens;
let mut selected = Vec::new();

for skill in sorted {
if skill.matches(context) && skill.token_cost <= budget {
budget -= skill.token_cost;
selected.push(SelectedSkill::from(skill));
}
}

selected
}

Stage 5: Execute

Sends the assembled request to the provider registry, which selects a provider and calls the LLM.

/// In clawdesk-agents/src/pipeline/execute.rs
pub async fn execute(
request: ProviderRequest,
registry: &ProviderRegistry,
cancel: CancellationToken,
) -> Result<ProviderResponse, PipelineError> {
let provider = registry.select(&request.model_preference)?;

// Provider handles retries and fallback internally
let response = provider
.send(request)
.await
.map_err(PipelineError::Provider)?;

// Check cancellation after potentially long LLM call
cancel.check()?;

Ok(response)
}

Stage 6: FailoverDecide

Evaluates the response. If the primary provider failed, the fallback FSM decides whether to retry with another provider.

/// In clawdesk-agents/src/pipeline/failover_decide.rs
pub async fn failover_decide(
result: Result<ProviderResponse, ProviderError>,
fsm: &mut FallbackFsm,
registry: &ProviderRegistry,
request: &ProviderRequest,
) -> Result<ProviderResponse, PipelineError> {
match result {
Ok(response) => {
fsm.transition(FallbackEvent::Success);
Ok(response)
}
Err(err) if err.is_retryable() => {
fsm.transition(FallbackEvent::Failure(err.classify()));
// Try next candidate from the FSM
let next = fsm.next_candidate(registry)?;
next.send(request.clone()).await.map_err(PipelineError::Provider)
}
Err(err) => {
fsm.transition(FallbackEvent::Fatal);
Err(PipelineError::Provider(err))
}
}
}
Deep Dive

The fallback FSM is a 7-state machine with a formal termination proof. See Fallback FSM Deep Dive.


Step 5: Provider Sends to LLM

Crate: clawdesk-providers

The provider translates the ProviderRequest into the LLM's native API format:

/// In clawdesk-providers/src/anthropic.rs
#[async_trait]
impl Provider for AnthropicProvider {
async fn send(
&self,
req: ProviderRequest,
) -> Result<ProviderResponse, ProviderError> {
let api_request = self.build_api_request(&req)?;

let response = self.client
.post("https://api.anthropic.com/v1/messages")
.header("x-api-key", &self.api_key)
.header("anthropic-version", "2023-06-01")
.json(&api_request)
.send()
.await
.map_err(|e| ProviderError::Network(e.to_string()))?;

self.parse_response(response).await
}
}

Step 6: Response Returns to Channel

The ProviderResponse is converted to an OutboundMessage and sent back through the channel:

/// In clawdesk-agents/src/pipeline/mod.rs
let outbound = OutboundMessage {
conversation_id: msg.conversation_id.clone(),
content: response.content,
reply_to: msg.thread_id.clone(),
metadata: OutboundMetadata {
model_used: response.model,
tokens_used: response.usage,
latency: start.elapsed(),
},
};

// Dispatch back through the originating channel
channel.send(outbound).await?;

The channel implementation translates OutboundMessage back into the platform's native API:

/// In clawdesk-channels/src/telegram/send.rs
impl TelegramChannel {
pub async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError> {
let chat_id = msg.conversation_id.as_telegram_chat_id()?;

self.api.send_message(SendMessageRequest {
chat_id,
text: msg.content.to_string(),
reply_to_message_id: msg.reply_to.map(|t| t.as_i64()),
parse_mode: Some(ParseMode::Markdown),
}).await?;

Ok(())
}
}

Crates Touched

OrderCrateRole
1clawdesk-channelsReceives platform webhook, constructs InboundMessage
2clawdesk-channelDefines InboundMessage enum and Channel trait
3clawdesk-domainnormalize()NormalizedMessage
4clawdesk-agentsRuns 6-stage pipeline
5clawdesk-securityAuthResolve stage
6clawdesk-memoryHistorySanitize, context fetching
7clawdesk-skillsToolSplit stage (skill selection)
8clawdesk-providersExecute stage (LLM API call)

What's Next?

Now that you understand the full message flow: