Observability
ClawDesk implements full-stack observability through two dedicated crates — clawdesk-observability for OpenTelemetry integration and clawdesk-telemetry for OTLP export and structured events. Together they provide distributed tracing, metrics collection, and structured logging across all 27 crates.
Observability Architecture
Three Pillars
ClawDesk implements all three observability pillars:
| Pillar | Technology | Crate | Export Format |
|---|---|---|---|
| Tracing | OpenTelemetry + tracing crate | clawdesk-observability | OTLP gRPC/HTTP |
| Metrics | OpenTelemetry Metrics | clawdesk-observability | Prometheus / OTLP |
| Logging | tracing structured events | clawdesk-telemetry | JSON / OTLP Logs |
Tracing
Span Hierarchy
ClawDesk creates a hierarchical span tree for every request:
Span Instrumentation
All major operations are instrumented with tracing spans:
use tracing::{instrument, info_span, Instrument};
// Declarative instrumentation with #[instrument]
#[instrument(
name = "pipeline.process",
skip(self, message),
fields(
session_key = %message.session_key,
channel_id = %message.channel_id,
message_id = %message.id,
)
)]
pub async fn process(
&self,
message: NormalizedMessage,
) -> Result<FinalResponse, PipelineError> {
// Stage 1
let auth = self.auth_resolve
.execute(&message)
.instrument(info_span!("stage.auth_resolve"))
.await?;
// Stage 2
let history = self.history_sanitize
.execute(&message, &auth)
.instrument(info_span!("stage.history_sanitize"))
.await?;
// Stage 3
let context = self.context_guard
.execute(&message, &auth, &history)
.instrument(info_span!(
"stage.context_guard",
total_tokens = tracing::field::Empty,
))
.await?;
// Record computed fields
tracing::Span::current()
.record("total_tokens", context.total_tokens);
// Stages 4-6...
Ok(response)
}
Provider Call Tracing
Every LLM provider call is traced with detailed metadata:
#[instrument(
name = "provider.chat",
skip(self, request),
fields(
provider = %self.name(),
model = %request.model,
input_tokens = tracing::field::Empty,
output_tokens = tracing::field::Empty,
duration_ms = tracing::field::Empty,
tool_calls = tracing::field::Empty,
)
)]
pub async fn chat(
&self,
request: &ProviderRequest,
) -> Result<ProviderResponse, ProviderError> {
let start = Instant::now();
let response = self.inner_chat(request).await?;
let span = tracing::Span::current();
span.record("input_tokens", response.usage.input_tokens);
span.record("output_tokens", response.usage.output_tokens);
span.record("duration_ms", start.elapsed().as_millis() as u64);
span.record("tool_calls", response.tool_calls.as_ref().map_or(0, |tc| tc.len()));
Ok(response)
}
Metrics
Metric Types
ClawDesk collects four types of metrics:
| Type | Description | Example |
|---|---|---|
| Counter | Monotonically increasing count | clawdesk_messages_total |
| Histogram | Distribution of values | clawdesk_pipeline_duration_seconds |
| Gauge | Point-in-time value | clawdesk_active_sessions |
| UpDownCounter | Can increase or decrease | clawdesk_active_ws_connections |
Metric Definitions
// crates/clawdesk-observability/src/metrics.rs
use opentelemetry::metrics::{Counter, Histogram, UpDownCounter, Meter};
pub struct MetricsRegistry {
// Message metrics
pub messages_received: Counter<u64>,
pub messages_processed: Counter<u64>,
pub messages_failed: Counter<u64>,
// Pipeline metrics
pub pipeline_duration: Histogram<f64>,
pub pipeline_stage_duration: Histogram<f64>,
// Provider metrics
pub provider_requests: Counter<u64>,
pub provider_errors: Counter<u64>,
pub provider_latency: Histogram<f64>,
pub provider_tokens_input: Counter<u64>,
pub provider_tokens_output: Counter<u64>,
// Session metrics
pub active_sessions: UpDownCounter<i64>,
pub session_duration: Histogram<f64>,
// Channel metrics
pub channel_messages_in: Counter<u64>,
pub channel_messages_out: Counter<u64>,
pub active_channels: UpDownCounter<i64>,
// Storage metrics
pub storage_operations: Counter<u64>,
pub storage_latency: Histogram<f64>,
pub vector_search_latency: Histogram<f64>,
// Security metrics
pub security_denials: Counter<u64>,
pub content_scan_duration: Histogram<f64>,
pub quarantined_messages: Counter<u64>,
// System metrics
pub active_ws_connections: UpDownCounter<i64>,
pub memory_usage_bytes: Histogram<f64>,
}
impl MetricsRegistry {
pub fn new(meter: &Meter) -> Self {
Self {
messages_received: meter
.u64_counter("clawdesk_messages_received_total")
.with_description("Total messages received across all channels")
.init(),
pipeline_duration: meter
.f64_histogram("clawdesk_pipeline_duration_seconds")
.with_description("Agent pipeline execution duration")
.with_unit("s")
.init(),
provider_latency: meter
.f64_histogram("clawdesk_provider_latency_seconds")
.with_description("LLM provider request latency")
.with_unit("s")
.init(),
// ... remaining metrics
}
}
}
Metric Labels (Attributes)
All metrics use consistent labels for filtering and aggregation:
use opentelemetry::KeyValue;
// Recording a metric with labels
metrics.messages_received.add(1, &[
KeyValue::new("channel_kind", "slack"),
KeyValue::new("channel_id", channel_id.to_string()),
]);
metrics.provider_latency.record(duration.as_secs_f64(), &[
KeyValue::new("provider", "anthropic"),
KeyValue::new("model", "claude-sonnet-4-20250514"),
KeyValue::new("status", "success"),
]);
metrics.pipeline_stage_duration.record(stage_duration.as_secs_f64(), &[
KeyValue::new("stage", "context_guard"),
KeyValue::new("session_key", session_key.to_string()),
]);
Prometheus Endpoint
The gateway exposes a /metrics endpoint in Prometheus exposition format:
// crates/clawdesk-gateway/src/routes.rs
pub async fn metrics_handler(
State(state): State<SharedState>,
) -> impl IntoResponse {
let app = state.load();
let encoder = TextEncoder::new();
let metric_families = app.metrics.prometheus_registry.gather();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
(
[(header::CONTENT_TYPE, "text/plain; charset=utf-8")],
String::from_utf8(buffer).unwrap(),
)
}
Example output:
# HELP clawdesk_messages_received_total Total messages received across all channels
# TYPE clawdesk_messages_received_total counter
clawdesk_messages_received_total{channel_kind="slack"} 15234
clawdesk_messages_received_total{channel_kind="discord"} 8921
clawdesk_messages_received_total{channel_kind="api"} 42156
# HELP clawdesk_pipeline_duration_seconds Agent pipeline execution duration
# TYPE clawdesk_pipeline_duration_seconds histogram
clawdesk_pipeline_duration_seconds_bucket{le="0.01"} 12000
clawdesk_pipeline_duration_seconds_bucket{le="0.05"} 14500
clawdesk_pipeline_duration_seconds_bucket{le="0.1"} 15000
clawdesk_pipeline_duration_seconds_bucket{le="0.5"} 15200
clawdesk_pipeline_duration_seconds_bucket{le="1.0"} 15234
clawdesk_pipeline_duration_seconds_bucket{le="+Inf"} 15234
clawdesk_pipeline_duration_seconds_sum 425.67
clawdesk_pipeline_duration_seconds_count 15234
# HELP clawdesk_active_sessions Current number of active sessions
# TYPE clawdesk_active_sessions gauge
clawdesk_active_sessions 142
Structured Logging
Log Architecture
ClawDesk uses the tracing crate for structured logging, which integrates seamlessly with the distributed tracing system:
// crates/clawdesk-telemetry/src/lib.rs
pub fn init_telemetry(config: &TelemetryConfig) -> Result<TelemetryGuard, TelemetryError> {
// 1. Create OTLP trace exporter
let trace_exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(&config.otlp_endpoint);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(trace_exporter)
.with_trace_config(
trace::config()
.with_sampler(Sampler::TraceIdRatioBased(config.sampling_rate))
.with_resource(Resource::new(vec![
KeyValue::new("service.name", "clawdesk"),
KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
// 2. Create tracing subscriber layers
let otel_layer = tracing_opentelemetry::layer()
.with_tracer(tracer);
let fmt_layer = tracing_subscriber::fmt::layer()
.json()
.with_target(true)
.with_span_list(true)
.with_current_span(true);
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(&config.log_level));
// 3. Compose layers
let subscriber = tracing_subscriber::registry()
.with(filter)
.with(otel_layer)
.with(fmt_layer);
tracing::subscriber::set_global_default(subscriber)?;
Ok(TelemetryGuard { tracer })
}
Structured Event Format
All log events include structured fields:
// Structured events throughout the codebase
// Gateway: request received
tracing::info!(
method = %request.method(),
uri = %request.uri(),
request_id = %request_id,
remote_addr = %remote_addr,
"request received"
);
// Pipeline: stage completed
tracing::info!(
stage = "context_guard",
total_tokens = context.total_tokens,
skills_selected = context.skills.len(),
memory_results = context.memory.results.len(),
duration_ms = stage_duration.as_millis(),
"context assembly completed"
);
// Provider: LLM call
tracing::info!(
provider = provider_name,
model = %request.model,
input_tokens = response.usage.input_tokens,
output_tokens = response.usage.output_tokens,
latency_ms = latency.as_millis(),
tool_calls = tool_call_count,
"provider request completed"
);
// Security: content flagged
tracing::warn!(
scan_stage = "regex",
pattern = hit.pattern,
severity = ?hit.severity,
session_key = %session_key,
"content flagged by security scanner"
);
JSON format output:
{
"timestamp": "2026-02-17T10:30:05.123Z",
"level": "INFO",
"target": "clawdesk_agents::pipeline",
"message": "context assembly completed",
"fields": {
"stage": "context_guard",
"total_tokens": 12450,
"skills_selected": 3,
"memory_results": 7,
"duration_ms": 4
},
"spans": [
{
"name": "http_request",
"request_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
},
{
"name": "pipeline.process",
"session_key": "sess_abc123",
"channel_id": "slack:T01234567:C01234567"
}
]
}
Telemetry Configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TelemetryConfig {
/// Enable/disable telemetry
pub enabled: bool,
/// OTLP collector endpoint
pub otlp_endpoint: String, // default: "http://localhost:4317"
/// Trace sampling rate (0.0 - 1.0)
pub sampling_rate: f64, // default: 1.0
/// Log level filter
pub log_level: String, // default: "info"
/// Enable Prometheus metrics endpoint
pub prometheus_enabled: bool, // default: true
/// Prometheus metrics port (on /metrics path)
pub prometheus_port: u16, // default: 9090
/// Export interval for metrics batching
pub export_interval_seconds: u64, // default: 15
/// Service name for traces
pub service_name: String, // default: "clawdesk"
}
TOML configuration:
[telemetry]
enabled = true
otlp_endpoint = "http://localhost:4317"
sampling_rate = 0.5 # Sample 50% of traces in production
log_level = "info,clawdesk_agents=debug"
prometheus_enabled = true
prometheus_port = 9090
export_interval_seconds = 15
service_name = "clawdesk-production"
OTLP Export Pipeline
Batch Processing
Telemetry data is batched to reduce network overhead:
| Data Type | Batch Size | Flush Interval | Queue Size |
|---|---|---|---|
| Traces | 512 spans | 5 seconds | 2048 |
| Metrics | N/A | 15 seconds | N/A |
| Logs | 512 events | 5 seconds | 2048 |
Health Check Integration
The observability system integrates with health checks:
pub async fn health_check(
State(state): State<SharedState>,
) -> impl IntoResponse {
let app = state.load();
let mut checks = Vec::new();
// Check each subsystem
checks.push(HealthCheck {
name: "storage",
status: app.session_store.health().await,
});
checks.push(HealthCheck {
name: "channels",
status: app.channels.health(),
});
checks.push(HealthCheck {
name: "telemetry",
status: if app.metrics.is_exporting() {
HealthStatus::Healthy
} else {
HealthStatus::Degraded("metrics export paused".into())
},
});
let overall = if checks.iter().all(|c| c.status.is_healthy()) {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
(overall, Json(HealthReport { checks }))
}
Key Dashboards
Recommended monitoring dashboards for production:
| Dashboard | Key Panels | Purpose |
|---|---|---|
| Request Overview | RPS, latency percentiles, error rate | Operational health |
| Pipeline Performance | Per-stage latency, token counts, failover rate | Pipeline tuning |
| Provider Usage | Requests per provider, token consumption, error types | Cost monitoring |
| Channel Activity | Messages per channel, active sessions, health | Channel health |
| Security Events | Denials, quarantines, scan types, audit chain | Security posture |
| Resource Usage | Memory, connections, goroutines, storage I/O | Capacity planning |
Summary
| Component | Technology | Export |
|---|---|---|
| Distributed tracing | OpenTelemetry + tracing | OTLP gRPC |
| Metrics | OpenTelemetry Metrics | Prometheus / OTLP |
| Structured logging | tracing JSON subscriber | stdout / OTLP Logs |
| Health checks | Custom /health endpoint | HTTP JSON |
| Span context | W3C Trace Context | HTTP headers |
The observability stack is designed to be:
- Low overhead — Batch exports, sampling, bounded queues
- Consistent — Same
tracingcrate across all 27 crates - Flexible — Any OTLP-compatible backend (Jaeger, Zipkin, Datadog, etc.)
- Production-ready — Graceful degradation if the collector is unavailable