Skip to main content

Tutorial: Build a Channel

What you'll build

A fully functional WebHook channel that receives HTTP POST requests and streams responses back. You'll implement the Channel trait (Layer 0), add Streaming (Layer 1), register it, and write tests.

Prerequisites

Architecture Recap

ClawDesk's channel system is layered:

Layer 0 is mandatory — every channel must implement it. Layer 1 traits are opt-in capabilities.


Step 1: Create the Channel Crate Structure

Inside clawdesk-channels/src/, create a new module for the webhook channel:

clawdesk-channels/src/
├── telegram/
├── discord/
├── slack/
├── webhook/ ← new
│ ├── mod.rs
│ ├── config.rs
│ ├── handler.rs
│ └── tests.rs
└── lib.rs

Register the module in lib.rs:

// clawdesk-channels/src/lib.rs
pub mod telegram;
pub mod discord;
pub mod slack;
pub mod webhook; // Add this line

Step 2: Define Configuration

File: clawdesk-channels/src/webhook/config.rs

use serde::{Deserialize, Serialize};
use std::net::SocketAddr;

/// Configuration for the WebHook channel.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
/// Address to bind the HTTP listener to.
pub bind_address: SocketAddr,

/// Optional shared secret for HMAC signature verification.
pub secret: Option<String>,

/// Maximum payload size in bytes (default: 1 MB).
#[serde(default = "default_max_payload")]
pub max_payload_bytes: usize,

/// Path prefix for the webhook endpoint.
#[serde(default = "default_path")]
pub path: String,
}

fn default_max_payload() -> usize {
1_048_576 // 1 MB
}

fn default_path() -> String {
"/webhook".to_string()
}

impl Default for WebhookConfig {
fn default() -> Self {
Self {
bind_address: "127.0.0.1:9090".parse().unwrap(),
secret: None,
max_payload_bytes: default_max_payload(),
path: default_path(),
}
}
}

Step 3: Implement Layer 0 — The Channel Trait

File: clawdesk-channels/src/webhook/mod.rs

The Channel trait defines 5 methods:

/// The core Channel trait (Layer 0) from clawdesk-channel
#[async_trait]
pub trait Channel: Send + Sync + 'static {
/// Returns the unique channel identifier.
fn id(&self) -> ChannelId;

/// Returns channel metadata (name, description, capabilities).
fn meta(&self) -> ChannelMeta;

/// Start the channel (bind listeners, connect to APIs).
async fn start(&self, processor: Arc<MessageProcessor>) -> Result<(), ChannelError>;

/// Send an outbound message through this channel.
async fn send(&self, message: OutboundMessage) -> Result<(), ChannelError>;

/// Gracefully stop the channel.
async fn stop(&self) -> Result<(), ChannelError>;
}

Now implement it:

// clawdesk-channels/src/webhook/mod.rs

pub mod config;
pub mod handler;
#[cfg(test)]
mod tests;

use std::sync::Arc;
use async_trait::async_trait;
use axum::{Router, routing::post};
use tokio::net::TcpListener;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;

use clawdesk_channel::{
Channel, ChannelError, ChannelId, ChannelMeta, Capability,
InboundMessage, OutboundMessage, MessageProcessor,
};
use self::config::WebhookConfig;

/// A generic WebHook channel that accepts HTTP POST requests.
pub struct WebhookChannel {
config: WebhookConfig,
cancel: CancellationToken,
/// Sender side — used to deliver outbound messages to pending requests.
response_tx: Arc<tokio::sync::Mutex<
std::collections::HashMap<String, tokio::sync::oneshot::Sender<OutboundMessage>>
>>,
}

impl WebhookChannel {
pub fn new(config: WebhookConfig) -> Self {
Self {
config,
cancel: CancellationToken::new(),
response_tx: Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
)),
}
}
}

#[async_trait]
impl Channel for WebhookChannel {
fn id(&self) -> ChannelId {
ChannelId::Custom("webhook".to_string())
}

fn meta(&self) -> ChannelMeta {
ChannelMeta {
name: "WebHook".to_string(),
description: "Generic HTTP webhook channel".to_string(),
capabilities: vec![
Capability::SendText,
Capability::ReceiveText,
Capability::Streaming,
],
}
}

async fn start(
&self,
processor: Arc<MessageProcessor>,
) -> Result<(), ChannelError> {
let config = self.config.clone();
let cancel = self.cancel.clone();
let response_tx = self.response_tx.clone();

// Build the Axum router
let app = Router::new()
.route(
&config.path,
post(handler::handle_webhook),
)
.with_state(handler::WebhookState {
processor,
config: config.clone(),
response_tx,
});

// Bind the TCP listener
let listener = TcpListener::bind(&config.bind_address)
.await
.map_err(|e| ChannelError::Start(e.to_string()))?;

tracing::info!(
address = %config.bind_address,
path = %config.path,
"WebHook channel started"
);

// Spawn the server with graceful shutdown
tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(cancel.cancelled_owned())
.await
.ok();
});

