1use std::sync::{Arc, Mutex};
8
9use super::audit::SopAuditLogger;
10use super::engine::{SopEngine, now_iso8601};
11use super::types::{SopEvent, SopRun, SopRunAction, SopTriggerSource};
12
13#[derive(Debug, Clone)]
17pub enum DispatchResult {
18 Started {
23 run_id: String,
24 sop_name: String,
25 action: Box<SopRunAction>,
26 },
27 Skipped { sop_name: String, reason: String },
29 NoMatch,
31}
32
33fn extract_run_id_from_action(action: &SopRunAction) -> &str {
37 match action {
38 SopRunAction::ExecuteStep { run_id, .. }
39 | SopRunAction::WaitApproval { run_id, .. }
40 | SopRunAction::DeterministicStep { run_id, .. }
41 | SopRunAction::CheckpointWait { run_id, .. }
42 | SopRunAction::Completed { run_id, .. }
43 | SopRunAction::Failed { run_id, .. } => run_id,
44 }
45}
46
47fn action_label(action: &SopRunAction) -> &'static str {
49 match action {
50 SopRunAction::ExecuteStep { .. } => "ExecuteStep",
51 SopRunAction::WaitApproval { .. } => "WaitApproval",
52 SopRunAction::DeterministicStep { .. } => "DeterministicStep",
53 SopRunAction::CheckpointWait { .. } => "CheckpointWait",
54 SopRunAction::Completed { .. } => "Completed",
55 SopRunAction::Failed { .. } => "Failed",
56 }
57}
58
59pub async fn dispatch_sop_event(
68 engine: &Arc<Mutex<SopEngine>>,
69 audit: &SopAuditLogger,
70 event: SopEvent,
71) -> Vec<DispatchResult> {
72 let matched_names: Vec<String> = match engine.lock() {
74 Ok(eng) => eng
75 .match_trigger(&event)
76 .iter()
77 .map(|s| s.name.clone())
78 .collect(),
79 Err(e) => {
80 crate::health::mark_component_error("sop_dispatch", format!("lock poisoned: {e}"));
81 ::zeroclaw_log::record!(
82 WARN,
83 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
84 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
85 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
86 "SOP dispatch: engine lock poisoned during match phase"
87 );
88 return vec![];
89 }
90 };
91
92 if matched_names.is_empty() {
93 ::zeroclaw_log::record!(
94 DEBUG,
95 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
96 "SOP dispatch: no match for event"
97 );
98 return vec![DispatchResult::NoMatch];
99 }
100
101 ::zeroclaw_log::record!(
102 INFO,
103 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
104 &format!(
105 "SOP dispatch: {} SOP(s) matched: {:?}",
106 matched_names.len(),
107 matched_names
108 )
109 );
110
111 let mut results = Vec::new();
113 let mut started_runs: Vec<SopRun> = Vec::new();
114
115 {
116 let mut eng = match engine.lock() {
117 Ok(e) => e,
118 Err(e) => {
119 crate::health::mark_component_error("sop_dispatch", format!("lock poisoned: {e}"));
120 ::zeroclaw_log::record!(
121 WARN,
122 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
123 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
124 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
125 "SOP dispatch: engine lock poisoned during start phase"
126 );
127 return vec![];
128 }
129 };
130
131 for sop_name in &matched_names {
132 match eng.start_run(sop_name, event.clone()) {
133 Ok(action) => {
134 let run_id = extract_run_id_from_action(&action).to_string();
136 if let Some(run) = eng.active_runs().get(&run_id) {
138 started_runs.push(run.clone());
139 }
140 ::zeroclaw_log::record!(
141 INFO,
142 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
143 &format!(
144 "SOP dispatch: started '{}' run {run_id} (action: {})",
145 sop_name,
146 action_label(&action)
147 )
148 );
149 results.push(DispatchResult::Started {
150 run_id,
151 sop_name: sop_name.clone(),
152 action: Box::new(action),
153 });
154 }
155 Err(e) => {
156 ::zeroclaw_log::record!(
157 INFO,
158 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
159 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
160 &format!("SOP dispatch: skipped '{}'", sop_name)
161 );
162 results.push(DispatchResult::Skipped {
163 sop_name: sop_name.clone(),
164 reason: e.to_string(),
165 });
166 }
167 }
168 }
169 } use zeroclaw_log::Instrument;
173 for run in &started_runs {
174 let span = zeroclaw_log::attribution_span!(run);
175 let run_id = run.run_id.clone();
176 if let Err(e) = zeroclaw_log::scope!(
177 session_key: run_id,
178 =>
179 audit.log_run_start(run)
180 )
181 .instrument(span)
182 .await
183 {
184 ::zeroclaw_log::record!(
185 WARN,
186 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
187 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
188 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
189 &format!("SOP dispatch: audit log failed for run {}", run.run_id)
190 );
191 }
192 }
193
194 crate::health::mark_component_ok("sop_dispatch");
195 results
196}
197
198pub fn process_headless_results(results: &[DispatchResult]) {
208 for result in results {
209 match result {
210 DispatchResult::Started {
211 run_id,
212 sop_name,
213 action,
214 } => match action.as_ref() {
215 SopRunAction::ExecuteStep { step, .. } => {
216 ::zeroclaw_log::record!(
217 WARN,
218 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
219 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
220 &format!(
221 "SOP headless dispatch: run {run_id} ('{sop_name}') ready for step {} \
222 '{}' but no agent loop available to execute",
223 step.number, step.title
224 )
225 );
226 }
227 SopRunAction::WaitApproval { step, .. } => {
228 ::zeroclaw_log::record!(
229 INFO,
230 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
231 &format!(
232 "SOP headless dispatch: run {run_id} ('{sop_name}') waiting for approval \
233 on step {} '{}'. Timeout polling will handle progression",
234 step.number, step.title
235 )
236 );
237 }
238 SopRunAction::DeterministicStep { step, .. } => {
239 ::zeroclaw_log::record!(
240 INFO,
241 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
242 &format!(
243 "SOP headless dispatch: run {run_id} ('{sop_name}') deterministic step {} \
244 '{}'",
245 step.number, step.title
246 )
247 );
248 }
249 SopRunAction::CheckpointWait {
250 step, state_file, ..
251 } => {
252 ::zeroclaw_log::record!(
253 INFO,
254 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
255 &format!(
256 "SOP headless dispatch: run {run_id} ('{sop_name}') checkpoint at step {} \
257 '{}', state persisted to {}",
258 step.number,
259 step.title,
260 state_file.display().to_string()
261 )
262 );
263 }
264 SopRunAction::Completed { .. } => {
265 ::zeroclaw_log::record!(
266 INFO,
267 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
268 .with_attrs(
269 ::serde_json::json!({"run_id": run_id, "sop_name": sop_name})
270 ),
271 "SOP headless dispatch: run ('') completed immediately"
272 );
273 }
274 SopRunAction::Failed { reason, .. } => {
275 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"run_id": run_id, "sop_name": sop_name, "reason": reason.to_string()})), "SOP headless dispatch: run ('') failed: ");
276 }
277 },
278 DispatchResult::Skipped { sop_name, reason } => {
279 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"sop_name": sop_name, "reason": reason.to_string()})), "SOP headless dispatch: skipped '': ");
280 }
281 DispatchResult::NoMatch => {}
282 }
283 }
284}
285
286pub async fn dispatch_peripheral_signal(
293 engine: &Arc<Mutex<SopEngine>>,
294 audit: &SopAuditLogger,
295 board: &str,
296 signal: &str,
297 payload: Option<&str>,
298) -> Vec<DispatchResult> {
299 let event = SopEvent {
300 source: SopTriggerSource::Peripheral,
301 topic: Some(format!("{board}/{signal}")),
302 payload: payload.map(String::from),
303 timestamp: now_iso8601(),
304 };
305 dispatch_sop_event(engine, audit, event).await
306}
307
308#[derive(Clone)]
315pub struct SopCronCache {
316 schedules: Vec<(String, String, cron::Schedule)>,
318}
319
320impl SopCronCache {
321 pub fn from_engine(engine: &Arc<Mutex<SopEngine>>) -> Self {
326 let mut schedules = Vec::new();
327 let eng = match engine.lock() {
328 Ok(e) => e,
329 Err(e) => {
330 ::zeroclaw_log::record!(
331 WARN,
332 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
333 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
334 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
335 "SopCronCache: engine lock poisoned"
336 );
337 return Self { schedules };
338 }
339 };
340
341 for sop in eng.sops() {
342 for trigger in &sop.triggers {
343 if let super::types::SopTrigger::Cron { expression } = trigger {
344 let normalized = match crate::cron::normalize_expression(expression) {
346 Ok(n) => n,
347 Err(e) => {
348 ::zeroclaw_log::record!(
349 WARN,
350 ::zeroclaw_log::Event::new(
351 module_path!(),
352 ::zeroclaw_log::Action::Note
353 )
354 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
355 &format!(
356 "SopCronCache: invalid cron expression '{}' in SOP '{}': {e}",
357 expression, sop.name
358 )
359 );
360 continue;
361 }
362 };
363 match normalized.parse::<cron::Schedule>() {
364 Ok(schedule) => {
365 schedules.push((sop.name.clone(), expression.clone(), schedule));
366 }
367 Err(e) => {
368 ::zeroclaw_log::record!(
369 WARN,
370 ::zeroclaw_log::Event::new(
371 module_path!(),
372 ::zeroclaw_log::Action::Note
373 )
374 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
375 &format!(
376 "SopCronCache: failed to parse cron schedule '{}' for SOP '{}': {e}",
377 normalized, sop.name
378 )
379 );
380 }
381 }
382 }
383 }
384 }
385
386 ::zeroclaw_log::record!(
387 INFO,
388 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
389 &format!("SopCronCache: cached {} cron schedule(s)", schedules.len())
390 );
391 Self { schedules }
392 }
393
394 #[cfg(test)]
396 pub fn schedules(&self) -> &[(String, String, cron::Schedule)] {
397 &self.schedules
398 }
399}
400
401pub async fn check_sop_cron_triggers(
406 engine: &Arc<Mutex<SopEngine>>,
407 audit: &SopAuditLogger,
408 cache: &SopCronCache,
409 last_check: &mut chrono::DateTime<chrono::Utc>,
410) -> Vec<DispatchResult> {
411 let now = chrono::Utc::now();
412 let mut all_results = Vec::new();
413
414 for (_sop_name, expression, schedule) in &cache.schedules {
415 let mut upcoming = schedule.after(last_check);
420 if let Some(next) = upcoming.next()
421 && next <= now
422 {
423 let event = SopEvent {
425 source: SopTriggerSource::Cron,
426 topic: Some(expression.clone()),
427 payload: None,
428 timestamp: now_iso8601(),
429 };
430 let results = dispatch_sop_event(engine, audit, event).await;
431 all_results.extend(results);
432 }
433 }
434
435 *last_check = now;
436 all_results
437}
438
439#[cfg(test)]
442mod tests {
443 use super::*;
444 use crate::sop::types::{
445 Sop, SopExecutionMode, SopPriority, SopRunAction, SopStep, SopTrigger, SopTriggerSource,
446 };
447 use zeroclaw_config::schema::{MemoryConfig, SopConfig};
448 use zeroclaw_memory::traits::Memory;
449
450 fn test_sop(name: &str, triggers: Vec<SopTrigger>) -> Sop {
451 Sop {
452 name: name.into(),
453 description: format!("Test SOP: {name}"),
454 version: "1.0.0".into(),
455 priority: SopPriority::Normal,
456 execution_mode: SopExecutionMode::Auto,
457 triggers,
458 steps: vec![SopStep {
459 number: 1,
460 title: "Step one".into(),
461 body: "Do step one".into(),
462 suggested_tools: vec![],
463 requires_confirmation: false,
464 kind: crate::sop::SopStepKind::default(),
465 schema: None,
466 }],
467 cooldown_secs: 0,
468 max_concurrent: 2,
469 location: None,
470 deterministic: false,
471 }
472 }
473
474 fn test_engine(sops: Vec<Sop>) -> Arc<Mutex<SopEngine>> {
475 let mut engine = SopEngine::new(SopConfig::default());
476 engine.set_sops_for_test(sops);
477 Arc::new(Mutex::new(engine))
478 }
479
480 fn test_audit() -> SopAuditLogger {
481 let mem_cfg = MemoryConfig {
482 backend: "sqlite".into(),
483 ..MemoryConfig::default()
484 };
485 let tmp = tempfile::tempdir().unwrap();
486 let memory: Arc<dyn Memory> =
487 Arc::from(zeroclaw_memory::create_memory(&mem_cfg, tmp.path(), None).unwrap());
488 std::mem::forget(tmp);
490 SopAuditLogger::new(memory)
491 }
492
493 #[tokio::test]
494 async fn dispatch_starts_matching_sop() {
495 let engine = test_engine(vec![test_sop(
496 "mqtt-sop",
497 vec![SopTrigger::Mqtt {
498 topic: "sensors/temp".into(),
499 condition: None,
500 }],
501 )]);
502 let audit = test_audit();
503
504 let event = SopEvent {
505 source: SopTriggerSource::Mqtt,
506 topic: Some("sensors/temp".into()),
507 payload: Some(r#"{"value": 42}"#.into()),
508 timestamp: now_iso8601(),
509 };
510
511 let results = dispatch_sop_event(&engine, &audit, event).await;
512 assert_eq!(results.len(), 1);
513 assert!(
514 matches!(&results[0], DispatchResult::Started { sop_name, action, .. } if sop_name == "mqtt-sop" && matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }))
515 );
516 }
517
518 #[tokio::test]
519 async fn dispatch_skips_when_cooldown_active() {
520 let mut sop = test_sop("cooldown-sop", vec![SopTrigger::Manual]);
521 sop.cooldown_secs = 3600;
522 sop.max_concurrent = 1;
523 let engine = test_engine(vec![sop]);
524 let audit = test_audit();
525
526 {
528 let mut eng = engine.lock().unwrap();
529 let _action = eng
530 .start_run(
531 "cooldown-sop",
532 SopEvent {
533 source: SopTriggerSource::Manual,
534 topic: None,
535 payload: None,
536 timestamp: now_iso8601(),
537 },
538 )
539 .unwrap();
540 let run_id = eng.active_runs().keys().next().unwrap().clone();
542 eng.advance_step(
543 &run_id,
544 crate::sop::types::SopStepResult {
545 step_number: 1,
546 status: crate::sop::types::SopStepStatus::Completed,
547 output: "done".into(),
548 started_at: now_iso8601(),
549 completed_at: Some(now_iso8601()),
550 },
551 )
552 .unwrap();
553 }
554
555 let event = SopEvent {
557 source: SopTriggerSource::Manual,
558 topic: None,
559 payload: None,
560 timestamp: now_iso8601(),
561 };
562 let results = dispatch_sop_event(&engine, &audit, event).await;
563 assert_eq!(results.len(), 1);
564 assert!(
565 matches!(&results[0], DispatchResult::Skipped { sop_name, .. } if sop_name == "cooldown-sop")
566 );
567 }
568
569 #[tokio::test]
570 async fn dispatch_returns_no_match_for_unknown_event() {
571 let engine = test_engine(vec![test_sop("manual-sop", vec![SopTrigger::Manual])]);
572 let audit = test_audit();
573
574 let event = SopEvent {
576 source: SopTriggerSource::Mqtt,
577 topic: Some("some/topic".into()),
578 payload: None,
579 timestamp: now_iso8601(),
580 };
581 let results = dispatch_sop_event(&engine, &audit, event).await;
582 assert_eq!(results.len(), 1);
583 assert!(matches!(&results[0], DispatchResult::NoMatch));
584 }
585
586 #[tokio::test]
587 async fn dispatch_batch_lock_starts_multiple_sops() {
588 let sop1 = test_sop(
589 "webhook-sop-1",
590 vec![SopTrigger::Webhook {
591 path: "/api/deploy".into(),
592 }],
593 );
594 let sop2 = test_sop(
595 "webhook-sop-2",
596 vec![SopTrigger::Webhook {
597 path: "/api/deploy".into(),
598 }],
599 );
600 let engine = test_engine(vec![sop1, sop2]);
601 let audit = test_audit();
602
603 let event = SopEvent {
604 source: SopTriggerSource::Webhook,
605 topic: Some("/api/deploy".into()),
606 payload: None,
607 timestamp: now_iso8601(),
608 };
609
610 let results = dispatch_sop_event(&engine, &audit, event).await;
611 let started_count = results
612 .iter()
613 .filter(|r| matches!(r, DispatchResult::Started { .. }))
614 .count();
615 assert_eq!(started_count, 2);
616 }
617
618 #[tokio::test]
621 async fn dispatch_captures_action_for_wait_approval() {
622 let mut sop = test_sop(
624 "supervised-sop",
625 vec![SopTrigger::Mqtt {
626 topic: "alert".into(),
627 condition: None,
628 }],
629 );
630 sop.execution_mode = SopExecutionMode::Supervised;
631 let engine = test_engine(vec![sop]);
632 let audit = test_audit();
633
634 let event = SopEvent {
635 source: SopTriggerSource::Mqtt,
636 topic: Some("alert".into()),
637 payload: None,
638 timestamp: now_iso8601(),
639 };
640
641 let results = dispatch_sop_event(&engine, &audit, event).await;
642 assert_eq!(results.len(), 1);
643 match &results[0] {
644 DispatchResult::Started {
645 run_id,
646 sop_name,
647 action,
648 } => {
649 assert_eq!(sop_name, "supervised-sop");
650 assert!(!run_id.is_empty());
651 assert!(
652 matches!(action.as_ref(), SopRunAction::WaitApproval { .. }),
653 "Supervised SOP must return WaitApproval, got {:?}",
654 action
655 );
656 }
657 other => panic!("Expected Started, got {other:?}"),
658 }
659 }
660
661 #[tokio::test]
663 async fn dispatch_captures_action_for_execute_step() {
664 let engine = test_engine(vec![test_sop("auto-sop", vec![SopTrigger::Manual])]);
665 let audit = test_audit();
666
667 let event = SopEvent {
668 source: SopTriggerSource::Manual,
669 topic: None,
670 payload: None,
671 timestamp: now_iso8601(),
672 };
673
674 let results = dispatch_sop_event(&engine, &audit, event).await;
675 assert_eq!(results.len(), 1);
676 match &results[0] {
677 DispatchResult::Started { action, .. } => {
678 assert!(
679 matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }),
680 "Auto SOP must return ExecuteStep, got {:?}",
681 action
682 );
683 }
684 other => panic!("Expected Started, got {other:?}"),
685 }
686 }
687
688 #[tokio::test]
689 async fn peripheral_signal_dispatches_to_matching_sop() {
690 let engine = test_engine(vec![test_sop(
691 "gpio-sop",
692 vec![SopTrigger::Peripheral {
693 board: "nucleo".into(),
694 signal: "pin_3".into(),
695 condition: None,
696 }],
697 )]);
698 let audit = test_audit();
699
700 let results =
701 dispatch_peripheral_signal(&engine, &audit, "nucleo", "pin_3", Some("1")).await;
702 assert_eq!(results.len(), 1);
703 assert!(
704 matches!(&results[0], DispatchResult::Started { sop_name, .. } if sop_name == "gpio-sop" )
705 );
706 }
707
708 #[tokio::test]
709 async fn peripheral_signal_no_match_returns_empty() {
710 let engine = test_engine(vec![test_sop(
711 "gpio-sop",
712 vec![SopTrigger::Peripheral {
713 board: "nucleo".into(),
714 signal: "pin_3".into(),
715 condition: None,
716 }],
717 )]);
718 let audit = test_audit();
719
720 let results = dispatch_peripheral_signal(&engine, &audit, "rpi", "gpio_5", None).await;
721 assert_eq!(results.len(), 1);
722 assert!(matches!(&results[0], DispatchResult::NoMatch));
723 }
724
725 #[test]
726 fn cron_cache_skips_invalid_expression() {
727 let sop = test_sop(
728 "bad-cron",
729 vec![SopTrigger::Cron {
730 expression: "not a valid cron".into(),
731 }],
732 );
733 let engine = test_engine(vec![sop]);
734 let cache = SopCronCache::from_engine(&engine);
735 assert!(cache.schedules().is_empty());
736 }
737
738 #[test]
739 fn cron_cache_parses_valid_expression() {
740 let sop = test_sop(
741 "valid-cron",
742 vec![SopTrigger::Cron {
743 expression: "0 */5 * * *".into(),
744 }],
745 );
746 let engine = test_engine(vec![sop]);
747 let cache = SopCronCache::from_engine(&engine);
748 assert_eq!(cache.schedules().len(), 1);
749 assert_eq!(cache.schedules()[0].0, "valid-cron");
750 assert_eq!(cache.schedules()[0].1, "0 */5 * * *");
751 }
752
753 #[tokio::test]
754 async fn cron_sop_trigger_fires_on_schedule() {
755 let sop = test_sop(
756 "cron-sop",
757 vec![SopTrigger::Cron {
758 expression: "* * * * *".into(),
759 }],
760 );
761 let engine = test_engine(vec![sop]);
762 let audit = test_audit();
763 let cache = SopCronCache::from_engine(&engine);
764
765 let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(2);
767 let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
768
769 let started = results
770 .iter()
771 .filter(|r| matches!(r, DispatchResult::Started { .. }))
772 .count();
773 assert!(started >= 1, "Expected at least 1 started SOP from cron");
774 }
775
776 #[tokio::test]
777 async fn cron_sop_only_matching_expression_fires() {
778 let sop1 = test_sop(
779 "every-min",
780 vec![SopTrigger::Cron {
781 expression: "* * * * *".into(),
782 }],
783 );
784 let sop2 = test_sop(
787 "yearly",
788 vec![SopTrigger::Cron {
789 expression: "0 0 1 1 *".into(),
790 }],
791 );
792 let engine = test_engine(vec![sop1, sop2]);
793 let audit = test_audit();
794 let cache = SopCronCache::from_engine(&engine);
795
796 let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(2);
797 let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
798
799 let started_names: Vec<&str> = results
801 .iter()
802 .filter_map(|r| match r {
803 DispatchResult::Started { sop_name, .. } => Some(sop_name.as_str()),
804 _ => None,
805 })
806 .collect();
807 assert!(started_names.contains(&"every-min"));
808 assert!(!started_names.contains(&"yearly"));
809 }
810
811 #[tokio::test]
812 async fn cron_sop_window_check_does_not_miss_tick() {
813 let sop = test_sop(
814 "every-min",
815 vec![SopTrigger::Cron {
816 expression: "* * * * *".into(),
817 }],
818 );
819 let engine = test_engine(vec![sop]);
820 let audit = test_audit();
821 let cache = SopCronCache::from_engine(&engine);
822
823 let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(5);
825 let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
826
827 let started = results
829 .iter()
830 .filter(|r| matches!(r, DispatchResult::Started { .. }))
831 .count();
832 assert!(
833 started >= 1,
834 "Window-based check should catch ticks from 5 minutes ago"
835 );
836
837 let now = chrono::Utc::now();
839 assert!(
840 (now - last_check).num_seconds() < 2,
841 "last_check should be updated to now"
842 );
843 }
844}