Tutorial: Build a Channel
A fully functional WebHook channel that receives HTTP POST requests and streams responses back. You'll implement the Channel trait (Layer 0), add Streaming (Layer 1), register it, and write tests.
Prerequisites
- Completed Message Flow Tutorial
- Rust toolchain installed
- ClawDesk repo cloned and building
Architecture Recap
ClawDesk's channel system is layered:
Layer 0 is mandatory — every channel must implement it. Layer 1 traits are opt-in capabilities.
Step 1: Create the Channel Crate Structure
Inside clawdesk-channels/src/, create a new module for the webhook channel:
clawdesk-channels/src/
├── telegram/
├── discord/
├── slack/
├── webhook/ ← new
│ ├── mod.rs
│ ├── config.rs
│ ├── handler.rs
│ └── tests.rs
└── lib.rs
Register the module in lib.rs:
// clawdesk-channels/src/lib.rs
pub mod telegram;
pub mod discord;
pub mod slack;
pub mod webhook; // Add this line
Step 2: Define Configuration
File: clawdesk-channels/src/webhook/config.rs
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
/// Configuration for the WebHook channel.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
/// Address to bind the HTTP listener to.
pub bind_address: SocketAddr,
/// Optional shared secret for HMAC signature verification.
pub secret: Option<String>,
/// Maximum payload size in bytes (default: 1 MB).
#[serde(default = "default_max_payload")]
pub max_payload_bytes: usize,
/// Path prefix for the webhook endpoint.
#[serde(default = "default_path")]
pub path: String,
}
fn default_max_payload() -> usize {
1_048_576 // 1 MB
}
fn default_path() -> String {
"/webhook".to_string()
}
impl Default for WebhookConfig {
fn default() -> Self {
Self {
bind_address: "127.0.0.1:9090".parse().unwrap(),
secret: None,
max_payload_bytes: default_max_payload(),
path: default_path(),
}
}
}
Step 3: Implement Layer 0 — The Channel Trait
File: clawdesk-channels/src/webhook/mod.rs
The Channel trait defines 5 methods:
/// The core Channel trait (Layer 0) from clawdesk-channel
#[async_trait]
pub trait Channel: Send + Sync + 'static {
/// Returns the unique channel identifier.
fn id(&self) -> ChannelId;
/// Returns channel metadata (name, description, capabilities).
fn meta(&self) -> ChannelMeta;
/// Start the channel (bind listeners, connect to APIs).
async fn start(&self, processor: Arc<MessageProcessor>) -> Result<(), ChannelError>;
/// Send an outbound message through this channel.
async fn send(&self, message: OutboundMessage) -> Result<(), ChannelError>;
/// Gracefully stop the channel.
async fn stop(&self) -> Result<(), ChannelError>;
}
Now implement it:
// clawdesk-channels/src/webhook/mod.rs
pub mod config;
pub mod handler;
#[cfg(test)]
mod tests;
use std::sync::Arc;
use async_trait::async_trait;
use axum::{Router, routing::post};
use tokio::net::TcpListener;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use clawdesk_channel::{
Channel, ChannelError, ChannelId, ChannelMeta, Capability,
InboundMessage, OutboundMessage, MessageProcessor,
};
use self::config::WebhookConfig;
/// A generic WebHook channel that accepts HTTP POST requests.
pub struct WebhookChannel {
config: WebhookConfig,
cancel: CancellationToken,
/// Sender side — used to deliver outbound messages to pending requests.
response_tx: Arc<tokio::sync::Mutex<
std::collections::HashMap<String, tokio::sync::oneshot::Sender<OutboundMessage>>
>>,
}
impl WebhookChannel {
pub fn new(config: WebhookConfig) -> Self {
Self {
config,
cancel: CancellationToken::new(),
response_tx: Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
)),
}
}
}
#[async_trait]
impl Channel for WebhookChannel {
fn id(&self) -> ChannelId {
ChannelId::Custom("webhook".to_string())
}
fn meta(&self) -> ChannelMeta {
ChannelMeta {
name: "WebHook".to_string(),
description: "Generic HTTP webhook channel".to_string(),
capabilities: vec![
Capability::SendText,
Capability::ReceiveText,
Capability::Streaming,
],
}
}
async fn start(
&self,
processor: Arc<MessageProcessor>,
) -> Result<(), ChannelError> {
let config = self.config.clone();
let cancel = self.cancel.clone();
let response_tx = self.response_tx.clone();
// Build the Axum router
let app = Router::new()
.route(
&config.path,
post(handler::handle_webhook),
)
.with_state(handler::WebhookState {
processor,
config: config.clone(),
response_tx,
});
// Bind the TCP listener
let listener = TcpListener::bind(&config.bind_address)
.await
.map_err(|e| ChannelError::Start(e.to_string()))?;
tracing::info!(
address = %config.bind_address,
path = %config.path,
"WebHook channel started"
);
// Spawn the server with graceful shutdown
tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(cancel.cancelled_owned())
.await
.ok();
});
Ok(())
}
async fn send(&self, message: OutboundMessage) -> Result<(), ChannelError> {
let conv_id = message.conversation_id.to_string();
let mut senders = self.response_tx.lock().await;
if let Some(tx) = senders.remove(&conv_id) {
tx.send(message).map_err(|_| {
ChannelError::Send("Response receiver dropped".to_string())
})?;
} else {
tracing::warn!(
conversation_id = %conv_id,
"No pending request for outbound message"
);
}
Ok(())
}
async fn stop(&self) -> Result<(), ChannelError> {
self.cancel.cancel();
tracing::info!("WebHook channel stopped");
Ok(())
}
}
Step 4: Implement the HTTP Handler
File: clawdesk-channels/src/webhook/handler.rs
use std::sync::Arc;
use axum::{
extract::State,
http::StatusCode,
Json,
};
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use clawdesk_channel::{
InboundMessage, OutboundMessage, MessageProcessor, CustomInbound,
};
use super::config::WebhookConfig;
/// Shared state for the webhook handler.
#[derive(Clone)]
pub struct WebhookState {
pub processor: Arc<MessageProcessor>,
pub config: WebhookConfig,
pub response_tx: Arc<tokio::sync::Mutex<
std::collections::HashMap<String, oneshot::Sender<OutboundMessage>>
>>,
}
/// Incoming webhook payload.
#[derive(Debug, Deserialize)]
pub struct WebhookPayload {
/// A unique ID for this request (used to correlate the response).
pub request_id: String,
/// The message text.
pub text: String,
/// Optional sender identifier.
pub sender: Option<String>,
/// Optional metadata as key-value pairs.
pub metadata: Option<serde_json::Value>,
}
/// Response payload.
#[derive(Debug, Serialize)]
pub struct WebhookResponse {
pub request_id: String,
pub text: String,
pub model: Option<String>,
pub tokens_used: Option<u32>,
}
pub async fn handle_webhook(
State(state): State<WebhookState>,
Json(payload): Json<WebhookPayload>,
) -> Result<Json<WebhookResponse>, StatusCode> {
// 1. Create a oneshot channel for the response
let (tx, rx) = oneshot::channel::<OutboundMessage>();
// 2. Register the response sender
{
let mut senders = state.response_tx.lock().await;
senders.insert(payload.request_id.clone(), tx);
}
// 3. Construct the InboundMessage
let inbound = InboundMessage::Custom(CustomInbound {
channel_name: "webhook".to_string(),
conversation_id: payload.request_id.clone(),
sender_id: payload.sender.clone(),
text: Some(payload.text),
metadata: payload.metadata,
});
// 4. Process the message
state.processor.process(inbound).await.map_err(|e| {
tracing::error!(error = %e, "Failed to process webhook message");
StatusCode::INTERNAL_SERVER_ERROR
})?;
// 5. Wait for the response (with timeout)
let response = tokio::time::timeout(
std::time::Duration::from_secs(60),
rx,
)
.await
.map_err(|_| StatusCode::GATEWAY_TIMEOUT)?
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(WebhookResponse {
request_id: payload.request_id,
text: response.content.to_string(),
model: response.metadata.model_used,
tokens_used: response.metadata.tokens_used,
}))
}
Step 5: Add Streaming (Layer 1)
Now let's opt in to the Streaming capability:
/// The Streaming trait (Layer 1) from clawdesk-channel
#[async_trait]
pub trait Streaming: Channel {
/// Send a streaming response via Server-Sent Events or chunked transfer.
async fn send_stream(
&self,
conversation_id: &ConversationId,
stream: Pin<Box<dyn Stream<Item = StreamChunk> + Send>>,
) -> Result<(), ChannelError>;
}
Add a streaming endpoint to the webhook channel:
// In clawdesk-channels/src/webhook/mod.rs
use clawdesk_channel::Streaming;
use futures::Stream;
use std::pin::Pin;
#[async_trait]
impl Streaming for WebhookChannel {
async fn send_stream(
&self,
conversation_id: &ConversationId,
stream: Pin<Box<dyn Stream<Item = StreamChunk> + Send>>,
) -> Result<(), ChannelError> {
// For webhooks, we collect the stream and send as a single response.
// A more sophisticated implementation could use SSE.
use futures::StreamExt;
let mut full_content = String::new();
let mut pinned = stream;
while let Some(chunk) = pinned.next().await {
match chunk {
StreamChunk::Text(text) => full_content.push_str(&text),
StreamChunk::Done(metadata) => {
let outbound = OutboundMessage {
conversation_id: conversation_id.clone(),
content: Content::Text(full_content.clone()),
reply_to: None,
metadata: OutboundMetadata::from(metadata),
};
self.send(outbound).await?;
return Ok(());
}
StreamChunk::Error(e) => {
return Err(ChannelError::Stream(e.to_string()));
}
}
}
Ok(())
}
}
Step 6: Register in the Channel Registry
Channels must be registered so the gateway knows about them:
// In clawdesk-gateway/src/registry.rs (or wherever channels are wired up)
use clawdesk_channels::webhook::{WebhookChannel, config::WebhookConfig};
pub fn register_channels(
registry: &mut ChannelRegistry,
config: &AppConfig,
) -> Result<(), GatewayError> {
// ... existing channels ...
// Register the webhook channel
if let Some(webhook_config) = &config.channels.webhook {
let channel = WebhookChannel::new(webhook_config.clone());
registry.register(Box::new(channel))?;
tracing::info!("Registered WebHook channel");
}
Ok(())
}
And add configuration support:
# In clawdesk.toml
[channels.webhook]
bind_address = "0.0.0.0:9090"
path = "/webhook"
max_payload_bytes = 1048576
# secret = "your-hmac-secret" # optional
Step 7: Write Tests
File: clawdesk-channels/src/webhook/tests.rs
#[cfg(test)]
mod tests {
use super::*;
use axum::body::Body;
use axum::http::{Request, StatusCode};
use tower::ServiceExt; // for oneshot
/// Helper to create a test webhook channel.
fn test_channel() -> WebhookChannel {
WebhookChannel::new(WebhookConfig {
bind_address: "127.0.0.1:0".parse().unwrap(),
secret: None,
max_payload_bytes: 1024,
path: "/webhook".to_string(),
})
}
#[test]
fn channel_id_is_webhook() {
let ch = test_channel();
assert_eq!(ch.id(), ChannelId::Custom("webhook".to_string()));
}
#[test]
fn meta_includes_streaming() {
let ch = test_channel();
let meta = ch.meta();
assert!(meta.capabilities.contains(&Capability::Streaming));
}
#[tokio::test]
async fn start_and_stop_lifecycle() {
let ch = test_channel();
let processor = Arc::new(MessageProcessor::mock());
ch.start(processor).await.unwrap();
// Channel should be running — verify by checking cancel token
assert!(!ch.cancel.is_cancelled());
ch.stop().await.unwrap();
assert!(ch.cancel.is_cancelled());
}
#[tokio::test]
async fn webhook_roundtrip() {
let ch = Arc::new(test_channel());
let processor = Arc::new(MessageProcessor::echo()); // echoes input
ch.start(processor).await.unwrap();
// Send a webhook request
let client = reqwest::Client::new();
let response = client
.post(&format!(
"http://{}/webhook",
ch.config.bind_address
))
.json(&serde_json::json!({
"request_id": "test-001",
"text": "Hello from test",
"sender": "test-user"
}))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let body: serde_json::Value = response.json().await.unwrap();
assert_eq!(body["request_id"], "test-001");
assert!(body["text"].as_str().unwrap().len() > 0);
ch.stop().await.unwrap();
}
#[tokio::test]
async fn rejects_oversized_payload() {
let ch = Arc::new(WebhookChannel::new(WebhookConfig {
max_payload_bytes: 10, // very small
..Default::default()
}));
let processor = Arc::new(MessageProcessor::mock());
ch.start(processor).await.unwrap();
let client = reqwest::Client::new();
let response = client
.post(&format!(
"http://{}/webhook",
ch.config.bind_address
))
.json(&serde_json::json!({
"request_id": "test-002",
"text": "This payload is definitely longer than 10 bytes",
}))
.send()
.await
.unwrap();
assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE);
ch.stop().await.unwrap();
}
}
Complete Channel Trait Checklist
| Method | Status | Notes |
|---|---|---|
id() | ✅ | Returns ChannelId::Custom("webhook") |
meta() | ✅ | Declares SendText, ReceiveText, Streaming |
start() | ✅ | Binds TCP, spawns Axum server |
send() | ✅ | Delivers via oneshot to waiting request |
stop() | ✅ | Cancels via CancellationToken |
Streaming::send_stream() | ✅ | Collects stream, sends as single response |
What's Next?
- Build a Provider — implement the other side of the pipeline
- Adding a Channel (Contributing Guide) — ClawDesk-specific conventions and review checklist
- Channel Traits Architecture — all Layer 0 and Layer 1 traits