Skip to main content

zeroclaw_gateway/
sse.rs

1//! Server-Sent Events (SSE) stream for real-time event delivery.
2//!
3//! Wraps the broadcast channel in AppState to deliver events to web dashboard clients.
4
5use super::AppState;
6use axum::{
7    Json,
8    extract::State,
9    http::{HeaderMap, StatusCode, header},
10    response::{
11        IntoResponse,
12        sse::{Event, KeepAlive, Sse},
13    },
14};
15use std::collections::VecDeque;
16use std::convert::Infallible;
17use std::sync::{Arc, Mutex};
18use tokio_stream::StreamExt;
19use tokio_stream::wrappers::BroadcastStream;
20
21/// Thread-safe ring buffer that retains recent events for history replay.
22pub struct EventBuffer {
23    inner: Mutex<VecDeque<serde_json::Value>>,
24    capacity: usize,
25}
26
27impl EventBuffer {
28    pub fn new(capacity: usize) -> Self {
29        Self {
30            inner: Mutex::new(VecDeque::with_capacity(capacity)),
31            capacity,
32        }
33    }
34
35    /// Push an event into the buffer, evicting the oldest if at capacity.
36    pub fn push(&self, event: serde_json::Value) {
37        let mut buf = self.inner.lock().unwrap();
38        if buf.len() == self.capacity {
39            buf.pop_front();
40        }
41        buf.push_back(event);
42    }
43
44    /// Return a snapshot of all buffered events (oldest first).
45    pub fn snapshot(&self) -> Vec<serde_json::Value> {
46        self.inner.lock().unwrap().iter().cloned().collect()
47    }
48}
49
50/// GET /api/events — SSE event stream
51pub async fn handle_sse_events(
52    State(state): State<AppState>,
53    headers: HeaderMap,
54) -> impl IntoResponse {
55    // Auth check
56    if state.pairing.require_pairing() {
57        let token = headers
58            .get(header::AUTHORIZATION)
59            .and_then(|v| v.to_str().ok())
60            .and_then(|auth| auth.strip_prefix("Bearer "))
61            .unwrap_or("");
62
63        if !state.pairing.is_authenticated(token) {
64            return (
65                StatusCode::UNAUTHORIZED,
66                "Unauthorized — provide Authorization: Bearer <token>",
67            )
68                .into_response();
69        }
70    }
71
72    let rx = state.event_tx.subscribe();
73    let stream = BroadcastStream::new(rx).filter_map(
74        |result: Result<
75            serde_json::Value,
76            tokio_stream::wrappers::errors::BroadcastStreamRecvError,
77        >| {
78            match result {
79                Ok(value) if is_public_sse_event(&value) => Some(Ok::<_, Infallible>(
80                    Event::default().data(value.to_string()),
81                )),
82                Ok(_) => None,
83                Err(_) => None, // Skip lagged messages
84            }
85        },
86    );
87
88    Sse::new(stream)
89        .keep_alive(KeepAlive::default())
90        .into_response()
91}
92
93/// GET /api/events/history — return buffered recent events as JSON.
94pub async fn handle_events_history(
95    State(state): State<AppState>,
96    headers: HeaderMap,
97) -> impl IntoResponse {
98    if let Err(e) = super::api::require_auth(&state, &headers) {
99        return e.into_response();
100    }
101    let events: Vec<_> = state
102        .event_buffer
103        .snapshot()
104        .into_iter()
105        .filter(is_public_sse_event)
106        .collect();
107    Json(serde_json::json!({ "events": events })).into_response()
108}
109
110/// Returns true for events that should be visible on the global SSE stream.
111///
112/// Contract: broadcast events must not include `session_id` unless they are
113/// intentionally scoped to that session and hidden from global `/api/events`.
114fn is_public_sse_event(event: &serde_json::Value) -> bool {
115    event
116        .get("session_id")
117        .and_then(serde_json::Value::as_str)
118        .is_none()
119}
120
121/// Broadcast observer that fans events out to SSE subscribers.
122///
123/// Installed as the process-wide broadcast hook by [`crate::run_gateway`] so
124/// that events recorded by *any* observer built through
125/// `observability::create_observer` — including the per-call observer the
126/// agent loop creates inside `process_message` — also reach `/api/events`
127/// clients.
128///
129/// Crate-private: the constructor signature is intentionally not part of any
130/// stable surface, since it is wired directly into `run_gateway`.
131pub(crate) struct BroadcastObserver {
132    tx: tokio::sync::broadcast::Sender<serde_json::Value>,
133    buffer: Arc<EventBuffer>,
134}
135
136impl BroadcastObserver {
137    pub(crate) fn new(
138        tx: tokio::sync::broadcast::Sender<serde_json::Value>,
139        buffer: Arc<EventBuffer>,
140    ) -> Self {
141        Self { tx, buffer }
142    }
143}
144
145impl zeroclaw_runtime::observability::Observer for BroadcastObserver {
146    fn record_event(&self, event: &zeroclaw_runtime::observability::ObserverEvent) {
147        // Recording into the primary observer (logs / Prometheus) is the
148        // responsibility of whoever built the event source; `TeeObserver`
149        // takes care of that fan-out. Here we only translate to JSON and
150        // ship to SSE subscribers.
151        let json = match event {
152            zeroclaw_runtime::observability::ObserverEvent::LlmRequest {
153                model_provider,
154                model,
155                ..
156            } => serde_json::json!({
157                "type": "llm_request",
158                "model_provider": model_provider,
159                "model": model,
160                "timestamp": chrono::Utc::now().to_rfc3339(),
161            }),
162            zeroclaw_runtime::observability::ObserverEvent::ToolCall {
163                tool,
164                duration,
165                success,
166                ..
167            } => serde_json::json!({
168                "type": "tool_call",
169                "tool": tool,
170                "duration_ms": duration.as_millis(),
171                "success": success,
172                "timestamp": chrono::Utc::now().to_rfc3339(),
173            }),
174            zeroclaw_runtime::observability::ObserverEvent::ToolCallStart { tool, .. } => {
175                serde_json::json!({
176                    "type": "tool_call_start",
177                    "tool": tool,
178                    "timestamp": chrono::Utc::now().to_rfc3339(),
179                })
180            }
181            zeroclaw_runtime::observability::ObserverEvent::Error { component, message } => {
182                serde_json::json!({
183                    "type": "error",
184                    "component": component,
185                    "message": message,
186                    "timestamp": chrono::Utc::now().to_rfc3339(),
187                })
188            }
189            zeroclaw_runtime::observability::ObserverEvent::AgentStart {
190                model_provider,
191                model,
192            } => {
193                serde_json::json!({
194                    "type": "agent_start",
195                    "model_provider": model_provider,
196                    "model": model,
197                    "timestamp": chrono::Utc::now().to_rfc3339(),
198                })
199            }
200            zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
201                model_provider,
202                model,
203                duration,
204                tokens_used,
205                cost_usd,
206            } => serde_json::json!({
207                "type": "agent_end",
208                "model_provider": model_provider,
209                "model": model,
210                "duration_ms": duration.as_millis(),
211                "tokens_used": tokens_used,
212                "cost_usd": cost_usd,
213                "timestamp": chrono::Utc::now().to_rfc3339(),
214            }),
215            _ => return, // Skip events we don't broadcast
216        };
217
218        self.buffer.push(json.clone());
219        let _ = self.tx.send(json);
220    }
221
222    fn record_metric(&self, _metric: &zeroclaw_runtime::observability::traits::ObserverMetric) {
223        // Metrics are not broadcast over SSE; the primary observer records them.
224    }
225
226    fn name(&self) -> &str {
227        "broadcast"
228    }
229
230    fn as_any(&self) -> &dyn std::any::Any {
231        self
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use zeroclaw_runtime::observability::{Observer, ObserverEvent};
239
240    fn make_broadcast() -> (
241        Arc<BroadcastObserver>,
242        tokio::sync::broadcast::Receiver<serde_json::Value>,
243        Arc<EventBuffer>,
244    ) {
245        let (tx, rx) = tokio::sync::broadcast::channel(16);
246        let buffer = Arc::new(EventBuffer::new(16));
247        let obs = Arc::new(BroadcastObserver::new(tx, buffer.clone()));
248        (obs, rx, buffer)
249    }
250
251    #[test]
252    fn tool_call_event_is_broadcast_and_buffered() {
253        let (obs, mut rx, buffer) = make_broadcast();
254
255        obs.record_event(&ObserverEvent::ToolCall {
256            tool: "shell".into(),
257            tool_call_id: None,
258            duration: std::time::Duration::from_millis(42),
259            success: true,
260            arguments: None,
261            result: None,
262        });
263
264        let value = rx.try_recv().expect("event should be broadcast");
265        assert_eq!(value["type"], "tool_call");
266        assert_eq!(value["tool"], "shell");
267        assert_eq!(value["success"], true);
268
269        let snap = buffer.snapshot();
270        assert_eq!(snap.len(), 1);
271        assert_eq!(snap[0]["type"], "tool_call");
272    }
273
274    #[test]
275    fn tool_call_start_event_is_broadcast() {
276        let (obs, mut rx, _buffer) = make_broadcast();
277
278        obs.record_event(&ObserverEvent::ToolCallStart {
279            tool: "mcp_filesystem__read_file".into(),
280            tool_call_id: None,
281            arguments: None,
282        });
283
284        let value = rx.try_recv().expect("event should be broadcast");
285        assert_eq!(value["type"], "tool_call_start");
286        assert_eq!(value["tool"], "mcp_filesystem__read_file");
287    }
288
289    #[test]
290    fn unmapped_events_are_skipped() {
291        let (obs, mut rx, buffer) = make_broadcast();
292
293        obs.record_event(&ObserverEvent::HeartbeatTick);
294
295        assert!(rx.try_recv().is_err(), "heartbeat should not broadcast");
296        assert!(buffer.snapshot().is_empty());
297    }
298
299    #[test]
300    fn session_scoped_events_are_not_public_sse_events() {
301        let session_event = serde_json::json!({
302            "type": "message",
303            "session_id": "operator-1",
304            "content": "private session notification"
305        });
306        let global_event = serde_json::json!({
307            "type": "tool_call",
308            "tool": "shell"
309        });
310
311        assert!(!is_public_sse_event(&session_event));
312        assert!(is_public_sse_event(&global_event));
313    }
314
315    /// End-to-end coverage of the wiring `run_gateway` performs at startup:
316    /// installing `BroadcastObserver` as the process-wide broadcast hook and
317    /// then building an observer through `create_observer` (the path the
318    /// agent loop takes inside `process_message`) must surface events on the
319    /// SSE broadcast channel. Codifies the load-bearing ordering so that
320    /// reordering or dropping `set_scoped_broadcast_hook` in `run_gateway` is caught
321    /// by `cargo test`, not by a silent regression in production.
322    #[test]
323    fn factory_observer_events_reach_broadcast_hook() {
324        // The broadcast hook is process-wide; serialize hook-touching tests
325        // within this test binary so they don't observe each other's state.
326        static HOOK_TEST_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
327        let _guard = HOOK_TEST_LOCK.lock();
328
329        zeroclaw_runtime::observability::clear_broadcast_hook();
330
331        let (tx, mut rx) = tokio::sync::broadcast::channel(16);
332        let buffer = Arc::new(EventBuffer::new(16));
333        let bo: Arc<dyn Observer> = Arc::new(BroadcastObserver::new(tx, buffer.clone()));
334        zeroclaw_runtime::observability::set_broadcast_hook(bo);
335
336        // Same factory call site as `process_message` in the agent loop.
337        let cfg = zeroclaw_config::schema::ObservabilityConfig {
338            backend: "noop".into(),
339            ..Default::default()
340        };
341        let observer = zeroclaw_runtime::observability::create_observer(&cfg);
342
343        observer.record_event(&ObserverEvent::ToolCall {
344            tool: "shell".into(),
345            tool_call_id: None,
346            duration: std::time::Duration::from_millis(7),
347            success: true,
348            arguments: None,
349            result: None,
350        });
351
352        let value = rx
353            .try_recv()
354            .expect("factory-built observer event must reach the SSE broadcast channel");
355        assert_eq!(value["type"], "tool_call");
356        assert_eq!(value["tool"], "shell");
357        assert_eq!(value["success"], true);
358
359        let snap = buffer.snapshot();
360        assert_eq!(
361            snap.len(),
362            1,
363            "broadcast events must also land in the buffer"
364        );
365
366        zeroclaw_runtime::observability::clear_broadcast_hook();
367    }
368}