zeroclaw_log/broadcast.rs
1//! Process-wide broadcast channel for the canonical log stream.
2//!
3//! The gateway installs a [`tokio::sync::broadcast::Sender<Value>`] here at
4//! startup; every event passing through [`crate::record_event`] is fanned
5//! out to that channel so SSE clients (and any other in-process subscriber)
6//! see the live stream.
7//!
8//! Lives in this crate, not in zeroclaw-runtime, so the dependency graph
9//! stays clean: zeroclaw-api → zeroclaw-config → zeroclaw-log → everything
10//! else.
11
12use std::sync::OnceLock;
13
14use parking_lot::RwLock;
15use serde_json::Value;
16use tokio::sync::broadcast;
17
18/// Type alias for the canonical log broadcast sender.
19pub type LogBroadcastSender = broadcast::Sender<Value>;
20
21static BROADCAST: OnceLock<RwLock<Option<LogBroadcastSender>>> = OnceLock::new();
22
23fn slot() -> &'static RwLock<Option<LogBroadcastSender>> {
24 BROADCAST.get_or_init(|| RwLock::new(None))
25}
26
27/// Install a process-wide broadcast sender. Calling again replaces the
28/// previous one (the old sender will be dropped — its `Receiver`s will
29/// see `RecvError::Closed` on their next read).
30pub fn set_broadcast_hook(sender: LogBroadcastSender) {
31 *slot().write() = Some(sender);
32}
33
34/// Remove the broadcast sender (tests, orderly shutdown).
35pub fn clear_broadcast_hook() {
36 *slot().write() = None;
37}
38
39/// Read the current broadcast sender, if any.
40#[must_use]
41pub fn current_broadcast_hook() -> Option<LogBroadcastSender> {
42 slot().read().clone()
43}
44
45/// Subscribe to the broadcast stream. Returns `None` when no sender has
46/// been installed yet (e.g. when running tests that never wired the
47/// gateway). The receiver yields every event emitted via
48/// [`crate::record_event`] after the subscribe call.
49#[must_use]
50pub fn subscribe() -> Option<broadcast::Receiver<Value>> {
51 slot().read().as_ref().map(|s| s.subscribe())
52}
53
54/// Test-only convenience: ensure a broadcast hook is installed and
55/// return a receiver. If no hook is set yet, install one with a 64K
56/// ring buffer (large enough that parallel workspace tests firing
57/// `record!` into the global hook can't evict the test's own event
58/// during the short window between emit and receive) and subscribe.
59/// Idempotent.
60#[doc(hidden)]
61#[must_use]
62pub fn subscribe_or_install() -> broadcast::Receiver<Value> {
63 {
64 let read = slot().read();
65 if let Some(sender) = read.as_ref() {
66 return sender.subscribe();
67 }
68 }
69 let (tx, rx) = broadcast::channel(65_536);
70 set_broadcast_hook(tx);
71 rx
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77
78 static HOOK_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
79
80 #[tokio::test]
81 async fn set_and_subscribe_round_trip() {
82 // Install + emit happen inside this scope so the lock is released
83 // before the await; otherwise clippy flags a sync Mutex held
84 // across an await point.
85 let mut rx = {
86 let _guard = HOOK_LOCK.lock();
87 clear_broadcast_hook();
88 assert!(current_broadcast_hook().is_none());
89
90 let (tx, _rx_keepalive) = broadcast::channel(8);
91 set_broadcast_hook(tx);
92 let rx = subscribe().expect("subscribe after set");
93
94 let hook = current_broadcast_hook().unwrap();
95 let _ = hook.send(serde_json::json!({ "ping": true }));
96 rx
97 };
98
99 let value = rx.recv().await.unwrap();
100 assert_eq!(value["ping"], true);
101
102 let _guard = HOOK_LOCK.lock();
103 clear_broadcast_hook();
104 assert!(current_broadcast_hook().is_none());
105 }
106}