Skip to main content

zeroclaw_runtime/observability/
mod.rs

1pub mod dora;
2pub mod log;
3pub mod multi;
4pub mod noop;
5#[cfg(feature = "observability-otel")]
6pub mod otel;
7#[cfg(feature = "observability-prometheus")]
8pub mod prometheus;
9pub mod runtime_trace;
10pub mod traits;
11pub mod verbose;
12
13#[allow(unused_imports)]
14pub use self::log::LogObserver;
15#[allow(unused_imports)]
16pub use self::multi::MultiObserver;
17pub use noop::NoopObserver;
18#[cfg(feature = "observability-otel")]
19pub use otel::OtelObserver;
20#[cfg(feature = "observability-prometheus")]
21pub use prometheus::PrometheusObserver;
22pub use traits::{Observer, ObserverEvent};
23#[allow(unused_imports)]
24pub use verbose::VerboseObserver;
25
26use std::any::Any;
27use std::sync::{Arc, OnceLock};
28
29use parking_lot::RwLock;
30use traits::ObserverMetric;
31use zeroclaw_config::schema::ObservabilityConfig;
32
33/// Process-wide broadcast hook installed by long-running subsystems (today: the
34/// gateway) so that events emitted by observers built in *other* subsystems —
35/// notably the agent loop's `process_message` — also fan out to the SSE
36/// broadcast channel. Without this, observers created per call site stay
37/// isolated and `/api/events` only sees the gateway's own direct emissions.
38///
39/// Uses `parking_lot::RwLock` so the event-recording path never has to handle
40/// lock poisoning: a panic inside a hook would not silently disable the entire
41/// observability channel on subsequent calls.
42static BROADCAST_HOOK: OnceLock<RwLock<BroadcastHookState>> = OnceLock::new();
43
44struct BroadcastHookEntry {
45    scoped_id: Option<u64>,
46    observer: Arc<dyn Observer>,
47}
48
49#[derive(Default)]
50struct BroadcastHookState {
51    next_scoped_id: u64,
52    entries: Vec<BroadcastHookEntry>,
53}
54
55impl BroadcastHookState {
56    fn current(&self) -> Option<Arc<dyn Observer>> {
57        self.entries.last().map(|entry| entry.observer.clone())
58    }
59}
60
61fn broadcast_hook_slot() -> &'static RwLock<BroadcastHookState> {
62    BROADCAST_HOOK.get_or_init(|| RwLock::new(BroadcastHookState::default()))
63}
64
65/// Install a process-wide observer that will receive every event recorded
66/// through observers built by [`create_observer`]. Calling this again replaces
67/// the previous hook.
68pub fn set_broadcast_hook(observer: Arc<dyn Observer>) {
69    let mut slot = broadcast_hook_slot().write();
70    slot.entries.clear();
71    slot.entries.push(BroadcastHookEntry {
72        scoped_id: None,
73        observer,
74    });
75}
76
77/// Guard returned by [`set_scoped_broadcast_hook`].
78///
79/// Dropping the guard removes the hook it installed, but only if a later caller
80/// has not already replaced the process-wide hook. If multiple scoped hooks are
81/// live at once, dropping the newest hook restores the previous still-live hook.
82#[must_use = "hold the guard for as long as the broadcast hook should remain installed"]
83pub struct BroadcastHookGuard {
84    scoped_id: u64,
85}
86
87impl Drop for BroadcastHookGuard {
88    fn drop(&mut self) {
89        let mut slot = broadcast_hook_slot().write();
90        slot.entries
91            .retain(|entry| entry.scoped_id != Some(self.scoped_id));
92    }
93}
94
95/// Install a process-wide observer and return a guard that clears it on drop.
96#[must_use = "hold the guard for as long as the broadcast hook should remain installed"]
97pub fn set_scoped_broadcast_hook(observer: Arc<dyn Observer>) -> BroadcastHookGuard {
98    let mut slot = broadcast_hook_slot().write();
99    let scoped_id = slot.next_scoped_id;
100    slot.next_scoped_id = slot.next_scoped_id.wrapping_add(1);
101    slot.entries.push(BroadcastHookEntry {
102        scoped_id: Some(scoped_id),
103        observer,
104    });
105    BroadcastHookGuard { scoped_id }
106}
107
108/// Remove the broadcast hook, if any. Intended for tests and orderly shutdown.
109pub fn clear_broadcast_hook() {
110    broadcast_hook_slot().write().entries.clear();
111}
112
113fn current_broadcast_hook() -> Option<Arc<dyn Observer>> {
114    broadcast_hook_slot().read().current()
115}
116
117/// Wrapper that forwards every event to a primary observer plus the
118/// process-wide broadcast hook (when set). Metrics flow only to the primary.
119struct TeeObserver {
120    primary: Box<dyn Observer>,
121}
122
123impl Observer for TeeObserver {
124    fn record_event(&self, event: &ObserverEvent) {
125        self.primary.record_event(event);
126        if let Some(hook) = current_broadcast_hook() {
127            hook.record_event(event);
128        }
129    }
130
131    fn record_metric(&self, metric: &ObserverMetric) {
132        self.primary.record_metric(metric);
133    }
134
135    fn flush(&self) {
136        self.primary.flush();
137    }
138
139    fn name(&self) -> &str {
140        // Delegate so callers (and tests) see the underlying backend name,
141        // not the internal wrapper.
142        self.primary.name()
143    }
144
145    fn as_any(&self) -> &dyn Any {
146        // Expose the primary so downcasts (e.g. to PrometheusObserver in the
147        // gateway's /metrics handler) keep working transparently.
148        self.primary.as_any()
149    }
150}
151
152/// Factory: create the right observer from config
153pub fn create_observer(config: &ObservabilityConfig) -> Box<dyn Observer> {
154    Box::new(TeeObserver {
155        primary: create_primary_observer(config),
156    })
157}
158
159fn create_primary_observer(config: &ObservabilityConfig) -> Box<dyn Observer> {
160    match config.backend.as_str() {
161        "log" => Box::new(LogObserver::new()),
162        "verbose" => Box::new(VerboseObserver::new()),
163        "prometheus" => {
164            #[cfg(feature = "observability-prometheus")]
165            {
166                Box::new(PrometheusObserver::shared())
167            }
168            #[cfg(not(feature = "observability-prometheus"))]
169            {
170                ::zeroclaw_log::record!(
171                    WARN,
172                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
173                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
174                    "Prometheus backend requested but this build was compiled without `observability-prometheus`; falling back to noop."
175                );
176                Box::new(NoopObserver)
177            }
178        }
179        "otel" | "opentelemetry" | "otlp" => {
180            #[cfg(feature = "observability-otel")]
181            match OtelObserver::new(
182                config.otel_endpoint.as_deref(),
183                config.otel_service_name.as_deref(),
184                config.otel_headers.clone(),
185            ) {
186                Ok(obs) => {
187                    ::zeroclaw_log::record!(
188                        INFO,
189                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
190                            .with_attrs(::serde_json::json!({"endpoint": config
191                            .otel_endpoint
192                            .as_deref()
193                            .unwrap_or("http://localhost:4318")})),
194                        "OpenTelemetry observer initialized"
195                    );
196                    Box::new(obs)
197                }
198                Err(e) => {
199                    ::zeroclaw_log::record!(
200                        ERROR,
201                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
202                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
203                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
204                        "Failed to create OTel observer. Falling back to noop."
205                    );
206                    Box::new(NoopObserver)
207                }
208            }
209            #[cfg(not(feature = "observability-otel"))]
210            {
211                ::zeroclaw_log::record!(
212                    WARN,
213                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
214                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
215                    "OpenTelemetry backend requested but this build was compiled without `observability-otel`; falling back to noop."
216                );
217                Box::new(NoopObserver)
218            }
219        }
220        "none" | "noop" => Box::new(NoopObserver),
221        _ => {
222            ::zeroclaw_log::record!(
223                WARN,
224                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
225                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
226                &format!(
227                    "Unknown observability backend '{}', falling back to noop",
228                    config.backend
229                )
230            );
231            Box::new(NoopObserver)
232        }
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239
240    #[test]
241    fn factory_none_returns_noop() {
242        let cfg = ObservabilityConfig {
243            backend: "none".into(),
244            ..ObservabilityConfig::default()
245        };
246        assert_eq!(create_observer(&cfg).name(), "noop");
247    }
248
249    #[test]
250    fn factory_noop_returns_noop() {
251        let cfg = ObservabilityConfig {
252            backend: "noop".into(),
253            ..ObservabilityConfig::default()
254        };
255        assert_eq!(create_observer(&cfg).name(), "noop");
256    }
257
258    #[test]
259    fn factory_log_returns_log() {
260        let cfg = ObservabilityConfig {
261            backend: "log".into(),
262            ..ObservabilityConfig::default()
263        };
264        assert_eq!(create_observer(&cfg).name(), "log");
265    }
266
267    #[test]
268    fn factory_verbose_returns_verbose() {
269        let cfg = ObservabilityConfig {
270            backend: "verbose".into(),
271            ..ObservabilityConfig::default()
272        };
273        assert_eq!(create_observer(&cfg).name(), "verbose");
274    }
275
276    #[test]
277    fn factory_prometheus_returns_prometheus() {
278        let cfg = ObservabilityConfig {
279            backend: "prometheus".into(),
280            ..ObservabilityConfig::default()
281        };
282        let expected = if cfg!(feature = "observability-prometheus") {
283            "prometheus"
284        } else {
285            "noop"
286        };
287        assert_eq!(create_observer(&cfg).name(), expected);
288    }
289
290    #[test]
291    fn factory_otel_returns_otel() {
292        let cfg = ObservabilityConfig {
293            backend: "otel".into(),
294            otel_endpoint: Some("http://127.0.0.1:19999".into()),
295            otel_service_name: Some("test".into()),
296            ..ObservabilityConfig::default()
297        };
298        let expected = if cfg!(feature = "observability-otel") {
299            "otel"
300        } else {
301            "noop"
302        };
303        assert_eq!(create_observer(&cfg).name(), expected);
304    }
305
306    #[test]
307    fn factory_opentelemetry_alias() {
308        let cfg = ObservabilityConfig {
309            backend: "opentelemetry".into(),
310            otel_endpoint: Some("http://127.0.0.1:19999".into()),
311            otel_service_name: Some("test".into()),
312            ..ObservabilityConfig::default()
313        };
314        let expected = if cfg!(feature = "observability-otel") {
315            "otel"
316        } else {
317            "noop"
318        };
319        assert_eq!(create_observer(&cfg).name(), expected);
320    }
321
322    #[test]
323    fn factory_otlp_alias() {
324        let cfg = ObservabilityConfig {
325            backend: "otlp".into(),
326            otel_endpoint: Some("http://127.0.0.1:19999".into()),
327            otel_service_name: Some("test".into()),
328            ..ObservabilityConfig::default()
329        };
330        let expected = if cfg!(feature = "observability-otel") {
331            "otel"
332        } else {
333            "noop"
334        };
335        assert_eq!(create_observer(&cfg).name(), expected);
336    }
337
338    #[test]
339    fn factory_unknown_falls_back_to_noop() {
340        let cfg = ObservabilityConfig {
341            backend: "xyzzy_unknown".into(),
342            ..ObservabilityConfig::default()
343        };
344        assert_eq!(create_observer(&cfg).name(), "noop");
345    }
346
347    #[test]
348    fn factory_empty_string_falls_back_to_noop() {
349        let cfg = ObservabilityConfig {
350            backend: String::new(),
351            ..ObservabilityConfig::default()
352        };
353        assert_eq!(create_observer(&cfg).name(), "noop");
354    }
355
356    #[test]
357    fn factory_garbage_falls_back_to_noop() {
358        let cfg = ObservabilityConfig {
359            backend: "xyzzy_garbage_123".into(),
360            ..ObservabilityConfig::default()
361        };
362        assert_eq!(create_observer(&cfg).name(), "noop");
363    }
364
365    use parking_lot::Mutex as PlMutex;
366    use std::sync::atomic::{AtomicUsize, Ordering};
367
368    /// Test observer that counts events and metrics, used to verify the
369    /// broadcast hook fan-out and that downcasts pass through `TeeObserver`.
370    #[derive(Default)]
371    struct CountingObserver {
372        events: AtomicUsize,
373        metrics: AtomicUsize,
374    }
375
376    impl Observer for CountingObserver {
377        fn record_event(&self, _event: &ObserverEvent) {
378            self.events.fetch_add(1, Ordering::SeqCst);
379        }
380
381        fn record_metric(&self, _metric: &ObserverMetric) {
382            self.metrics.fetch_add(1, Ordering::SeqCst);
383        }
384
385        fn name(&self) -> &str {
386            "counting"
387        }
388
389        fn as_any(&self) -> &dyn Any {
390            self
391        }
392    }
393
394    /// Serialize tests that touch the process-wide broadcast hook so they
395    /// don't observe each other's installations.
396    static HOOK_TEST_LOCK: PlMutex<()> = PlMutex::new(());
397
398    #[test]
399    fn broadcast_hook_receives_events_from_factory_observer() {
400        let _guard = HOOK_TEST_LOCK.lock();
401        clear_broadcast_hook();
402
403        let hook = Arc::new(CountingObserver::default());
404        set_broadcast_hook(hook.clone());
405
406        let cfg = ObservabilityConfig {
407            backend: "noop".into(),
408            ..ObservabilityConfig::default()
409        };
410        let observer = create_observer(&cfg);
411
412        observer.record_event(&ObserverEvent::HeartbeatTick);
413        observer.record_event(&ObserverEvent::Error {
414            component: "x".into(),
415            message: "y".into(),
416        });
417
418        assert_eq!(hook.events.load(Ordering::SeqCst), 2);
419
420        clear_broadcast_hook();
421    }
422
423    #[test]
424    fn broadcast_hook_does_not_receive_metrics() {
425        let _guard = HOOK_TEST_LOCK.lock();
426        clear_broadcast_hook();
427
428        let hook = Arc::new(CountingObserver::default());
429        set_broadcast_hook(hook.clone());
430
431        let cfg = ObservabilityConfig {
432            backend: "noop".into(),
433            ..ObservabilityConfig::default()
434        };
435        let observer = create_observer(&cfg);
436
437        observer.record_metric(&ObserverMetric::TokensUsed(10));
438        observer.record_metric(&ObserverMetric::TokensUsed(20));
439
440        assert_eq!(hook.events.load(Ordering::SeqCst), 0);
441        assert_eq!(hook.metrics.load(Ordering::SeqCst), 0);
442
443        clear_broadcast_hook();
444    }
445
446    #[test]
447    fn broadcast_hook_unset_means_only_primary_runs() {
448        let _guard = HOOK_TEST_LOCK.lock();
449        clear_broadcast_hook();
450
451        let cfg = ObservabilityConfig {
452            backend: "noop".into(),
453            ..ObservabilityConfig::default()
454        };
455        let observer = create_observer(&cfg);
456
457        // No hook installed; recording must not panic and must be a no-op.
458        observer.record_event(&ObserverEvent::HeartbeatTick);
459        observer.record_metric(&ObserverMetric::TokensUsed(1));
460    }
461
462    #[test]
463    fn scoped_broadcast_hook_guard_clears_installed_hook_on_drop() {
464        let _guard = HOOK_TEST_LOCK.lock();
465        clear_broadcast_hook();
466
467        let hook = Arc::new(CountingObserver::default());
468        let broadcast_guard = set_scoped_broadcast_hook(hook.clone());
469
470        let cfg = ObservabilityConfig {
471            backend: "noop".into(),
472            ..ObservabilityConfig::default()
473        };
474        let observer = create_observer(&cfg);
475        observer.record_event(&ObserverEvent::HeartbeatTick);
476        assert_eq!(hook.events.load(Ordering::SeqCst), 1);
477
478        drop(broadcast_guard);
479        observer.record_event(&ObserverEvent::HeartbeatTick);
480        assert_eq!(hook.events.load(Ordering::SeqCst), 1);
481
482        clear_broadcast_hook();
483    }
484
485    #[test]
486    fn scoped_broadcast_hook_guard_preserves_replacement_hook() {
487        let _guard = HOOK_TEST_LOCK.lock();
488        clear_broadcast_hook();
489
490        let old_hook = Arc::new(CountingObserver::default());
491        let old_guard = set_scoped_broadcast_hook(old_hook.clone());
492
493        let new_hook = Arc::new(CountingObserver::default());
494        set_broadcast_hook(new_hook.clone());
495        drop(old_guard);
496
497        let cfg = ObservabilityConfig {
498            backend: "noop".into(),
499            ..ObservabilityConfig::default()
500        };
501        let observer = create_observer(&cfg);
502        observer.record_event(&ObserverEvent::HeartbeatTick);
503
504        assert_eq!(old_hook.events.load(Ordering::SeqCst), 0);
505        assert_eq!(new_hook.events.load(Ordering::SeqCst), 1);
506
507        clear_broadcast_hook();
508    }
509
510    #[test]
511    fn dropping_newer_scoped_broadcast_hook_restores_older_live_hook() {
512        let _guard = HOOK_TEST_LOCK.lock();
513        clear_broadcast_hook();
514
515        let old_hook = Arc::new(CountingObserver::default());
516        let old_guard = set_scoped_broadcast_hook(old_hook.clone());
517
518        let new_hook = Arc::new(CountingObserver::default());
519        let new_guard = set_scoped_broadcast_hook(new_hook.clone());
520
521        let cfg = ObservabilityConfig {
522            backend: "noop".into(),
523            ..ObservabilityConfig::default()
524        };
525        let observer = create_observer(&cfg);
526        observer.record_event(&ObserverEvent::HeartbeatTick);
527        assert_eq!(old_hook.events.load(Ordering::SeqCst), 0);
528        assert_eq!(new_hook.events.load(Ordering::SeqCst), 1);
529
530        drop(new_guard);
531        observer.record_event(&ObserverEvent::HeartbeatTick);
532        assert_eq!(old_hook.events.load(Ordering::SeqCst), 1);
533        assert_eq!(new_hook.events.load(Ordering::SeqCst), 1);
534
535        drop(old_guard);
536        observer.record_event(&ObserverEvent::HeartbeatTick);
537        assert_eq!(old_hook.events.load(Ordering::SeqCst), 1);
538        assert_eq!(new_hook.events.load(Ordering::SeqCst), 1);
539
540        clear_broadcast_hook();
541    }
542
543    #[test]
544    fn factory_observer_downcasts_through_tee() {
545        let _guard = HOOK_TEST_LOCK.lock();
546        clear_broadcast_hook();
547
548        let cfg = ObservabilityConfig {
549            backend: "log".into(),
550            ..ObservabilityConfig::default()
551        };
552        let observer = create_observer(&cfg);
553
554        // `as_any` must surface the primary observer so existing downcasts
555        // (e.g. PrometheusObserver in /metrics) keep working through the tee.
556        assert!(observer.as_any().downcast_ref::<LogObserver>().is_some());
557    }
558}