Skip to main content

Gateway Server

The clawdesk-gateway crate implements ClawDesk's HTTP and WebSocket server using Axum. It serves as the convergence point where all subsystems meet, exposing 24+ routes for message processing, administration, OpenAI-compatible APIs, and real-time streaming.

Server Architecture

Server Bootstrap

// crates/clawdesk-gateway/src/bootstrap.rs

pub async fn start_server(
config: ValidatedConfig,
token: CancellationToken,
) -> Result<(), ClawDeskError> {
// 1. Build application state
let state = bootstrap_state(config.clone()).await?;
let shared_state = Arc::new(ArcSwap::new(Arc::new(state)));

// 2. Build the router
let app = build_router(shared_state.clone());

// 3. Bind to address
let addr = config.server.bind_address(); // default: 127.0.0.1:18789
let listener = TcpListener::bind(addr).await?;

tracing::info!(%addr, "gateway server listening");

// 4. Start the config watcher for hot-reload
let watcher_state = shared_state.clone();
let watcher_token = token.child_token();
tokio::spawn(async move {
config_watcher(watcher_state, watcher_token).await;
});

// 5. Serve with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(async move {
token.cancelled().await;
tracing::info!("gateway shutting down");
})
.await?;

Ok(())
}

Route Table

The gateway exposes 24+ routes organized into 5 groups:

Core API Routes (/api/v1/)

MethodPathHandlerDescription
POST/api/v1/messageshandle_messageProcess an inbound message
POST/api/v1/messages/streamhandle_message_streamProcess with SSE streaming
GET/api/v1/sessionslist_sessionsList active sessions
GET/api/v1/sessions/:idget_sessionGet session details
DELETE/api/v1/sessions/:iddelete_sessionTerminate a session
GET/api/v1/sessions/:id/historyget_historyGet conversation history
GET/api/v1/channelslist_channelsList configured channels
GET/api/v1/providerslist_providersList available providers
GET/api/v1/modelslist_modelsList available models

OpenAI-Compatible Routes (/v1/)

MethodPathHandlerDescription
POST/v1/chat/completionsopenai_chatChat completions (OpenAI format)
POST/v1/responsesresponses_createResponses API
GET/v1/modelsopenai_modelsList models (OpenAI format)

Admin Routes (/admin/)

MethodPathHandlerDescription
GET/admin/configget_configGet current configuration
PUT/admin/configupdate_configUpdate configuration (hot-reload)
POST/admin/config/reloadreload_configForce config reload
GET/admin/channelsadmin_channelsChannel management
POST/admin/channels/:id/restartrestart_channelRestart a channel
GET/admin/pluginslist_pluginsPlugin management
GET/admin/skillslist_skillsSkill management
POST/admin/skillscreate_skillCreate a new skill

WebSocket Routes

MethodPathHandlerDescription
GET/wsws_handlerWebSocket upgrade for streaming
GET/ws/eventsws_eventsReal-time event stream

Health & Observability

MethodPathHandlerDescription
GET/healthhealth_checkHealth check endpoint
GET/health/readyreadiness_checkReadiness probe
GET/metricsmetrics_handlerPrometheus metrics

Router Construction

// crates/clawdesk-gateway/src/routes.rs

pub fn build_router(state: SharedState) -> Router {
Router::new()
// Core API
.nest("/api/v1", api_v1_routes())
// OpenAI-compatible
.nest("/v1", openai_compat_routes())
// Admin
.nest("/admin", admin_routes())
// WebSocket
.route("/ws", get(ws::ws_handler))
.route("/ws/events", get(ws::ws_events))
// Health & metrics
.route("/health", get(health::health_check))
.route("/health/ready", get(health::readiness_check))
.route("/metrics", get(health::metrics_handler))
// Middleware stack (applied bottom-up)
.layer(CompressionLayer::new())
.layer(TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
tracing::info_span!(
"http_request",
method = %request.method(),
uri = %request.uri(),
request_id = %Uuid::new_v4(),
)
}))
.layer(CorsLayer::permissive())
.layer(Extension(state))
}

fn api_v1_routes() -> Router {
Router::new()
.route("/messages", post(handlers::handle_message))
.route("/messages/stream", post(handlers::handle_message_stream))
.route("/sessions", get(handlers::list_sessions))
.route("/sessions/:id", get(handlers::get_session))
.route("/sessions/:id", delete(handlers::delete_session))
.route("/sessions/:id/history", get(handlers::get_history))
.route("/channels", get(handlers::list_channels))
.route("/providers", get(handlers::list_providers))
.route("/models", get(handlers::list_models))
}

