zeroclaw_log/
observer_bridge.rs1use std::sync::{Arc, OnceLock};
17use std::time::Duration;
18
19use parking_lot::RwLock;
20use zeroclaw_api::observability_traits::{Observer, ObserverEvent};
21
22use crate::event::LogEvent;
23
24static OBSERVER: OnceLock<RwLock<Option<Arc<dyn Observer>>>> = OnceLock::new();
25
26fn slot() -> &'static RwLock<Option<Arc<dyn Observer>>> {
27 OBSERVER.get_or_init(|| RwLock::new(None))
28}
29
30pub fn set_observer_bridge(observer: Arc<dyn Observer>) {
33 *slot().write() = Some(observer);
34}
35
36pub fn clear_observer_bridge() {
38 *slot().write() = None;
39}
40
41pub(crate) fn forward(event: &LogEvent) {
46 let Some(observer) = slot().read().clone() else {
47 return;
48 };
49 if let Some(obs_event) = project(event) {
50 observer.record_event(&obs_event);
51 }
52}
53
54fn project(event: &LogEvent) -> Option<ObserverEvent> {
55 use crate::event::type_field;
56 let action = event.event.action.as_str();
57 let attribution = &event.zeroclaw;
58 let model_provider = attribution
59 .get(&type_field("model_provider"))
60 .or_else(|| attribution.get("model_provider"))
61 .unwrap_or_default()
62 .to_string();
63 let model = attribution.get("model").unwrap_or_default().to_string();
64 let tool = attribution.get("tool").unwrap_or_default().to_string();
65 let channel = attribution.get("channel").unwrap_or_default().to_string();
66 let duration = attribution
67 .duration_ms
68 .map(Duration::from_millis)
69 .unwrap_or_default();
70 let success = matches!(event.event.outcome.as_str(), "success");
71
72 match action {
73 "agent_start" => Some(ObserverEvent::AgentStart {
74 model_provider,
75 model,
76 }),
77 "agent_end" => Some(ObserverEvent::AgentEnd {
78 model_provider,
79 model,
80 duration,
81 tokens_used: event
82 .attributes
83 .get("tokens_used")
84 .and_then(serde_json::Value::as_u64),
85 cost_usd: event
86 .attributes
87 .get("cost_usd")
88 .and_then(serde_json::Value::as_f64),
89 }),
90 "llm_request" => Some(ObserverEvent::LlmRequest {
91 model_provider,
92 model,
93 messages_count: event
94 .attributes
95 .get("messages_count")
96 .and_then(serde_json::Value::as_u64)
97 .unwrap_or_default() as usize,
98 }),
99 "llm_response" => Some(ObserverEvent::LlmResponse {
100 model_provider,
101 model,
102 duration,
103 success,
104 error_message: event
105 .attributes
106 .get("error")
107 .and_then(serde_json::Value::as_str)
108 .map(str::to_string),
109 input_tokens: event
110 .attributes
111 .get("input_tokens")
112 .and_then(serde_json::Value::as_u64),
113 output_tokens: event
114 .attributes
115 .get("output_tokens")
116 .and_then(serde_json::Value::as_u64),
117 }),
118 "tool_call_start" => Some(ObserverEvent::ToolCallStart {
119 tool,
120 tool_call_id: None,
121 arguments: None,
122 }),
123 "tool_call" | "tool_call_result" => Some(ObserverEvent::ToolCall {
124 tool,
125 tool_call_id: None,
126 duration,
127 success,
128 arguments: None,
129 result: None,
130 }),
131 "channel_message_inbound" => Some(ObserverEvent::ChannelMessage {
132 channel,
133 direction: "inbound".to_string(),
134 }),
135 "channel_send" => Some(ObserverEvent::ChannelMessage {
136 channel,
137 direction: "outbound".to_string(),
138 }),
139 "turn_complete" => Some(ObserverEvent::TurnComplete),
140 "heartbeat_tick" => Some(ObserverEvent::HeartbeatTick),
141 "error" => Some(ObserverEvent::Error {
142 component: attribution
143 .get(&type_field("channel"))
144 .unwrap_or("system")
145 .to_string(),
146 message: event.message.clone().unwrap_or_default(),
147 }),
148 _ => None,
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use crate::event::{EventCategory, EventOutcome, Severity};
156 use std::any::Any;
157 use std::sync::Mutex;
158 use zeroclaw_api::observability_traits::ObserverMetric;
159
160 #[derive(Default)]
161 struct CapturingObserver {
162 events: Mutex<Vec<ObserverEvent>>,
163 }
164
165 impl Observer for CapturingObserver {
166 fn record_event(&self, event: &ObserverEvent) {
167 self.events.lock().unwrap().push(event.clone());
168 }
169 fn record_metric(&self, _metric: &ObserverMetric) {}
170 fn name(&self) -> &str {
171 "capturing"
172 }
173 fn as_any(&self) -> &dyn Any {
174 self
175 }
176 }
177
178 static BRIDGE_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
179
180 #[test]
181 fn projects_llm_request() {
182 let _guard = BRIDGE_LOCK.lock();
183 clear_observer_bridge();
184 let observer = Arc::new(CapturingObserver::default());
185 set_observer_bridge(observer.clone());
186
187 let mut event = LogEvent::new(Severity::Info, "llm_request", EventCategory::Agent);
188 event
189 .zeroclaw
190 .set_composite("model_provider", "anthropic.clamps");
191 event.zeroclaw.set("model", "claude-sonnet-4-6");
192 event.attributes = serde_json::json!({ "messages_count": 4 });
193
194 forward(&event);
195
196 let projected = observer.events.lock().unwrap();
197 assert_eq!(projected.len(), 1);
198 match &projected[0] {
199 ObserverEvent::LlmRequest {
200 model_provider,
201 model,
202 messages_count,
203 } => {
204 assert_eq!(model_provider, "anthropic");
205 assert_eq!(model, "claude-sonnet-4-6");
206 assert_eq!(*messages_count, 4);
207 }
208 other => panic!("expected LlmRequest, got {other:?}"),
209 }
210
211 clear_observer_bridge();
212 }
213
214 #[test]
215 fn projects_tool_call_success() {
216 let _guard = BRIDGE_LOCK.lock();
217 clear_observer_bridge();
218 let observer = Arc::new(CapturingObserver::default());
219 set_observer_bridge(observer.clone());
220
221 let mut event = LogEvent::new(Severity::Info, "tool_call", EventCategory::Tool);
222 event.zeroclaw.set("tool", "shell");
223 event.zeroclaw.duration_ms = Some(120);
224 event.set_outcome(EventOutcome::Success);
225
226 forward(&event);
227
228 let projected = observer.events.lock().unwrap();
229 assert_eq!(projected.len(), 1);
230 match &projected[0] {
231 ObserverEvent::ToolCall {
232 tool,
233 duration,
234 success,
235 ..
236 } => {
237 assert_eq!(tool, "shell");
238 assert_eq!(*duration, Duration::from_millis(120));
239 assert!(*success);
240 }
241 other => panic!("expected ToolCall, got {other:?}"),
242 }
243
244 clear_observer_bridge();
245 }
246
247 #[test]
248 fn unknown_action_is_noop() {
249 let _guard = BRIDGE_LOCK.lock();
250 clear_observer_bridge();
251 let observer = Arc::new(CapturingObserver::default());
252 set_observer_bridge(observer.clone());
253
254 let event = LogEvent::new(Severity::Info, "totally_made_up", EventCategory::System);
255 forward(&event);
256
257 assert!(observer.events.lock().unwrap().is_empty());
258 clear_observer_bridge();
259 }
260}