Skip to main content

zeroclaw_log/
observer_bridge.rs

1//! Observer bridge — projects [`crate::LogEvent`]s onto the typed
2//! [`zeroclaw_api::observability_traits::ObserverEvent`] variants when a
3//! bound observer is installed.
4//!
5//! Lets metrics backends (Prometheus, OTel) consume the same single
6//! emission stream as the JSONL log and the SSE broadcast. The
7//! projection is bounded: only the actions that map to a known variant
8//! get forwarded, and only the metric-relevant subset of fields
9//! crosses the boundary (the high-cardinality content like message body
10//! and attributes does not).
11//!
12//! Install via [`set_observer_bridge`]; bridge is invoked once per event
13//! by `writer::record_event`. Missing observer = no-op; unmapped action
14//! = no-op.
15
16use std::sync::{Arc, OnceLock};
17use std::time::Duration;
18
19use parking_lot::RwLock;
20use zeroclaw_api::observability_traits::{Observer, ObserverEvent, TurnTokenUsage};
21
22use crate::event::LogEvent;
23
24static OBSERVER: OnceLock<RwLock<Option<Arc<dyn Observer>>>> = OnceLock::new();
25
26fn slot() -> &'static RwLock<Option<Arc<dyn Observer>>> {
27    OBSERVER.get_or_init(|| RwLock::new(None))
28}
29
30/// Install the bound Observer that the bridge forwards events to.
31/// Calling again replaces the previous binding.
32pub fn set_observer_bridge(observer: Arc<dyn Observer>) {
33    *slot().write() = Some(observer);
34}
35
36/// Remove the Observer binding (tests, orderly shutdown).
37pub fn clear_observer_bridge() {
38    *slot().write() = None;
39}
40
41/// Project a [`LogEvent`] onto an [`ObserverEvent`] variant when the
42/// action is one the typed surface understands, and forward to the
43/// bound observer. No-op when no observer is bound or the action does
44/// not map.
45pub(crate) fn forward(event: &LogEvent) {
46    let Some(observer) = slot().read().clone() else {
47        return;
48    };
49    if let Some(obs_event) = project(event) {
50        observer.record_event(&obs_event);
51    }
52}
53
54fn project(event: &LogEvent) -> Option<ObserverEvent> {
55    use crate::event::type_field;
56    let action = event.event.action.as_str();
57    let attribution = &event.zeroclaw;
58    let model_provider = attribution
59        .get(&type_field("model_provider"))
60        .or_else(|| attribution.get("model_provider"))
61        .unwrap_or_default()
62        .to_string();
63    let model = attribution.get("model").unwrap_or_default().to_string();
64    let tool = attribution.get("tool").unwrap_or_default().to_string();
65    let channel = attribution.get("channel").unwrap_or_default().to_string();
66    let duration = attribution
67        .duration_ms
68        .map(Duration::from_millis)
69        .unwrap_or_default();
70    let success = matches!(event.event.outcome.as_str(), "success");
71
72    let agent_alias = attribution
73        .get("agent_alias")
74        .or_else(|| {
75            event
76                .attributes
77                .get("agent_alias")
78                .and_then(serde_json::Value::as_str)
79        })
80        .unwrap_or_default()
81        .to_string();
82    let turn_id = event
83        .attributes
84        .get("turn_id")
85        .or_else(|| event.attributes.get("trace_id"))
86        .and_then(serde_json::Value::as_str)
87        .unwrap_or_default()
88        .to_string();
89
90    let channel_opt = if channel.is_empty() {
91        None
92    } else {
93        Some(channel.clone())
94    };
95    let agent_alias_opt = if agent_alias.is_empty() {
96        None
97    } else {
98        Some(agent_alias)
99    };
100    let turn_id_opt = if turn_id.is_empty() {
101        None
102    } else {
103        Some(turn_id)
104    };
105
106    match action {
107        "agent_start" => Some(ObserverEvent::AgentStart {
108            model_provider,
109            model,
110            channel: channel_opt,
111            agent_alias: agent_alias_opt,
112            turn_id: turn_id_opt,
113        }),
114        "agent_end" => Some(ObserverEvent::AgentEnd {
115            model_provider,
116            model,
117            duration,
118            tokens_used: {
119                let input = event
120                    .attributes
121                    .get("input_tokens")
122                    .and_then(serde_json::Value::as_u64);
123                let output = event
124                    .attributes
125                    .get("output_tokens")
126                    .and_then(serde_json::Value::as_u64);
127                match (input, output) {
128                    (Some(input_tokens), Some(output_tokens)) => Some(TurnTokenUsage {
129                        input_tokens,
130                        output_tokens,
131                    }),
132                    _ => None,
133                }
134            },
135            cost_usd: event
136                .attributes
137                .get("cost_usd")
138                .and_then(serde_json::Value::as_f64),
139            channel: channel_opt,
140            agent_alias: agent_alias_opt,
141            turn_id: turn_id_opt,
142        }),
143        "llm_request" => Some(ObserverEvent::LlmRequest {
144            model_provider,
145            model,
146            messages_count: event
147                .attributes
148                .get("messages_count")
149                .and_then(serde_json::Value::as_u64)
150                .unwrap_or_default() as usize,
151            channel: channel_opt,
152            agent_alias: agent_alias_opt,
153            turn_id: turn_id_opt,
154        }),
155        "llm_response" => Some(ObserverEvent::LlmResponse {
156            model_provider,
157            model,
158            duration,
159            success,
160            error_message: event
161                .attributes
162                .get("error")
163                .and_then(serde_json::Value::as_str)
164                .map(str::to_string),
165            input_tokens: event
166                .attributes
167                .get("input_tokens")
168                .and_then(serde_json::Value::as_u64),
169            output_tokens: event
170                .attributes
171                .get("output_tokens")
172                .and_then(serde_json::Value::as_u64),
173            channel: channel_opt,
174            agent_alias: agent_alias_opt,
175            turn_id: turn_id_opt,
176        }),
177        "tool_call_start" => Some(ObserverEvent::ToolCallStart {
178            tool,
179            tool_call_id: None,
180            arguments: None,
181            channel: channel_opt,
182            agent_alias: agent_alias_opt,
183            turn_id: turn_id_opt,
184        }),
185        "tool_call" | "tool_call_result" => Some(ObserverEvent::ToolCall {
186            tool,
187            tool_call_id: None,
188            duration,
189            success,
190            arguments: None,
191            result: None,
192            channel: channel_opt,
193            agent_alias: agent_alias_opt,
194            turn_id: turn_id_opt,
195        }),
196        "channel_message_inbound" => Some(ObserverEvent::ChannelMessage {
197            channel,
198            direction: "inbound".to_string(),
199        }),
200        "channel_send" => Some(ObserverEvent::ChannelMessage {
201            channel,
202            direction: "outbound".to_string(),
203        }),
204        "turn_complete" => Some(ObserverEvent::TurnComplete),
205        "heartbeat_tick" => Some(ObserverEvent::HeartbeatTick),
206        "error" => Some(ObserverEvent::Error {
207            component: attribution
208                .get(&type_field("channel"))
209                .unwrap_or("system")
210                .to_string(),
211            message: event.message.clone().unwrap_or_default(),
212        }),
213        _ => None,
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use crate::event::{EventCategory, EventOutcome, Severity};
221    use std::any::Any;
222    use std::sync::Mutex;
223    use zeroclaw_api::observability_traits::ObserverMetric;
224
225    #[derive(Default)]
226    struct CapturingObserver {
227        events: Mutex<Vec<ObserverEvent>>,
228    }
229
230    impl Observer for CapturingObserver {
231        fn record_event(&self, event: &ObserverEvent) {
232            self.events.lock().unwrap().push(event.clone());
233        }
234        fn record_metric(&self, _metric: &ObserverMetric) {}
235        fn name(&self) -> &str {
236            "capturing"
237        }
238        fn as_any(&self) -> &dyn Any {
239            self
240        }
241    }
242
243    static BRIDGE_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
244
245    #[test]
246    fn projects_llm_request() {
247        let _guard = BRIDGE_LOCK.lock();
248        clear_observer_bridge();
249        let observer = Arc::new(CapturingObserver::default());
250        set_observer_bridge(observer.clone());
251
252        let mut event = LogEvent::new(Severity::Info, "llm_request", EventCategory::Agent);
253        event
254            .zeroclaw
255            .set_composite("model_provider", "anthropic.clamps");
256        event.zeroclaw.set("model", "claude-sonnet-4-6");
257        event.attributes = serde_json::json!({ "messages_count": 4 });
258
259        forward(&event);
260
261        let projected = observer.events.lock().unwrap();
262        assert_eq!(projected.len(), 1);
263        match &projected[0] {
264            ObserverEvent::LlmRequest {
265                model_provider,
266                model,
267                messages_count,
268                ..
269            } => {
270                assert_eq!(model_provider, "anthropic");
271                assert_eq!(model, "claude-sonnet-4-6");
272                assert_eq!(*messages_count, 4);
273            }
274            other => panic!("expected LlmRequest, got {other:?}"),
275        }
276
277        clear_observer_bridge();
278    }
279
280    #[test]
281    fn projects_tool_call_success() {
282        let _guard = BRIDGE_LOCK.lock();
283        clear_observer_bridge();
284        let observer = Arc::new(CapturingObserver::default());
285        set_observer_bridge(observer.clone());
286
287        let mut event = LogEvent::new(Severity::Info, "tool_call", EventCategory::Tool);
288        event.zeroclaw.set("tool", "shell");
289        event.zeroclaw.duration_ms = Some(120);
290        event.set_outcome(EventOutcome::Success);
291
292        forward(&event);
293
294        let projected = observer.events.lock().unwrap();
295        assert_eq!(projected.len(), 1);
296        match &projected[0] {
297            ObserverEvent::ToolCall {
298                tool,
299                duration,
300                success,
301                ..
302            } => {
303                assert_eq!(tool, "shell");
304                assert_eq!(*duration, Duration::from_millis(120));
305                assert!(*success);
306            }
307            other => panic!("expected ToolCall, got {other:?}"),
308        }
309
310        clear_observer_bridge();
311    }
312
313    #[test]
314    fn unknown_action_is_noop() {
315        let _guard = BRIDGE_LOCK.lock();
316        clear_observer_bridge();
317        let observer = Arc::new(CapturingObserver::default());
318        set_observer_bridge(observer.clone());
319
320        let event = LogEvent::new(Severity::Info, "totally_made_up", EventCategory::System);
321        forward(&event);
322
323        assert!(observer.events.lock().unwrap().is_empty());
324        clear_observer_bridge();
325    }
326
327    #[test]
328    fn projects_llm_request_with_turn_metadata() {
329        let _guard = BRIDGE_LOCK.lock();
330        clear_observer_bridge();
331        let observer = Arc::new(CapturingObserver::default());
332        set_observer_bridge(observer.clone());
333
334        let mut event = LogEvent::new(Severity::Info, "llm_request", EventCategory::Agent);
335        event
336            .zeroclaw
337            .set_composite("model_provider", "anthropic.default");
338        event.zeroclaw.set("model", "claude-sonnet-4-6");
339        event.zeroclaw.set("agent_alias", "default");
340        event.zeroclaw.set_composite("channel", "wss.default");
341        event.attributes = serde_json::json!({
342            "messages_count": 2,
343            "turn_id": "turn-1"
344        });
345
346        forward(&event);
347
348        let projected = observer.events.lock().unwrap();
349        assert_eq!(projected.len(), 1);
350        match &projected[0] {
351            ObserverEvent::LlmRequest {
352                model_provider,
353                model,
354                messages_count,
355                channel,
356                agent_alias,
357                turn_id,
358            } => {
359                assert_eq!(model_provider, "anthropic");
360                assert_eq!(model, "claude-sonnet-4-6");
361                assert_eq!(*messages_count, 2);
362                assert_eq!(channel.as_deref(), Some("wss.default"));
363                assert_eq!(agent_alias.as_deref(), Some("default"));
364                assert_eq!(turn_id.as_deref(), Some("turn-1"));
365            }
366            other => panic!("expected LlmRequest, got {other:?}"),
367        }
368
369        clear_observer_bridge();
370    }
371
372    #[test]
373    fn projects_agent_end_structured_usage() {
374        let _guard = BRIDGE_LOCK.lock();
375        clear_observer_bridge();
376        let observer = Arc::new(CapturingObserver::default());
377        set_observer_bridge(observer.clone());
378
379        let mut event = LogEvent::new(Severity::Info, "agent_end", EventCategory::Agent);
380        event
381            .zeroclaw
382            .set_composite("model_provider", "anthropic.default");
383        event.zeroclaw.set("model", "claude-sonnet-4-6");
384        event.attributes = serde_json::json!({
385            "input_tokens": 12,
386            "output_tokens": 34,
387            "turn_id": "turn-1"
388        });
389
390        forward(&event);
391
392        let projected = observer.events.lock().unwrap();
393        assert_eq!(projected.len(), 1);
394        match &projected[0] {
395            ObserverEvent::AgentEnd {
396                tokens_used,
397                turn_id,
398                ..
399            } => {
400                let usage = tokens_used.as_ref().expect("usage should project");
401                assert_eq!(usage.input_tokens, 12);
402                assert_eq!(usage.output_tokens, 34);
403                assert_eq!(turn_id.as_deref(), Some("turn-1"));
404            }
405            other => panic!("expected AgentEnd, got {other:?}"),
406        }
407
408        clear_observer_bridge();
409    }
410}