Tutorial: Message Flow
Trace a Telegram message from raw webhook JSON through every crate it touches, across the agent pipeline, out to an LLM provider, and back as a reply. By the end you'll understand ClawDesk's core data flow.
Overview
Every message in ClawDesk follows this path:
$$ \text{Channel} \xrightarrow{\text{receive}} \text{InboundMessage} \xrightarrow{\text{normalize}} \text{NormalizedMessage} \xrightarrow{\text{pipeline}} \text{OutboundMessage} \xrightarrow{\text{send}} \text{Channel} $$
The journey touches 8 crates and involves 6 pipeline stages. Let's walk through each step.
The Full Sequence
Step 1: Channel Receives the Webhook
When a Telegram user sends "Hello", Telegram delivers a JSON payload to ClawDesk's webhook endpoint.
Crate: clawdesk-channels (the Telegram channel implementation)
/// In clawdesk-channels/src/telegram/webhook.rs
pub async fn handle_webhook(
State(state): State<TelegramState>,
Json(update): Json<TelegramUpdate>,
) -> Result<StatusCode, ChannelError> {
// Parse the Telegram-specific update structure
let message = update.message.ok_or(ChannelError::NoMessage)?;
// Construct the channel-specific InboundMessage variant
let inbound = InboundMessage::Telegram(TelegramInbound {
chat_id: message.chat.id,
from: message.from.map(|u| u.into()),
text: message.text.clone(),
reply_to_message_id: message.reply_to_message.map(|r| r.message_id),
media: extract_media(&message),
// Only Telegram-specific fields — no Option<_> waste
});
// Forward to the channel trait's processing pipeline
state.processor.process(inbound).await?;
Ok(StatusCode::OK)
}
InboundMessage::Telegram carries only Telegram-specific fields. There is no Option<discord_guild_id> or Option<slack_thread_ts>. This is the type algebra at work — see Type Algebra Deep Dive.
Step 2: The InboundMessage Sum Type
Crate: clawdesk-channel
InboundMessage is a 13-variant enum defined in clawdesk-channel. Each variant is a dedicated struct:
/// In clawdesk-channel/src/types.rs
pub enum InboundMessage {
Telegram(TelegramInbound),
Discord(DiscordInbound),
Slack(SlackInbound),
WhatsApp(WhatsAppInbound),
Signal(SignalInbound),
IMessage(IMessageInbound),
Matrix(MatrixInbound),
WebChat(WebChatInbound),
Email(EmailInbound),
Sms(SmsInbound),
Cli(CliInbound),
Irc(IrcInbound),
Custom(CustomInbound),
}
The compiler ensures every match is exhaustive. Adding a 14th variant forces updates everywhere — no runtime surprises.
Step 3: Normalization
Crate: clawdesk-domain
The normalize() function converts any InboundMessage variant into a single NormalizedMessage:
/// In clawdesk-domain/src/normalize.rs
pub fn normalize(inbound: InboundMessage) -> Result<NormalizedMessage, DomainError> {
match inbound {
InboundMessage::Telegram(tg) => Ok(NormalizedMessage {
channel_id: ChannelId::Telegram,
conversation_id: ConversationId::from(tg.chat_id),
sender: tg.from.map(|u| Sender {
id: u.id.to_string(),
display_name: u.display_name(),
}),
content: Content::from_text_and_media(tg.text, tg.media),
thread_id: tg.reply_to_message_id.map(ThreadId::from),
timestamp: Utc::now(),
metadata: Metadata::telegram_specific(&tg),
}),
InboundMessage::Discord(dc) => {
// Discord-specific normalization...
Ok(NormalizedMessage { /* ... */ })
}
// ... all 13 variants handled exhaustively
}
}
After normalization, the rest of the system works with a single unified type. This is the canonical form:
pub struct NormalizedMessage {
pub channel_id: ChannelId,
pub conversation_id: ConversationId,
pub sender: Option<Sender>,
pub content: Content,
pub thread_id: Option<ThreadId>,
pub timestamp: DateTime<Utc>,
pub metadata: Metadata,
}
Step 4: The Agent Pipeline (6 Stages)
Crate: clawdesk-agents
The AgentPipeline processes the NormalizedMessage through 6 sequential stages:
Stage 1: AuthResolve
Resolves the sender's identity against the security layer and determines permissions.
/// In clawdesk-agents/src/pipeline/auth_resolve.rs
pub async fn auth_resolve(
msg: &NormalizedMessage,
security: &SecurityService,
) -> Result<AuthContext, PipelineError> {
let identity = security.resolve_identity(&msg.sender).await?;
let permissions = security.get_permissions(&identity).await?;
Ok(AuthContext {
identity,
permissions,
rate_limit: security.check_rate_limit(&identity).await?,
})
}
Stage 2: HistorySanitize
Fetches conversation history from SochDB and sanitizes it (removes expired entries, enforces retention policies).
/// In clawdesk-agents/src/pipeline/history_sanitize.rs
pub async fn history_sanitize(
msg: &NormalizedMessage,
memory: &MemoryService,
auth: &AuthContext,
) -> Result<Vec<HistoryEntry>, PipelineError> {
let raw_history = memory
.get_history(&msg.conversation_id, auth.permissions.history_depth)
.await?;
// Remove messages beyond retention window
let sanitized = raw_history
.into_iter()
.filter(|entry| entry.timestamp > auth.permissions.retention_cutoff)
.collect();
Ok(sanitized)
}
Stage 3: ContextGuard
Enforces the token budget and content policy before sending to the LLM.
/// In clawdesk-agents/src/pipeline/context_guard.rs
pub async fn context_guard(
msg: &NormalizedMessage,
history: &[HistoryEntry],
auth: &AuthContext,
) -> Result<GuardedContext, PipelineError> {
let total_tokens = estimate_tokens(msg, history);
if total_tokens > auth.permissions.max_tokens {
// Trim history from oldest, preserving system prompt
let trimmed = trim_to_budget(history, auth.permissions.max_tokens);
return Ok(GuardedContext::Trimmed(trimmed));
}
Ok(GuardedContext::Full(history.to_vec()))
}
Stage 4: ToolSplit
Crate: clawdesk-skills (called from clawdesk-agents)
Selects applicable skills for the current message, solving a weighted knapsack problem under the remaining token budget:
/// In clawdesk-skills/src/selector.rs
pub fn select_skills(
available: &[Skill],
context: &NormalizedMessage,
remaining_tokens: usize,
) -> Vec<SelectedSkill> {
// 1. Topological sort by dependencies
let sorted = topological_sort(available);
// 2. Greedy packing — O(k log k)
let mut budget = remaining_tokens;
let mut selected = Vec::new();
for skill in sorted {
if skill.matches(context) && skill.token_cost <= budget {
budget -= skill.token_cost;
selected.push(SelectedSkill::from(skill));
}
}
selected
}
Stage 5: Execute
Sends the assembled request to the provider registry, which selects a provider and calls the LLM.
/// In clawdesk-agents/src/pipeline/execute.rs
pub async fn execute(
request: ProviderRequest,
registry: &ProviderRegistry,
cancel: CancellationToken,
) -> Result<ProviderResponse, PipelineError> {
let provider = registry.select(&request.model_preference)?;
// Provider handles retries and fallback internally
let response = provider
.send(request)
.await
.map_err(PipelineError::Provider)?;
// Check cancellation after potentially long LLM call
cancel.check()?;
Ok(response)
}
Stage 6: FailoverDecide
Evaluates the response. If the primary provider failed, the fallback FSM decides whether to retry with another provider.
/// In clawdesk-agents/src/pipeline/failover_decide.rs
pub async fn failover_decide(
result: Result<ProviderResponse, ProviderError>,
fsm: &mut FallbackFsm,
registry: &ProviderRegistry,
request: &ProviderRequest,
) -> Result<ProviderResponse, PipelineError> {
match result {
Ok(response) => {
fsm.transition(FallbackEvent::Success);
Ok(response)
}
Err(err) if err.is_retryable() => {
fsm.transition(FallbackEvent::Failure(err.classify()));
// Try next candidate from the FSM
let next = fsm.next_candidate(registry)?;
next.send(request.clone()).await.map_err(PipelineError::Provider)
}
Err(err) => {
fsm.transition(FallbackEvent::Fatal);
Err(PipelineError::Provider(err))
}
}
}
The fallback FSM is a 7-state machine with a formal termination proof. See Fallback FSM Deep Dive.
Step 5: Provider Sends to LLM
Crate: clawdesk-providers
The provider translates the ProviderRequest into the LLM's native API format:
/// In clawdesk-providers/src/anthropic.rs
#[async_trait]
impl Provider for AnthropicProvider {
async fn send(
&self,
req: ProviderRequest,
) -> Result<ProviderResponse, ProviderError> {
let api_request = self.build_api_request(&req)?;
let response = self.client
.post("https://api.anthropic.com/v1/messages")
.header("x-api-key", &self.api_key)
.header("anthropic-version", "2023-06-01")
.json(&api_request)
.send()
.await
.map_err(|e| ProviderError::Network(e.to_string()))?;
self.parse_response(response).await
}
}
Step 6: Response Returns to Channel
The ProviderResponse is converted to an OutboundMessage and sent back through the channel:
/// In clawdesk-agents/src/pipeline/mod.rs
let outbound = OutboundMessage {
conversation_id: msg.conversation_id.clone(),
content: response.content,
reply_to: msg.thread_id.clone(),
metadata: OutboundMetadata {
model_used: response.model,
tokens_used: response.usage,
latency: start.elapsed(),
},
};
// Dispatch back through the originating channel
channel.send(outbound).await?;
The channel implementation translates OutboundMessage back into the platform's native API:
/// In clawdesk-channels/src/telegram/send.rs
impl TelegramChannel {
pub async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError> {
let chat_id = msg.conversation_id.as_telegram_chat_id()?;
self.api.send_message(SendMessageRequest {
chat_id,
text: msg.content.to_string(),
reply_to_message_id: msg.reply_to.map(|t| t.as_i64()),
parse_mode: Some(ParseMode::Markdown),
}).await?;
Ok(())
}
}
Crates Touched
| Order | Crate | Role |
|---|---|---|
| 1 | clawdesk-channels | Receives platform webhook, constructs InboundMessage |
| 2 | clawdesk-channel | Defines InboundMessage enum and Channel trait |
| 3 | clawdesk-domain | normalize() → NormalizedMessage |
| 4 | clawdesk-agents | Runs 6-stage pipeline |
| 5 | clawdesk-security | AuthResolve stage |
| 6 | clawdesk-memory | HistorySanitize, context fetching |
| 7 | clawdesk-skills | ToolSplit stage (skill selection) |
| 8 | clawdesk-providers | Execute stage (LLM API call) |
What's Next?
Now that you understand the full message flow:
- Build a Channel — implement your own channel that plugs into this flow
- Type Algebra Deep Dive — understand why
InboundMessageis designed this way - Fallback FSM Deep Dive — the formal model behind Stage 6