Middleware Stack

Middleware is applied in a defined order. Each request passes through the stack top-to-bottom, and responses return bottom-to-top:

Rate Limiter

// crates/clawdesk-gateway/src/rate_limiter.rs

pub struct RateLimiter {
/// Per-IP rate limits
ip_limits: DashMap<IpAddr, TokenBucket>,

/// Per-API-key rate limits
key_limits: DashMap<String, TokenBucket>,

/// Configuration
config: RateLimitConfig,
}

#[derive(Debug, Clone)]
pub struct RateLimitConfig {
/// Requests per minute per IP
pub ip_rpm: u32, // default: 60

/// Requests per minute per API key
pub key_rpm: u32, // default: 120

/// Burst allowance (multiplier)
pub burst_factor: f32, // default: 2.0

/// Cleanup interval for stale entries
pub cleanup_interval: Duration, // default: 5 min
}

impl RateLimiter {
pub fn check(&self, ip: IpAddr, api_key: Option<&str>) -> RateLimitResult {
// Check IP limit
let ip_bucket = self.ip_limits
.entry(ip)
.or_insert_with(|| TokenBucket::new(
self.config.ip_rpm,
self.config.burst_factor,
));

if !ip_bucket.try_consume() {
return RateLimitResult::Limited {
retry_after: ip_bucket.time_until_refill(),
};
}

// Check API key limit (if provided)
if let Some(key) = api_key {
let key_bucket = self.key_limits
.entry(key.to_string())
.or_insert_with(|| TokenBucket::new(
self.config.key_rpm,
self.config.burst_factor,
));

if !key_bucket.try_consume() {
return RateLimitResult::Limited {
retry_after: key_bucket.time_until_refill(),
};
}
}

RateLimitResult::Allowed
}
}

Application State: ArcSwap

The gateway uses ArcSwap for lock-free, atomic configuration updates:

// crates/clawdesk-gateway/src/state.rs

use arc_swap::ArcSwap;

/// Shared application state accessible from all route handlers.
pub struct AppState {
/// Hot-swappable configuration
pub config: ArcSwap<ValidatedConfig>,

/// Agent pipeline (immutable after construction)
pub pipeline: Arc<AgentPipeline>,

/// Channel registry
pub channels: Arc<ChannelRegistry>,

/// Storage ports
pub session_store: Arc<dyn SessionStore>,
pub conversation_store: Arc<dyn ConversationStore>,
pub vector_store: Arc<dyn VectorStore>,

/// Security layer
pub security: Arc<SecurityLayer>,

/// Observability
pub metrics: Arc<MetricsRegistry>,
}

pub type SharedState = Arc<ArcSwap<AppState>>;

Why ArcSwap?

ApproachRead CostWrite CostBlocking?
RwLock<Config>Lock acquisitionWrite lock (blocks all readers)Yes
Mutex<Config>Lock acquisitionLock acquisitionYes
ArcSwap<Config>Single atomic loadSingle atomic swapNo
// Reading config (lock-free, ~1 atomic load)
let config = state.config.load();
let port = config.server.port;

// Updating config (atomic swap, no blocking)
let new_config = Arc::new(updated_config);
state.config.store(new_config);
// All subsequent reads immediately see the new config
Performance

ArcSwap::load() performs a single atomic operation with no contention, making it suitable for the hot path where every request reads the config. Under load with 10K concurrent requests, RwLock would create significant contention; ArcSwap remains O(1).

Hot-Reload via Config Watcher

// crates/clawdesk-gateway/src/watcher.rs

pub async fn config_watcher(
state: SharedState,
token: CancellationToken,
) {
let config_path = state.load().config.load().config_file_path.clone();

let (tx, mut rx) = mpsc::channel(1);
let mut watcher = notify::recommended_watcher(move |event| {
let _ = tx.blocking_send(event);
}).expect("failed to create file watcher");

watcher.watch(&config_path, RecursiveMode::NonRecursive)
.expect("failed to watch config file");

loop {
tokio::select! {
Some(Ok(event)) = rx.recv() => {
if event.kind.is_modify() {
match reload_config(&config_path).await {
Ok(new_config) => {
let current = state.load();
let new_state = Arc::new(AppState {
config: ArcSwap::new(Arc::new(new_config)),
// Reuse existing immutable components
pipeline: current.pipeline.clone(),
channels: current.channels.clone(),
session_store: current.session_store.clone(),
conversation_store: current.conversation_store.clone(),
vector_store: current.vector_store.clone(),
security: current.security.clone(),
metrics: current.metrics.clone(),
});
state.store(new_state);
tracing::info!("config hot-reloaded");
}
Err(e) => {
tracing::error!("config reload failed: {e}");
// Keep old config on failure
}
}
}
}
_ = token.cancelled() => {
tracing::info!("config watcher shutting down");
break;
}
}
}
}

