1use std::sync::{Arc, OnceLock};
17use std::time::Duration;
18
19use parking_lot::RwLock;
20use zeroclaw_api::observability_traits::{Observer, ObserverEvent, TurnTokenUsage};
21
22use crate::event::LogEvent;
23
24static OBSERVER: OnceLock<RwLock<Option<Arc<dyn Observer>>>> = OnceLock::new();
25
26fn slot() -> &'static RwLock<Option<Arc<dyn Observer>>> {
27 OBSERVER.get_or_init(|| RwLock::new(None))
28}
29
30pub fn set_observer_bridge(observer: Arc<dyn Observer>) {
33 *slot().write() = Some(observer);
34}
35
36pub fn clear_observer_bridge() {
38 *slot().write() = None;
39}
40
41pub(crate) fn forward(event: &LogEvent) {
46 let Some(observer) = slot().read().clone() else {
47 return;
48 };
49 if let Some(obs_event) = project(event) {
50 observer.record_event(&obs_event);
51 }
52}
53
54fn project(event: &LogEvent) -> Option<ObserverEvent> {
55 use crate::event::type_field;
56 let action = event.event.action.as_str();
57 let attribution = &event.zeroclaw;
58 let model_provider = attribution
59 .get(&type_field("model_provider"))
60 .or_else(|| attribution.get("model_provider"))
61 .unwrap_or_default()
62 .to_string();
63 let model = attribution.get("model").unwrap_or_default().to_string();
64 let tool = attribution.get("tool").unwrap_or_default().to_string();
65 let channel = attribution.get("channel").unwrap_or_default().to_string();
66 let duration = attribution
67 .duration_ms
68 .map(Duration::from_millis)
69 .unwrap_or_default();
70 let success = matches!(event.event.outcome.as_str(), "success");
71
72 let agent_alias = attribution
73 .get("agent_alias")
74 .or_else(|| {
75 event
76 .attributes
77 .get("agent_alias")
78 .and_then(serde_json::Value::as_str)
79 })
80 .unwrap_or_default()
81 .to_string();
82 let turn_id = event
83 .attributes
84 .get("turn_id")
85 .or_else(|| event.attributes.get("trace_id"))
86 .and_then(serde_json::Value::as_str)
87 .unwrap_or_default()
88 .to_string();
89
90 let channel_opt = if channel.is_empty() {
91 None
92 } else {
93 Some(channel.clone())
94 };
95 let agent_alias_opt = if agent_alias.is_empty() {
96 None
97 } else {
98 Some(agent_alias)
99 };
100 let turn_id_opt = if turn_id.is_empty() {
101 None
102 } else {
103 Some(turn_id)
104 };
105
106 match action {
107 "agent_start" => Some(ObserverEvent::AgentStart {
108 model_provider,
109 model,
110 channel: channel_opt,
111 agent_alias: agent_alias_opt,
112 turn_id: turn_id_opt,
113 }),
114 "agent_end" => Some(ObserverEvent::AgentEnd {
115 model_provider,
116 model,
117 duration,
118 tokens_used: {
119 let input = event
120 .attributes
121 .get("input_tokens")
122 .and_then(serde_json::Value::as_u64);
123 let output = event
124 .attributes
125 .get("output_tokens")
126 .and_then(serde_json::Value::as_u64);
127 match (input, output) {
128 (Some(input_tokens), Some(output_tokens)) => Some(TurnTokenUsage {
129 input_tokens,
130 output_tokens,
131 }),
132 _ => None,
133 }
134 },
135 cost_usd: event
136 .attributes
137 .get("cost_usd")
138 .and_then(serde_json::Value::as_f64),
139 channel: channel_opt,
140 agent_alias: agent_alias_opt,
141 turn_id: turn_id_opt,
142 }),
143 "llm_request" => Some(ObserverEvent::LlmRequest {
144 model_provider,
145 model,
146 messages_count: event
147 .attributes
148 .get("messages_count")
149 .and_then(serde_json::Value::as_u64)
150 .unwrap_or_default() as usize,
151 channel: channel_opt,
152 agent_alias: agent_alias_opt,
153 turn_id: turn_id_opt,
154 }),
155 "llm_response" => Some(ObserverEvent::LlmResponse {
156 model_provider,
157 model,
158 duration,
159 success,
160 error_message: event
161 .attributes
162 .get("error")
163 .and_then(serde_json::Value::as_str)
164 .map(str::to_string),
165 input_tokens: event
166 .attributes
167 .get("input_tokens")
168 .and_then(serde_json::Value::as_u64),
169 output_tokens: event
170 .attributes
171 .get("output_tokens")
172 .and_then(serde_json::Value::as_u64),
173 channel: channel_opt,
174 agent_alias: agent_alias_opt,
175 turn_id: turn_id_opt,
176 }),
177 "tool_call_start" => Some(ObserverEvent::ToolCallStart {
178 tool,
179 tool_call_id: None,
180 arguments: None,
181 channel: channel_opt,
182 agent_alias: agent_alias_opt,
183 turn_id: turn_id_opt,
184 }),
185 "tool_call" | "tool_call_result" => Some(ObserverEvent::ToolCall {
186 tool,
187 tool_call_id: None,
188 duration,
189 success,
190 arguments: None,
191 result: None,
192 channel: channel_opt,
193 agent_alias: agent_alias_opt,
194 turn_id: turn_id_opt,
195 }),
196 "channel_message_inbound" => Some(ObserverEvent::ChannelMessage {
197 channel,
198 direction: "inbound".to_string(),
199 }),
200 "channel_send" => Some(ObserverEvent::ChannelMessage {
201 channel,
202 direction: "outbound".to_string(),
203 }),
204 "turn_complete" => Some(ObserverEvent::TurnComplete),
205 "heartbeat_tick" => Some(ObserverEvent::HeartbeatTick),
206 "error" => Some(ObserverEvent::Error {
207 component: attribution
208 .get(&type_field("channel"))
209 .unwrap_or("system")
210 .to_string(),
211 message: event.message.clone().unwrap_or_default(),
212 }),
213 _ => None,
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use crate::event::{EventCategory, EventOutcome, Severity};
221 use std::any::Any;
222 use std::sync::Mutex;
223 use zeroclaw_api::observability_traits::ObserverMetric;
224
225 #[derive(Default)]
226 struct CapturingObserver {
227 events: Mutex<Vec<ObserverEvent>>,
228 }
229
230 impl Observer for CapturingObserver {
231 fn record_event(&self, event: &ObserverEvent) {
232 self.events.lock().unwrap().push(event.clone());
233 }
234 fn record_metric(&self, _metric: &ObserverMetric) {}
235 fn name(&self) -> &str {
236 "capturing"
237 }
238 fn as_any(&self) -> &dyn Any {
239 self
240 }
241 }
242
243 static BRIDGE_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
244
245 #[test]
246 fn projects_llm_request() {
247 let _guard = BRIDGE_LOCK.lock();
248 clear_observer_bridge();
249 let observer = Arc::new(CapturingObserver::default());
250 set_observer_bridge(observer.clone());
251
252 let mut event = LogEvent::new(Severity::Info, "llm_request", EventCategory::Agent);
253 event
254 .zeroclaw
255 .set_composite("model_provider", "anthropic.clamps");
256 event.zeroclaw.set("model", "claude-sonnet-4-6");
257 event.attributes = serde_json::json!({ "messages_count": 4 });
258
259 forward(&event);
260
261 let projected = observer.events.lock().unwrap();
262 assert_eq!(projected.len(), 1);
263 match &projected[0] {
264 ObserverEvent::LlmRequest {
265 model_provider,
266 model,
267 messages_count,
268 ..
269 } => {
270 assert_eq!(model_provider, "anthropic");
271 assert_eq!(model, "claude-sonnet-4-6");
272 assert_eq!(*messages_count, 4);
273 }
274 other => panic!("expected LlmRequest, got {other:?}"),
275 }
276
277 clear_observer_bridge();
278 }
279
280 #[test]
281 fn projects_tool_call_success() {
282 let _guard = BRIDGE_LOCK.lock();
283 clear_observer_bridge();
284 let observer = Arc::new(CapturingObserver::default());
285 set_observer_bridge(observer.clone());
286
287 let mut event = LogEvent::new(Severity::Info, "tool_call", EventCategory::Tool);
288 event.zeroclaw.set("tool", "shell");
289 event.zeroclaw.duration_ms = Some(120);
290 event.set_outcome(EventOutcome::Success);
291
292 forward(&event);
293
294 let projected = observer.events.lock().unwrap();
295 assert_eq!(projected.len(), 1);
296 match &projected[0] {
297 ObserverEvent::ToolCall {
298 tool,
299 duration,
300 success,
301 ..
302 } => {
303 assert_eq!(tool, "shell");
304 assert_eq!(*duration, Duration::from_millis(120));
305 assert!(*success);
306 }
307 other => panic!("expected ToolCall, got {other:?}"),
308 }
309
310 clear_observer_bridge();
311 }
312
313 #[test]
314 fn unknown_action_is_noop() {
315 let _guard = BRIDGE_LOCK.lock();
316 clear_observer_bridge();
317 let observer = Arc::new(CapturingObserver::default());
318 set_observer_bridge(observer.clone());
319
320 let event = LogEvent::new(Severity::Info, "totally_made_up", EventCategory::System);
321 forward(&event);
322
323 assert!(observer.events.lock().unwrap().is_empty());
324 clear_observer_bridge();
325 }
326
327 #[test]
328 fn projects_llm_request_with_turn_metadata() {
329 let _guard = BRIDGE_LOCK.lock();
330 clear_observer_bridge();
331 let observer = Arc::new(CapturingObserver::default());
332 set_observer_bridge(observer.clone());
333
334 let mut event = LogEvent::new(Severity::Info, "llm_request", EventCategory::Agent);
335 event
336 .zeroclaw
337 .set_composite("model_provider", "anthropic.default");
338 event.zeroclaw.set("model", "claude-sonnet-4-6");
339 event.zeroclaw.set("agent_alias", "default");
340 event.zeroclaw.set_composite("channel", "wss.default");
341 event.attributes = serde_json::json!({
342 "messages_count": 2,
343 "turn_id": "turn-1"
344 });
345
346 forward(&event);
347
348 let projected = observer.events.lock().unwrap();
349 assert_eq!(projected.len(), 1);
350 match &projected[0] {
351 ObserverEvent::LlmRequest {
352 model_provider,
353 model,
354 messages_count,
355 channel,
356 agent_alias,
357 turn_id,
358 } => {
359 assert_eq!(model_provider, "anthropic");
360 assert_eq!(model, "claude-sonnet-4-6");
361 assert_eq!(*messages_count, 2);
362 assert_eq!(channel.as_deref(), Some("wss.default"));
363 assert_eq!(agent_alias.as_deref(), Some("default"));
364 assert_eq!(turn_id.as_deref(), Some("turn-1"));
365 }
366 other => panic!("expected LlmRequest, got {other:?}"),
367 }
368
369 clear_observer_bridge();
370 }
371
372 #[test]
373 fn projects_agent_end_structured_usage() {
374 let _guard = BRIDGE_LOCK.lock();
375 clear_observer_bridge();
376 let observer = Arc::new(CapturingObserver::default());
377 set_observer_bridge(observer.clone());
378
379 let mut event = LogEvent::new(Severity::Info, "agent_end", EventCategory::Agent);
380 event
381 .zeroclaw
382 .set_composite("model_provider", "anthropic.default");
383 event.zeroclaw.set("model", "claude-sonnet-4-6");
384 event.attributes = serde_json::json!({
385 "input_tokens": 12,
386 "output_tokens": 34,
387 "turn_id": "turn-1"
388 });
389
390 forward(&event);
391
392 let projected = observer.events.lock().unwrap();
393 assert_eq!(projected.len(), 1);
394 match &projected[0] {
395 ObserverEvent::AgentEnd {
396 tokens_used,
397 turn_id,
398 ..
399 } => {
400 let usage = tokens_used.as_ref().expect("usage should project");
401 assert_eq!(usage.input_tokens, 12);
402 assert_eq!(usage.output_tokens, 34);
403 assert_eq!(turn_id.as_deref(), Some("turn-1"));
404 }
405 other => panic!("expected AgentEnd, got {other:?}"),
406 }
407
408 clear_observer_bridge();
409 }
410}