Ok(())
}

async fn send(&self, message: OutboundMessage) -> Result<(), ChannelError> {
let conv_id = message.conversation_id.to_string();
let mut senders = self.response_tx.lock().await;

if let Some(tx) = senders.remove(&conv_id) {
tx.send(message).map_err(|_| {
ChannelError::Send("Response receiver dropped".to_string())
})?;
} else {
tracing::warn!(
conversation_id = %conv_id,
"No pending request for outbound message"
);
}

Ok(())
}

async fn stop(&self) -> Result<(), ChannelError> {
self.cancel.cancel();
tracing::info!("WebHook channel stopped");
Ok(())
}
}

Step 4: Implement the HTTP Handler

File: clawdesk-channels/src/webhook/handler.rs

use std::sync::Arc;
use axum::{
extract::State,
http::StatusCode,
Json,
};
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;

use clawdesk_channel::{
InboundMessage, OutboundMessage, MessageProcessor, CustomInbound,
};
use super::config::WebhookConfig;

/// Shared state for the webhook handler.
#[derive(Clone)]
pub struct WebhookState {
pub processor: Arc<MessageProcessor>,
pub config: WebhookConfig,
pub response_tx: Arc<tokio::sync::Mutex<
std::collections::HashMap<String, oneshot::Sender<OutboundMessage>>
>>,
}

/// Incoming webhook payload.
#[derive(Debug, Deserialize)]
pub struct WebhookPayload {
/// A unique ID for this request (used to correlate the response).
pub request_id: String,
/// The message text.
pub text: String,
/// Optional sender identifier.
pub sender: Option<String>,
/// Optional metadata as key-value pairs.
pub metadata: Option<serde_json::Value>,
}

/// Response payload.
#[derive(Debug, Serialize)]
pub struct WebhookResponse {
pub request_id: String,
pub text: String,
pub model: Option<String>,
pub tokens_used: Option<u32>,
}

pub async fn handle_webhook(
State(state): State<WebhookState>,
Json(payload): Json<WebhookPayload>,
) -> Result<Json<WebhookResponse>, StatusCode> {
// 1. Create a oneshot channel for the response
let (tx, rx) = oneshot::channel::<OutboundMessage>();

// 2. Register the response sender
{
let mut senders = state.response_tx.lock().await;
senders.insert(payload.request_id.clone(), tx);
}

// 3. Construct the InboundMessage
let inbound = InboundMessage::Custom(CustomInbound {
channel_name: "webhook".to_string(),
conversation_id: payload.request_id.clone(),
sender_id: payload.sender.clone(),
text: Some(payload.text),
metadata: payload.metadata,
});

// 4. Process the message
state.processor.process(inbound).await.map_err(|e| {
tracing::error!(error = %e, "Failed to process webhook message");
StatusCode::INTERNAL_SERVER_ERROR
})?;

// 5. Wait for the response (with timeout)
let response = tokio::time::timeout(
std::time::Duration::from_secs(60),
rx,
)
.await
.map_err(|_| StatusCode::GATEWAY_TIMEOUT)?
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

Ok(Json(WebhookResponse {
request_id: payload.request_id,
text: response.content.to_string(),
model: response.metadata.model_used,
tokens_used: response.metadata.tokens_used,
}))
}

Step 5: Add Streaming (Layer 1)

Now let's opt in to the Streaming capability:

/// The Streaming trait (Layer 1) from clawdesk-channel
#[async_trait]
pub trait Streaming: Channel {
/// Send a streaming response via Server-Sent Events or chunked transfer.
async fn send_stream(
&self,
conversation_id: &ConversationId,
stream: Pin<Box<dyn Stream<Item = StreamChunk> + Send>>,
) -> Result<(), ChannelError>;
}

Add a streaming endpoint to the webhook channel:

// In clawdesk-channels/src/webhook/mod.rs

use clawdesk_channel::Streaming;
use futures::Stream;
use std::pin::Pin;

