1use super::AppState;
6use axum::{
7 Json,
8 extract::State,
9 http::{HeaderMap, StatusCode, header},
10 response::{
11 IntoResponse,
12 sse::{Event, KeepAlive, Sse},
13 },
14};
15use std::collections::VecDeque;
16use std::convert::Infallible;
17use std::sync::{Arc, Mutex};
18use tokio_stream::StreamExt;
19use tokio_stream::wrappers::BroadcastStream;
20
21pub struct EventBuffer {
23 inner: Mutex<VecDeque<serde_json::Value>>,
24 capacity: usize,
25}
26
27impl EventBuffer {
28 pub fn new(capacity: usize) -> Self {
29 Self {
30 inner: Mutex::new(VecDeque::with_capacity(capacity)),
31 capacity,
32 }
33 }
34
35 pub fn push(&self, event: serde_json::Value) {
37 let mut buf = self.inner.lock().unwrap();
38 if buf.len() == self.capacity {
39 buf.pop_front();
40 }
41 buf.push_back(event);
42 }
43
44 pub fn snapshot(&self) -> Vec<serde_json::Value> {
46 self.inner.lock().unwrap().iter().cloned().collect()
47 }
48}
49
50pub async fn handle_sse_events(
52 State(state): State<AppState>,
53 headers: HeaderMap,
54) -> impl IntoResponse {
55 if state.pairing.require_pairing() {
57 let token = headers
58 .get(header::AUTHORIZATION)
59 .and_then(|v| v.to_str().ok())
60 .and_then(|auth| auth.strip_prefix("Bearer "))
61 .unwrap_or("");
62
63 if !state.pairing.is_authenticated(token) {
64 return (
65 StatusCode::UNAUTHORIZED,
66 "Unauthorized — provide Authorization: Bearer <token>",
67 )
68 .into_response();
69 }
70 }
71
72 let rx = state.event_tx.subscribe();
73 let stream = BroadcastStream::new(rx).filter_map(
74 |result: Result<
75 serde_json::Value,
76 tokio_stream::wrappers::errors::BroadcastStreamRecvError,
77 >| {
78 match result {
79 Ok(value) if is_public_sse_event(&value) => Some(Ok::<_, Infallible>(
80 Event::default().data(value.to_string()),
81 )),
82 Ok(_) => None,
83 Err(_) => None, }
85 },
86 );
87
88 Sse::new(stream)
89 .keep_alive(KeepAlive::default())
90 .into_response()
91}
92
93pub async fn handle_events_history(
95 State(state): State<AppState>,
96 headers: HeaderMap,
97) -> impl IntoResponse {
98 if let Err(e) = super::api::require_auth(&state, &headers) {
99 return e.into_response();
100 }
101 let events: Vec<_> = state
102 .event_buffer
103 .snapshot()
104 .into_iter()
105 .filter(is_public_sse_event)
106 .collect();
107 Json(serde_json::json!({ "events": events })).into_response()
108}
109
110fn is_public_sse_event(event: &serde_json::Value) -> bool {
118 if event.get("source").and_then(serde_json::Value::as_str) == Some("observability") {
119 return true;
120 }
121 event
122 .get("session_id")
123 .and_then(serde_json::Value::as_str)
124 .is_none()
125}
126
127pub(crate) struct BroadcastObserver {
138 tx: tokio::sync::broadcast::Sender<serde_json::Value>,
139 buffer: Arc<EventBuffer>,
140}
141
142impl BroadcastObserver {
143 pub(crate) fn new(
144 tx: tokio::sync::broadcast::Sender<serde_json::Value>,
145 buffer: Arc<EventBuffer>,
146 ) -> Self {
147 Self { tx, buffer }
148 }
149}
150
151impl zeroclaw_runtime::observability::Observer for BroadcastObserver {
152 fn record_event(&self, event: &zeroclaw_runtime::observability::ObserverEvent) {
153 let json = match event {
158 zeroclaw_runtime::observability::ObserverEvent::LlmRequest {
159 model_provider,
160 model,
161 ..
162 } => serde_json::json!({
163 "type": "llm_request",
164 "source": "observability",
165 "model_provider": model_provider,
166 "model": model,
167 "timestamp": chrono::Utc::now().to_rfc3339(),
168 }),
169 zeroclaw_runtime::observability::ObserverEvent::ToolCall {
170 tool,
171 duration,
172 success,
173 ..
174 } => serde_json::json!({
175 "type": "tool_call",
176 "source": "observability",
177 "tool": tool,
178 "duration_ms": duration.as_millis(),
179 "success": success,
180 "timestamp": chrono::Utc::now().to_rfc3339(),
181 }),
182 zeroclaw_runtime::observability::ObserverEvent::ToolCallStart { tool, .. } => {
183 serde_json::json!({
184 "type": "tool_call_start",
185 "source": "observability",
186 "tool": tool,
187 "timestamp": chrono::Utc::now().to_rfc3339(),
188 })
189 }
190 zeroclaw_runtime::observability::ObserverEvent::Error { component, message } => {
191 serde_json::json!({
192 "type": "error",
193 "source": "observability",
194 "component": component,
195 "message": message,
196 "timestamp": chrono::Utc::now().to_rfc3339(),
197 })
198 }
199 zeroclaw_runtime::observability::ObserverEvent::AgentStart {
200 model_provider,
201 model,
202 } => {
203 serde_json::json!({
204 "type": "agent_start",
205 "source": "observability",
206 "model_provider": model_provider,
207 "model": model,
208 "timestamp": chrono::Utc::now().to_rfc3339(),
209 })
210 }
211 zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
212 model_provider,
213 model,
214 duration,
215 tokens_used,
216 cost_usd,
217 } => serde_json::json!({
218 "type": "agent_end",
219 "source": "observability",
220 "model_provider": model_provider,
221 "model": model,
222 "duration_ms": duration.as_millis(),
223 "tokens_used": tokens_used,
224 "cost_usd": cost_usd,
225 "timestamp": chrono::Utc::now().to_rfc3339(),
226 }),
227 _ => return, };
229
230 self.buffer.push(json.clone());
231 let _ = self.tx.send(json);
232 }
233
234 fn record_metric(&self, _metric: &zeroclaw_runtime::observability::traits::ObserverMetric) {
235 }
237
238 fn name(&self) -> &str {
239 "broadcast"
240 }
241
242 fn as_any(&self) -> &dyn std::any::Any {
243 self
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use zeroclaw_runtime::observability::{Observer, ObserverEvent};
251
252 fn make_broadcast() -> (
253 Arc<BroadcastObserver>,
254 tokio::sync::broadcast::Receiver<serde_json::Value>,
255 Arc<EventBuffer>,
256 ) {
257 let (tx, rx) = tokio::sync::broadcast::channel(16);
258 let buffer = Arc::new(EventBuffer::new(16));
259 let obs = Arc::new(BroadcastObserver::new(tx, buffer.clone()));
260 (obs, rx, buffer)
261 }
262
263 #[test]
264 fn tool_call_event_is_broadcast_and_buffered() {
265 let (obs, mut rx, buffer) = make_broadcast();
266
267 obs.record_event(&ObserverEvent::ToolCall {
268 tool: "shell".into(),
269 tool_call_id: None,
270 duration: std::time::Duration::from_millis(42),
271 success: true,
272 arguments: None,
273 result: None,
274 });
275
276 let value = rx.try_recv().expect("event should be broadcast");
277 assert_eq!(value["type"], "tool_call");
278 assert_eq!(value["tool"], "shell");
279 assert_eq!(value["success"], true);
280
281 let snap = buffer.snapshot();
282 assert_eq!(snap.len(), 1);
283 assert_eq!(snap[0]["type"], "tool_call");
284 }
285
286 #[test]
287 fn tool_call_start_event_is_broadcast() {
288 let (obs, mut rx, _buffer) = make_broadcast();
289
290 obs.record_event(&ObserverEvent::ToolCallStart {
291 tool: "mcp_filesystem__read_file".into(),
292 tool_call_id: None,
293 arguments: None,
294 });
295
296 let value = rx.try_recv().expect("event should be broadcast");
297 assert_eq!(value["type"], "tool_call_start");
298 assert_eq!(value["tool"], "mcp_filesystem__read_file");
299 }
300
301 #[test]
302 fn unmapped_events_are_skipped() {
303 let (obs, mut rx, buffer) = make_broadcast();
304
305 obs.record_event(&ObserverEvent::HeartbeatTick);
306
307 assert!(rx.try_recv().is_err(), "heartbeat should not broadcast");
308 assert!(buffer.snapshot().is_empty());
309 }
310
311 #[test]
312 fn session_scoped_events_are_not_public_sse_events() {
313 let session_event = serde_json::json!({
314 "type": "message",
315 "session_id": "operator-1",
316 "content": "private session notification"
317 });
318 let global_event = serde_json::json!({
319 "type": "tool_call",
320 "tool": "shell"
321 });
322
323 assert!(!is_public_sse_event(&session_event));
324 assert!(is_public_sse_event(&global_event));
325 }
326
327 #[test]
328 fn observability_tagged_events_are_public_even_without_session_id() {
329 let obs = serde_json::json!({
332 "type": "tool_call",
333 "source": "observability",
334 "tool": "shell",
335 });
336 assert!(is_public_sse_event(&obs));
337 }
338
339 #[test]
340 fn broadcast_observer_tags_every_event_with_observability_source() {
341 let (obs, mut rx, _buffer) = make_broadcast();
345
346 let cases: Vec<ObserverEvent> = vec![
347 ObserverEvent::LlmRequest {
348 model_provider: "p".into(),
349 model: "m".into(),
350 messages_count: 0,
351 },
352 ObserverEvent::ToolCall {
353 tool: "shell".into(),
354 tool_call_id: None,
355 duration: std::time::Duration::from_millis(1),
356 success: true,
357 arguments: None,
358 result: None,
359 },
360 ObserverEvent::ToolCallStart {
361 tool: "shell".into(),
362 tool_call_id: None,
363 arguments: None,
364 },
365 ObserverEvent::Error {
366 component: "any".into(),
367 message: "boom".into(),
368 },
369 ObserverEvent::AgentStart {
370 model_provider: "p".into(),
371 model: "m".into(),
372 },
373 ObserverEvent::AgentEnd {
374 model_provider: "p".into(),
375 model: "m".into(),
376 duration: std::time::Duration::from_millis(1),
377 tokens_used: None,
378 cost_usd: None,
379 },
380 ];
381 for ev in cases {
382 obs.record_event(&ev);
383 let v = rx.try_recv().expect("event must broadcast");
384 assert_eq!(
385 v["source"], "observability",
386 "every BroadcastObserver event must be tagged source=observability: {v}"
387 );
388 }
389 }
390
391 #[test]
399 fn factory_observer_events_reach_broadcast_hook() {
400 static HOOK_TEST_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
403 let _guard = HOOK_TEST_LOCK.lock();
404
405 zeroclaw_runtime::observability::clear_broadcast_hook();
406
407 let (tx, mut rx) = tokio::sync::broadcast::channel(16);
408 let buffer = Arc::new(EventBuffer::new(16));
409 let bo: Arc<dyn Observer> = Arc::new(BroadcastObserver::new(tx, buffer.clone()));
410 zeroclaw_runtime::observability::set_broadcast_hook(bo);
411
412 let cfg = zeroclaw_config::schema::ObservabilityConfig {
414 backend: "noop".into(),
415 ..Default::default()
416 };
417 let observer = zeroclaw_runtime::observability::create_observer(&cfg);
418
419 observer.record_event(&ObserverEvent::ToolCall {
420 tool: "shell".into(),
421 tool_call_id: None,
422 duration: std::time::Duration::from_millis(7),
423 success: true,
424 arguments: None,
425 result: None,
426 });
427
428 let value = rx
429 .try_recv()
430 .expect("factory-built observer event must reach the SSE broadcast channel");
431 assert_eq!(value["type"], "tool_call");
432 assert_eq!(value["tool"], "shell");
433 assert_eq!(value["success"], true);
434
435 let snap = buffer.snapshot();
436 assert_eq!(
437 snap.len(),
438 1,
439 "broadcast events must also land in the buffer"
440 );
441
442 zeroclaw_runtime::observability::clear_broadcast_hook();
443 }
444}