1use std::collections::{HashMap, VecDeque};
2use std::sync::RwLock;
3use std::time::Instant;
4
5use chrono::{DateTime, NaiveDateTime, Utc};
6use serde_json::json;
7
8use super::types::{SopRun, SopRunStatus, SopStepStatus};
9use zeroclaw_memory::traits::{Memory, MemoryCategory};
10
11const MAX_RECENT_RUNS: usize = 1000;
15
16const PENDING_EVICT_SECS: u64 = 3600;
18
19#[derive(Debug, Default, Clone)]
25struct MetricCounters {
26 runs_completed: u64,
27 runs_failed: u64,
28 runs_cancelled: u64,
29 steps_executed: u64,
30 steps_defined: u64,
31 steps_failed: u64,
32 steps_skipped: u64,
33 human_approvals: u64,
34 timeout_auto_approvals: u64,
35}
36
37#[derive(Debug, Clone)]
44struct RunSnapshot {
45 completed_at: DateTime<Utc>,
46 terminal_status: SopRunStatus,
47 steps_executed: u64,
48 steps_defined: u64,
49 steps_failed: u64,
50 steps_skipped: u64,
51 human_approval_count: u64,
52 timeout_approval_count: u64,
53}
54
55#[derive(Debug, Default)]
59struct SopCounters {
60 counters: MetricCounters,
61 recent_runs: VecDeque<RunSnapshot>,
62}
63
64#[derive(Debug, Default)]
67struct CollectorState {
68 global: SopCounters,
69 per_sop: HashMap<String, SopCounters>,
70 pending_approvals: HashMap<String, (Instant, u64)>,
72 pending_timeout_approvals: HashMap<String, (Instant, u64)>,
74}
75
76pub struct SopMetricsCollector {
83 inner: RwLock<CollectorState>,
84}
85
86impl SopMetricsCollector {
87 pub fn new() -> Self {
89 Self {
90 inner: RwLock::new(CollectorState::default()),
91 }
92 }
93
94 pub fn record_run_complete(&self, run: &SopRun) {
100 let Ok(mut state) = self.inner.write() else {
101 ::zeroclaw_log::record!(
102 WARN,
103 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
104 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
105 "SOP metrics collector lock poisoned in record_run_complete"
106 );
107 return;
108 };
109
110 let now = Instant::now();
112 state
113 .pending_approvals
114 .retain(|_, (ts, _)| now.duration_since(*ts).as_secs() < PENDING_EVICT_SECS);
115 state
116 .pending_timeout_approvals
117 .retain(|_, (ts, _)| now.duration_since(*ts).as_secs() < PENDING_EVICT_SECS);
118
119 let human_count = state
120 .pending_approvals
121 .remove(&run.run_id)
122 .map(|(_, c)| c)
123 .unwrap_or(0);
124 let timeout_count = state
125 .pending_timeout_approvals
126 .remove(&run.run_id)
127 .map(|(_, c)| c)
128 .unwrap_or(0);
129
130 let snapshot = build_snapshot(run, human_count, timeout_count);
131 apply_run(&mut state.global, &snapshot);
132 let counters = state.per_sop.entry(run.sop_name.clone()).or_default();
133 apply_run(counters, &snapshot);
134 }
135
136 pub fn record_approval(&self, sop_name: &str, run_id: &str) {
140 let Ok(mut state) = self.inner.write() else {
141 ::zeroclaw_log::record!(
142 WARN,
143 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
144 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
145 "SOP metrics collector lock poisoned in record_approval"
146 );
147 return;
148 };
149 state.global.counters.human_approvals += 1;
150 state
151 .per_sop
152 .entry(sop_name.to_string())
153 .or_default()
154 .counters
155 .human_approvals += 1;
156 let entry = state
157 .pending_approvals
158 .entry(run_id.to_string())
159 .or_insert((Instant::now(), 0));
160 entry.0 = Instant::now();
161 entry.1 += 1;
162 }
163
164 pub fn record_timeout_auto_approve(&self, sop_name: &str, run_id: &str) {
168 let Ok(mut state) = self.inner.write() else {
169 ::zeroclaw_log::record!(
170 WARN,
171 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
172 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
173 "SOP metrics collector lock poisoned in record_timeout_auto_approve"
174 );
175 return;
176 };
177 state.global.counters.timeout_auto_approvals += 1;
178 state
179 .per_sop
180 .entry(sop_name.to_string())
181 .or_default()
182 .counters
183 .timeout_auto_approvals += 1;
184 let entry = state
185 .pending_timeout_approvals
186 .entry(run_id.to_string())
187 .or_insert((Instant::now(), 0));
188 entry.0 = Instant::now();
189 entry.1 += 1;
190 }
191
192 pub async fn rebuild_from_memory(memory: &dyn Memory) -> anyhow::Result<Self> {
204 let category = MemoryCategory::Custom("sop".into());
205 let entries = memory.list(Some(&category), None).await?;
206
207 let mut runs: HashMap<String, SopRun> = HashMap::new();
209 let mut approval_counts: HashMap<String, u64> = HashMap::new();
210 let mut timeout_counts: HashMap<String, u64> = HashMap::new();
211 let mut approval_sop_names: HashMap<String, String> = HashMap::new();
213
214 for entry in &entries {
215 if entry.key.starts_with("sop_run_") {
216 if let Ok(run) = serde_json::from_str::<SopRun>(&entry.content)
217 && matches!(
218 run.status,
219 SopRunStatus::Completed | SopRunStatus::Failed | SopRunStatus::Cancelled
220 )
221 {
222 runs.insert(run.run_id.clone(), run);
223 }
224 } else if entry.key.starts_with("sop_approval_") {
225 if let Ok(run) = serde_json::from_str::<SopRun>(&entry.content) {
226 *approval_counts.entry(run.run_id.clone()).or_default() += 1;
227 approval_sop_names
228 .entry(run.run_id.clone())
229 .or_insert(run.sop_name);
230 }
231 } else if entry.key.starts_with("sop_timeout_approve_")
232 && let Ok(run) = serde_json::from_str::<SopRun>(&entry.content)
233 {
234 *timeout_counts.entry(run.run_id.clone()).or_default() += 1;
235 approval_sop_names
236 .entry(run.run_id.clone())
237 .or_insert(run.sop_name);
238 }
239 }
240
241 let mut state = CollectorState::default();
243 for (run_id, run) in &runs {
244 let human_count = approval_counts.get(run_id).copied().unwrap_or(0);
245 let timeout_count = timeout_counts.get(run_id).copied().unwrap_or(0);
246 let snapshot = build_snapshot(run, human_count, timeout_count);
247 apply_run(&mut state.global, &snapshot);
248 let counters = state.per_sop.entry(run.sop_name.clone()).or_default();
249 apply_run(counters, &snapshot);
250 }
251
252 for (run_id, count) in &approval_counts {
254 state.global.counters.human_approvals += count;
255 if let Some(sop_name) = approval_sop_names.get(run_id) {
256 state
257 .per_sop
258 .entry(sop_name.clone())
259 .or_default()
260 .counters
261 .human_approvals += count;
262 }
263 }
264 for (run_id, count) in &timeout_counts {
265 state.global.counters.timeout_auto_approvals += count;
266 if let Some(sop_name) = approval_sop_names.get(run_id) {
267 state
268 .per_sop
269 .entry(sop_name.clone())
270 .or_default()
271 .counters
272 .timeout_auto_approvals += count;
273 }
274 }
275
276 for (run_id, count) in &approval_counts {
279 if !runs.contains_key(run_id) {
280 state
281 .pending_approvals
282 .insert(run_id.clone(), (Instant::now(), *count));
283 }
284 }
285 for (run_id, count) in &timeout_counts {
286 if !runs.contains_key(run_id) {
287 state
288 .pending_timeout_approvals
289 .insert(run_id.clone(), (Instant::now(), *count));
290 }
291 }
292
293 Ok(Self {
294 inner: RwLock::new(state),
295 })
296 }
297
298 pub fn get_metric_value(&self, name: &str) -> Option<serde_json::Value> {
311 let Ok(state) = self.inner.read() else {
312 return None;
313 };
314
315 let rest = name.strip_prefix("sop.")?;
316
317 if let Some(val) = resolve_metric(&state.global, rest) {
319 return Some(val);
320 }
321
322 let mut best_key: Option<&str> = None;
324 let mut best_len = 0;
325 for key in state.per_sop.keys() {
326 if rest.starts_with(key.as_str()) {
327 let next_char_idx = key.len();
328 if rest.len() > next_char_idx
330 && rest.as_bytes()[next_char_idx] == b'.'
331 && key.len() > best_len
332 {
333 best_key = Some(key.as_str());
334 best_len = key.len();
335 }
336 }
337 }
338
339 if let Some(sop_key) = best_key {
340 let suffix = &rest[sop_key.len() + 1..]; if let Some(counters) = state.per_sop.get(sop_key) {
342 return resolve_metric(counters, suffix);
343 }
344 }
345
346 None
347 }
348
349 pub fn get_metric_value_windowed(
356 &self,
357 name: &str,
358 window: &std::time::Duration,
359 ) -> Option<serde_json::Value> {
360 let state = self.inner.read().ok()?;
361 let rest = name.strip_prefix("sop.")?;
362
363 let (counters, metric_name) = if let Some(dot) = rest.find('.') {
365 let mut best_key: Option<&str> = None;
368 let mut best_len = 0;
369 for key in state.per_sop.keys() {
370 if rest.starts_with(key.as_str()) {
371 let next_char_idx = key.len();
372 if rest.len() > next_char_idx
373 && rest.as_bytes()[next_char_idx] == b'.'
374 && key.len() > best_len
375 {
376 best_key = Some(key.as_str());
377 best_len = key.len();
378 }
379 }
380 }
381 if let Some(sop_key) = best_key {
382 let suffix = &rest[sop_key.len() + 1..];
383 match state.per_sop.get(sop_key) {
384 Some(c) => (c, suffix),
385 None => return None,
386 }
387 } else {
388 let _ = dot; (&state.global, rest)
392 }
393 } else {
394 (&state.global, rest)
396 };
397
398 let cutoff = Utc::now() - chrono::Duration::from_std(*window).ok()?;
399 let wc = aggregate_windowed(&counters.recent_runs, cutoff);
400 resolve_from_counters(&wc, metric_name)
401 }
402
403 pub fn snapshot(&self) -> serde_json::Value {
405 let Ok(state) = self.inner.read() else {
406 return json!({"error": "lock poisoned"});
407 };
408
409 let per_sop: serde_json::Map<String, serde_json::Value> = state
410 .per_sop
411 .iter()
412 .map(|(name, c)| (name.clone(), counters_to_json(c)))
413 .collect();
414
415 json!({
416 "global": counters_to_json(&state.global),
417 "per_sop": per_sop,
418 "pending_approvals": state.pending_approvals.len(),
419 "pending_timeout_approvals": state.pending_timeout_approvals.len(),
420 })
421 }
422}
423
424impl Default for SopMetricsCollector {
425 fn default() -> Self {
426 Self::new()
427 }
428}
429
430fn build_snapshot(run: &SopRun, human_count: u64, timeout_count: u64) -> RunSnapshot {
433 let completed_at = run
434 .completed_at
435 .as_deref()
436 .and_then(parse_completed_at)
437 .unwrap_or_else(Utc::now);
438
439 let steps_executed = run.step_results.len() as u64;
440 let steps_failed = run
441 .step_results
442 .iter()
443 .filter(|s| s.status == SopStepStatus::Failed)
444 .count() as u64;
445 let steps_skipped = run
446 .step_results
447 .iter()
448 .filter(|s| s.status == SopStepStatus::Skipped)
449 .count() as u64;
450
451 RunSnapshot {
452 completed_at,
453 terminal_status: run.status,
454 steps_executed,
455 steps_defined: u64::from(run.total_steps),
456 steps_failed,
457 steps_skipped,
458 human_approval_count: human_count,
459 timeout_approval_count: timeout_count,
460 }
461}
462
463fn apply_run(sop: &mut SopCounters, snap: &RunSnapshot) {
464 let c = &mut sop.counters;
465 match snap.terminal_status {
466 SopRunStatus::Completed => c.runs_completed += 1,
467 SopRunStatus::Failed => c.runs_failed += 1,
468 SopRunStatus::Cancelled => c.runs_cancelled += 1,
469 _ => {}
470 }
471 c.steps_executed += snap.steps_executed;
472 c.steps_defined += snap.steps_defined;
473 c.steps_failed += snap.steps_failed;
474 c.steps_skipped += snap.steps_skipped;
475
476 sop.recent_runs.push_back(snap.clone());
477 if sop.recent_runs.len() > MAX_RECENT_RUNS {
478 sop.recent_runs.pop_front();
479 }
480}
481
482fn parse_completed_at(ts: &str) -> Option<DateTime<Utc>> {
483 if let Ok(dt) = DateTime::parse_from_rfc3339(ts) {
485 return Some(dt.with_timezone(&Utc));
486 }
487 if let Ok(n) = NaiveDateTime::parse_from_str(ts.trim_end_matches('Z'), "%Y-%m-%dT%H:%M:%S") {
489 return Some(n.and_utc());
490 }
491 ::zeroclaw_log::record!(
493 WARN,
494 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
495 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
496 .with_attrs(::serde_json::json!({"ts": ts})),
497 "SOP metrics: could not parse completed_at timestamp: "
498 );
499 None
500}
501
502fn aggregate_windowed(
504 recent_runs: &VecDeque<RunSnapshot>,
505 cutoff: DateTime<Utc>,
506) -> MetricCounters {
507 let mut wc = MetricCounters::default();
508 for snap in recent_runs {
509 if snap.completed_at >= cutoff {
510 match snap.terminal_status {
511 SopRunStatus::Completed => wc.runs_completed += 1,
512 SopRunStatus::Failed => wc.runs_failed += 1,
513 SopRunStatus::Cancelled => wc.runs_cancelled += 1,
514 _ => {}
515 }
516 wc.steps_executed += snap.steps_executed;
517 wc.steps_defined += snap.steps_defined;
518 wc.steps_failed += snap.steps_failed;
519 wc.steps_skipped += snap.steps_skipped;
520 wc.human_approvals += snap.human_approval_count;
521 wc.timeout_auto_approvals += snap.timeout_approval_count;
522 }
523 }
524 wc
525}
526
527fn resolve_metric(sop: &SopCounters, suffix: &str) -> Option<serde_json::Value> {
529 let (base, window_days) = if let Some(base) = suffix.strip_suffix("_7d") {
531 (base, Some(7i64))
532 } else if let Some(base) = suffix.strip_suffix("_30d") {
533 (base, Some(30i64))
534 } else if let Some(base) = suffix.strip_suffix("_90d") {
535 (base, Some(90i64))
536 } else {
537 (suffix, None)
538 };
539
540 if let Some(days) = window_days {
541 let cutoff = Utc::now() - chrono::Duration::days(days);
542 let wc = aggregate_windowed(&sop.recent_runs, cutoff);
543 resolve_from_counters(&wc, base)
544 } else {
545 resolve_from_counters(&sop.counters, base)
546 }
547}
548
549fn resolve_from_counters(c: &MetricCounters, metric: &str) -> Option<serde_json::Value> {
553 match metric {
554 "runs_completed" => Some(json!(c.runs_completed)),
555 "runs_failed" => Some(json!(c.runs_failed)),
556 "runs_cancelled" => Some(json!(c.runs_cancelled)),
557 "deviation_rate" => {
558 if c.steps_executed == 0 {
559 Some(json!(0.0))
560 } else {
561 Some(json!(
562 (c.steps_failed + c.steps_skipped) as f64 / c.steps_executed as f64
563 ))
564 }
565 }
566 "protocol_adherence_rate" => {
567 if c.steps_defined == 0 {
568 Some(json!(0.0))
569 } else {
570 let good = c
571 .steps_executed
572 .saturating_sub(c.steps_failed)
573 .saturating_sub(c.steps_skipped);
574 Some(json!(good as f64 / c.steps_defined as f64))
575 }
576 }
577 "human_intervention_count" => Some(json!(c.human_approvals)),
578 "human_intervention_rate" => Some(json!(
579 c.human_approvals as f64 / c.runs_completed.max(1) as f64
580 )),
581 "timeout_auto_approvals" => Some(json!(c.timeout_auto_approvals)),
582 "timeout_approval_rate" => Some(json!(
583 c.timeout_auto_approvals as f64 / c.runs_completed.max(1) as f64
584 )),
585 "completion_rate" => {
586 let total = c.runs_completed + c.runs_failed + c.runs_cancelled;
587 Some(json!(c.runs_completed as f64 / total.max(1) as f64))
588 }
589 _ => None,
590 }
591}
592
593fn counters_to_json(sop: &SopCounters) -> serde_json::Value {
594 let c = &sop.counters;
595 json!({
596 "runs_completed": c.runs_completed,
597 "runs_failed": c.runs_failed,
598 "runs_cancelled": c.runs_cancelled,
599 "steps_executed": c.steps_executed,
600 "steps_defined": c.steps_defined,
601 "steps_failed": c.steps_failed,
602 "steps_skipped": c.steps_skipped,
603 "human_approvals": c.human_approvals,
604 "timeout_auto_approvals": c.timeout_auto_approvals,
605 "recent_runs_depth": sop.recent_runs.len(),
606 })
607}
608
609#[cfg(test)]
612mod tests {
613 use super::*;
614 use crate::sop::types::{SopEvent, SopStepResult, SopTriggerSource};
615
616 fn make_event() -> SopEvent {
617 SopEvent {
618 source: SopTriggerSource::Manual,
619 topic: None,
620 payload: None,
621 timestamp: "2026-02-19T12:00:00Z".into(),
622 }
623 }
624
625 fn make_run(
626 run_id: &str,
627 sop_name: &str,
628 status: SopRunStatus,
629 total_steps: u32,
630 step_results: Vec<SopStepResult>,
631 ) -> SopRun {
632 let now = Utc::now();
633 let started = (now - chrono::Duration::minutes(5)).to_rfc3339();
634 let completed = now.to_rfc3339();
635 SopRun {
636 run_id: run_id.into(),
637 sop_name: sop_name.into(),
638 trigger_event: make_event(),
639 status,
640 current_step: total_steps,
641 total_steps,
642 started_at: started,
643 completed_at: Some(completed),
644 step_results,
645 waiting_since: None,
646 llm_calls_saved: 0,
647 }
648 }
649
650 fn make_step(number: u32, status: SopStepStatus) -> SopStepResult {
651 SopStepResult {
652 step_number: number,
653 status,
654 output: format!("Step {number}"),
655 started_at: "2026-02-19T12:00:00Z".into(),
656 completed_at: Some("2026-02-19T12:01:00Z".into()),
657 }
658 }
659
660 #[test]
661 fn zero_state_baseline() {
662 let c = SopMetricsCollector::new();
663 assert_eq!(c.get_metric_value("sop.runs_completed"), Some(json!(0u64)));
664 assert_eq!(c.get_metric_value("sop.runs_failed"), Some(json!(0u64)));
665 assert_eq!(c.get_metric_value("sop.runs_cancelled"), Some(json!(0u64)));
666 assert_eq!(c.get_metric_value("sop.deviation_rate"), Some(json!(0.0)));
667 assert_eq!(c.get_metric_value("sop.completion_rate"), Some(json!(0.0)));
668 }
669
670 #[test]
671 fn counter_arithmetic() {
672 let c = SopMetricsCollector::new();
673 let run = make_run(
674 "r1",
675 "test-sop",
676 SopRunStatus::Completed,
677 3,
678 vec![
679 make_step(1, SopStepStatus::Completed),
680 make_step(2, SopStepStatus::Completed),
681 make_step(3, SopStepStatus::Completed),
682 ],
683 );
684 c.record_run_complete(&run);
685
686 assert_eq!(c.get_metric_value("sop.runs_completed"), Some(json!(1u64)));
687 assert_eq!(c.get_metric_value("sop.runs_failed"), Some(json!(0u64)));
688 assert_eq!(c.get_metric_value("sop.deviation_rate"), Some(json!(0.0)));
689 assert_eq!(c.get_metric_value("sop.completion_rate"), Some(json!(1.0)));
690 }
691
692 #[test]
693 fn windowed_filtering() {
694 let c = SopMetricsCollector::new();
695 let run = make_run(
696 "r1",
697 "test-sop",
698 SopRunStatus::Completed,
699 2,
700 vec![
701 make_step(1, SopStepStatus::Completed),
702 make_step(2, SopStepStatus::Completed),
703 ],
704 );
705 c.record_run_complete(&run);
706
707 assert_eq!(
708 c.get_metric_value("sop.runs_completed_7d"),
709 Some(json!(1u64))
710 );
711 assert_eq!(
712 c.get_metric_value("sop.runs_completed_30d"),
713 Some(json!(1u64))
714 );
715 assert_eq!(
716 c.get_metric_value("sop.runs_completed_90d"),
717 Some(json!(1u64))
718 );
719 }
720
721 #[test]
722 fn deviation_rate_zero_steps() {
723 let c = SopMetricsCollector::new();
724 let run = make_run("r1", "test-sop", SopRunStatus::Completed, 0, vec![]);
725 c.record_run_complete(&run);
726 assert_eq!(c.get_metric_value("sop.deviation_rate"), Some(json!(0.0)));
727 }
728
729 #[test]
730 fn protocol_adherence_rate_partial_run() {
731 let c = SopMetricsCollector::new();
732 let run = make_run(
733 "r1",
734 "test-sop",
735 SopRunStatus::Failed,
736 3,
737 vec![
738 make_step(1, SopStepStatus::Completed),
739 make_step(2, SopStepStatus::Failed),
740 ],
741 );
742 c.record_run_complete(&run);
743
744 let val = c
746 .get_metric_value("sop.protocol_adherence_rate")
747 .unwrap()
748 .as_f64()
749 .unwrap();
750 assert!((val - 1.0 / 3.0).abs() < 1e-10);
751 }
752
753 #[test]
754 fn protocol_adherence_rate_full_run() {
755 let c = SopMetricsCollector::new();
756 let run = make_run(
757 "r1",
758 "test-sop",
759 SopRunStatus::Completed,
760 2,
761 vec![
762 make_step(1, SopStepStatus::Completed),
763 make_step(2, SopStepStatus::Completed),
764 ],
765 );
766 c.record_run_complete(&run);
767
768 let val = c
769 .get_metric_value("sop.protocol_adherence_rate")
770 .unwrap()
771 .as_f64()
772 .unwrap();
773 assert!((val - 1.0).abs() < 1e-10);
774 }
775
776 #[test]
777 fn protocol_adherence_rate_failed_run() {
778 let c = SopMetricsCollector::new();
779 let run = make_run(
780 "r1",
781 "test-sop",
782 SopRunStatus::Failed,
783 3,
784 vec![
785 make_step(1, SopStepStatus::Completed),
786 make_step(2, SopStepStatus::Failed),
787 make_step(3, SopStepStatus::Skipped),
788 ],
789 );
790 c.record_run_complete(&run);
791
792 let val = c
794 .get_metric_value("sop.protocol_adherence_rate")
795 .unwrap()
796 .as_f64()
797 .unwrap();
798 assert!((val - 1.0 / 3.0).abs() < 1e-10);
799 }
800
801 #[test]
802 fn derived_rate_metrics() {
803 let c = SopMetricsCollector::new();
804 c.record_approval("test-sop", "r1");
805 c.record_timeout_auto_approve("test-sop", "r2");
806
807 let run1 = make_run(
808 "r1",
809 "test-sop",
810 SopRunStatus::Completed,
811 1,
812 vec![make_step(1, SopStepStatus::Completed)],
813 );
814 let run2 = make_run(
815 "r2",
816 "test-sop",
817 SopRunStatus::Completed,
818 1,
819 vec![make_step(1, SopStepStatus::Completed)],
820 );
821 c.record_run_complete(&run1);
822 c.record_run_complete(&run2);
823
824 let hir = c
826 .get_metric_value("sop.human_intervention_rate")
827 .unwrap()
828 .as_f64()
829 .unwrap();
830 assert!((hir - 0.5).abs() < 1e-10);
831
832 let tar = c
834 .get_metric_value("sop.timeout_approval_rate")
835 .unwrap()
836 .as_f64()
837 .unwrap();
838 assert!((tar - 0.5).abs() < 1e-10);
839
840 assert_eq!(c.get_metric_value("sop.completion_rate"), Some(json!(1.0)));
841 }
842
843 #[test]
844 fn per_sop_lookup() {
845 let c = SopMetricsCollector::new();
846 let run = make_run(
847 "r1",
848 "valve-shutdown",
849 SopRunStatus::Completed,
850 2,
851 vec![
852 make_step(1, SopStepStatus::Completed),
853 make_step(2, SopStepStatus::Completed),
854 ],
855 );
856 c.record_run_complete(&run);
857
858 assert_eq!(
859 c.get_metric_value("sop.valve-shutdown.runs_completed"),
860 Some(json!(1u64))
861 );
862 assert_eq!(
863 c.get_metric_value("sop.valve-shutdown.completion_rate"),
864 Some(json!(1.0))
865 );
866 }
867
868 #[test]
869 fn longest_match_disambiguation() {
870 let c = SopMetricsCollector::new();
871 let r1 = make_run(
872 "r1",
873 "valve",
874 SopRunStatus::Completed,
875 1,
876 vec![make_step(1, SopStepStatus::Completed)],
877 );
878 let r2 = make_run(
879 "r2",
880 "valve-shutdown",
881 SopRunStatus::Failed,
882 2,
883 vec![
884 make_step(1, SopStepStatus::Completed),
885 make_step(2, SopStepStatus::Failed),
886 ],
887 );
888 c.record_run_complete(&r1);
889 c.record_run_complete(&r2);
890
891 assert_eq!(
892 c.get_metric_value("sop.valve-shutdown.runs_failed"),
893 Some(json!(1u64))
894 );
895 assert_eq!(
896 c.get_metric_value("sop.valve.runs_completed"),
897 Some(json!(1u64))
898 );
899 }
900
901 #[test]
902 fn not_found_for_unknown_metric() {
903 let c = SopMetricsCollector::new();
904 assert_eq!(c.get_metric_value("sop.nonexistent"), None);
905 assert_eq!(c.get_metric_value("other.runs_completed"), None);
906 assert_eq!(c.get_metric_value("sop.no-sop.nonexistent"), None);
907 }
908
909 #[test]
910 fn approval_flag_propagation() {
911 let c = SopMetricsCollector::new();
912 c.record_approval("test-sop", "r1");
913
914 let run = make_run(
915 "r1",
916 "test-sop",
917 SopRunStatus::Completed,
918 1,
919 vec![make_step(1, SopStepStatus::Completed)],
920 );
921 c.record_run_complete(&run);
922
923 let snap = c.snapshot();
924 let global = &snap["global"];
925 assert_eq!(global["human_approvals"], json!(1u64));
926 assert_eq!(global["runs_completed"], json!(1u64));
927
928 let hic = c
929 .get_metric_value("sop.human_intervention_count_7d")
930 .unwrap()
931 .as_u64()
932 .unwrap();
933 assert_eq!(hic, 1);
934 }
935
936 #[test]
937 fn pending_approval_stale_eviction() {
938 let c = SopMetricsCollector::new();
939 c.record_approval("test-sop", "orphan-run");
940
941 {
942 let state = c.inner.read().unwrap();
943 assert_eq!(state.pending_approvals.len(), 1);
944 }
945
946 let run = make_run(
947 "r2",
948 "test-sop",
949 SopRunStatus::Completed,
950 1,
951 vec![make_step(1, SopStepStatus::Completed)],
952 );
953 c.record_run_complete(&run);
954
955 {
957 let state = c.inner.read().unwrap();
958 assert_eq!(state.pending_approvals.len(), 1);
959 }
960 }
961
962 #[test]
963 fn snapshot_diagnostic_output() {
964 let c = SopMetricsCollector::new();
965 let run = make_run(
966 "r1",
967 "test-sop",
968 SopRunStatus::Completed,
969 1,
970 vec![make_step(1, SopStepStatus::Completed)],
971 );
972 c.record_run_complete(&run);
973
974 let snap = c.snapshot();
975 assert!(snap["global"].is_object());
976 assert!(snap["per_sop"].is_object());
977 assert_eq!(snap["global"]["runs_completed"], json!(1u64));
978 assert_eq!(snap["global"]["recent_runs_depth"], json!(1));
979 assert!(snap["per_sop"]["test-sop"].is_object());
980 }
981
982 #[test]
983 fn runs_cancelled_tracking() {
984 let c = SopMetricsCollector::new();
985 let run = make_run(
986 "r1",
987 "test-sop",
988 SopRunStatus::Cancelled,
989 2,
990 vec![make_step(1, SopStepStatus::Completed)],
991 );
992 c.record_run_complete(&run);
993
994 assert_eq!(c.get_metric_value("sop.runs_cancelled"), Some(json!(1u64)));
995 let cr = c
996 .get_metric_value("sop.completion_rate")
997 .unwrap()
998 .as_f64()
999 .unwrap();
1000 assert!((cr - 0.0).abs() < 1e-10);
1001 }
1002
1003 #[test]
1006 fn multiple_approvals_per_run_consistent() {
1007 let c = SopMetricsCollector::new();
1008 c.record_approval("test-sop", "r1");
1010 c.record_approval("test-sop", "r1");
1011 c.record_approval("test-sop", "r1");
1012
1013 let run = make_run(
1014 "r1",
1015 "test-sop",
1016 SopRunStatus::Completed,
1017 3,
1018 vec![
1019 make_step(1, SopStepStatus::Completed),
1020 make_step(2, SopStepStatus::Completed),
1021 make_step(3, SopStepStatus::Completed),
1022 ],
1023 );
1024 c.record_run_complete(&run);
1025
1026 assert_eq!(
1028 c.get_metric_value("sop.human_intervention_count"),
1029 Some(json!(3u64))
1030 );
1031 assert_eq!(
1033 c.get_metric_value("sop.human_intervention_count_7d"),
1034 Some(json!(3u64))
1035 );
1036 let rate = c
1038 .get_metric_value("sop.human_intervention_rate")
1039 .unwrap()
1040 .as_f64()
1041 .unwrap();
1042 assert!((rate - 3.0).abs() < 1e-10);
1043 }
1044
1045 #[test]
1048 fn ring_buffer_overflow_cap() {
1049 let c = SopMetricsCollector::new();
1050 for i in 0..1001u64 {
1051 let run = make_run(
1052 &format!("r{i}"),
1053 "test-sop",
1054 SopRunStatus::Completed,
1055 1,
1056 vec![make_step(1, SopStepStatus::Completed)],
1057 );
1058 c.record_run_complete(&run);
1059 }
1060
1061 assert_eq!(
1063 c.get_metric_value("sop.runs_completed"),
1064 Some(json!(1001u64))
1065 );
1066 let snap = c.snapshot();
1068 assert_eq!(snap["global"]["recent_runs_depth"], json!(MAX_RECENT_RUNS));
1069 let w = c
1071 .get_metric_value("sop.runs_completed_7d")
1072 .unwrap()
1073 .as_u64()
1074 .unwrap();
1075 assert_eq!(w, MAX_RECENT_RUNS as u64);
1076 }
1077
1078 #[test]
1081 fn windowed_excludes_old_runs() {
1082 let c = SopMetricsCollector::new();
1083 {
1085 let mut state = c.inner.write().unwrap();
1086 let old_snap = RunSnapshot {
1087 completed_at: Utc::now() - chrono::Duration::days(10),
1088 terminal_status: SopRunStatus::Completed,
1089 steps_executed: 1,
1090 steps_defined: 1,
1091 steps_failed: 0,
1092 steps_skipped: 0,
1093 human_approval_count: 0,
1094 timeout_approval_count: 0,
1095 };
1096 state.global.counters.runs_completed += 1;
1097 state.global.counters.steps_executed += 1;
1098 state.global.counters.steps_defined += 1;
1099 state.global.recent_runs.push_back(old_snap);
1100 }
1101
1102 assert_eq!(c.get_metric_value("sop.runs_completed"), Some(json!(1u64)));
1104 assert_eq!(
1106 c.get_metric_value("sop.runs_completed_7d"),
1107 Some(json!(0u64))
1108 );
1109 assert_eq!(
1111 c.get_metric_value("sop.runs_completed_30d"),
1112 Some(json!(1u64))
1113 );
1114 }
1115
1116 #[test]
1119 fn sop_name_matching_metric_suffix_resolves_global() {
1120 let c = SopMetricsCollector::new();
1121 let run = make_run(
1123 "r1",
1124 "runs_completed",
1125 SopRunStatus::Completed,
1126 1,
1127 vec![make_step(1, SopStepStatus::Completed)],
1128 );
1129 c.record_run_complete(&run);
1130
1131 assert_eq!(c.get_metric_value("sop.runs_completed"), Some(json!(1u64)));
1133 assert_eq!(
1135 c.get_metric_value("sop.runs_completed.runs_completed"),
1136 Some(json!(1u64))
1137 );
1138 }
1139
1140 #[tokio::test]
1143 async fn warm_start_roundtrip() {
1144 let mem_cfg = zeroclaw_config::schema::MemoryConfig {
1145 backend: "sqlite".into(),
1146 ..zeroclaw_config::schema::MemoryConfig::default()
1147 };
1148 let tmp = tempfile::tempdir().unwrap();
1149 let memory: std::sync::Arc<dyn Memory> = std::sync::Arc::from(
1150 zeroclaw_memory::create_memory(&mem_cfg, tmp.path(), None).unwrap(),
1151 );
1152
1153 let audit = crate::sop::SopAuditLogger::new(memory.clone());
1154 let run = make_run(
1155 "r1",
1156 "test-sop",
1157 SopRunStatus::Completed,
1158 2,
1159 vec![
1160 make_step(1, SopStepStatus::Completed),
1161 make_step(2, SopStepStatus::Completed),
1162 ],
1163 );
1164 audit.log_run_start(&run).await.unwrap();
1165 audit.log_run_complete(&run).await.unwrap();
1166 audit.log_approval(&run, 1).await.unwrap();
1167
1168 let collector = SopMetricsCollector::rebuild_from_memory(memory.as_ref())
1169 .await
1170 .unwrap();
1171
1172 assert_eq!(
1173 collector.get_metric_value("sop.runs_completed"),
1174 Some(json!(1u64))
1175 );
1176 assert_eq!(
1177 collector.get_metric_value("sop.human_intervention_count"),
1178 Some(json!(1u64))
1179 );
1180 assert_eq!(
1181 collector.get_metric_value("sop.test-sop.runs_completed"),
1182 Some(json!(1u64))
1183 );
1184 }
1185
1186 #[tokio::test]
1187 async fn warm_start_skips_running_runs() {
1188 let mem_cfg = zeroclaw_config::schema::MemoryConfig {
1189 backend: "sqlite".into(),
1190 ..zeroclaw_config::schema::MemoryConfig::default()
1191 };
1192 let tmp = tempfile::tempdir().unwrap();
1193 let memory: std::sync::Arc<dyn Memory> = std::sync::Arc::from(
1194 zeroclaw_memory::create_memory(&mem_cfg, tmp.path(), None).unwrap(),
1195 );
1196
1197 let audit = crate::sop::SopAuditLogger::new(memory.clone());
1198 let run = SopRun {
1199 run_id: "r1".into(),
1200 sop_name: "test-sop".into(),
1201 trigger_event: make_event(),
1202 status: SopRunStatus::Running,
1203 current_step: 1,
1204 total_steps: 3,
1205 started_at: "2026-02-19T12:00:00Z".into(),
1206 completed_at: None,
1207 step_results: vec![],
1208 waiting_since: None,
1209 llm_calls_saved: 0,
1210 };
1211 audit.log_run_start(&run).await.unwrap();
1212
1213 let collector = SopMetricsCollector::rebuild_from_memory(memory.as_ref())
1214 .await
1215 .unwrap();
1216
1217 assert_eq!(
1218 collector.get_metric_value("sop.runs_completed"),
1219 Some(json!(0u64))
1220 );
1221 }
1222
1223 #[tokio::test]
1224 async fn warm_start_empty_memory() {
1225 let mem_cfg = zeroclaw_config::schema::MemoryConfig {
1226 backend: "sqlite".into(),
1227 ..zeroclaw_config::schema::MemoryConfig::default()
1228 };
1229 let tmp = tempfile::tempdir().unwrap();
1230 let memory: std::sync::Arc<dyn Memory> = std::sync::Arc::from(
1231 zeroclaw_memory::create_memory(&mem_cfg, tmp.path(), None).unwrap(),
1232 );
1233
1234 let collector = SopMetricsCollector::rebuild_from_memory(memory.as_ref())
1235 .await
1236 .unwrap();
1237
1238 assert_eq!(
1239 collector.get_metric_value("sop.runs_completed"),
1240 Some(json!(0u64))
1241 );
1242 }
1243
1244 #[tokio::test]
1245 async fn warm_start_approval_matching() {
1246 let mem_cfg = zeroclaw_config::schema::MemoryConfig {
1247 backend: "sqlite".into(),
1248 ..zeroclaw_config::schema::MemoryConfig::default()
1249 };
1250 let tmp = tempfile::tempdir().unwrap();
1251 let memory: std::sync::Arc<dyn Memory> = std::sync::Arc::from(
1252 zeroclaw_memory::create_memory(&mem_cfg, tmp.path(), None).unwrap(),
1253 );
1254
1255 let audit = crate::sop::SopAuditLogger::new(memory.clone());
1256 let run = make_run(
1257 "r1",
1258 "test-sop",
1259 SopRunStatus::Completed,
1260 1,
1261 vec![make_step(1, SopStepStatus::Completed)],
1262 );
1263 audit.log_run_start(&run).await.unwrap();
1264 audit.log_timeout_auto_approve(&run, 1).await.unwrap();
1265 audit.log_run_complete(&run).await.unwrap();
1266
1267 let collector = SopMetricsCollector::rebuild_from_memory(memory.as_ref())
1268 .await
1269 .unwrap();
1270
1271 assert_eq!(
1272 collector.get_metric_value("sop.timeout_auto_approvals"),
1273 Some(json!(1u64))
1274 );
1275 let ta_7d = collector
1276 .get_metric_value("sop.timeout_auto_approvals_7d")
1277 .unwrap()
1278 .as_u64()
1279 .unwrap();
1280 assert_eq!(ta_7d, 1);
1281 }
1282
1283 #[tokio::test]
1286 async fn warm_start_preserves_pending_for_nonterminal_runs() {
1287 let mem_cfg = zeroclaw_config::schema::MemoryConfig {
1288 backend: "sqlite".into(),
1289 ..zeroclaw_config::schema::MemoryConfig::default()
1290 };
1291 let tmp = tempfile::tempdir().unwrap();
1292 let memory: std::sync::Arc<dyn Memory> = std::sync::Arc::from(
1293 zeroclaw_memory::create_memory(&mem_cfg, tmp.path(), None).unwrap(),
1294 );
1295
1296 let audit = crate::sop::SopAuditLogger::new(memory.clone());
1297
1298 let running_run = SopRun {
1300 run_id: "r1".into(),
1301 sop_name: "test-sop".into(),
1302 trigger_event: make_event(),
1303 status: SopRunStatus::Running,
1304 current_step: 1,
1305 total_steps: 3,
1306 started_at: "2026-02-19T12:00:00Z".into(),
1307 completed_at: None,
1308 step_results: vec![],
1309 waiting_since: None,
1310 llm_calls_saved: 0,
1311 };
1312 audit.log_run_start(&running_run).await.unwrap();
1313 audit.log_approval(&running_run, 1).await.unwrap();
1314
1315 let collector = SopMetricsCollector::rebuild_from_memory(memory.as_ref())
1317 .await
1318 .unwrap();
1319
1320 assert_eq!(
1322 collector.get_metric_value("sop.human_intervention_count"),
1323 Some(json!(1u64))
1324 );
1325 assert_eq!(
1327 collector.get_metric_value("sop.runs_completed"),
1328 Some(json!(0u64))
1329 );
1330
1331 let completed_run = make_run(
1333 "r1",
1334 "test-sop",
1335 SopRunStatus::Completed,
1336 3,
1337 vec![
1338 make_step(1, SopStepStatus::Completed),
1339 make_step(2, SopStepStatus::Completed),
1340 make_step(3, SopStepStatus::Completed),
1341 ],
1342 );
1343 collector.record_run_complete(&completed_run);
1344
1345 let hic_7d = collector
1347 .get_metric_value("sop.human_intervention_count_7d")
1348 .unwrap()
1349 .as_u64()
1350 .unwrap();
1351 assert_eq!(hic_7d, 1);
1352 }
1353
1354 #[test]
1357 fn get_metric_windowed_7d_matches_suffix() {
1358 let c = SopMetricsCollector::new();
1359 let run = make_run(
1360 "r1",
1361 "test-sop",
1362 SopRunStatus::Completed,
1363 2,
1364 vec![
1365 make_step(1, SopStepStatus::Completed),
1366 make_step(2, SopStepStatus::Completed),
1367 ],
1368 );
1369 c.record_run_complete(&run);
1370
1371 let suffix_val = c.get_metric_value("sop.completion_rate_7d");
1372 let windowed_val = c.get_metric_value_windowed(
1373 "sop.completion_rate",
1374 &std::time::Duration::from_secs(7 * 86400),
1375 );
1376 assert_eq!(suffix_val, windowed_val);
1377 }
1378
1379 #[test]
1380 fn get_metric_windowed_custom_duration() {
1381 let c = SopMetricsCollector::new();
1382 let run = make_run(
1384 "r1",
1385 "test-sop",
1386 SopRunStatus::Completed,
1387 1,
1388 vec![make_step(1, SopStepStatus::Completed)],
1389 );
1390 c.record_run_complete(&run);
1391
1392 {
1394 let mut state = c.inner.write().unwrap();
1395 let old_snap = RunSnapshot {
1396 completed_at: Utc::now() - chrono::Duration::days(20),
1397 terminal_status: SopRunStatus::Completed,
1398 steps_executed: 1,
1399 steps_defined: 1,
1400 steps_failed: 0,
1401 steps_skipped: 0,
1402 human_approval_count: 0,
1403 timeout_approval_count: 0,
1404 };
1405 state.global.recent_runs.push_back(old_snap);
1406 }
1407
1408 let val = c
1410 .get_metric_value_windowed(
1411 "sop.runs_completed",
1412 &std::time::Duration::from_secs(14 * 86400),
1413 )
1414 .unwrap()
1415 .as_u64()
1416 .unwrap();
1417 assert_eq!(val, 1);
1418
1419 let val = c
1421 .get_metric_value_windowed(
1422 "sop.runs_completed",
1423 &std::time::Duration::from_secs(30 * 86400),
1424 )
1425 .unwrap()
1426 .as_u64()
1427 .unwrap();
1428 assert_eq!(val, 2);
1429 }
1430}