#[async_trait]
impl Streaming for WebhookChannel {
async fn send_stream(
&self,
conversation_id: &ConversationId,
stream: Pin<Box<dyn Stream<Item = StreamChunk> + Send>>,
) -> Result<(), ChannelError> {
// For webhooks, we collect the stream and send as a single response.
// A more sophisticated implementation could use SSE.
use futures::StreamExt;

let mut full_content = String::new();
let mut pinned = stream;

while let Some(chunk) = pinned.next().await {
match chunk {
StreamChunk::Text(text) => full_content.push_str(&text),
StreamChunk::Done(metadata) => {
let outbound = OutboundMessage {
conversation_id: conversation_id.clone(),
content: Content::Text(full_content.clone()),
reply_to: None,
metadata: OutboundMetadata::from(metadata),
};
self.send(outbound).await?;
return Ok(());
}
StreamChunk::Error(e) => {
return Err(ChannelError::Stream(e.to_string()));
}
}
}

Ok(())
}
}

Step 6: Register in the Channel Registry

Channels must be registered so the gateway knows about them:

// In clawdesk-gateway/src/registry.rs (or wherever channels are wired up)

use clawdesk_channels::webhook::{WebhookChannel, config::WebhookConfig};

pub fn register_channels(
registry: &mut ChannelRegistry,
config: &AppConfig,
) -> Result<(), GatewayError> {
// ... existing channels ...

// Register the webhook channel
if let Some(webhook_config) = &config.channels.webhook {
let channel = WebhookChannel::new(webhook_config.clone());
registry.register(Box::new(channel))?;
tracing::info!("Registered WebHook channel");
}

Ok(())
}

And add configuration support:

# In clawdesk.toml
[channels.webhook]
bind_address = "0.0.0.0:9090"
path = "/webhook"
max_payload_bytes = 1048576
# secret = "your-hmac-secret" # optional

Step 7: Write Tests

File: clawdesk-channels/src/webhook/tests.rs

#[cfg(test)]
mod tests {
use super::*;
use axum::body::Body;
use axum::http::{Request, StatusCode};
use tower::ServiceExt; // for oneshot

/// Helper to create a test webhook channel.
fn test_channel() -> WebhookChannel {
WebhookChannel::new(WebhookConfig {
bind_address: "127.0.0.1:0".parse().unwrap(),
secret: None,
max_payload_bytes: 1024,
path: "/webhook".to_string(),
})
}

#[test]
fn channel_id_is_webhook() {
let ch = test_channel();
assert_eq!(ch.id(), ChannelId::Custom("webhook".to_string()));
}

#[test]
fn meta_includes_streaming() {
let ch = test_channel();
let meta = ch.meta();
assert!(meta.capabilities.contains(&Capability::Streaming));
}

#[tokio::test]
async fn start_and_stop_lifecycle() {
let ch = test_channel();
let processor = Arc::new(MessageProcessor::mock());

ch.start(processor).await.unwrap();
// Channel should be running — verify by checking cancel token
assert!(!ch.cancel.is_cancelled());

ch.stop().await.unwrap();
assert!(ch.cancel.is_cancelled());
}

#[tokio::test]
async fn webhook_roundtrip() {
let ch = Arc::new(test_channel());
let processor = Arc::new(MessageProcessor::echo()); // echoes input

ch.start(processor).await.unwrap();

// Send a webhook request
let client = reqwest::Client::new();
let response = client
.post(&format!(
"http://{}/webhook",
ch.config.bind_address
))
.json(&serde_json::json!({
"request_id": "test-001",
"text": "Hello from test",
"sender": "test-user"
}))
.send()
.await
.unwrap();

assert_eq!(response.status(), 200);

let body: serde_json::Value = response.json().await.unwrap();
assert_eq!(body["request_id"], "test-001");
assert!(body["text"].as_str().unwrap().len() > 0);

ch.stop().await.unwrap();
}

#[tokio::test]
async fn rejects_oversized_payload() {
let ch = Arc::new(WebhookChannel::new(WebhookConfig {
max_payload_bytes: 10, // very small
..Default::default()
}));
let processor = Arc::new(MessageProcessor::mock());
ch.start(processor).await.unwrap();

let client = reqwest::Client::new();
let response = client
.post(&format!(
"http://{}/webhook",
ch.config.bind_address
))
.json(&serde_json::json!({
"request_id": "test-002",
"text": "This payload is definitely longer than 10 bytes",
}))
.send()
.await
.unwrap();

assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE);
ch.stop().await.unwrap();
}
}

Complete Channel Trait Checklist

MethodStatusNotes
id()Returns ChannelId::Custom("webhook")
meta()Declares SendText, ReceiveText, Streaming
start()Binds TCP, spawns Axum server
send()Delivers via oneshot to waiting request
stop()Cancels via CancellationToken
Streaming::send_stream()Collects stream, sends as single response

What's Next?