1use super::AppState;
6use axum::{
7 Json,
8 extract::State,
9 http::{HeaderMap, StatusCode, header},
10 response::{
11 IntoResponse,
12 sse::{Event, KeepAlive, Sse},
13 },
14};
15use std::collections::VecDeque;
16use std::convert::Infallible;
17use std::sync::{Arc, Mutex};
18use tokio_stream::StreamExt;
19use tokio_stream::wrappers::BroadcastStream;
20
21pub struct EventBuffer {
23 inner: Mutex<VecDeque<serde_json::Value>>,
24 capacity: usize,
25}
26
27impl EventBuffer {
28 pub fn new(capacity: usize) -> Self {
29 Self {
30 inner: Mutex::new(VecDeque::with_capacity(capacity)),
31 capacity,
32 }
33 }
34
35 pub fn push(&self, event: serde_json::Value) {
37 let mut buf = self.inner.lock().unwrap();
38 if buf.len() == self.capacity {
39 buf.pop_front();
40 }
41 buf.push_back(event);
42 }
43
44 pub fn snapshot(&self) -> Vec<serde_json::Value> {
46 self.inner.lock().unwrap().iter().cloned().collect()
47 }
48}
49
50pub async fn handle_sse_events(
52 State(state): State<AppState>,
53 headers: HeaderMap,
54) -> impl IntoResponse {
55 if state.pairing.require_pairing() {
57 let token = headers
58 .get(header::AUTHORIZATION)
59 .and_then(|v| v.to_str().ok())
60 .and_then(|auth| auth.strip_prefix("Bearer "))
61 .unwrap_or("");
62
63 if !state.pairing.is_authenticated(token) {
64 return (
65 StatusCode::UNAUTHORIZED,
66 "Unauthorized — provide Authorization: Bearer <token>",
67 )
68 .into_response();
69 }
70 }
71
72 let rx = state.event_tx.subscribe();
73 let stream = BroadcastStream::new(rx).filter_map(
74 |result: Result<
75 serde_json::Value,
76 tokio_stream::wrappers::errors::BroadcastStreamRecvError,
77 >| {
78 match result {
79 Ok(value) if is_public_sse_event(&value) => Some(Ok::<_, Infallible>(
80 Event::default().data(value.to_string()),
81 )),
82 Ok(_) => None,
83 Err(_) => None, }
85 },
86 );
87
88 Sse::new(stream)
89 .keep_alive(KeepAlive::default())
90 .into_response()
91}
92
93pub async fn handle_events_history(
95 State(state): State<AppState>,
96 headers: HeaderMap,
97) -> impl IntoResponse {
98 if let Err(e) = super::api::require_auth(&state, &headers) {
99 return e.into_response();
100 }
101 let events: Vec<_> = state
102 .event_buffer
103 .snapshot()
104 .into_iter()
105 .filter(is_public_sse_event)
106 .collect();
107 Json(serde_json::json!({ "events": events })).into_response()
108}
109
110fn is_public_sse_event(event: &serde_json::Value) -> bool {
115 event
116 .get("session_id")
117 .and_then(serde_json::Value::as_str)
118 .is_none()
119}
120
121pub(crate) struct BroadcastObserver {
132 tx: tokio::sync::broadcast::Sender<serde_json::Value>,
133 buffer: Arc<EventBuffer>,
134}
135
136impl BroadcastObserver {
137 pub(crate) fn new(
138 tx: tokio::sync::broadcast::Sender<serde_json::Value>,
139 buffer: Arc<EventBuffer>,
140 ) -> Self {
141 Self { tx, buffer }
142 }
143}
144
145impl zeroclaw_runtime::observability::Observer for BroadcastObserver {
146 fn record_event(&self, event: &zeroclaw_runtime::observability::ObserverEvent) {
147 let json = match event {
152 zeroclaw_runtime::observability::ObserverEvent::LlmRequest {
153 model_provider,
154 model,
155 ..
156 } => serde_json::json!({
157 "type": "llm_request",
158 "model_provider": model_provider,
159 "model": model,
160 "timestamp": chrono::Utc::now().to_rfc3339(),
161 }),
162 zeroclaw_runtime::observability::ObserverEvent::ToolCall {
163 tool,
164 duration,
165 success,
166 ..
167 } => serde_json::json!({
168 "type": "tool_call",
169 "tool": tool,
170 "duration_ms": duration.as_millis(),
171 "success": success,
172 "timestamp": chrono::Utc::now().to_rfc3339(),
173 }),
174 zeroclaw_runtime::observability::ObserverEvent::ToolCallStart { tool, .. } => {
175 serde_json::json!({
176 "type": "tool_call_start",
177 "tool": tool,
178 "timestamp": chrono::Utc::now().to_rfc3339(),
179 })
180 }
181 zeroclaw_runtime::observability::ObserverEvent::Error { component, message } => {
182 serde_json::json!({
183 "type": "error",
184 "component": component,
185 "message": message,
186 "timestamp": chrono::Utc::now().to_rfc3339(),
187 })
188 }
189 zeroclaw_runtime::observability::ObserverEvent::AgentStart {
190 model_provider,
191 model,
192 } => {
193 serde_json::json!({
194 "type": "agent_start",
195 "model_provider": model_provider,
196 "model": model,
197 "timestamp": chrono::Utc::now().to_rfc3339(),
198 })
199 }
200 zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
201 model_provider,
202 model,
203 duration,
204 tokens_used,
205 cost_usd,
206 } => serde_json::json!({
207 "type": "agent_end",
208 "model_provider": model_provider,
209 "model": model,
210 "duration_ms": duration.as_millis(),
211 "tokens_used": tokens_used,
212 "cost_usd": cost_usd,
213 "timestamp": chrono::Utc::now().to_rfc3339(),
214 }),
215 _ => return, };
217
218 self.buffer.push(json.clone());
219 let _ = self.tx.send(json);
220 }
221
222 fn record_metric(&self, _metric: &zeroclaw_runtime::observability::traits::ObserverMetric) {
223 }
225
226 fn name(&self) -> &str {
227 "broadcast"
228 }
229
230 fn as_any(&self) -> &dyn std::any::Any {
231 self
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238 use zeroclaw_runtime::observability::{Observer, ObserverEvent};
239
240 fn make_broadcast() -> (
241 Arc<BroadcastObserver>,
242 tokio::sync::broadcast::Receiver<serde_json::Value>,
243 Arc<EventBuffer>,
244 ) {
245 let (tx, rx) = tokio::sync::broadcast::channel(16);
246 let buffer = Arc::new(EventBuffer::new(16));
247 let obs = Arc::new(BroadcastObserver::new(tx, buffer.clone()));
248 (obs, rx, buffer)
249 }
250
251 #[test]
252 fn tool_call_event_is_broadcast_and_buffered() {
253 let (obs, mut rx, buffer) = make_broadcast();
254
255 obs.record_event(&ObserverEvent::ToolCall {
256 tool: "shell".into(),
257 tool_call_id: None,
258 duration: std::time::Duration::from_millis(42),
259 success: true,
260 arguments: None,
261 result: None,
262 });
263
264 let value = rx.try_recv().expect("event should be broadcast");
265 assert_eq!(value["type"], "tool_call");
266 assert_eq!(value["tool"], "shell");
267 assert_eq!(value["success"], true);
268
269 let snap = buffer.snapshot();
270 assert_eq!(snap.len(), 1);
271 assert_eq!(snap[0]["type"], "tool_call");
272 }
273
274 #[test]
275 fn tool_call_start_event_is_broadcast() {
276 let (obs, mut rx, _buffer) = make_broadcast();
277
278 obs.record_event(&ObserverEvent::ToolCallStart {
279 tool: "mcp_filesystem__read_file".into(),
280 tool_call_id: None,
281 arguments: None,
282 });
283
284 let value = rx.try_recv().expect("event should be broadcast");
285 assert_eq!(value["type"], "tool_call_start");
286 assert_eq!(value["tool"], "mcp_filesystem__read_file");
287 }
288
289 #[test]
290 fn unmapped_events_are_skipped() {
291 let (obs, mut rx, buffer) = make_broadcast();
292
293 obs.record_event(&ObserverEvent::HeartbeatTick);
294
295 assert!(rx.try_recv().is_err(), "heartbeat should not broadcast");
296 assert!(buffer.snapshot().is_empty());
297 }
298
299 #[test]
300 fn session_scoped_events_are_not_public_sse_events() {
301 let session_event = serde_json::json!({
302 "type": "message",
303 "session_id": "operator-1",
304 "content": "private session notification"
305 });
306 let global_event = serde_json::json!({
307 "type": "tool_call",
308 "tool": "shell"
309 });
310
311 assert!(!is_public_sse_event(&session_event));
312 assert!(is_public_sse_event(&global_event));
313 }
314
315 #[test]
323 fn factory_observer_events_reach_broadcast_hook() {
324 static HOOK_TEST_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
327 let _guard = HOOK_TEST_LOCK.lock();
328
329 zeroclaw_runtime::observability::clear_broadcast_hook();
330
331 let (tx, mut rx) = tokio::sync::broadcast::channel(16);
332 let buffer = Arc::new(EventBuffer::new(16));
333 let bo: Arc<dyn Observer> = Arc::new(BroadcastObserver::new(tx, buffer.clone()));
334 zeroclaw_runtime::observability::set_broadcast_hook(bo);
335
336 let cfg = zeroclaw_config::schema::ObservabilityConfig {
338 backend: "noop".into(),
339 ..Default::default()
340 };
341 let observer = zeroclaw_runtime::observability::create_observer(&cfg);
342
343 observer.record_event(&ObserverEvent::ToolCall {
344 tool: "shell".into(),
345 tool_call_id: None,
346 duration: std::time::Duration::from_millis(7),
347 success: true,
348 arguments: None,
349 result: None,
350 });
351
352 let value = rx
353 .try_recv()
354 .expect("factory-built observer event must reach the SSE broadcast channel");
355 assert_eq!(value["type"], "tool_call");
356 assert_eq!(value["tool"], "shell");
357 assert_eq!(value["success"], true);
358
359 let snap = buffer.snapshot();
360 assert_eq!(
361 snap.len(),
362 1,
363 "broadcast events must also land in the buffer"
364 );
365
366 zeroclaw_runtime::observability::clear_broadcast_hook();
367 }
368}