Skip to main content

zeroclaw_runtime/sop/
dispatch.rs

1//! Unified SOP event dispatch helpers.
2//!
3//! All event sources (MQTT, webhook, cron, peripheral) route through
4//! `dispatch_sop_event` so that locking, audit, and health bookkeeping
5//! happen in exactly one place.
6
7use std::sync::{Arc, Mutex};
8
9use super::audit::SopAuditLogger;
10use super::engine::{SopEngine, now_iso8601};
11use super::types::{SopEvent, SopRun, SopRunAction, SopTriggerSource};
12
13// ── Dispatch result ─────────────────────────────────────────────
14
15/// Outcome of attempting to dispatch an event to the SOP engine.
16#[derive(Debug, Clone)]
17pub enum DispatchResult {
18    /// A new SOP run was started. `action` carries the next step the runtime
19    /// must execute (or wait for approval on). Callers that cannot act on the
20    /// action (e.g. headless fan-in) must still audit/log it — never silently
21    /// drop.
22    Started {
23        run_id: String,
24        sop_name: String,
25        action: Box<SopRunAction>,
26    },
27    /// A matching SOP was found but could not start (cooldown / concurrency).
28    Skipped { sop_name: String, reason: String },
29    /// No loaded SOP matched the event.
30    NoMatch,
31}
32
33// ── Action helpers ──────────────────────────────────────────────
34
35/// Extract the `run_id` from any `SopRunAction` variant.
36fn extract_run_id_from_action(action: &SopRunAction) -> &str {
37    match action {
38        SopRunAction::ExecuteStep { run_id, .. }
39        | SopRunAction::WaitApproval { run_id, .. }
40        | SopRunAction::DeterministicStep { run_id, .. }
41        | SopRunAction::CheckpointWait { run_id, .. }
42        | SopRunAction::Completed { run_id, .. }
43        | SopRunAction::Failed { run_id, .. } => run_id,
44    }
45}
46
47/// Short label for logging which action was returned.
48fn action_label(action: &SopRunAction) -> &'static str {
49    match action {
50        SopRunAction::ExecuteStep { .. } => "ExecuteStep",
51        SopRunAction::WaitApproval { .. } => "WaitApproval",
52        SopRunAction::DeterministicStep { .. } => "DeterministicStep",
53        SopRunAction::CheckpointWait { .. } => "CheckpointWait",
54        SopRunAction::Completed { .. } => "Completed",
55        SopRunAction::Failed { .. } => "Failed",
56    }
57}
58
59// ── Core dispatch ───────────────────────────────────────────────
60
61/// Dispatch an incoming event to the SOP engine.
62///
63/// Pattern (batch lock — exactly 2 acquisitions):
64/// 1. Lock → `match_trigger` → collect SOP names → drop lock
65/// 2. Lock → for each name: `start_run` → collect results → drop lock
66/// 3. Async (no lock): audit each started run
67pub async fn dispatch_sop_event(
68    engine: &Arc<Mutex<SopEngine>>,
69    audit: &SopAuditLogger,
70    event: SopEvent,
71) -> Vec<DispatchResult> {
72    // Phase 1: match
73    let matched_names: Vec<String> = match engine.lock() {
74        Ok(eng) => eng
75            .match_trigger(&event)
76            .iter()
77            .map(|s| s.name.clone())
78            .collect(),
79        Err(e) => {
80            crate::health::mark_component_error("sop_dispatch", format!("lock poisoned: {e}"));
81            ::zeroclaw_log::record!(
82                WARN,
83                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
84                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
85                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
86                "SOP dispatch: engine lock poisoned during match phase"
87            );
88            return vec![];
89        }
90    };
91
92    if matched_names.is_empty() {
93        ::zeroclaw_log::record!(
94            DEBUG,
95            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
96            "SOP dispatch: no match for event"
97        );
98        return vec![DispatchResult::NoMatch];
99    }
100
101    ::zeroclaw_log::record!(
102        INFO,
103        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
104        &format!(
105            "SOP dispatch: {} SOP(s) matched: {:?}",
106            matched_names.len(),
107            matched_names
108        )
109    );
110
111    // Phase 2: start runs
112    let mut results = Vec::new();
113    let mut started_runs: Vec<SopRun> = Vec::new();
114
115    {
116        let mut eng = match engine.lock() {
117            Ok(e) => e,
118            Err(e) => {
119                crate::health::mark_component_error("sop_dispatch", format!("lock poisoned: {e}"));
120                ::zeroclaw_log::record!(
121                    WARN,
122                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
123                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
124                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
125                    "SOP dispatch: engine lock poisoned during start phase"
126                );
127                return vec![];
128            }
129        };
130
131        for sop_name in &matched_names {
132            match eng.start_run(sop_name, event.clone()) {
133                Ok(action) => {
134                    // Extract run_id from the action (authoritative source)
135                    let run_id = extract_run_id_from_action(&action).to_string();
136                    // Snapshot the run for audit (must be done under lock)
137                    if let Some(run) = eng.active_runs().get(&run_id) {
138                        started_runs.push(run.clone());
139                    }
140                    ::zeroclaw_log::record!(
141                        INFO,
142                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
143                        &format!(
144                            "SOP dispatch: started '{}' run {run_id} (action: {})",
145                            sop_name,
146                            action_label(&action)
147                        )
148                    );
149                    results.push(DispatchResult::Started {
150                        run_id,
151                        sop_name: sop_name.clone(),
152                        action: Box::new(action),
153                    });
154                }
155                Err(e) => {
156                    ::zeroclaw_log::record!(
157                        INFO,
158                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
159                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
160                        &format!("SOP dispatch: skipped '{}'", sop_name)
161                    );
162                    results.push(DispatchResult::Skipped {
163                        sop_name: sop_name.clone(),
164                        reason: e.to_string(),
165                    });
166                }
167            }
168        }
169    } // lock dropped
170
171    // Phase 3: audit (async, no lock)
172    use zeroclaw_log::Instrument;
173    for run in &started_runs {
174        let span = zeroclaw_log::attribution_span!(run);
175        let run_id = run.run_id.clone();
176        if let Err(e) = zeroclaw_log::scope!(
177            session_key: run_id,
178            =>
179            audit.log_run_start(run)
180        )
181        .instrument(span)
182        .await
183        {
184            ::zeroclaw_log::record!(
185                WARN,
186                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
187                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
188                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
189                &format!("SOP dispatch: audit log failed for run {}", run.run_id)
190            );
191        }
192    }
193
194    crate::health::mark_component_ok("sop_dispatch");
195    results
196}
197
198// ── Headless result processing ──────────────────────────────────
199
200/// Process dispatch results in headless (non-agent-loop) callers.
201///
202/// This handles audit and logging for fan-in callers (MQTT, webhook, cron)
203/// that cannot execute SOP steps interactively. For `WaitApproval` actions,
204/// approval timeout polling in the scheduler handles progression.
205/// For `ExecuteStep` actions, the run is started in the engine but steps
206/// cannot be executed without an agent loop — this is logged as a warning.
207pub fn process_headless_results(results: &[DispatchResult]) {
208    for result in results {
209        match result {
210            DispatchResult::Started {
211                run_id,
212                sop_name,
213                action,
214            } => match action.as_ref() {
215                SopRunAction::ExecuteStep { step, .. } => {
216                    ::zeroclaw_log::record!(
217                        WARN,
218                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
219                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
220                        &format!(
221                            "SOP headless dispatch: run {run_id} ('{sop_name}') ready for step {} \
222                         '{}' but no agent loop available to execute",
223                            step.number, step.title
224                        )
225                    );
226                }
227                SopRunAction::WaitApproval { step, .. } => {
228                    ::zeroclaw_log::record!(
229                        INFO,
230                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
231                        &format!(
232                            "SOP headless dispatch: run {run_id} ('{sop_name}') waiting for approval \
233                         on step {} '{}'. Timeout polling will handle progression",
234                            step.number, step.title
235                        )
236                    );
237                }
238                SopRunAction::DeterministicStep { step, .. } => {
239                    ::zeroclaw_log::record!(
240                        INFO,
241                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
242                        &format!(
243                            "SOP headless dispatch: run {run_id} ('{sop_name}') deterministic step {} \
244                         '{}'",
245                            step.number, step.title
246                        )
247                    );
248                }
249                SopRunAction::CheckpointWait {
250                    step, state_file, ..
251                } => {
252                    ::zeroclaw_log::record!(
253                        INFO,
254                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
255                        &format!(
256                            "SOP headless dispatch: run {run_id} ('{sop_name}') checkpoint at step {} \
257                         '{}', state persisted to {}",
258                            step.number,
259                            step.title,
260                            state_file.display().to_string()
261                        )
262                    );
263                }
264                SopRunAction::Completed { .. } => {
265                    ::zeroclaw_log::record!(
266                        INFO,
267                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
268                            .with_attrs(
269                                ::serde_json::json!({"run_id": run_id, "sop_name": sop_name})
270                            ),
271                        "SOP headless dispatch: run  ('') completed immediately"
272                    );
273                }
274                SopRunAction::Failed { reason, .. } => {
275                    ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"run_id": run_id, "sop_name": sop_name, "reason": reason.to_string()})), "SOP headless dispatch: run  ('') failed: ");
276                }
277            },
278            DispatchResult::Skipped { sop_name, reason } => {
279                ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"sop_name": sop_name, "reason": reason.to_string()})), "SOP headless dispatch: skipped '': ");
280            }
281            DispatchResult::NoMatch => {}
282        }
283    }
284}
285
286// ── Peripheral signal helper ────────────────────────────────────
287
288/// Convenience wrapper for peripheral hardware callbacks.
289///
290/// Builds a `SopEvent` with source `Peripheral` and topic `"{board}/{signal}"`
291/// then dispatches it through the standard path.
292pub async fn dispatch_peripheral_signal(
293    engine: &Arc<Mutex<SopEngine>>,
294    audit: &SopAuditLogger,
295    board: &str,
296    signal: &str,
297    payload: Option<&str>,
298) -> Vec<DispatchResult> {
299    let event = SopEvent {
300        source: SopTriggerSource::Peripheral,
301        topic: Some(format!("{board}/{signal}")),
302        payload: payload.map(String::from),
303        timestamp: now_iso8601(),
304    };
305    dispatch_sop_event(engine, audit, event).await
306}
307
308// ── Cron SOP cache + check ──────────────────────────────────────
309
310/// Pre-parsed cron schedules for SOP triggers.
311///
312/// Built once at daemon startup to avoid re-parsing cron expressions
313/// on every scheduler tick.
314#[derive(Clone)]
315pub struct SopCronCache {
316    /// (sop_name, raw_expression, parsed_schedule)
317    schedules: Vec<(String, String, cron::Schedule)>,
318}
319
320impl SopCronCache {
321    /// Build cache from the current engine state.
322    ///
323    /// Locks the engine once, iterates SOPs, parses Cron trigger expressions.
324    /// Invalid expressions are logged and skipped (fail-closed).
325    pub fn from_engine(engine: &Arc<Mutex<SopEngine>>) -> Self {
326        let mut schedules = Vec::new();
327        let eng = match engine.lock() {
328            Ok(e) => e,
329            Err(e) => {
330                ::zeroclaw_log::record!(
331                    WARN,
332                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
333                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
334                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
335                    "SopCronCache: engine lock poisoned"
336                );
337                return Self { schedules };
338            }
339        };
340
341        for sop in eng.sops() {
342            for trigger in &sop.triggers {
343                if let super::types::SopTrigger::Cron { expression } = trigger {
344                    // Normalize 5-field crontab to 6-field (prepend seconds)
345                    let normalized = match crate::cron::normalize_expression(expression) {
346                        Ok(n) => n,
347                        Err(e) => {
348                            ::zeroclaw_log::record!(
349                                WARN,
350                                ::zeroclaw_log::Event::new(
351                                    module_path!(),
352                                    ::zeroclaw_log::Action::Note
353                                )
354                                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
355                                &format!(
356                                    "SopCronCache: invalid cron expression '{}' in SOP '{}': {e}",
357                                    expression, sop.name
358                                )
359                            );
360                            continue;
361                        }
362                    };
363                    match normalized.parse::<cron::Schedule>() {
364                        Ok(schedule) => {
365                            schedules.push((sop.name.clone(), expression.clone(), schedule));
366                        }
367                        Err(e) => {
368                            ::zeroclaw_log::record!(
369                                WARN,
370                                ::zeroclaw_log::Event::new(
371                                    module_path!(),
372                                    ::zeroclaw_log::Action::Note
373                                )
374                                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
375                                &format!(
376                                    "SopCronCache: failed to parse cron schedule '{}' for SOP '{}': {e}",
377                                    normalized, sop.name
378                                )
379                            );
380                        }
381                    }
382                }
383            }
384        }
385
386        ::zeroclaw_log::record!(
387            INFO,
388            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
389            &format!("SopCronCache: cached {} cron schedule(s)", schedules.len())
390        );
391        Self { schedules }
392    }
393
394    /// Return the cached schedules (for testing).
395    #[cfg(test)]
396    pub fn schedules(&self) -> &[(String, String, cron::Schedule)] {
397        &self.schedules
398    }
399}
400
401/// Check all cached cron SOP triggers for firings in the window
402/// `(last_check, now]` and dispatch events for each.
403///
404/// Uses window-based evaluation so ticks between polls are never missed.
405pub async fn check_sop_cron_triggers(
406    engine: &Arc<Mutex<SopEngine>>,
407    audit: &SopAuditLogger,
408    cache: &SopCronCache,
409    last_check: &mut chrono::DateTime<chrono::Utc>,
410) -> Vec<DispatchResult> {
411    let now = chrono::Utc::now();
412    let mut all_results = Vec::new();
413
414    for (_sop_name, expression, schedule) in &cache.schedules {
415        // Check if any occurrence fell in the window (last_check, now].
416        // At-most-once semantics: even if multiple ticks of the same expression
417        // fell in the window (e.g., scheduler delayed), we fire only once.
418        // This is intentional — SOP triggers should not retroactively batch-fire.
419        let mut upcoming = schedule.after(last_check);
420        if let Some(next) = upcoming.next()
421            && next <= now
422        {
423            // This expression fired in the window
424            let event = SopEvent {
425                source: SopTriggerSource::Cron,
426                topic: Some(expression.clone()),
427                payload: None,
428                timestamp: now_iso8601(),
429            };
430            let results = dispatch_sop_event(engine, audit, event).await;
431            all_results.extend(results);
432        }
433    }
434
435    *last_check = now;
436    all_results
437}
438
439// ── Tests ───────────────────────────────────────────────────────
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use crate::sop::types::{
445        Sop, SopExecutionMode, SopPriority, SopRunAction, SopStep, SopTrigger, SopTriggerSource,
446    };
447    use zeroclaw_config::schema::{MemoryConfig, SopConfig};
448    use zeroclaw_memory::traits::Memory;
449
450    fn test_sop(name: &str, triggers: Vec<SopTrigger>) -> Sop {
451        Sop {
452            name: name.into(),
453            description: format!("Test SOP: {name}"),
454            version: "1.0.0".into(),
455            priority: SopPriority::Normal,
456            execution_mode: SopExecutionMode::Auto,
457            triggers,
458            steps: vec![SopStep {
459                number: 1,
460                title: "Step one".into(),
461                body: "Do step one".into(),
462                suggested_tools: vec![],
463                requires_confirmation: false,
464                kind: crate::sop::SopStepKind::default(),
465                schema: None,
466            }],
467            cooldown_secs: 0,
468            max_concurrent: 2,
469            location: None,
470            deterministic: false,
471        }
472    }
473
474    fn test_engine(sops: Vec<Sop>) -> Arc<Mutex<SopEngine>> {
475        let mut engine = SopEngine::new(SopConfig::default());
476        engine.set_sops_for_test(sops);
477        Arc::new(Mutex::new(engine))
478    }
479
480    fn test_audit() -> SopAuditLogger {
481        let mem_cfg = MemoryConfig {
482            backend: "sqlite".into(),
483            ..MemoryConfig::default()
484        };
485        let tmp = tempfile::tempdir().unwrap();
486        let memory: Arc<dyn Memory> =
487            Arc::from(zeroclaw_memory::create_memory(&mem_cfg, tmp.path(), None).unwrap());
488        // Leak the tempdir so it lives for the test
489        std::mem::forget(tmp);
490        SopAuditLogger::new(memory)
491    }
492
493    #[tokio::test]
494    async fn dispatch_starts_matching_sop() {
495        let engine = test_engine(vec![test_sop(
496            "mqtt-sop",
497            vec![SopTrigger::Mqtt {
498                topic: "sensors/temp".into(),
499                condition: None,
500            }],
501        )]);
502        let audit = test_audit();
503
504        let event = SopEvent {
505            source: SopTriggerSource::Mqtt,
506            topic: Some("sensors/temp".into()),
507            payload: Some(r#"{"value": 42}"#.into()),
508            timestamp: now_iso8601(),
509        };
510
511        let results = dispatch_sop_event(&engine, &audit, event).await;
512        assert_eq!(results.len(), 1);
513        assert!(
514            matches!(&results[0], DispatchResult::Started { sop_name, action, .. } if sop_name == "mqtt-sop" && matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }))
515        );
516    }
517
518    #[tokio::test]
519    async fn dispatch_skips_when_cooldown_active() {
520        let mut sop = test_sop("cooldown-sop", vec![SopTrigger::Manual]);
521        sop.cooldown_secs = 3600;
522        sop.max_concurrent = 1;
523        let engine = test_engine(vec![sop]);
524        let audit = test_audit();
525
526        // Start a run manually so that completing it will trigger cooldown
527        {
528            let mut eng = engine.lock().unwrap();
529            let _action = eng
530                .start_run(
531                    "cooldown-sop",
532                    SopEvent {
533                        source: SopTriggerSource::Manual,
534                        topic: None,
535                        payload: None,
536                        timestamp: now_iso8601(),
537                    },
538                )
539                .unwrap();
540            // Complete the run
541            let run_id = eng.active_runs().keys().next().unwrap().clone();
542            eng.advance_step(
543                &run_id,
544                crate::sop::types::SopStepResult {
545                    step_number: 1,
546                    status: crate::sop::types::SopStepStatus::Completed,
547                    output: "done".into(),
548                    started_at: now_iso8601(),
549                    completed_at: Some(now_iso8601()),
550                },
551            )
552            .unwrap();
553        }
554
555        // Now dispatch — should skip due to cooldown
556        let event = SopEvent {
557            source: SopTriggerSource::Manual,
558            topic: None,
559            payload: None,
560            timestamp: now_iso8601(),
561        };
562        let results = dispatch_sop_event(&engine, &audit, event).await;
563        assert_eq!(results.len(), 1);
564        assert!(
565            matches!(&results[0], DispatchResult::Skipped { sop_name, .. } if sop_name == "cooldown-sop")
566        );
567    }
568
569    #[tokio::test]
570    async fn dispatch_returns_no_match_for_unknown_event() {
571        let engine = test_engine(vec![test_sop("manual-sop", vec![SopTrigger::Manual])]);
572        let audit = test_audit();
573
574        // Send an MQTT event — the SOP only has a Manual trigger
575        let event = SopEvent {
576            source: SopTriggerSource::Mqtt,
577            topic: Some("some/topic".into()),
578            payload: None,
579            timestamp: now_iso8601(),
580        };
581        let results = dispatch_sop_event(&engine, &audit, event).await;
582        assert_eq!(results.len(), 1);
583        assert!(matches!(&results[0], DispatchResult::NoMatch));
584    }
585
586    #[tokio::test]
587    async fn dispatch_batch_lock_starts_multiple_sops() {
588        let sop1 = test_sop(
589            "webhook-sop-1",
590            vec![SopTrigger::Webhook {
591                path: "/api/deploy".into(),
592            }],
593        );
594        let sop2 = test_sop(
595            "webhook-sop-2",
596            vec![SopTrigger::Webhook {
597                path: "/api/deploy".into(),
598            }],
599        );
600        let engine = test_engine(vec![sop1, sop2]);
601        let audit = test_audit();
602
603        let event = SopEvent {
604            source: SopTriggerSource::Webhook,
605            topic: Some("/api/deploy".into()),
606            payload: None,
607            timestamp: now_iso8601(),
608        };
609
610        let results = dispatch_sop_event(&engine, &audit, event).await;
611        let started_count = results
612            .iter()
613            .filter(|r| matches!(r, DispatchResult::Started { .. }))
614            .count();
615        assert_eq!(started_count, 2);
616    }
617
618    /// B1 DoD: prove that the action returned by `start_run` is captured in
619    /// `DispatchResult::Started` — not silently dropped.
620    #[tokio::test]
621    async fn dispatch_captures_action_for_wait_approval() {
622        // Supervised mode → WaitApproval on step 1
623        let mut sop = test_sop(
624            "supervised-sop",
625            vec![SopTrigger::Mqtt {
626                topic: "alert".into(),
627                condition: None,
628            }],
629        );
630        sop.execution_mode = SopExecutionMode::Supervised;
631        let engine = test_engine(vec![sop]);
632        let audit = test_audit();
633
634        let event = SopEvent {
635            source: SopTriggerSource::Mqtt,
636            topic: Some("alert".into()),
637            payload: None,
638            timestamp: now_iso8601(),
639        };
640
641        let results = dispatch_sop_event(&engine, &audit, event).await;
642        assert_eq!(results.len(), 1);
643        match &results[0] {
644            DispatchResult::Started {
645                run_id,
646                sop_name,
647                action,
648            } => {
649                assert_eq!(sop_name, "supervised-sop");
650                assert!(!run_id.is_empty());
651                assert!(
652                    matches!(action.as_ref(), SopRunAction::WaitApproval { .. }),
653                    "Supervised SOP must return WaitApproval, got {:?}",
654                    action
655                );
656            }
657            other => panic!("Expected Started, got {other:?}"),
658        }
659    }
660
661    /// B1 DoD: Auto-mode SOP returns ExecuteStep action in dispatch result.
662    #[tokio::test]
663    async fn dispatch_captures_action_for_execute_step() {
664        let engine = test_engine(vec![test_sop("auto-sop", vec![SopTrigger::Manual])]);
665        let audit = test_audit();
666
667        let event = SopEvent {
668            source: SopTriggerSource::Manual,
669            topic: None,
670            payload: None,
671            timestamp: now_iso8601(),
672        };
673
674        let results = dispatch_sop_event(&engine, &audit, event).await;
675        assert_eq!(results.len(), 1);
676        match &results[0] {
677            DispatchResult::Started { action, .. } => {
678                assert!(
679                    matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }),
680                    "Auto SOP must return ExecuteStep, got {:?}",
681                    action
682                );
683            }
684            other => panic!("Expected Started, got {other:?}"),
685        }
686    }
687
688    #[tokio::test]
689    async fn peripheral_signal_dispatches_to_matching_sop() {
690        let engine = test_engine(vec![test_sop(
691            "gpio-sop",
692            vec![SopTrigger::Peripheral {
693                board: "nucleo".into(),
694                signal: "pin_3".into(),
695                condition: None,
696            }],
697        )]);
698        let audit = test_audit();
699
700        let results =
701            dispatch_peripheral_signal(&engine, &audit, "nucleo", "pin_3", Some("1")).await;
702        assert_eq!(results.len(), 1);
703        assert!(
704            matches!(&results[0], DispatchResult::Started { sop_name, .. } if sop_name == "gpio-sop" )
705        );
706    }
707
708    #[tokio::test]
709    async fn peripheral_signal_no_match_returns_empty() {
710        let engine = test_engine(vec![test_sop(
711            "gpio-sop",
712            vec![SopTrigger::Peripheral {
713                board: "nucleo".into(),
714                signal: "pin_3".into(),
715                condition: None,
716            }],
717        )]);
718        let audit = test_audit();
719
720        let results = dispatch_peripheral_signal(&engine, &audit, "rpi", "gpio_5", None).await;
721        assert_eq!(results.len(), 1);
722        assert!(matches!(&results[0], DispatchResult::NoMatch));
723    }
724
725    #[test]
726    fn cron_cache_skips_invalid_expression() {
727        let sop = test_sop(
728            "bad-cron",
729            vec![SopTrigger::Cron {
730                expression: "not a valid cron".into(),
731            }],
732        );
733        let engine = test_engine(vec![sop]);
734        let cache = SopCronCache::from_engine(&engine);
735        assert!(cache.schedules().is_empty());
736    }
737
738    #[test]
739    fn cron_cache_parses_valid_expression() {
740        let sop = test_sop(
741            "valid-cron",
742            vec![SopTrigger::Cron {
743                expression: "0 */5 * * *".into(),
744            }],
745        );
746        let engine = test_engine(vec![sop]);
747        let cache = SopCronCache::from_engine(&engine);
748        assert_eq!(cache.schedules().len(), 1);
749        assert_eq!(cache.schedules()[0].0, "valid-cron");
750        assert_eq!(cache.schedules()[0].1, "0 */5 * * *");
751    }
752
753    #[tokio::test]
754    async fn cron_sop_trigger_fires_on_schedule() {
755        let sop = test_sop(
756            "cron-sop",
757            vec![SopTrigger::Cron {
758                expression: "* * * * *".into(),
759            }],
760        );
761        let engine = test_engine(vec![sop]);
762        let audit = test_audit();
763        let cache = SopCronCache::from_engine(&engine);
764
765        // Set last_check to 2 minutes ago so the window contains a tick
766        let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(2);
767        let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
768
769        let started = results
770            .iter()
771            .filter(|r| matches!(r, DispatchResult::Started { .. }))
772            .count();
773        assert!(started >= 1, "Expected at least 1 started SOP from cron");
774    }
775
776    #[tokio::test]
777    async fn cron_sop_only_matching_expression_fires() {
778        let sop1 = test_sop(
779            "every-min",
780            vec![SopTrigger::Cron {
781                expression: "* * * * *".into(),
782            }],
783        );
784        // An expression that won't fire in a 2-minute window from now:
785        // "0 0 1 1 *" = midnight Jan 1
786        let sop2 = test_sop(
787            "yearly",
788            vec![SopTrigger::Cron {
789                expression: "0 0 1 1 *".into(),
790            }],
791        );
792        let engine = test_engine(vec![sop1, sop2]);
793        let audit = test_audit();
794        let cache = SopCronCache::from_engine(&engine);
795
796        let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(2);
797        let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
798
799        // Only "every-min" should have fired
800        let started_names: Vec<&str> = results
801            .iter()
802            .filter_map(|r| match r {
803                DispatchResult::Started { sop_name, .. } => Some(sop_name.as_str()),
804                _ => None,
805            })
806            .collect();
807        assert!(started_names.contains(&"every-min"));
808        assert!(!started_names.contains(&"yearly"));
809    }
810
811    #[tokio::test]
812    async fn cron_sop_window_check_does_not_miss_tick() {
813        let sop = test_sop(
814            "every-min",
815            vec![SopTrigger::Cron {
816                expression: "* * * * *".into(),
817            }],
818        );
819        let engine = test_engine(vec![sop]);
820        let audit = test_audit();
821        let cache = SopCronCache::from_engine(&engine);
822
823        // Simulate: last_check was 5 minutes ago, poll just now
824        let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(5);
825        let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
826
827        // At least one tick should have been caught
828        let started = results
829            .iter()
830            .filter(|r| matches!(r, DispatchResult::Started { .. }))
831            .count();
832        assert!(
833            started >= 1,
834            "Window-based check should catch ticks from 5 minutes ago"
835        );
836
837        // last_check should be updated to approximately now
838        let now = chrono::Utc::now();
839        assert!(
840            (now - last_check).num_seconds() < 2,
841            "last_check should be updated to now"
842        );
843    }
844}