Skip to main content

zeroclaw_log/
observer_bridge.rs

1//! Observer bridge — projects [`crate::LogEvent`]s onto the typed
2//! [`zeroclaw_api::observability_traits::ObserverEvent`] variants when a
3//! bound observer is installed.
4//!
5//! Lets metrics backends (Prometheus, OTel) consume the same single
6//! emission stream as the JSONL log and the SSE broadcast. The
7//! projection is bounded: only the actions that map to a known variant
8//! get forwarded, and only the metric-relevant subset of fields
9//! crosses the boundary (the high-cardinality content like message body
10//! and attributes does not).
11//!
12//! Install via [`set_observer_bridge`]; bridge is invoked once per event
13//! by `writer::record_event`. Missing observer = no-op; unmapped action
14//! = no-op.
15
16use std::sync::{Arc, OnceLock};
17use std::time::Duration;
18
19use parking_lot::RwLock;
20use zeroclaw_api::observability_traits::{Observer, ObserverEvent};
21
22use crate::event::LogEvent;
23
24static OBSERVER: OnceLock<RwLock<Option<Arc<dyn Observer>>>> = OnceLock::new();
25
26fn slot() -> &'static RwLock<Option<Arc<dyn Observer>>> {
27    OBSERVER.get_or_init(|| RwLock::new(None))
28}
29
30/// Install the bound Observer that the bridge forwards events to.
31/// Calling again replaces the previous binding.
32pub fn set_observer_bridge(observer: Arc<dyn Observer>) {
33    *slot().write() = Some(observer);
34}
35
36/// Remove the Observer binding (tests, orderly shutdown).
37pub fn clear_observer_bridge() {
38    *slot().write() = None;
39}
40
41/// Project a [`LogEvent`] onto an [`ObserverEvent`] variant when the
42/// action is one the typed surface understands, and forward to the
43/// bound observer. No-op when no observer is bound or the action does
44/// not map.
45pub(crate) fn forward(event: &LogEvent) {
46    let Some(observer) = slot().read().clone() else {
47        return;
48    };
49    if let Some(obs_event) = project(event) {
50        observer.record_event(&obs_event);
51    }
52}
53
54fn project(event: &LogEvent) -> Option<ObserverEvent> {
55    use crate::event::type_field;
56    let action = event.event.action.as_str();
57    let attribution = &event.zeroclaw;
58    let model_provider = attribution
59        .get(&type_field("model_provider"))
60        .or_else(|| attribution.get("model_provider"))
61        .unwrap_or_default()
62        .to_string();
63    let model = attribution.get("model").unwrap_or_default().to_string();
64    let tool = attribution.get("tool").unwrap_or_default().to_string();
65    let channel = attribution.get("channel").unwrap_or_default().to_string();
66    let duration = attribution
67        .duration_ms
68        .map(Duration::from_millis)
69        .unwrap_or_default();
70    let success = matches!(event.event.outcome.as_str(), "success");
71
72    match action {
73        "agent_start" => Some(ObserverEvent::AgentStart {
74            model_provider,
75            model,
76        }),
77        "agent_end" => Some(ObserverEvent::AgentEnd {
78            model_provider,
79            model,
80            duration,
81            tokens_used: event
82                .attributes
83                .get("tokens_used")
84                .and_then(serde_json::Value::as_u64),
85            cost_usd: event
86                .attributes
87                .get("cost_usd")
88                .and_then(serde_json::Value::as_f64),
89        }),
90        "llm_request" => Some(ObserverEvent::LlmRequest {
91            model_provider,
92            model,
93            messages_count: event
94                .attributes
95                .get("messages_count")
96                .and_then(serde_json::Value::as_u64)
97                .unwrap_or_default() as usize,
98        }),
99        "llm_response" => Some(ObserverEvent::LlmResponse {
100            model_provider,
101            model,
102            duration,
103            success,
104            error_message: event
105                .attributes
106                .get("error")
107                .and_then(serde_json::Value::as_str)
108                .map(str::to_string),
109            input_tokens: event
110                .attributes
111                .get("input_tokens")
112                .and_then(serde_json::Value::as_u64),
113            output_tokens: event
114                .attributes
115                .get("output_tokens")
116                .and_then(serde_json::Value::as_u64),
117        }),
118        "tool_call_start" => Some(ObserverEvent::ToolCallStart {
119            tool,
120            tool_call_id: None,
121            arguments: None,
122        }),
123        "tool_call" | "tool_call_result" => Some(ObserverEvent::ToolCall {
124            tool,
125            tool_call_id: None,
126            duration,
127            success,
128            arguments: None,
129            result: None,
130        }),
131        "channel_message_inbound" => Some(ObserverEvent::ChannelMessage {
132            channel,
133            direction: "inbound".to_string(),
134        }),
135        "channel_send" => Some(ObserverEvent::ChannelMessage {
136            channel,
137            direction: "outbound".to_string(),
138        }),
139        "turn_complete" => Some(ObserverEvent::TurnComplete),
140        "heartbeat_tick" => Some(ObserverEvent::HeartbeatTick),
141        "error" => Some(ObserverEvent::Error {
142            component: attribution
143                .get(&type_field("channel"))
144                .unwrap_or("system")
145                .to_string(),
146            message: event.message.clone().unwrap_or_default(),
147        }),
148        _ => None,
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use crate::event::{EventCategory, EventOutcome, Severity};
156    use std::any::Any;
157    use std::sync::Mutex;
158    use zeroclaw_api::observability_traits::ObserverMetric;
159
160    #[derive(Default)]
161    struct CapturingObserver {
162        events: Mutex<Vec<ObserverEvent>>,
163    }
164
165    impl Observer for CapturingObserver {
166        fn record_event(&self, event: &ObserverEvent) {
167            self.events.lock().unwrap().push(event.clone());
168        }
169        fn record_metric(&self, _metric: &ObserverMetric) {}
170        fn name(&self) -> &str {
171            "capturing"
172        }
173        fn as_any(&self) -> &dyn Any {
174            self
175        }
176    }
177
178    static BRIDGE_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
179
180    #[test]
181    fn projects_llm_request() {
182        let _guard = BRIDGE_LOCK.lock();
183        clear_observer_bridge();
184        let observer = Arc::new(CapturingObserver::default());
185        set_observer_bridge(observer.clone());
186
187        let mut event = LogEvent::new(Severity::Info, "llm_request", EventCategory::Agent);
188        event
189            .zeroclaw
190            .set_composite("model_provider", "anthropic.clamps");
191        event.zeroclaw.set("model", "claude-sonnet-4-6");
192        event.attributes = serde_json::json!({ "messages_count": 4 });
193
194        forward(&event);
195
196        let projected = observer.events.lock().unwrap();
197        assert_eq!(projected.len(), 1);
198        match &projected[0] {
199            ObserverEvent::LlmRequest {
200                model_provider,
201                model,
202                messages_count,
203            } => {
204                assert_eq!(model_provider, "anthropic");
205                assert_eq!(model, "claude-sonnet-4-6");
206                assert_eq!(*messages_count, 4);
207            }
208            other => panic!("expected LlmRequest, got {other:?}"),
209        }
210
211        clear_observer_bridge();
212    }
213
214    #[test]
215    fn projects_tool_call_success() {
216        let _guard = BRIDGE_LOCK.lock();
217        clear_observer_bridge();
218        let observer = Arc::new(CapturingObserver::default());
219        set_observer_bridge(observer.clone());
220
221        let mut event = LogEvent::new(Severity::Info, "tool_call", EventCategory::Tool);
222        event.zeroclaw.set("tool", "shell");
223        event.zeroclaw.duration_ms = Some(120);
224        event.set_outcome(EventOutcome::Success);
225
226        forward(&event);
227
228        let projected = observer.events.lock().unwrap();
229        assert_eq!(projected.len(), 1);
230        match &projected[0] {
231            ObserverEvent::ToolCall {
232                tool,
233                duration,
234                success,
235                ..
236            } => {
237                assert_eq!(tool, "shell");
238                assert_eq!(*duration, Duration::from_millis(120));
239                assert!(*success);
240            }
241            other => panic!("expected ToolCall, got {other:?}"),
242        }
243
244        clear_observer_bridge();
245    }
246
247    #[test]
248    fn unknown_action_is_noop() {
249        let _guard = BRIDGE_LOCK.lock();
250        clear_observer_bridge();
251        let observer = Arc::new(CapturingObserver::default());
252        set_observer_bridge(observer.clone());
253
254        let event = LogEvent::new(Severity::Info, "totally_made_up", EventCategory::System);
255        forward(&event);
256
257        assert!(observer.events.lock().unwrap().is_empty());
258        clear_observer_bridge();
259    }
260}