Skip to main content

zeroclaw_gateway/
sse.rs

1//! Server-Sent Events (SSE) stream for real-time event delivery.
2//!
3//! Wraps the broadcast channel in AppState to deliver events to web dashboard clients.
4
5use super::AppState;
6use axum::{
7    Json,
8    extract::State,
9    http::{HeaderMap, StatusCode, header},
10    response::{
11        IntoResponse,
12        sse::{Event, KeepAlive, Sse},
13    },
14};
15use std::collections::VecDeque;
16use std::convert::Infallible;
17use std::sync::{Arc, Mutex};
18use tokio_stream::StreamExt;
19use tokio_stream::wrappers::BroadcastStream;
20
21/// Thread-safe ring buffer that retains recent events for history replay.
22pub struct EventBuffer {
23    inner: Mutex<VecDeque<serde_json::Value>>,
24    capacity: usize,
25}
26
27impl EventBuffer {
28    pub fn new(capacity: usize) -> Self {
29        Self {
30            inner: Mutex::new(VecDeque::with_capacity(capacity)),
31            capacity,
32        }
33    }
34
35    /// Push an event into the buffer, evicting the oldest if at capacity.
36    pub fn push(&self, event: serde_json::Value) {
37        let mut buf = self.inner.lock().unwrap();
38        if buf.len() == self.capacity {
39            buf.pop_front();
40        }
41        buf.push_back(event);
42    }
43
44    /// Return a snapshot of all buffered events (oldest first).
45    pub fn snapshot(&self) -> Vec<serde_json::Value> {
46        self.inner.lock().unwrap().iter().cloned().collect()
47    }
48}
49
50/// GET /api/events — SSE event stream
51pub async fn handle_sse_events(
52    State(state): State<AppState>,
53    headers: HeaderMap,
54) -> impl IntoResponse {
55    // Auth check
56    if state.pairing.require_pairing() {
57        let token = headers
58            .get(header::AUTHORIZATION)
59            .and_then(|v| v.to_str().ok())
60            .and_then(|auth| auth.strip_prefix("Bearer "))
61            .unwrap_or("");
62
63        if !state.pairing.is_authenticated(token) {
64            return (
65                StatusCode::UNAUTHORIZED,
66                "Unauthorized — provide Authorization: Bearer <token>",
67            )
68                .into_response();
69        }
70    }
71
72    let rx = state.event_tx.subscribe();
73    let stream = BroadcastStream::new(rx).filter_map(
74        |result: Result<
75            serde_json::Value,
76            tokio_stream::wrappers::errors::BroadcastStreamRecvError,
77        >| {
78            match result {
79                Ok(value) if is_public_sse_event(&value) => Some(Ok::<_, Infallible>(
80                    Event::default().data(value.to_string()),
81                )),
82                Ok(_) => None,
83                Err(_) => None, // Skip lagged messages
84            }
85        },
86    );
87
88    Sse::new(stream)
89        .keep_alive(KeepAlive::default())
90        .into_response()
91}
92
93/// GET /api/events/history — return buffered recent events as JSON.
94pub async fn handle_events_history(
95    State(state): State<AppState>,
96    headers: HeaderMap,
97) -> impl IntoResponse {
98    if let Err(e) = super::api::require_auth(&state, &headers) {
99        return e.into_response();
100    }
101    let events: Vec<_> = state
102        .event_buffer
103        .snapshot()
104        .into_iter()
105        .filter(is_public_sse_event)
106        .collect();
107    Json(serde_json::json!({ "events": events })).into_response()
108}
109
110/// Returns true for events that should be visible on the global SSE stream.
111///
112/// Contract: broadcast events must not include `session_id` unless they are
113/// intentionally scoped to that session and hidden from global `/api/events`.
114/// Observability telemetry (events tagged `source: "observability"`) is
115/// explicitly public — it is global monitoring data intended for the
116/// dashboard SSE stream even though it never carries a chat `session_id`.
117fn is_public_sse_event(event: &serde_json::Value) -> bool {
118    if event.get("source").and_then(serde_json::Value::as_str) == Some("observability") {
119        return true;
120    }
121    event
122        .get("session_id")
123        .and_then(serde_json::Value::as_str)
124        .is_none()
125}
126
127/// Broadcast observer that fans events out to SSE subscribers.
128///
129/// Installed as the process-wide broadcast hook by [`crate::run_gateway`] so
130/// that events recorded by *any* observer built through
131/// `observability::create_observer` — including the per-call observer the
132/// agent loop creates inside `process_message` — also reach `/api/events`
133/// clients.
134///
135/// Crate-private: the constructor signature is intentionally not part of any
136/// stable surface, since it is wired directly into `run_gateway`.
137pub(crate) struct BroadcastObserver {
138    tx: tokio::sync::broadcast::Sender<serde_json::Value>,
139    buffer: Arc<EventBuffer>,
140}
141
142impl BroadcastObserver {
143    pub(crate) fn new(
144        tx: tokio::sync::broadcast::Sender<serde_json::Value>,
145        buffer: Arc<EventBuffer>,
146    ) -> Self {
147        Self { tx, buffer }
148    }
149}
150
151impl zeroclaw_runtime::observability::Observer for BroadcastObserver {
152    fn record_event(&self, event: &zeroclaw_runtime::observability::ObserverEvent) {
153        // Recording into the primary observer (logs / Prometheus) is the
154        // responsibility of whoever built the event source; `TeeObserver`
155        // takes care of that fan-out. Here we only translate to JSON and
156        // ship to SSE subscribers.
157        let json = match event {
158            zeroclaw_runtime::observability::ObserverEvent::LlmRequest {
159                model_provider,
160                model,
161                ..
162            } => serde_json::json!({
163                "type": "llm_request",
164                "source": "observability",
165                "model_provider": model_provider,
166                "model": model,
167                "timestamp": chrono::Utc::now().to_rfc3339(),
168            }),
169            zeroclaw_runtime::observability::ObserverEvent::ToolCall {
170                tool,
171                duration,
172                success,
173                ..
174            } => serde_json::json!({
175                "type": "tool_call",
176                "source": "observability",
177                "tool": tool,
178                "duration_ms": duration.as_millis(),
179                "success": success,
180                "timestamp": chrono::Utc::now().to_rfc3339(),
181            }),
182            zeroclaw_runtime::observability::ObserverEvent::ToolCallStart { tool, .. } => {
183                serde_json::json!({
184                    "type": "tool_call_start",
185                    "source": "observability",
186                    "tool": tool,
187                    "timestamp": chrono::Utc::now().to_rfc3339(),
188                })
189            }
190            zeroclaw_runtime::observability::ObserverEvent::Error { component, message } => {
191                serde_json::json!({
192                    "type": "error",
193                    "source": "observability",
194                    "component": component,
195                    "message": message,
196                    "timestamp": chrono::Utc::now().to_rfc3339(),
197                })
198            }
199            zeroclaw_runtime::observability::ObserverEvent::AgentStart {
200                model_provider,
201                model,
202            } => {
203                serde_json::json!({
204                    "type": "agent_start",
205                    "source": "observability",
206                    "model_provider": model_provider,
207                    "model": model,
208                    "timestamp": chrono::Utc::now().to_rfc3339(),
209                })
210            }
211            zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
212                model_provider,
213                model,
214                duration,
215                tokens_used,
216                cost_usd,
217            } => serde_json::json!({
218                "type": "agent_end",
219                "source": "observability",
220                "model_provider": model_provider,
221                "model": model,
222                "duration_ms": duration.as_millis(),
223                "tokens_used": tokens_used,
224                "cost_usd": cost_usd,
225                "timestamp": chrono::Utc::now().to_rfc3339(),
226            }),
227            _ => return, // Skip events we don't broadcast
228        };
229
230        self.buffer.push(json.clone());
231        let _ = self.tx.send(json);
232    }
233
234    fn record_metric(&self, _metric: &zeroclaw_runtime::observability::traits::ObserverMetric) {
235        // Metrics are not broadcast over SSE; the primary observer records them.
236    }
237
238    fn name(&self) -> &str {
239        "broadcast"
240    }
241
242    fn as_any(&self) -> &dyn std::any::Any {
243        self
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use zeroclaw_runtime::observability::{Observer, ObserverEvent};
251
252    fn make_broadcast() -> (
253        Arc<BroadcastObserver>,
254        tokio::sync::broadcast::Receiver<serde_json::Value>,
255        Arc<EventBuffer>,
256    ) {
257        let (tx, rx) = tokio::sync::broadcast::channel(16);
258        let buffer = Arc::new(EventBuffer::new(16));
259        let obs = Arc::new(BroadcastObserver::new(tx, buffer.clone()));
260        (obs, rx, buffer)
261    }
262
263    #[test]
264    fn tool_call_event_is_broadcast_and_buffered() {
265        let (obs, mut rx, buffer) = make_broadcast();
266
267        obs.record_event(&ObserverEvent::ToolCall {
268            tool: "shell".into(),
269            tool_call_id: None,
270            duration: std::time::Duration::from_millis(42),
271            success: true,
272            arguments: None,
273            result: None,
274        });
275
276        let value = rx.try_recv().expect("event should be broadcast");
277        assert_eq!(value["type"], "tool_call");
278        assert_eq!(value["tool"], "shell");
279        assert_eq!(value["success"], true);
280
281        let snap = buffer.snapshot();
282        assert_eq!(snap.len(), 1);
283        assert_eq!(snap[0]["type"], "tool_call");
284    }
285
286    #[test]
287    fn tool_call_start_event_is_broadcast() {
288        let (obs, mut rx, _buffer) = make_broadcast();
289
290        obs.record_event(&ObserverEvent::ToolCallStart {
291            tool: "mcp_filesystem__read_file".into(),
292            tool_call_id: None,
293            arguments: None,
294        });
295
296        let value = rx.try_recv().expect("event should be broadcast");
297        assert_eq!(value["type"], "tool_call_start");
298        assert_eq!(value["tool"], "mcp_filesystem__read_file");
299    }
300
301    #[test]
302    fn unmapped_events_are_skipped() {
303        let (obs, mut rx, buffer) = make_broadcast();
304
305        obs.record_event(&ObserverEvent::HeartbeatTick);
306
307        assert!(rx.try_recv().is_err(), "heartbeat should not broadcast");
308        assert!(buffer.snapshot().is_empty());
309    }
310
311    #[test]
312    fn session_scoped_events_are_not_public_sse_events() {
313        let session_event = serde_json::json!({
314            "type": "message",
315            "session_id": "operator-1",
316            "content": "private session notification"
317        });
318        let global_event = serde_json::json!({
319            "type": "tool_call",
320            "tool": "shell"
321        });
322
323        assert!(!is_public_sse_event(&session_event));
324        assert!(is_public_sse_event(&global_event));
325    }
326
327    #[test]
328    fn observability_tagged_events_are_public_even_without_session_id() {
329        // After #7151, observability frames keep the SSE pathway open even
330        // though they would not otherwise carry a session_id discriminator.
331        let obs = serde_json::json!({
332            "type": "tool_call",
333            "source": "observability",
334            "tool": "shell",
335        });
336        assert!(is_public_sse_event(&obs));
337    }
338
339    #[test]
340    fn broadcast_observer_tags_every_event_with_observability_source() {
341        // The chat-WS filter relies on this tag as a defense-in-depth check
342        // (any future emitter that forgets to set session_id still gets
343        // routed correctly). Cover every variant the observer broadcasts.
344        let (obs, mut rx, _buffer) = make_broadcast();
345
346        let cases: Vec<ObserverEvent> = vec![
347            ObserverEvent::LlmRequest {
348                model_provider: "p".into(),
349                model: "m".into(),
350                messages_count: 0,
351            },
352            ObserverEvent::ToolCall {
353                tool: "shell".into(),
354                tool_call_id: None,
355                duration: std::time::Duration::from_millis(1),
356                success: true,
357                arguments: None,
358                result: None,
359            },
360            ObserverEvent::ToolCallStart {
361                tool: "shell".into(),
362                tool_call_id: None,
363                arguments: None,
364            },
365            ObserverEvent::Error {
366                component: "any".into(),
367                message: "boom".into(),
368            },
369            ObserverEvent::AgentStart {
370                model_provider: "p".into(),
371                model: "m".into(),
372            },
373            ObserverEvent::AgentEnd {
374                model_provider: "p".into(),
375                model: "m".into(),
376                duration: std::time::Duration::from_millis(1),
377                tokens_used: None,
378                cost_usd: None,
379            },
380        ];
381        for ev in cases {
382            obs.record_event(&ev);
383            let v = rx.try_recv().expect("event must broadcast");
384            assert_eq!(
385                v["source"], "observability",
386                "every BroadcastObserver event must be tagged source=observability: {v}"
387            );
388        }
389    }
390
391    /// End-to-end coverage of the wiring `run_gateway` performs at startup:
392    /// installing `BroadcastObserver` as the process-wide broadcast hook and
393    /// then building an observer through `create_observer` (the path the
394    /// agent loop takes inside `process_message`) must surface events on the
395    /// SSE broadcast channel. Codifies the load-bearing ordering so that
396    /// reordering or dropping `set_scoped_broadcast_hook` in `run_gateway` is caught
397    /// by `cargo test`, not by a silent regression in production.
398    #[test]
399    fn factory_observer_events_reach_broadcast_hook() {
400        // The broadcast hook is process-wide; serialize hook-touching tests
401        // within this test binary so they don't observe each other's state.
402        static HOOK_TEST_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
403        let _guard = HOOK_TEST_LOCK.lock();
404
405        zeroclaw_runtime::observability::clear_broadcast_hook();
406
407        let (tx, mut rx) = tokio::sync::broadcast::channel(16);
408        let buffer = Arc::new(EventBuffer::new(16));
409        let bo: Arc<dyn Observer> = Arc::new(BroadcastObserver::new(tx, buffer.clone()));
410        zeroclaw_runtime::observability::set_broadcast_hook(bo);
411
412        // Same factory call site as `process_message` in the agent loop.
413        let cfg = zeroclaw_config::schema::ObservabilityConfig {
414            backend: "noop".into(),
415            ..Default::default()
416        };
417        let observer = zeroclaw_runtime::observability::create_observer(&cfg);
418
419        observer.record_event(&ObserverEvent::ToolCall {
420            tool: "shell".into(),
421            tool_call_id: None,
422            duration: std::time::Duration::from_millis(7),
423            success: true,
424            arguments: None,
425            result: None,
426        });
427
428        let value = rx
429            .try_recv()
430            .expect("factory-built observer event must reach the SSE broadcast channel");
431        assert_eq!(value["type"], "tool_call");
432        assert_eq!(value["tool"], "shell");
433        assert_eq!(value["success"], true);
434
435        let snap = buffer.snapshot();
436        assert_eq!(
437            snap.len(),
438            1,
439            "broadcast events must also land in the buffer"
440        );
441
442        zeroclaw_runtime::observability::clear_broadcast_hook();
443    }
444}