Gateway Server
The clawdesk-gateway crate implements ClawDesk's HTTP and WebSocket server using Axum. It serves as the convergence point where all subsystems meet, exposing 24+ routes for message processing, administration, OpenAI-compatible APIs, and real-time streaming.
Server Architecture
Server Bootstrap
// crates/clawdesk-gateway/src/bootstrap.rs
pub async fn start_server(
config: ValidatedConfig,
token: CancellationToken,
) -> Result<(), ClawDeskError> {
// 1. Build application state
let state = bootstrap_state(config.clone()).await?;
let shared_state = Arc::new(ArcSwap::new(Arc::new(state)));
// 2. Build the router
let app = build_router(shared_state.clone());
// 3. Bind to address
let addr = config.server.bind_address(); // default: 127.0.0.1:18789
let listener = TcpListener::bind(addr).await?;
tracing::info!(%addr, "gateway server listening");
// 4. Start the config watcher for hot-reload
let watcher_state = shared_state.clone();
let watcher_token = token.child_token();
tokio::spawn(async move {
config_watcher(watcher_state, watcher_token).await;
});
// 5. Serve with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(async move {
token.cancelled().await;
tracing::info!("gateway shutting down");
})
.await?;
Ok(())
}
Route Table
The gateway exposes 24+ routes organized into 5 groups:
Core API Routes (/api/v1/)
| Method | Path | Handler | Description |
|---|---|---|---|
POST | /api/v1/messages | handle_message | Process an inbound message |
POST | /api/v1/messages/stream | handle_message_stream | Process with SSE streaming |
GET | /api/v1/sessions | list_sessions | List active sessions |
GET | /api/v1/sessions/:id | get_session | Get session details |
DELETE | /api/v1/sessions/:id | delete_session | Terminate a session |
GET | /api/v1/sessions/:id/history | get_history | Get conversation history |
GET | /api/v1/channels | list_channels | List configured channels |
GET | /api/v1/providers | list_providers | List available providers |
GET | /api/v1/models | list_models | List available models |
OpenAI-Compatible Routes (/v1/)
| Method | Path | Handler | Description |
|---|---|---|---|
POST | /v1/chat/completions | openai_chat | Chat completions (OpenAI format) |
POST | /v1/responses | responses_create | Responses API |
GET | /v1/models | openai_models | List models (OpenAI format) |
Admin Routes (/admin/)
| Method | Path | Handler | Description |
|---|---|---|---|
GET | /admin/config | get_config | Get current configuration |
PUT | /admin/config | update_config | Update configuration (hot-reload) |
POST | /admin/config/reload | reload_config | Force config reload |
GET | /admin/channels | admin_channels | Channel management |
POST | /admin/channels/:id/restart | restart_channel | Restart a channel |
GET | /admin/plugins | list_plugins | Plugin management |
GET | /admin/skills | list_skills | Skill management |
POST | /admin/skills | create_skill | Create a new skill |
WebSocket Routes
| Method | Path | Handler | Description |
|---|---|---|---|
GET | /ws | ws_handler | WebSocket upgrade for streaming |
GET | /ws/events | ws_events | Real-time event stream |
Health & Observability
| Method | Path | Handler | Description |
|---|---|---|---|
GET | /health | health_check | Health check endpoint |
GET | /health/ready | readiness_check | Readiness probe |
GET | /metrics | metrics_handler | Prometheus metrics |
Router Construction
// crates/clawdesk-gateway/src/routes.rs
pub fn build_router(state: SharedState) -> Router {
Router::new()
// Core API
.nest("/api/v1", api_v1_routes())
// OpenAI-compatible
.nest("/v1", openai_compat_routes())
// Admin
.nest("/admin", admin_routes())
// WebSocket
.route("/ws", get(ws::ws_handler))
.route("/ws/events", get(ws::ws_events))
// Health & metrics
.route("/health", get(health::health_check))
.route("/health/ready", get(health::readiness_check))
.route("/metrics", get(health::metrics_handler))
// Middleware stack (applied bottom-up)
.layer(CompressionLayer::new())
.layer(TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
tracing::info_span!(
"http_request",
method = %request.method(),
uri = %request.uri(),
request_id = %Uuid::new_v4(),
)
}))
.layer(CorsLayer::permissive())
.layer(Extension(state))
}
fn api_v1_routes() -> Router {
Router::new()
.route("/messages", post(handlers::handle_message))
.route("/messages/stream", post(handlers::handle_message_stream))
.route("/sessions", get(handlers::list_sessions))
.route("/sessions/:id", get(handlers::get_session))
.route("/sessions/:id", delete(handlers::delete_session))
.route("/sessions/:id/history", get(handlers::get_history))
.route("/channels", get(handlers::list_channels))
.route("/providers", get(handlers::list_providers))
.route("/models", get(handlers::list_models))
}
Middleware Stack
Middleware is applied in a defined order. Each request passes through the stack top-to-bottom, and responses return bottom-to-top:
Rate Limiter
// crates/clawdesk-gateway/src/rate_limiter.rs
pub struct RateLimiter {
/// Per-IP rate limits
ip_limits: DashMap<IpAddr, TokenBucket>,
/// Per-API-key rate limits
key_limits: DashMap<String, TokenBucket>,
/// Configuration
config: RateLimitConfig,
}
#[derive(Debug, Clone)]
pub struct RateLimitConfig {
/// Requests per minute per IP
pub ip_rpm: u32, // default: 60
/// Requests per minute per API key
pub key_rpm: u32, // default: 120
/// Burst allowance (multiplier)
pub burst_factor: f32, // default: 2.0
/// Cleanup interval for stale entries
pub cleanup_interval: Duration, // default: 5 min
}
impl RateLimiter {
pub fn check(&self, ip: IpAddr, api_key: Option<&str>) -> RateLimitResult {
// Check IP limit
let ip_bucket = self.ip_limits
.entry(ip)
.or_insert_with(|| TokenBucket::new(
self.config.ip_rpm,
self.config.burst_factor,
));
if !ip_bucket.try_consume() {
return RateLimitResult::Limited {
retry_after: ip_bucket.time_until_refill(),
};
}
// Check API key limit (if provided)
if let Some(key) = api_key {
let key_bucket = self.key_limits
.entry(key.to_string())
.or_insert_with(|| TokenBucket::new(
self.config.key_rpm,
self.config.burst_factor,
));
if !key_bucket.try_consume() {
return RateLimitResult::Limited {
retry_after: key_bucket.time_until_refill(),
};
}
}
RateLimitResult::Allowed
}
}
Application State: ArcSwap
The gateway uses ArcSwap for lock-free, atomic configuration updates:
// crates/clawdesk-gateway/src/state.rs
use arc_swap::ArcSwap;
/// Shared application state accessible from all route handlers.
pub struct AppState {
/// Hot-swappable configuration
pub config: ArcSwap<ValidatedConfig>,
/// Agent pipeline (immutable after construction)
pub pipeline: Arc<AgentPipeline>,
/// Channel registry
pub channels: Arc<ChannelRegistry>,
/// Storage ports
pub session_store: Arc<dyn SessionStore>,
pub conversation_store: Arc<dyn ConversationStore>,
pub vector_store: Arc<dyn VectorStore>,
/// Security layer
pub security: Arc<SecurityLayer>,
/// Observability
pub metrics: Arc<MetricsRegistry>,
}
pub type SharedState = Arc<ArcSwap<AppState>>;
Why ArcSwap?
| Approach | Read Cost | Write Cost | Blocking? |
|---|---|---|---|
RwLock<Config> | Lock acquisition | Write lock (blocks all readers) | Yes |
Mutex<Config> | Lock acquisition | Lock acquisition | Yes |
ArcSwap<Config> | Single atomic load | Single atomic swap | No |
// Reading config (lock-free, ~1 atomic load)
let config = state.config.load();
let port = config.server.port;
// Updating config (atomic swap, no blocking)
let new_config = Arc::new(updated_config);
state.config.store(new_config);
// All subsequent reads immediately see the new config
ArcSwap::load() performs a single atomic operation with no contention, making it suitable for the hot path where every request reads the config. Under load with 10K concurrent requests, RwLock would create significant contention; ArcSwap remains O(1).
Hot-Reload via Config Watcher
// crates/clawdesk-gateway/src/watcher.rs
pub async fn config_watcher(
state: SharedState,
token: CancellationToken,
) {
let config_path = state.load().config.load().config_file_path.clone();
let (tx, mut rx) = mpsc::channel(1);
let mut watcher = notify::recommended_watcher(move |event| {
let _ = tx.blocking_send(event);
}).expect("failed to create file watcher");
watcher.watch(&config_path, RecursiveMode::NonRecursive)
.expect("failed to watch config file");
loop {
tokio::select! {
Some(Ok(event)) = rx.recv() => {
if event.kind.is_modify() {
match reload_config(&config_path).await {
Ok(new_config) => {
let current = state.load();
let new_state = Arc::new(AppState {
config: ArcSwap::new(Arc::new(new_config)),
// Reuse existing immutable components
pipeline: current.pipeline.clone(),
channels: current.channels.clone(),
session_store: current.session_store.clone(),
conversation_store: current.conversation_store.clone(),
vector_store: current.vector_store.clone(),
security: current.security.clone(),
metrics: current.metrics.clone(),
});
state.store(new_state);
tracing::info!("config hot-reloaded");
}
Err(e) => {
tracing::error!("config reload failed: {e}");
// Keep old config on failure
}
}
}
}
_ = token.cancelled() => {
tracing::info!("config watcher shutting down");
break;
}
}
}
}
WebSocket Streaming
The gateway supports real-time streaming via WebSocket:
// crates/clawdesk-gateway/src/ws.rs
pub async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<SharedState>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_ws_connection(socket, state))
}
async fn handle_ws_connection(
socket: WebSocket,
state: SharedState,
) {
let (mut sender, mut receiver) = socket.split();
let (tx, mut rx) = mpsc::channel::<WsMessage>(64);
// Outbound task: send messages to client
let send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if sender.send(msg.into()).await.is_err() {
break;
}
}
});
// Inbound task: receive messages from client
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Text(text) => {
let request: WsRequest = match serde_json::from_str(&text) {
Ok(r) => r,
Err(e) => {
let _ = tx.send(WsMessage::error(e.to_string())).await;
continue;
}
};
// Process through pipeline with streaming
let tx_clone = tx.clone();
let state_clone = state.clone();
tokio::spawn(async move {
process_ws_request(request, state_clone, tx_clone).await;
});
}
Message::Close(_) => break,
_ => {}
}
}
send_task.abort();
}
OpenAI-Compatible API
The gateway provides an OpenAI-compatible API for drop-in replacement:
// crates/clawdesk-gateway/src/openai_compat.rs
/// POST /v1/chat/completions
pub async fn openai_chat(
State(state): State<SharedState>,
Json(request): Json<OpenAiChatRequest>,
) -> Result<Json<OpenAiChatResponse>, ApiError> {
let app = state.load();
// Convert OpenAI format to ClawDesk internal format
let provider_request = ProviderRequest::from_openai(&request)?;
// Route to the appropriate provider
let provider = app.pipeline.provider_registry()
.get_by_model(&request.model)?;
if request.stream.unwrap_or(false) {
// Streaming handled separately via SSE
return Err(ApiError::UseStreamEndpoint);
}
let response = provider.chat(&provider_request).await?;
// Convert back to OpenAI format
Ok(Json(OpenAiChatResponse::from_internal(response)))
}
Error Handling
All route handlers return structured errors:
/// API error response format.
#[derive(Debug, Serialize)]
pub struct ApiError {
pub error: ApiErrorBody,
}
#[derive(Debug, Serialize)]
pub struct ApiErrorBody {
pub code: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<serde_json::Value>,
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
let status = match self.error.code.as_str() {
"not_found" => StatusCode::NOT_FOUND,
"unauthorized" => StatusCode::UNAUTHORIZED,
"forbidden" => StatusCode::FORBIDDEN,
"rate_limited" => StatusCode::TOO_MANY_REQUESTS,
"validation_error" => StatusCode::BAD_REQUEST,
"context_overflow" => StatusCode::PAYLOAD_TOO_LARGE,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
(status, Json(self)).into_response()
}
}
Performance Characteristics
| Metric | Target | Mechanism |
|---|---|---|
| Request latency (p50) | < 2ms (excl. LLM) | Zero-copy routing, pre-parsed state |
| Request latency (p99) | < 10ms (excl. LLM) | Bounded middleware stack |
| Concurrent connections | 10K+ | Tokio, bounded per-connection memory |
| Config reload | < 100ms | ArcSwap atomic swap |
| WebSocket messages/sec | 50K+ | Tokio, bounded channel fan-out |
Module Map
clawdesk-gateway/
├── src/
│ ├── admin.rs # Admin route handlers
│ ├── bootstrap.rs # Server initialization
│ ├── middleware.rs # Custom middleware
│ ├── openai_compat.rs # OpenAI-compatible API
│ ├── rate_limiter.rs # Token bucket rate limiter
│ ├── responses_api.rs # Responses API handlers
│ ├── routes.rs # Router construction
│ ├── rpc.rs # Internal RPC handlers
│ ├── skills_admin.rs # Skill management API
│ ├── state.rs # AppState definition
│ ├── watcher.rs # Config file watcher
│ └── ws.rs # WebSocket handlers