WebSocket Streaming

The gateway supports real-time streaming via WebSocket:

// crates/clawdesk-gateway/src/ws.rs

pub async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<SharedState>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_ws_connection(socket, state))
}

async fn handle_ws_connection(
socket: WebSocket,
state: SharedState,
) {
let (mut sender, mut receiver) = socket.split();
let (tx, mut rx) = mpsc::channel::<WsMessage>(64);

// Outbound task: send messages to client
let send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if sender.send(msg.into()).await.is_err() {
break;
}
}
});

// Inbound task: receive messages from client
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Text(text) => {
let request: WsRequest = match serde_json::from_str(&text) {
Ok(r) => r,
Err(e) => {
let _ = tx.send(WsMessage::error(e.to_string())).await;
continue;
}
};

// Process through pipeline with streaming
let tx_clone = tx.clone();
let state_clone = state.clone();
tokio::spawn(async move {
process_ws_request(request, state_clone, tx_clone).await;
});
}
Message::Close(_) => break,
_ => {}
}
}

send_task.abort();
}

OpenAI-Compatible API

The gateway provides an OpenAI-compatible API for drop-in replacement:

// crates/clawdesk-gateway/src/openai_compat.rs

/// POST /v1/chat/completions
pub async fn openai_chat(
State(state): State<SharedState>,
Json(request): Json<OpenAiChatRequest>,
) -> Result<Json<OpenAiChatResponse>, ApiError> {
let app = state.load();

// Convert OpenAI format to ClawDesk internal format
let provider_request = ProviderRequest::from_openai(&request)?;

// Route to the appropriate provider
let provider = app.pipeline.provider_registry()
.get_by_model(&request.model)?;

if request.stream.unwrap_or(false) {
// Streaming handled separately via SSE
return Err(ApiError::UseStreamEndpoint);
}

let response = provider.chat(&provider_request).await?;

// Convert back to OpenAI format
Ok(Json(OpenAiChatResponse::from_internal(response)))
}

Error Handling

All route handlers return structured errors:

/// API error response format.
#[derive(Debug, Serialize)]
pub struct ApiError {
pub error: ApiErrorBody,
}

#[derive(Debug, Serialize)]
pub struct ApiErrorBody {
pub code: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<serde_json::Value>,
}

impl IntoResponse for ApiError {
fn into_response(self) -> Response {
let status = match self.error.code.as_str() {
"not_found" => StatusCode::NOT_FOUND,
"unauthorized" => StatusCode::UNAUTHORIZED,
"forbidden" => StatusCode::FORBIDDEN,
"rate_limited" => StatusCode::TOO_MANY_REQUESTS,
"validation_error" => StatusCode::BAD_REQUEST,
"context_overflow" => StatusCode::PAYLOAD_TOO_LARGE,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};

(status, Json(self)).into_response()
}
}

Performance Characteristics

MetricTargetMechanism
Request latency (p50)< 2ms (excl. LLM)Zero-copy routing, pre-parsed state
Request latency (p99)< 10ms (excl. LLM)Bounded middleware stack
Concurrent connections10K+Tokio, bounded per-connection memory
Config reload< 100msArcSwap atomic swap
WebSocket messages/sec50K+Tokio, bounded channel fan-out

Module Map

clawdesk-gateway/
├── src/
│ ├── admin.rs # Admin route handlers
│ ├── bootstrap.rs # Server initialization
│ ├── middleware.rs # Custom middleware
│ ├── openai_compat.rs # OpenAI-compatible API
│ ├── rate_limiter.rs # Token bucket rate limiter
│ ├── responses_api.rs # Responses API handlers
│ ├── routes.rs # Router construction
│ ├── rpc.rs # Internal RPC handlers
│ ├── skills_admin.rs # Skill management API
│ ├── state.rs # AppState definition
│ ├── watcher.rs # Config file watcher
│ └── ws.rs # WebSocket handlers