zeroclaw_runtime/observability/
mod.rs1pub mod dora;
2pub mod log;
3pub mod multi;
4pub mod noop;
5#[cfg(feature = "observability-otel")]
6pub mod otel;
7#[cfg(feature = "observability-prometheus")]
8pub mod prometheus;
9pub mod runtime_trace;
10pub mod traits;
11pub mod verbose;
12
13#[allow(unused_imports)]
14pub use self::log::LogObserver;
15#[allow(unused_imports)]
16pub use self::multi::MultiObserver;
17pub use noop::NoopObserver;
18#[cfg(feature = "observability-otel")]
19pub use otel::OtelObserver;
20#[cfg(feature = "observability-prometheus")]
21pub use prometheus::PrometheusObserver;
22pub use traits::{Observer, ObserverEvent};
23#[allow(unused_imports)]
24pub use verbose::VerboseObserver;
25
26use std::any::Any;
27use std::sync::{Arc, OnceLock};
28
29use parking_lot::RwLock;
30use traits::ObserverMetric;
31use zeroclaw_config::schema::ObservabilityConfig;
32
33static BROADCAST_HOOK: OnceLock<RwLock<BroadcastHookState>> = OnceLock::new();
43
44struct BroadcastHookEntry {
45 scoped_id: Option<u64>,
46 observer: Arc<dyn Observer>,
47}
48
49#[derive(Default)]
50struct BroadcastHookState {
51 next_scoped_id: u64,
52 entries: Vec<BroadcastHookEntry>,
53}
54
55impl BroadcastHookState {
56 fn current(&self) -> Option<Arc<dyn Observer>> {
57 self.entries.last().map(|entry| entry.observer.clone())
58 }
59}
60
61fn broadcast_hook_slot() -> &'static RwLock<BroadcastHookState> {
62 BROADCAST_HOOK.get_or_init(|| RwLock::new(BroadcastHookState::default()))
63}
64
65pub fn set_broadcast_hook(observer: Arc<dyn Observer>) {
69 let mut slot = broadcast_hook_slot().write();
70 slot.entries.clear();
71 slot.entries.push(BroadcastHookEntry {
72 scoped_id: None,
73 observer,
74 });
75}
76
77#[must_use = "hold the guard for as long as the broadcast hook should remain installed"]
83pub struct BroadcastHookGuard {
84 scoped_id: u64,
85}
86
87impl Drop for BroadcastHookGuard {
88 fn drop(&mut self) {
89 let mut slot = broadcast_hook_slot().write();
90 slot.entries
91 .retain(|entry| entry.scoped_id != Some(self.scoped_id));
92 }
93}
94
95#[must_use = "hold the guard for as long as the broadcast hook should remain installed"]
97pub fn set_scoped_broadcast_hook(observer: Arc<dyn Observer>) -> BroadcastHookGuard {
98 let mut slot = broadcast_hook_slot().write();
99 let scoped_id = slot.next_scoped_id;
100 slot.next_scoped_id = slot.next_scoped_id.wrapping_add(1);
101 slot.entries.push(BroadcastHookEntry {
102 scoped_id: Some(scoped_id),
103 observer,
104 });
105 BroadcastHookGuard { scoped_id }
106}
107
108pub fn clear_broadcast_hook() {
110 broadcast_hook_slot().write().entries.clear();
111}
112
113fn current_broadcast_hook() -> Option<Arc<dyn Observer>> {
114 broadcast_hook_slot().read().current()
115}
116
117struct TeeObserver {
120 primary: Box<dyn Observer>,
121}
122
123impl Observer for TeeObserver {
124 fn record_event(&self, event: &ObserverEvent) {
125 self.primary.record_event(event);
126 if let Some(hook) = current_broadcast_hook() {
127 hook.record_event(event);
128 }
129 }
130
131 fn record_metric(&self, metric: &ObserverMetric) {
132 self.primary.record_metric(metric);
133 }
134
135 fn flush(&self) {
136 self.primary.flush();
137 }
138
139 fn name(&self) -> &str {
140 self.primary.name()
143 }
144
145 fn as_any(&self) -> &dyn Any {
146 self.primary.as_any()
149 }
150}
151
152pub fn create_observer(config: &ObservabilityConfig) -> Box<dyn Observer> {
154 Box::new(TeeObserver {
155 primary: create_primary_observer(config),
156 })
157}
158
159fn create_primary_observer(config: &ObservabilityConfig) -> Box<dyn Observer> {
160 match config.backend.as_str() {
161 "log" => Box::new(LogObserver::new()),
162 "verbose" => Box::new(VerboseObserver::new()),
163 "prometheus" => {
164 #[cfg(feature = "observability-prometheus")]
165 {
166 Box::new(PrometheusObserver::shared())
167 }
168 #[cfg(not(feature = "observability-prometheus"))]
169 {
170 ::zeroclaw_log::record!(
171 WARN,
172 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
173 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
174 "Prometheus backend requested but this build was compiled without `observability-prometheus`; falling back to noop."
175 );
176 Box::new(NoopObserver)
177 }
178 }
179 "otel" | "opentelemetry" | "otlp" => {
180 #[cfg(feature = "observability-otel")]
181 match OtelObserver::new(
182 config.otel_endpoint.as_deref(),
183 config.otel_service_name.as_deref(),
184 config.otel_headers.clone(),
185 ) {
186 Ok(obs) => {
187 ::zeroclaw_log::record!(
188 INFO,
189 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
190 .with_attrs(::serde_json::json!({"endpoint": config
191 .otel_endpoint
192 .as_deref()
193 .unwrap_or("http://localhost:4318")})),
194 "OpenTelemetry observer initialized"
195 );
196 Box::new(obs)
197 }
198 Err(e) => {
199 ::zeroclaw_log::record!(
200 ERROR,
201 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
202 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
203 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
204 "Failed to create OTel observer. Falling back to noop."
205 );
206 Box::new(NoopObserver)
207 }
208 }
209 #[cfg(not(feature = "observability-otel"))]
210 {
211 ::zeroclaw_log::record!(
212 WARN,
213 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
214 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
215 "OpenTelemetry backend requested but this build was compiled without `observability-otel`; falling back to noop."
216 );
217 Box::new(NoopObserver)
218 }
219 }
220 "none" | "noop" => Box::new(NoopObserver),
221 _ => {
222 ::zeroclaw_log::record!(
223 WARN,
224 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
225 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
226 &format!(
227 "Unknown observability backend '{}', falling back to noop",
228 config.backend
229 )
230 );
231 Box::new(NoopObserver)
232 }
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239
240 #[test]
241 fn factory_none_returns_noop() {
242 let cfg = ObservabilityConfig {
243 backend: "none".into(),
244 ..ObservabilityConfig::default()
245 };
246 assert_eq!(create_observer(&cfg).name(), "noop");
247 }
248
249 #[test]
250 fn factory_noop_returns_noop() {
251 let cfg = ObservabilityConfig {
252 backend: "noop".into(),
253 ..ObservabilityConfig::default()
254 };
255 assert_eq!(create_observer(&cfg).name(), "noop");
256 }
257
258 #[test]
259 fn factory_log_returns_log() {
260 let cfg = ObservabilityConfig {
261 backend: "log".into(),
262 ..ObservabilityConfig::default()
263 };
264 assert_eq!(create_observer(&cfg).name(), "log");
265 }
266
267 #[test]
268 fn factory_verbose_returns_verbose() {
269 let cfg = ObservabilityConfig {
270 backend: "verbose".into(),
271 ..ObservabilityConfig::default()
272 };
273 assert_eq!(create_observer(&cfg).name(), "verbose");
274 }
275
276 #[test]
277 fn factory_prometheus_returns_prometheus() {
278 let cfg = ObservabilityConfig {
279 backend: "prometheus".into(),
280 ..ObservabilityConfig::default()
281 };
282 let expected = if cfg!(feature = "observability-prometheus") {
283 "prometheus"
284 } else {
285 "noop"
286 };
287 assert_eq!(create_observer(&cfg).name(), expected);
288 }
289
290 #[test]
291 fn factory_otel_returns_otel() {
292 let cfg = ObservabilityConfig {
293 backend: "otel".into(),
294 otel_endpoint: Some("http://127.0.0.1:19999".into()),
295 otel_service_name: Some("test".into()),
296 ..ObservabilityConfig::default()
297 };
298 let expected = if cfg!(feature = "observability-otel") {
299 "otel"
300 } else {
301 "noop"
302 };
303 assert_eq!(create_observer(&cfg).name(), expected);
304 }
305
306 #[test]
307 fn factory_opentelemetry_alias() {
308 let cfg = ObservabilityConfig {
309 backend: "opentelemetry".into(),
310 otel_endpoint: Some("http://127.0.0.1:19999".into()),
311 otel_service_name: Some("test".into()),
312 ..ObservabilityConfig::default()
313 };
314 let expected = if cfg!(feature = "observability-otel") {
315 "otel"
316 } else {
317 "noop"
318 };
319 assert_eq!(create_observer(&cfg).name(), expected);
320 }
321
322 #[test]
323 fn factory_otlp_alias() {
324 let cfg = ObservabilityConfig {
325 backend: "otlp".into(),
326 otel_endpoint: Some("http://127.0.0.1:19999".into()),
327 otel_service_name: Some("test".into()),
328 ..ObservabilityConfig::default()
329 };
330 let expected = if cfg!(feature = "observability-otel") {
331 "otel"
332 } else {
333 "noop"
334 };
335 assert_eq!(create_observer(&cfg).name(), expected);
336 }
337
338 #[test]
339 fn factory_unknown_falls_back_to_noop() {
340 let cfg = ObservabilityConfig {
341 backend: "xyzzy_unknown".into(),
342 ..ObservabilityConfig::default()
343 };
344 assert_eq!(create_observer(&cfg).name(), "noop");
345 }
346
347 #[test]
348 fn factory_empty_string_falls_back_to_noop() {
349 let cfg = ObservabilityConfig {
350 backend: String::new(),
351 ..ObservabilityConfig::default()
352 };
353 assert_eq!(create_observer(&cfg).name(), "noop");
354 }
355
356 #[test]
357 fn factory_garbage_falls_back_to_noop() {
358 let cfg = ObservabilityConfig {
359 backend: "xyzzy_garbage_123".into(),
360 ..ObservabilityConfig::default()
361 };
362 assert_eq!(create_observer(&cfg).name(), "noop");
363 }
364
365 use parking_lot::Mutex as PlMutex;
366 use std::sync::atomic::{AtomicUsize, Ordering};
367
368 #[derive(Default)]
371 struct CountingObserver {
372 events: AtomicUsize,
373 metrics: AtomicUsize,
374 }
375
376 impl Observer for CountingObserver {
377 fn record_event(&self, _event: &ObserverEvent) {
378 self.events.fetch_add(1, Ordering::SeqCst);
379 }
380
381 fn record_metric(&self, _metric: &ObserverMetric) {
382 self.metrics.fetch_add(1, Ordering::SeqCst);
383 }
384
385 fn name(&self) -> &str {
386 "counting"
387 }
388
389 fn as_any(&self) -> &dyn Any {
390 self
391 }
392 }
393
394 static HOOK_TEST_LOCK: PlMutex<()> = PlMutex::new(());
397
398 #[test]
399 fn broadcast_hook_receives_events_from_factory_observer() {
400 let _guard = HOOK_TEST_LOCK.lock();
401 clear_broadcast_hook();
402
403 let hook = Arc::new(CountingObserver::default());
404 set_broadcast_hook(hook.clone());
405
406 let cfg = ObservabilityConfig {
407 backend: "noop".into(),
408 ..ObservabilityConfig::default()
409 };
410 let observer = create_observer(&cfg);
411
412 observer.record_event(&ObserverEvent::HeartbeatTick);
413 observer.record_event(&ObserverEvent::Error {
414 component: "x".into(),
415 message: "y".into(),
416 });
417
418 assert_eq!(hook.events.load(Ordering::SeqCst), 2);
419
420 clear_broadcast_hook();
421 }
422
423 #[test]
424 fn broadcast_hook_does_not_receive_metrics() {
425 let _guard = HOOK_TEST_LOCK.lock();
426 clear_broadcast_hook();
427
428 let hook = Arc::new(CountingObserver::default());
429 set_broadcast_hook(hook.clone());
430
431 let cfg = ObservabilityConfig {
432 backend: "noop".into(),
433 ..ObservabilityConfig::default()
434 };
435 let observer = create_observer(&cfg);
436
437 observer.record_metric(&ObserverMetric::TokensUsed(10));
438 observer.record_metric(&ObserverMetric::TokensUsed(20));
439
440 assert_eq!(hook.events.load(Ordering::SeqCst), 0);
441 assert_eq!(hook.metrics.load(Ordering::SeqCst), 0);
442
443 clear_broadcast_hook();
444 }
445
446 #[test]
447 fn broadcast_hook_unset_means_only_primary_runs() {
448 let _guard = HOOK_TEST_LOCK.lock();
449 clear_broadcast_hook();
450
451 let cfg = ObservabilityConfig {
452 backend: "noop".into(),
453 ..ObservabilityConfig::default()
454 };
455 let observer = create_observer(&cfg);
456
457 observer.record_event(&ObserverEvent::HeartbeatTick);
459 observer.record_metric(&ObserverMetric::TokensUsed(1));
460 }
461
462 #[test]
463 fn scoped_broadcast_hook_guard_clears_installed_hook_on_drop() {
464 let _guard = HOOK_TEST_LOCK.lock();
465 clear_broadcast_hook();
466
467 let hook = Arc::new(CountingObserver::default());
468 let broadcast_guard = set_scoped_broadcast_hook(hook.clone());
469
470 let cfg = ObservabilityConfig {
471 backend: "noop".into(),
472 ..ObservabilityConfig::default()
473 };
474 let observer = create_observer(&cfg);
475 observer.record_event(&ObserverEvent::HeartbeatTick);
476 assert_eq!(hook.events.load(Ordering::SeqCst), 1);
477
478 drop(broadcast_guard);
479 observer.record_event(&ObserverEvent::HeartbeatTick);
480 assert_eq!(hook.events.load(Ordering::SeqCst), 1);
481
482 clear_broadcast_hook();
483 }
484
485 #[test]
486 fn scoped_broadcast_hook_guard_preserves_replacement_hook() {
487 let _guard = HOOK_TEST_LOCK.lock();
488 clear_broadcast_hook();
489
490 let old_hook = Arc::new(CountingObserver::default());
491 let old_guard = set_scoped_broadcast_hook(old_hook.clone());
492
493 let new_hook = Arc::new(CountingObserver::default());
494 set_broadcast_hook(new_hook.clone());
495 drop(old_guard);
496
497 let cfg = ObservabilityConfig {
498 backend: "noop".into(),
499 ..ObservabilityConfig::default()
500 };
501 let observer = create_observer(&cfg);
502 observer.record_event(&ObserverEvent::HeartbeatTick);
503
504 assert_eq!(old_hook.events.load(Ordering::SeqCst), 0);
505 assert_eq!(new_hook.events.load(Ordering::SeqCst), 1);
506
507 clear_broadcast_hook();
508 }
509
510 #[test]
511 fn dropping_newer_scoped_broadcast_hook_restores_older_live_hook() {
512 let _guard = HOOK_TEST_LOCK.lock();
513 clear_broadcast_hook();
514
515 let old_hook = Arc::new(CountingObserver::default());
516 let old_guard = set_scoped_broadcast_hook(old_hook.clone());
517
518 let new_hook = Arc::new(CountingObserver::default());
519 let new_guard = set_scoped_broadcast_hook(new_hook.clone());
520
521 let cfg = ObservabilityConfig {
522 backend: "noop".into(),
523 ..ObservabilityConfig::default()
524 };
525 let observer = create_observer(&cfg);
526 observer.record_event(&ObserverEvent::HeartbeatTick);
527 assert_eq!(old_hook.events.load(Ordering::SeqCst), 0);
528 assert_eq!(new_hook.events.load(Ordering::SeqCst), 1);
529
530 drop(new_guard);
531 observer.record_event(&ObserverEvent::HeartbeatTick);
532 assert_eq!(old_hook.events.load(Ordering::SeqCst), 1);
533 assert_eq!(new_hook.events.load(Ordering::SeqCst), 1);
534
535 drop(old_guard);
536 observer.record_event(&ObserverEvent::HeartbeatTick);
537 assert_eq!(old_hook.events.load(Ordering::SeqCst), 1);
538 assert_eq!(new_hook.events.load(Ordering::SeqCst), 1);
539
540 clear_broadcast_hook();
541 }
542
543 #[test]
544 fn factory_observer_downcasts_through_tee() {
545 let _guard = HOOK_TEST_LOCK.lock();
546 clear_broadcast_hook();
547
548 let cfg = ObservabilityConfig {
549 backend: "log".into(),
550 ..ObservabilityConfig::default()
551 };
552 let observer = create_observer(&cfg);
553
554 assert!(observer.as_any().downcast_ref::<LogObserver>().is_some());
557 }
558}