Skip to main content

zeroclaw_runtime/sop/
engine.rs

1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::path::{Path, PathBuf};
4
5use anyhow::{Result, bail};
6
7use super::condition::evaluate_condition;
8use super::load_sops;
9use super::types::{
10    DeterministicRunState, DeterministicSavings, Sop, SopEvent, SopExecutionMode, SopPriority,
11    SopRun, SopRunAction, SopRunStatus, SopStep, SopStepKind, SopStepResult, SopStepStatus,
12    SopTrigger, SopTriggerSource,
13};
14use zeroclaw_config::schema::SopConfig;
15
16/// Central SOP orchestrator: loads SOPs, matches triggers, manages run lifecycle.
17pub struct SopEngine {
18    sops: Vec<Sop>,
19    active_runs: HashMap<String, SopRun>,
20    /// Completed/failed/cancelled runs (kept for status queries).
21    finished_runs: Vec<SopRun>,
22    config: SopConfig,
23    run_counter: u64,
24    /// Cumulative savings from deterministic execution.
25    deterministic_savings: DeterministicSavings,
26}
27
28impl SopEngine {
29    /// Create a new engine with the given config. Call `reload()` to load SOPs.
30    pub fn new(config: SopConfig) -> Self {
31        Self {
32            sops: Vec::new(),
33            active_runs: HashMap::new(),
34            finished_runs: Vec::new(),
35            config,
36            run_counter: 0,
37            deterministic_savings: DeterministicSavings::default(),
38        }
39    }
40
41    /// Load/reload SOPs from the configured directory.
42    pub fn reload(&mut self, workspace_dir: &Path) {
43        self.sops = load_sops(
44            workspace_dir,
45            self.config.sops_dir.as_deref(),
46            super::parse_execution_mode(&self.config.default_execution_mode),
47        );
48        ::zeroclaw_log::record!(
49            INFO,
50            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
51            &format!("SOP engine loaded {} SOPs", self.sops.len())
52        );
53    }
54
55    /// Return all loaded SOP definitions.
56    pub fn sops(&self) -> &[Sop] {
57        &self.sops
58    }
59
60    /// Return all active (in-flight) runs.
61    pub fn active_runs(&self) -> &HashMap<String, SopRun> {
62        &self.active_runs
63    }
64
65    /// Look up a run by ID (active or finished).
66    pub fn get_run(&self, run_id: &str) -> Option<&SopRun> {
67        self.active_runs
68            .get(run_id)
69            .or_else(|| self.finished_runs.iter().find(|r| r.run_id == run_id))
70    }
71
72    /// Look up an SOP by name.
73    pub fn get_sop(&self, name: &str) -> Option<&Sop> {
74        self.sops.iter().find(|s| s.name == name)
75    }
76
77    // ── Trigger matching ────────────────────────────────────────
78
79    /// Match an incoming event against all loaded SOPs and return the names of
80    /// SOPs whose triggers match.
81    pub fn match_trigger(&self, event: &SopEvent) -> Vec<&Sop> {
82        self.sops
83            .iter()
84            .filter(|sop| sop.triggers.iter().any(|t| trigger_matches(t, event)))
85            .collect()
86    }
87
88    // ── Run lifecycle ───────────────────────────────────────────
89
90    /// Check whether a new run can be started for the given SOP
91    /// (respects cooldown and concurrency limits).
92    pub fn can_start(&self, sop_name: &str) -> bool {
93        let sop = match self.get_sop(sop_name) {
94            Some(s) => s,
95            None => return false,
96        };
97
98        // Per-SOP concurrency limit
99        let active_for_sop = self
100            .active_runs
101            .values()
102            .filter(|r| r.sop_name == sop_name)
103            .count();
104        if active_for_sop >= sop.max_concurrent as usize {
105            return false;
106        }
107
108        // Global concurrency limit
109        if self.active_runs.len() >= self.config.max_concurrent_total {
110            return false;
111        }
112
113        // Cooldown: check most recent finished run for this SOP
114        if sop.cooldown_secs > 0
115            && let Some(last) = self.last_finished_run(sop_name)
116            && let Some(ref completed_at) = last.completed_at
117            && !cooldown_elapsed(completed_at, sop.cooldown_secs)
118        {
119            return false;
120        }
121
122        true
123    }
124
125    /// Start a new SOP run. Returns the first action to take.
126    /// Deterministic SOPs are automatically routed to `start_deterministic_run`.
127    pub fn start_run(&mut self, sop_name: &str, event: SopEvent) -> Result<SopRunAction> {
128        // Route deterministic SOPs to dedicated path
129        if self
130            .get_sop(sop_name)
131            .is_some_and(|s| s.execution_mode == SopExecutionMode::Deterministic)
132        {
133            return self.start_deterministic_run(sop_name, event);
134        }
135
136        let sop = self
137            .get_sop(sop_name)
138            .ok_or_else(|| {
139                ::zeroclaw_log::record!(
140                    WARN,
141                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
142                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
143                        .with_attrs(::serde_json::json!({"sop_name": sop_name})),
144                    "SOP engine: sop not found"
145                );
146                anyhow::Error::msg(format!("SOP not found: {sop_name}"))
147            })?
148            .clone();
149
150        if !self.can_start(sop_name) {
151            bail!(
152                "Cannot start SOP '{}': cooldown or concurrency limit reached",
153                sop_name
154            );
155        }
156
157        if sop.steps.is_empty() {
158            bail!("SOP '{}' has no steps defined", sop_name);
159        }
160
161        self.run_counter += 1;
162        let dur = std::time::SystemTime::now()
163            .duration_since(std::time::UNIX_EPOCH)
164            .unwrap_or_default();
165        let epoch_ms = dur.as_secs() * 1000 + u64::from(dur.subsec_millis());
166        let run_id = format!("run-{epoch_ms}-{:04}", self.run_counter);
167        let now = now_iso8601();
168
169        let run = SopRun {
170            run_id: run_id.clone(),
171            sop_name: sop_name.to_string(),
172            trigger_event: event,
173            status: SopRunStatus::Running,
174            current_step: 1,
175            total_steps: u32::try_from(sop.steps.len()).unwrap_or(u32::MAX),
176            started_at: now,
177            completed_at: None,
178            step_results: Vec::new(),
179            waiting_since: None,
180            llm_calls_saved: 0,
181        };
182
183        self.active_runs.insert(run_id.clone(), run);
184
185        ::zeroclaw_log::record!(
186            INFO,
187            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
188            &format!("SOP run {} started for '{}'", run_id, sop_name)
189        );
190
191        // Determine first action based on execution mode
192        let step = sop.steps[0].clone();
193        let context = format_step_context(&sop, &self.active_runs[&run_id], &step);
194        let action = resolve_step_action(&sop, &step, run_id.clone(), context);
195
196        // If the action is WaitApproval, update run status and record timestamp
197        if matches!(action, SopRunAction::WaitApproval { .. })
198            && let Some(run) = self.active_runs.get_mut(&run_id)
199        {
200            run.status = SopRunStatus::WaitingApproval;
201            run.waiting_since = Some(now_iso8601());
202        }
203
204        Ok(action)
205    }
206
207    /// Report the result of the current step and advance the run.
208    /// Returns the next action to take.
209    pub fn advance_step(&mut self, run_id: &str, result: SopStepResult) -> Result<SopRunAction> {
210        let run = self.active_runs.get_mut(run_id).ok_or_else(|| {
211            ::zeroclaw_log::record!(
212                WARN,
213                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
214                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
215                    .with_attrs(::serde_json::json!({"run_id": run_id})),
216                "SOP engine: active run not found"
217            );
218            anyhow::Error::msg(format!("Active run not found: {run_id}"))
219        })?;
220
221        let sop = self
222            .sops
223            .iter()
224            .find(|s| s.name == run.sop_name)
225            .ok_or_else(|| {
226                let sop_name = run.sop_name.clone();
227                ::zeroclaw_log::record!(
228                    WARN,
229                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
230                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
231                        .with_attrs(::serde_json::json!({"sop_name": sop_name})),
232                    "SOP engine: sop no longer loaded (definition removed mid-run)"
233                );
234                anyhow::Error::msg(format!("SOP '{}' no longer loaded", run.sop_name))
235            })?
236            .clone();
237
238        // Record step result
239        run.step_results.push(result.clone());
240
241        // Check if step failed
242        if result.status == SopStepStatus::Failed {
243            let reason = format!("Step {} failed: {}", result.step_number, result.output);
244            ::zeroclaw_log::record!(
245                WARN,
246                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
247                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
248                    .with_attrs(
249                        ::serde_json::json!({"run_id": run_id, "reason": reason.to_string()})
250                    ),
251                "SOP run : "
252            );
253            return Ok(self.finish_run(run_id, SopRunStatus::Failed, Some(reason)));
254        }
255
256        // Advance to next step
257        let next_step_num = run.current_step + 1;
258        if next_step_num > run.total_steps {
259            // All steps completed
260            ::zeroclaw_log::record!(
261                INFO,
262                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
263                    .with_attrs(::serde_json::json!({"run_id": run_id})),
264                "SOP run  completed successfully"
265            );
266            return Ok(self.finish_run(run_id, SopRunStatus::Completed, None));
267        }
268
269        // Update run state
270        let run = self.active_runs.get_mut(run_id).unwrap();
271        run.current_step = next_step_num;
272
273        let step_idx = (next_step_num - 1) as usize;
274        let step = sop.steps[step_idx].clone();
275        let context = format_step_context(&sop, run, &step);
276        let run_id_str = run_id.to_string();
277        let action = resolve_step_action(&sop, &step, run_id_str.clone(), context);
278
279        // If the action is WaitApproval, update run status and record timestamp
280        if matches!(action, SopRunAction::WaitApproval { .. })
281            && let Some(run) = self.active_runs.get_mut(&run_id_str)
282        {
283            run.status = SopRunStatus::WaitingApproval;
284            run.waiting_since = Some(now_iso8601());
285        }
286
287        Ok(action)
288    }
289
290    /// Cancel an active run.
291    pub fn cancel_run(&mut self, run_id: &str) -> Result<()> {
292        if !self.active_runs.contains_key(run_id) {
293            bail!("Active run not found: {run_id}");
294        }
295        self.finish_run(run_id, SopRunStatus::Cancelled, None);
296        ::zeroclaw_log::record!(
297            INFO,
298            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
299                .with_attrs(::serde_json::json!({"run_id": run_id})),
300            "SOP run  cancelled"
301        );
302        Ok(())
303    }
304
305    /// Approve a step that is waiting for approval, transitioning back to Running.
306    pub fn approve_step(&mut self, run_id: &str) -> Result<SopRunAction> {
307        let run = self.active_runs.get_mut(run_id).ok_or_else(|| {
308            ::zeroclaw_log::record!(
309                WARN,
310                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
311                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
312                    .with_attrs(::serde_json::json!({"run_id": run_id})),
313                "SOP engine: active run not found"
314            );
315            anyhow::Error::msg(format!("Active run not found: {run_id}"))
316        })?;
317
318        if run.status != SopRunStatus::WaitingApproval {
319            bail!(
320                "Run {run_id} is not waiting for approval (status: {})",
321                run.status
322            );
323        }
324
325        run.status = SopRunStatus::Running;
326        run.waiting_since = None;
327
328        let sop = self
329            .sops
330            .iter()
331            .find(|s| s.name == run.sop_name)
332            .ok_or_else(|| {
333                let sop_name = run.sop_name.clone();
334                ::zeroclaw_log::record!(
335                    WARN,
336                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
337                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
338                        .with_attrs(::serde_json::json!({"sop_name": sop_name})),
339                    "SOP engine: sop no longer loaded (definition removed mid-run)"
340                );
341                anyhow::Error::msg(format!("SOP '{}' no longer loaded", run.sop_name))
342            })?
343            .clone();
344
345        let step_idx = (run.current_step - 1) as usize;
346        let step = sop.steps[step_idx].clone();
347        let context = format_step_context(&sop, run, &step);
348
349        Ok(SopRunAction::ExecuteStep {
350            run_id: run_id.to_string(),
351            step,
352            context,
353        })
354    }
355
356    /// List finished runs, optionally filtered by SOP name.
357    pub fn finished_runs(&self, sop_name: Option<&str>) -> Vec<&SopRun> {
358        self.finished_runs
359            .iter()
360            .filter(|r| sop_name.is_none_or(|name| r.sop_name == name))
361            .collect()
362    }
363
364    /// Return cumulative deterministic execution savings.
365    pub fn deterministic_savings(&self) -> &DeterministicSavings {
366        &self.deterministic_savings
367    }
368
369    // ── Deterministic execution ─────────────────────────────────
370
371    /// Start a deterministic SOP run. Steps execute sequentially without LLM
372    /// round-trips. Returns the first action (DeterministicStep or CheckpointWait).
373    pub fn start_deterministic_run(
374        &mut self,
375        sop_name: &str,
376        event: SopEvent,
377    ) -> Result<SopRunAction> {
378        let sop = self
379            .get_sop(sop_name)
380            .ok_or_else(|| {
381                ::zeroclaw_log::record!(
382                    WARN,
383                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
384                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
385                        .with_attrs(::serde_json::json!({"sop_name": sop_name})),
386                    "SOP engine: sop not found"
387                );
388                anyhow::Error::msg(format!("SOP not found: {sop_name}"))
389            })?
390            .clone();
391
392        if sop.execution_mode != SopExecutionMode::Deterministic {
393            bail!(
394                "SOP '{}' is not in deterministic mode (mode: {})",
395                sop_name,
396                sop.execution_mode
397            );
398        }
399
400        if !self.can_start(sop_name) {
401            bail!(
402                "Cannot start SOP '{}': cooldown or concurrency limit reached",
403                sop_name
404            );
405        }
406
407        if sop.steps.is_empty() {
408            bail!("SOP '{}' has no steps defined", sop_name);
409        }
410
411        self.run_counter += 1;
412        let dur = std::time::SystemTime::now()
413            .duration_since(std::time::UNIX_EPOCH)
414            .unwrap_or_default();
415        let epoch_ms = dur.as_secs() * 1000 + u64::from(dur.subsec_millis());
416        let run_id = format!("det-{epoch_ms}-{:04}", self.run_counter);
417        let now = now_iso8601();
418
419        let total_steps = u32::try_from(sop.steps.len()).unwrap_or(u32::MAX);
420        let run = SopRun {
421            run_id: run_id.clone(),
422            sop_name: sop_name.to_string(),
423            trigger_event: event,
424            status: SopRunStatus::Running,
425            current_step: 1,
426            total_steps,
427            started_at: now,
428            completed_at: None,
429            step_results: Vec::new(),
430            waiting_since: None,
431            llm_calls_saved: 0,
432        };
433
434        self.active_runs.insert(run_id.clone(), run);
435        ::zeroclaw_log::record!(
436            INFO,
437            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
438            &format!(
439                "Deterministic SOP run {} started for '{}'",
440                run_id, sop_name
441            )
442        );
443
444        // Produce first step action
445        let step = sop.steps[0].clone();
446        let input = serde_json::Value::Null;
447        self.resolve_deterministic_action(&sop, &run_id, &step, input)
448    }
449
450    /// Advance a deterministic run with the output of the current step.
451    /// The output is piped as input to the next step.
452    pub fn advance_deterministic_step(
453        &mut self,
454        run_id: &str,
455        step_output: serde_json::Value,
456    ) -> Result<SopRunAction> {
457        let run = self.active_runs.get_mut(run_id).ok_or_else(|| {
458            ::zeroclaw_log::record!(
459                WARN,
460                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
461                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
462                    .with_attrs(::serde_json::json!({"run_id": run_id})),
463                "SOP engine: active run not found"
464            );
465            anyhow::Error::msg(format!("Active run not found: {run_id}"))
466        })?;
467
468        let sop = self
469            .sops
470            .iter()
471            .find(|s| s.name == run.sop_name)
472            .ok_or_else(|| {
473                let sop_name = run.sop_name.clone();
474                ::zeroclaw_log::record!(
475                    WARN,
476                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
477                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
478                        .with_attrs(::serde_json::json!({"sop_name": sop_name})),
479                    "SOP engine: sop no longer loaded (definition removed mid-run)"
480                );
481                anyhow::Error::msg(format!("SOP '{}' no longer loaded", run.sop_name))
482            })?
483            .clone();
484
485        // Record step result
486        let now = now_iso8601();
487        let step_result = SopStepResult {
488            step_number: run.current_step,
489            status: SopStepStatus::Completed,
490            output: step_output.to_string(),
491            started_at: run.started_at.clone(),
492            completed_at: Some(now),
493        };
494        run.step_results.push(step_result);
495
496        // Each deterministic step saves one LLM call
497        run.llm_calls_saved += 1;
498
499        // Advance to next step
500        let next_step_num = run.current_step + 1;
501        if next_step_num > run.total_steps {
502            ::zeroclaw_log::record!(
503                INFO,
504                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
505                &format!(
506                    "Deterministic SOP run {run_id} completed ({} LLM calls saved)",
507                    run.llm_calls_saved
508                )
509            );
510            let saved = run.llm_calls_saved;
511            self.deterministic_savings.total_llm_calls_saved += saved;
512            self.deterministic_savings.total_runs += 1;
513            return Ok(self.finish_run(run_id, SopRunStatus::Completed, None));
514        }
515
516        let run = self.active_runs.get_mut(run_id).unwrap();
517        run.current_step = next_step_num;
518
519        let step_idx = (next_step_num - 1) as usize;
520        let step = sop.steps[step_idx].clone();
521        let run_id_owned = run_id.to_string();
522
523        self.resolve_deterministic_action(&sop, &run_id_owned, &step, step_output)
524    }
525
526    /// Resume a deterministic run from persisted state.
527    pub fn resume_deterministic_run(
528        &mut self,
529        state: DeterministicRunState,
530    ) -> Result<SopRunAction> {
531        let run = self.active_runs.get_mut(&state.run_id).ok_or_else(|| {
532            let run_id = state.run_id.clone();
533            ::zeroclaw_log::record!(
534                WARN,
535                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
536                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
537                    .with_attrs(::serde_json::json!({"run_id": run_id})),
538                "SOP engine: active run not found"
539            );
540            anyhow::Error::msg(format!("Active run not found: {}", state.run_id))
541        })?;
542
543        if run.status != SopRunStatus::PausedCheckpoint {
544            bail!(
545                "Run {} is not paused at checkpoint (status: {})",
546                state.run_id,
547                run.status
548            );
549        }
550
551        let sop = self
552            .sops
553            .iter()
554            .find(|s| s.name == run.sop_name)
555            .ok_or_else(|| {
556                let sop_name = run.sop_name.clone();
557                ::zeroclaw_log::record!(
558                    WARN,
559                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
560                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
561                        .with_attrs(::serde_json::json!({"sop_name": sop_name})),
562                    "SOP engine: sop no longer loaded (definition removed mid-run)"
563                );
564                anyhow::Error::msg(format!("SOP '{}' no longer loaded", run.sop_name))
565            })?
566            .clone();
567
568        run.status = SopRunStatus::Running;
569        run.waiting_since = None;
570        run.llm_calls_saved = state.llm_calls_saved;
571
572        // Resume from the step after the last completed one
573        let next_step_num = state.last_completed_step + 1;
574        if next_step_num > state.total_steps {
575            ::zeroclaw_log::record!(
576                INFO,
577                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
578                &format!(
579                    "Deterministic SOP run {} completed on resume ({} LLM calls saved)",
580                    state.run_id, state.llm_calls_saved
581                )
582            );
583            self.deterministic_savings.total_llm_calls_saved += state.llm_calls_saved;
584            self.deterministic_savings.total_runs += 1;
585            return Ok(self.finish_run(&state.run_id, SopRunStatus::Completed, None));
586        }
587
588        let run = self.active_runs.get_mut(&state.run_id).unwrap();
589        run.current_step = next_step_num;
590
591        let step_idx = (next_step_num - 1) as usize;
592        let step = sop.steps[step_idx].clone();
593
594        // Use last step's output as input, or Null
595        let last_output = state
596            .step_outputs
597            .get(&state.last_completed_step)
598            .cloned()
599            .unwrap_or(serde_json::Value::Null);
600
601        let run_id = state.run_id.clone();
602        self.resolve_deterministic_action(&sop, &run_id, &step, last_output)
603    }
604
605    /// Resolve the action for a deterministic step (execute or checkpoint).
606    fn resolve_deterministic_action(
607        &mut self,
608        sop: &Sop,
609        run_id: &str,
610        step: &SopStep,
611        input: serde_json::Value,
612    ) -> Result<SopRunAction> {
613        if step.kind == SopStepKind::Checkpoint {
614            // Pause at checkpoint — persist state and wait for approval
615            if let Some(run) = self.active_runs.get_mut(run_id) {
616                run.status = SopRunStatus::PausedCheckpoint;
617                run.waiting_since = Some(now_iso8601());
618            }
619
620            let state_file = self.persist_deterministic_state(run_id, sop)?;
621
622            ::zeroclaw_log::record!(
623                INFO,
624                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
625                &format!(
626                    "Deterministic SOP run {run_id}: checkpoint at step {} '{}', state persisted to {}",
627                    step.number,
628                    step.title,
629                    state_file.display().to_string()
630                )
631            );
632
633            Ok(SopRunAction::CheckpointWait {
634                run_id: run_id.to_string(),
635                step: step.clone(),
636                state_file,
637            })
638        } else {
639            Ok(SopRunAction::DeterministicStep {
640                run_id: run_id.to_string(),
641                step: step.clone(),
642                input,
643            })
644        }
645    }
646
647    /// Persist the current deterministic run state to a JSON file.
648    fn persist_deterministic_state(&self, run_id: &str, sop: &Sop) -> Result<PathBuf> {
649        let run = self.active_runs.get(run_id).ok_or_else(|| {
650            ::zeroclaw_log::record!(
651                WARN,
652                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
653                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
654                    .with_attrs(::serde_json::json!({"run_id": run_id})),
655                "SOP engine: run not found in history"
656            );
657            anyhow::Error::msg(format!("Run not found: {run_id}"))
658        })?;
659
660        let mut step_outputs = HashMap::new();
661        for result in &run.step_results {
662            // Try to parse output as JSON, fall back to string value
663            let value = serde_json::from_str(&result.output)
664                .unwrap_or_else(|_| serde_json::Value::String(result.output.clone()));
665            step_outputs.insert(result.step_number, value);
666        }
667
668        let state = DeterministicRunState {
669            run_id: run_id.to_string(),
670            sop_name: run.sop_name.clone(),
671            last_completed_step: run.current_step.saturating_sub(1),
672            total_steps: run.total_steps,
673            step_outputs,
674            persisted_at: now_iso8601(),
675            llm_calls_saved: run.llm_calls_saved,
676            paused_at_checkpoint: run.status == SopRunStatus::PausedCheckpoint,
677        };
678
679        // Write to SOP location directory, or system temp dir
680        let temp_dir = std::env::temp_dir();
681        let dir = sop.location.as_deref().unwrap_or(temp_dir.as_path());
682        let state_file = dir.join(format!("{run_id}.state.json"));
683        let json = serde_json::to_string_pretty(&state)?;
684        std::fs::write(&state_file, json)?;
685
686        Ok(state_file)
687    }
688
689    /// Load a persisted deterministic run state from a JSON file.
690    pub fn load_deterministic_state(path: &Path) -> Result<DeterministicRunState> {
691        let content = std::fs::read_to_string(path)?;
692        let state: DeterministicRunState = serde_json::from_str(&content)?;
693        Ok(state)
694    }
695
696    // ── Approval timeout ──────────────────────────────────────────
697
698    /// Check all WaitingApproval runs for timeout. For Critical/High-priority SOPs,
699    /// auto-approve and return the resulting actions. Non-critical SOPs stay
700    /// in WaitingApproval indefinitely (or until explicitly approved/cancelled).
701    pub fn check_approval_timeouts(&mut self) -> Vec<SopRunAction> {
702        let timeout_secs = self.config.approval_timeout_secs;
703        if timeout_secs == 0 {
704            return Vec::new();
705        }
706
707        // Collect timed-out runs with their priority classification
708        // cooldown_elapsed(ts, secs) returns true when (now - ts) >= secs
709        let timed_out: Vec<(String, bool)> = self
710            .active_runs
711            .values()
712            .filter(|r| r.status == SopRunStatus::WaitingApproval)
713            .filter(|r| {
714                r.waiting_since
715                    .as_deref()
716                    .is_some_and(|ts| cooldown_elapsed(ts, timeout_secs))
717            })
718            .map(|r| {
719                let is_critical =
720                    self.sops
721                        .iter()
722                        .find(|s| s.name == r.sop_name)
723                        .is_some_and(|s| {
724                            matches!(s.priority, SopPriority::Critical | SopPriority::High)
725                        });
726                (r.run_id.clone(), is_critical)
727            })
728            .collect();
729
730        let mut actions = Vec::new();
731        for (run_id, is_critical) in timed_out {
732            if is_critical {
733                // Auto-approve: Critical/High priority SOPs fall back to Auto on timeout
734                ::zeroclaw_log::record!(
735                    INFO,
736                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
737                        .with_attrs(::serde_json::json!({"run_id": run_id})),
738                    "SOP run : approval timeout — auto-approving (critical/high priority)"
739                );
740                match self.approve_step(&run_id) {
741                    Ok(action) => actions.push(action),
742                    Err(e) => ::zeroclaw_log::record!(
743                        WARN,
744                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
745                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
746                            .with_attrs(
747                                ::serde_json::json!({"error": format!("{}", e), "run_id": run_id})
748                            ),
749                        "SOP run : auto-approve failed"
750                    ),
751                }
752            } else {
753                ::zeroclaw_log::record!(
754                    INFO,
755                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
756                        .with_attrs(::serde_json::json!({"run_id": run_id})),
757                    "SOP run : approval timeout — waiting indefinitely (non-critical)"
758                );
759            }
760        }
761
762        actions
763    }
764
765    // ── Test helpers ──────────────────────────────────────────────
766
767    /// Replace loaded SOPs (for testing from other modules).
768    // Available for cross-crate testing
769    pub fn set_sops_for_test(&mut self, sops: Vec<Sop>) {
770        self.sops = sops;
771    }
772
773    // ── Internal helpers ────────────────────────────────────────
774
775    pub fn last_finished_run(&self, sop_name: &str) -> Option<&SopRun> {
776        self.finished_runs
777            .iter()
778            .rev()
779            .find(|r| r.sop_name == sop_name)
780    }
781
782    pub fn finish_run(
783        &mut self,
784        run_id: &str,
785        status: SopRunStatus,
786        reason: Option<String>,
787    ) -> SopRunAction {
788        let mut run = self.active_runs.remove(run_id).unwrap();
789        run.status = status;
790        run.completed_at = Some(now_iso8601());
791        let sop_name = run.sop_name.clone();
792        let run_id_owned = run.run_id.clone();
793        self.finished_runs.push(run);
794
795        // Evict oldest finished runs when over capacity
796        let max = self.config.max_finished_runs;
797        if max > 0 && self.finished_runs.len() > max {
798            let excess = self.finished_runs.len() - max;
799            self.finished_runs.drain(..excess);
800        }
801
802        match status {
803            SopRunStatus::Failed => SopRunAction::Failed {
804                run_id: run_id_owned,
805                sop_name,
806                reason: reason.unwrap_or_default(),
807            },
808            _ => SopRunAction::Completed {
809                run_id: run_id_owned,
810                sop_name,
811            },
812        }
813    }
814}
815
816// ── Trigger matching ────────────────────────────────────────────
817
818/// Check whether a single trigger definition matches an incoming event.
819fn trigger_matches(trigger: &SopTrigger, event: &SopEvent) -> bool {
820    match (trigger, event.source) {
821        (SopTrigger::Mqtt { topic, condition }, SopTriggerSource::Mqtt) => {
822            let topic_match = event
823                .topic
824                .as_deref()
825                .is_some_and(|t| mqtt_topic_matches(topic, t));
826            if !topic_match {
827                return false;
828            }
829            // Evaluate condition against payload (None condition = unconditional)
830            match condition {
831                Some(cond) => evaluate_condition(cond, event.payload.as_deref()),
832                None => true,
833            }
834        }
835
836        (SopTrigger::Webhook { path }, SopTriggerSource::Webhook) => {
837            event.topic.as_deref().is_some_and(|t| t == path)
838        }
839
840        (
841            SopTrigger::Peripheral {
842                board,
843                signal,
844                condition,
845            },
846            SopTriggerSource::Peripheral,
847        ) => {
848            let topic_match = event.topic.as_deref().is_some_and(|t| {
849                let expected = format!("{board}/{signal}");
850                t == expected
851            });
852            if !topic_match {
853                return false;
854            }
855            // Evaluate condition against payload (None condition = unconditional)
856            match condition {
857                Some(cond) => evaluate_condition(cond, event.payload.as_deref()),
858                None => true,
859            }
860        }
861
862        (SopTrigger::Cron { expression }, SopTriggerSource::Cron) => {
863            event.topic.as_deref().is_some_and(|t| t == expression)
864        }
865
866        (SopTrigger::Manual, SopTriggerSource::Manual) => true,
867
868        _ => false,
869    }
870}
871
872/// Simple MQTT topic matching with `+` (single-level) and `#` (multi-level) wildcards.
873fn mqtt_topic_matches(pattern: &str, topic: &str) -> bool {
874    let pat_parts: Vec<&str> = pattern.split('/').collect();
875    let top_parts: Vec<&str> = topic.split('/').collect();
876
877    let mut pi = 0;
878    let mut ti = 0;
879
880    while pi < pat_parts.len() && ti < top_parts.len() {
881        match pat_parts[pi] {
882            "#" => return true, // multi-level wildcard matches everything remaining
883            "+" => {
884                // single-level wildcard matches one segment
885                pi += 1;
886                ti += 1;
887            }
888            seg => {
889                if seg != top_parts[ti] {
890                    return false;
891                }
892                pi += 1;
893                ti += 1;
894            }
895        }
896    }
897
898    // Both must be fully consumed (unless pattern ended with #)
899    pi == pat_parts.len() && ti == top_parts.len()
900}
901
902// ── Execution mode resolution ───────────────────────────────────
903
904/// Determine the action for a step based on SOP execution mode.
905fn resolve_step_action(sop: &Sop, step: &SopStep, run_id: String, context: String) -> SopRunAction {
906    // Steps with requires_confirmation always need approval
907    if step.requires_confirmation {
908        return SopRunAction::WaitApproval {
909            run_id,
910            step: step.clone(),
911            context,
912        };
913    }
914
915    let needs_approval = match sop.execution_mode {
916        // Deterministic mode is handled via start_deterministic_run;
917        // if we reach here via the standard path, treat as Auto.
918        SopExecutionMode::Auto | SopExecutionMode::Deterministic => false,
919        SopExecutionMode::Supervised => {
920            // Supervised: approval only before the first step
921            step.number == 1
922        }
923        SopExecutionMode::StepByStep => true,
924        SopExecutionMode::PriorityBased => match sop.priority {
925            SopPriority::Critical | SopPriority::High => false,
926            SopPriority::Normal | SopPriority::Low => {
927                // Supervised behavior for normal/low
928                step.number == 1
929            }
930        },
931    };
932
933    if needs_approval {
934        SopRunAction::WaitApproval {
935            run_id,
936            step: step.clone(),
937            context,
938        }
939    } else {
940        SopRunAction::ExecuteStep {
941            run_id,
942            step: step.clone(),
943            context,
944        }
945    }
946}
947
948// ── Step context formatting ─────────────────────────────────────
949
950/// Build the structured context message that gets injected into the agent.
951fn format_step_context(sop: &Sop, run: &SopRun, step: &SopStep) -> String {
952    let mut ctx = format!(
953        "[SOP: {} (run {}) — Step {} of {}]\n\n",
954        sop.name, run.run_id, step.number, run.total_steps
955    );
956
957    let _ = writeln!(
958        ctx,
959        "Trigger: {} {}",
960        run.trigger_event.source,
961        run.trigger_event.topic.as_deref().unwrap_or("(no topic)")
962    );
963
964    if let Some(ref payload) = run.trigger_event.payload {
965        let _ = writeln!(ctx, "Payload: {payload}");
966    }
967
968    // Previous step summary
969    if let Some(prev) = run.step_results.last() {
970        let _ = writeln!(
971            ctx,
972            "Previous: Step {} {} — {}",
973            prev.step_number, prev.status, prev.output
974        );
975    }
976
977    let _ = write!(ctx, "\nCurrent step: **{}**\n{}\n", step.title, step.body);
978
979    if !step.suggested_tools.is_empty() {
980        let _ = write!(
981            ctx,
982            "\nSuggested tools: {}\n",
983            step.suggested_tools.join(", ")
984        );
985    }
986
987    ctx.push_str("\nWhen done, report your result.\n");
988
989    ctx
990}
991
992// ── Utilities ───────────────────────────────────────────────────
993
994pub fn now_iso8601() -> String {
995    // Use chrono if available, otherwise fallback to SystemTime
996    let now = std::time::SystemTime::now()
997        .duration_since(std::time::UNIX_EPOCH)
998        .unwrap_or_default();
999    // Simple UTC timestamp without chrono dependency
1000    let secs = now.as_secs();
1001    let days = secs / 86400;
1002    let time_secs = secs % 86400;
1003    let hours = time_secs / 3600;
1004    let minutes = (time_secs % 3600) / 60;
1005    let seconds = time_secs % 60;
1006
1007    // Days since epoch to Y-M-D (simplified — good enough for run IDs)
1008    let (year, month, day) = days_to_ymd(days);
1009    format!("{year:04}-{month:02}-{day:02}T{hours:02}:{minutes:02}:{seconds:02}Z")
1010}
1011
1012/// Convert days since Unix epoch to (year, month, day).
1013fn days_to_ymd(mut days: u64) -> (u64, u64, u64) {
1014    // Algorithm from https://howardhinnant.github.io/date_algorithms.html
1015    days += 719_468;
1016    let era = days / 146_097;
1017    let doe = days - era * 146_097;
1018    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
1019    let y = yoe + era * 400;
1020    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1021    let mp = (5 * doy + 2) / 153;
1022    let d = doy - (153 * mp + 2) / 5 + 1;
1023    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1024    let y = if m <= 2 { y + 1 } else { y };
1025    (y, m, d)
1026}
1027
1028/// Check if enough time has elapsed since a timestamp string.
1029fn cooldown_elapsed(completed_at: &str, cooldown_secs: u64) -> bool {
1030    // Parse the ISO-8601 timestamp we generate
1031    let completed = parse_iso8601_secs(completed_at);
1032    let now = std::time::SystemTime::now()
1033        .duration_since(std::time::UNIX_EPOCH)
1034        .unwrap_or_default()
1035        .as_secs();
1036
1037    match completed {
1038        Some(ts) => now.saturating_sub(ts) >= cooldown_secs,
1039        None => true, // Can't parse timestamp; allow start
1040    }
1041}
1042
1043/// Minimal ISO-8601 parser returning seconds since epoch.
1044fn parse_iso8601_secs(input: &str) -> Option<u64> {
1045    // Expected format: YYYY-MM-DDTHH:MM:SSZ
1046    let input = input.trim_end_matches('Z');
1047    let parts: Vec<&str> = input.split('T').collect();
1048    if parts.len() != 2 {
1049        return None;
1050    }
1051    let date_parts: Vec<u64> = parts[0].split('-').filter_map(|p| p.parse().ok()).collect();
1052    let time_parts: Vec<u64> = parts[1].split(':').filter_map(|p| p.parse().ok()).collect();
1053    if date_parts.len() != 3 || time_parts.len() != 3 {
1054        return None;
1055    }
1056    let (year, month, day) = (date_parts[0], date_parts[1], date_parts[2]);
1057    let (hour, min, sec) = (time_parts[0], time_parts[1], time_parts[2]);
1058
1059    // Reverse of days_to_ymd: compute days since epoch
1060    let year_adj = if month <= 2 { year - 1 } else { year };
1061    let month_adj = if month > 2 { month - 3 } else { month + 9 };
1062    let era = year_adj / 400;
1063    let yoe = year_adj - era * 400;
1064    let doy = (153 * month_adj + 2) / 5 + day - 1;
1065    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1066    let days = era * 146_097 + doe - 719_468;
1067
1068    Some(days * 86400 + hour * 3600 + min * 60 + sec)
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use super::*;
1074    use crate::sop::types::SopExecutionMode;
1075
1076    fn manual_event() -> SopEvent {
1077        SopEvent {
1078            source: SopTriggerSource::Manual,
1079            topic: None,
1080            payload: None,
1081            timestamp: now_iso8601(),
1082        }
1083    }
1084
1085    fn mqtt_event(topic: &str, payload: &str) -> SopEvent {
1086        SopEvent {
1087            source: SopTriggerSource::Mqtt,
1088            topic: Some(topic.into()),
1089            payload: Some(payload.into()),
1090            timestamp: now_iso8601(),
1091        }
1092    }
1093
1094    fn test_sop(name: &str, mode: SopExecutionMode, priority: SopPriority) -> Sop {
1095        Sop {
1096            name: name.into(),
1097            description: format!("Test SOP: {name}"),
1098            version: "1.0.0".into(),
1099            priority,
1100            execution_mode: mode,
1101            triggers: vec![SopTrigger::Manual],
1102            steps: vec![
1103                SopStep {
1104                    number: 1,
1105                    title: "Step one".into(),
1106                    body: "Do step one".into(),
1107                    suggested_tools: vec!["shell".into()],
1108                    requires_confirmation: false,
1109                    kind: SopStepKind::default(),
1110                    schema: None,
1111                },
1112                SopStep {
1113                    number: 2,
1114                    title: "Step two".into(),
1115                    body: "Do step two".into(),
1116                    suggested_tools: vec![],
1117                    requires_confirmation: false,
1118                    kind: SopStepKind::default(),
1119                    schema: None,
1120                },
1121            ],
1122            cooldown_secs: 0,
1123            max_concurrent: 1,
1124            location: None,
1125            deterministic: false,
1126        }
1127    }
1128
1129    fn engine_with_sops(sops: Vec<Sop>) -> SopEngine {
1130        let mut engine = SopEngine::new(SopConfig::default());
1131        engine.sops = sops;
1132        engine
1133    }
1134
1135    /// Extract run_id from any SopRunAction variant.
1136    fn extract_run_id(action: &SopRunAction) -> &str {
1137        match action {
1138            SopRunAction::ExecuteStep { run_id, .. }
1139            | SopRunAction::WaitApproval { run_id, .. }
1140            | SopRunAction::DeterministicStep { run_id, .. }
1141            | SopRunAction::CheckpointWait { run_id, .. }
1142            | SopRunAction::Completed { run_id, .. }
1143            | SopRunAction::Failed { run_id, .. } => run_id,
1144        }
1145    }
1146
1147    /// Get the first active run_id from the engine (for tests with a single run).
1148    #[allow(dead_code)]
1149    fn first_active_run_id(engine: &SopEngine) -> String {
1150        engine
1151            .active_runs()
1152            .keys()
1153            .next()
1154            .expect("expected at least one active run")
1155            .clone()
1156    }
1157
1158    // ── Trigger matching ────────────────────────────────
1159
1160    #[test]
1161    fn match_manual_trigger() {
1162        let engine = engine_with_sops(vec![test_sop(
1163            "s1",
1164            SopExecutionMode::Auto,
1165            SopPriority::Normal,
1166        )]);
1167        let matches = engine.match_trigger(&manual_event());
1168        assert_eq!(matches.len(), 1);
1169        assert_eq!(matches[0].name, "s1");
1170    }
1171
1172    #[test]
1173    fn no_match_for_wrong_source() {
1174        let engine = engine_with_sops(vec![test_sop(
1175            "s1",
1176            SopExecutionMode::Auto,
1177            SopPriority::Normal,
1178        )]);
1179        let event = mqtt_event("sensors/temp", "{}");
1180        let matches = engine.match_trigger(&event);
1181        assert!(matches.is_empty());
1182    }
1183
1184    #[test]
1185    fn match_mqtt_trigger_exact() {
1186        let sop = Sop {
1187            triggers: vec![SopTrigger::Mqtt {
1188                topic: "plant/pump/pressure".into(),
1189                condition: None,
1190            }],
1191            ..test_sop(
1192                "pressure-sop",
1193                SopExecutionMode::Auto,
1194                SopPriority::Critical,
1195            )
1196        };
1197        let engine = engine_with_sops(vec![sop]);
1198        let matches = engine.match_trigger(&mqtt_event("plant/pump/pressure", "87.3"));
1199        assert_eq!(matches.len(), 1);
1200    }
1201
1202    #[test]
1203    fn match_mqtt_wildcard_plus() {
1204        let sop = Sop {
1205            triggers: vec![SopTrigger::Mqtt {
1206                topic: "plant/+/pressure".into(),
1207                condition: None,
1208            }],
1209            ..test_sop("wildcard-sop", SopExecutionMode::Auto, SopPriority::Normal)
1210        };
1211        let engine = engine_with_sops(vec![sop]);
1212        assert_eq!(
1213            engine
1214                .match_trigger(&mqtt_event("plant/pump_3/pressure", "87"))
1215                .len(),
1216            1
1217        );
1218        assert!(
1219            engine
1220                .match_trigger(&mqtt_event("plant/pump_3/temperature", "50"))
1221                .is_empty()
1222        );
1223    }
1224
1225    #[test]
1226    fn match_mqtt_wildcard_hash() {
1227        let sop = Sop {
1228            triggers: vec![SopTrigger::Mqtt {
1229                topic: "plant/#".into(),
1230                condition: None,
1231            }],
1232            ..test_sop("hash-sop", SopExecutionMode::Auto, SopPriority::Normal)
1233        };
1234        let engine = engine_with_sops(vec![sop]);
1235        assert_eq!(
1236            engine
1237                .match_trigger(&mqtt_event("plant/pump/pressure", "87"))
1238                .len(),
1239            1
1240        );
1241        assert_eq!(
1242            engine
1243                .match_trigger(&mqtt_event("plant/a/b/c/d", "x"))
1244                .len(),
1245            1
1246        );
1247    }
1248
1249    #[test]
1250    fn mqtt_topic_matching_edge_cases() {
1251        assert!(mqtt_topic_matches("a/b/c", "a/b/c"));
1252        assert!(!mqtt_topic_matches("a/b/c", "a/b/d"));
1253        assert!(!mqtt_topic_matches("a/b/c", "a/b"));
1254        assert!(!mqtt_topic_matches("a/b", "a/b/c"));
1255        assert!(mqtt_topic_matches("+/+/+", "a/b/c"));
1256        assert!(!mqtt_topic_matches("+/+", "a/b/c"));
1257        assert!(mqtt_topic_matches("#", "a/b/c"));
1258        assert!(mqtt_topic_matches("a/#", "a/b/c"));
1259        assert!(!mqtt_topic_matches("b/#", "a/b/c"));
1260    }
1261
1262    // ── Webhook trigger matching ─────────────────────
1263
1264    #[test]
1265    fn webhook_trigger_matches_exact_path() {
1266        let sop = Sop {
1267            triggers: vec![SopTrigger::Webhook {
1268                path: "/webhook".into(),
1269            }],
1270            ..test_sop("webhook-sop", SopExecutionMode::Auto, SopPriority::Normal)
1271        };
1272        let engine = engine_with_sops(vec![sop]);
1273
1274        // Exact match — should match
1275        let event = SopEvent {
1276            source: SopTriggerSource::Webhook,
1277            topic: Some("/webhook".into()),
1278            payload: None,
1279            timestamp: now_iso8601(),
1280        };
1281        assert_eq!(engine.match_trigger(&event).len(), 1);
1282    }
1283
1284    #[test]
1285    fn webhook_trigger_rejects_different_path() {
1286        let sop = Sop {
1287            triggers: vec![SopTrigger::Webhook {
1288                path: "/sop/deploy".into(),
1289            }],
1290            ..test_sop("deploy-sop", SopExecutionMode::Auto, SopPriority::Normal)
1291        };
1292        let engine = engine_with_sops(vec![sop]);
1293
1294        // Path /webhook does NOT match /sop/deploy
1295        let event = SopEvent {
1296            source: SopTriggerSource::Webhook,
1297            topic: Some("/webhook".into()),
1298            payload: None,
1299            timestamp: now_iso8601(),
1300        };
1301        assert!(engine.match_trigger(&event).is_empty());
1302
1303        // But /sop/deploy matches /sop/deploy
1304        let event = SopEvent {
1305            source: SopTriggerSource::Webhook,
1306            topic: Some("/sop/deploy".into()),
1307            payload: None,
1308            timestamp: now_iso8601(),
1309        };
1310        assert_eq!(engine.match_trigger(&event).len(), 1);
1311    }
1312
1313    // ── Cron trigger matching ─────────────────────────
1314
1315    #[test]
1316    fn cron_trigger_matches_only_matching_expression() {
1317        let sop = Sop {
1318            triggers: vec![SopTrigger::Cron {
1319                expression: "0 */5 * * *".into(),
1320            }],
1321            ..test_sop("cron-sop", SopExecutionMode::Auto, SopPriority::Normal)
1322        };
1323        let engine = engine_with_sops(vec![sop]);
1324
1325        // Matching expression
1326        let event = SopEvent {
1327            source: SopTriggerSource::Cron,
1328            topic: Some("0 */5 * * *".into()),
1329            payload: None,
1330            timestamp: now_iso8601(),
1331        };
1332        assert_eq!(engine.match_trigger(&event).len(), 1);
1333
1334        // Different expression — should NOT match
1335        let event = SopEvent {
1336            source: SopTriggerSource::Cron,
1337            topic: Some("0 */10 * * *".into()),
1338            payload: None,
1339            timestamp: now_iso8601(),
1340        };
1341        assert!(engine.match_trigger(&event).is_empty());
1342
1343        // No topic — should NOT match
1344        let event = SopEvent {
1345            source: SopTriggerSource::Cron,
1346            topic: None,
1347            payload: None,
1348            timestamp: now_iso8601(),
1349        };
1350        assert!(engine.match_trigger(&event).is_empty());
1351    }
1352
1353    // ── Condition-based trigger matching ────────────────
1354
1355    #[test]
1356    fn mqtt_condition_filters_by_payload() {
1357        let sop = Sop {
1358            triggers: vec![SopTrigger::Mqtt {
1359                topic: "sensors/pressure".into(),
1360                condition: Some("$.value > 85".into()),
1361            }],
1362            ..test_sop("cond-sop", SopExecutionMode::Auto, SopPriority::Critical)
1363        };
1364        let engine = engine_with_sops(vec![sop]);
1365
1366        // Payload meets condition
1367        let matches = engine.match_trigger(&mqtt_event("sensors/pressure", r#"{"value": 90}"#));
1368        assert_eq!(matches.len(), 1);
1369
1370        // Payload does not meet condition
1371        let matches = engine.match_trigger(&mqtt_event("sensors/pressure", r#"{"value": 50}"#));
1372        assert!(matches.is_empty());
1373    }
1374
1375    #[test]
1376    fn mqtt_no_condition_matches_any_payload() {
1377        let sop = Sop {
1378            triggers: vec![SopTrigger::Mqtt {
1379                topic: "sensors/temp".into(),
1380                condition: None,
1381            }],
1382            ..test_sop("no-cond", SopExecutionMode::Auto, SopPriority::Normal)
1383        };
1384        let engine = engine_with_sops(vec![sop]);
1385
1386        let matches = engine.match_trigger(&mqtt_event("sensors/temp", "anything"));
1387        assert_eq!(matches.len(), 1);
1388    }
1389
1390    #[test]
1391    fn mqtt_condition_no_payload_fails_closed() {
1392        let sop = Sop {
1393            triggers: vec![SopTrigger::Mqtt {
1394                topic: "sensors/temp".into(),
1395                condition: Some("$.value > 0".into()),
1396            }],
1397            ..test_sop("no-payload", SopExecutionMode::Auto, SopPriority::Normal)
1398        };
1399        let engine = engine_with_sops(vec![sop]);
1400
1401        // Event with no payload
1402        let event = SopEvent {
1403            source: SopTriggerSource::Mqtt,
1404            topic: Some("sensors/temp".into()),
1405            payload: None,
1406            timestamp: now_iso8601(),
1407        };
1408        assert!(engine.match_trigger(&event).is_empty());
1409    }
1410
1411    #[test]
1412    fn peripheral_condition_filters_by_payload() {
1413        let sop = Sop {
1414            triggers: vec![SopTrigger::Peripheral {
1415                board: "nucleo".into(),
1416                signal: "pin_3".into(),
1417                condition: Some("> 0".into()),
1418            }],
1419            ..test_sop("periph-cond", SopExecutionMode::Auto, SopPriority::High)
1420        };
1421        let engine = engine_with_sops(vec![sop]);
1422
1423        // Positive signal
1424        let event = SopEvent {
1425            source: SopTriggerSource::Peripheral,
1426            topic: Some("nucleo/pin_3".into()),
1427            payload: Some("1".into()),
1428            timestamp: now_iso8601(),
1429        };
1430        assert_eq!(engine.match_trigger(&event).len(), 1);
1431
1432        // Zero signal — does not meet condition
1433        let event = SopEvent {
1434            source: SopTriggerSource::Peripheral,
1435            topic: Some("nucleo/pin_3".into()),
1436            payload: Some("0".into()),
1437            timestamp: now_iso8601(),
1438        };
1439        assert!(engine.match_trigger(&event).is_empty());
1440    }
1441
1442    #[test]
1443    fn peripheral_no_condition_matches_any() {
1444        let sop = Sop {
1445            triggers: vec![SopTrigger::Peripheral {
1446                board: "rpi".into(),
1447                signal: "gpio_5".into(),
1448                condition: None,
1449            }],
1450            ..test_sop("periph-nocond", SopExecutionMode::Auto, SopPriority::Normal)
1451        };
1452        let engine = engine_with_sops(vec![sop]);
1453
1454        let event = SopEvent {
1455            source: SopTriggerSource::Peripheral,
1456            topic: Some("rpi/gpio_5".into()),
1457            payload: Some("0".into()),
1458            timestamp: now_iso8601(),
1459        };
1460        assert_eq!(engine.match_trigger(&event).len(), 1);
1461    }
1462
1463    // ── Run lifecycle ───────────────────────────────────
1464
1465    #[test]
1466    fn start_run_returns_first_step() {
1467        let mut engine = engine_with_sops(vec![test_sop(
1468            "s1",
1469            SopExecutionMode::Auto,
1470            SopPriority::Normal,
1471        )]);
1472        let action = engine.start_run("s1", manual_event()).unwrap();
1473        let run_id = extract_run_id(&action);
1474        assert!(run_id.starts_with("run-"));
1475        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1476        assert_eq!(engine.active_runs().len(), 1);
1477    }
1478
1479    #[test]
1480    fn start_run_unknown_sop_fails() {
1481        let mut engine = engine_with_sops(vec![]);
1482        assert!(engine.start_run("nonexistent", manual_event()).is_err());
1483    }
1484
1485    #[test]
1486    fn advance_step_to_completion() {
1487        let mut engine = engine_with_sops(vec![test_sop(
1488            "s1",
1489            SopExecutionMode::Auto,
1490            SopPriority::Normal,
1491        )]);
1492        let action = engine.start_run("s1", manual_event()).unwrap();
1493        let run_id = extract_run_id(&action).to_string();
1494
1495        // Complete step 1
1496        let action = engine
1497            .advance_step(
1498                &run_id,
1499                SopStepResult {
1500                    step_number: 1,
1501                    status: SopStepStatus::Completed,
1502                    output: "done".into(),
1503                    started_at: now_iso8601(),
1504                    completed_at: Some(now_iso8601()),
1505                },
1506            )
1507            .unwrap();
1508
1509        // Should get step 2
1510        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1511
1512        // Complete step 2
1513        let action = engine
1514            .advance_step(
1515                &run_id,
1516                SopStepResult {
1517                    step_number: 2,
1518                    status: SopStepStatus::Completed,
1519                    output: "done".into(),
1520                    started_at: now_iso8601(),
1521                    completed_at: Some(now_iso8601()),
1522                },
1523            )
1524            .unwrap();
1525
1526        assert!(matches!(action, SopRunAction::Completed { .. }));
1527        assert!(engine.active_runs().is_empty());
1528        assert_eq!(engine.finished_runs(None).len(), 1);
1529    }
1530
1531    #[test]
1532    fn step_failure_ends_run() {
1533        let mut engine = engine_with_sops(vec![test_sop(
1534            "s1",
1535            SopExecutionMode::Auto,
1536            SopPriority::Normal,
1537        )]);
1538        let action = engine.start_run("s1", manual_event()).unwrap();
1539        let run_id = extract_run_id(&action).to_string();
1540
1541        let action = engine
1542            .advance_step(
1543                &run_id,
1544                SopStepResult {
1545                    step_number: 1,
1546                    status: SopStepStatus::Failed,
1547                    output: "valve stuck".into(),
1548                    started_at: now_iso8601(),
1549                    completed_at: Some(now_iso8601()),
1550                },
1551            )
1552            .unwrap();
1553
1554        assert!(
1555            matches!(action, SopRunAction::Failed { ref reason, .. } if reason.contains("valve stuck"))
1556        );
1557        assert!(engine.active_runs().is_empty());
1558    }
1559
1560    #[test]
1561    fn cancel_run() {
1562        let mut engine = engine_with_sops(vec![test_sop(
1563            "s1",
1564            SopExecutionMode::Auto,
1565            SopPriority::Normal,
1566        )]);
1567        let action = engine.start_run("s1", manual_event()).unwrap();
1568        let run_id = extract_run_id(&action).to_string();
1569        engine.cancel_run(&run_id).unwrap();
1570        assert!(engine.active_runs().is_empty());
1571        let finished = engine.finished_runs(None);
1572        assert_eq!(finished[0].status, SopRunStatus::Cancelled);
1573    }
1574
1575    #[test]
1576    fn cancel_unknown_run_fails() {
1577        let mut engine = engine_with_sops(vec![]);
1578        assert!(engine.cancel_run("nonexistent").is_err());
1579    }
1580
1581    // ── Concurrency ─────────────────────────────────────
1582
1583    #[test]
1584    fn per_sop_concurrency_limit() {
1585        let mut engine = engine_with_sops(vec![test_sop(
1586            "s1",
1587            SopExecutionMode::Auto,
1588            SopPriority::Normal,
1589        )]);
1590        // max_concurrent = 1 by default
1591        engine.start_run("s1", manual_event()).unwrap();
1592        assert!(!engine.can_start("s1"));
1593        assert!(engine.start_run("s1", manual_event()).is_err());
1594    }
1595
1596    #[test]
1597    fn global_concurrency_limit() {
1598        let sops = vec![
1599            test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal),
1600            test_sop("s2", SopExecutionMode::Auto, SopPriority::Normal),
1601        ];
1602        let mut engine = SopEngine::new(SopConfig {
1603            max_concurrent_total: 1,
1604            ..SopConfig::default()
1605        });
1606        engine.sops = sops;
1607
1608        engine.start_run("s1", manual_event()).unwrap();
1609        assert!(!engine.can_start("s2"));
1610    }
1611
1612    // ── Cooldown ────────────────────────────────────────
1613
1614    #[test]
1615    fn cooldown_blocks_immediate_restart() {
1616        let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
1617        sop.cooldown_secs = 3600; // 1 hour
1618        let mut engine = engine_with_sops(vec![sop]);
1619
1620        let action = engine.start_run("s1", manual_event()).unwrap();
1621        let run_id = extract_run_id(&action).to_string();
1622        // Complete both steps
1623        engine
1624            .advance_step(
1625                &run_id,
1626                SopStepResult {
1627                    step_number: 1,
1628                    status: SopStepStatus::Completed,
1629                    output: "ok".into(),
1630                    started_at: now_iso8601(),
1631                    completed_at: Some(now_iso8601()),
1632                },
1633            )
1634            .unwrap();
1635        engine
1636            .advance_step(
1637                &run_id,
1638                SopStepResult {
1639                    step_number: 2,
1640                    status: SopStepStatus::Completed,
1641                    output: "ok".into(),
1642                    started_at: now_iso8601(),
1643                    completed_at: Some(now_iso8601()),
1644                },
1645            )
1646            .unwrap();
1647
1648        // Cooldown not elapsed — should block
1649        assert!(!engine.can_start("s1"));
1650    }
1651
1652    // ── Execution modes ─────────────────────────────────
1653
1654    #[test]
1655    fn auto_mode_executes_immediately() {
1656        let mut engine = engine_with_sops(vec![test_sop(
1657            "s1",
1658            SopExecutionMode::Auto,
1659            SopPriority::Normal,
1660        )]);
1661        let action = engine.start_run("s1", manual_event()).unwrap();
1662        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1663    }
1664
1665    #[test]
1666    fn supervised_mode_waits_on_first_step() {
1667        let mut engine = engine_with_sops(vec![test_sop(
1668            "s1",
1669            SopExecutionMode::Supervised,
1670            SopPriority::Normal,
1671        )]);
1672        let action = engine.start_run("s1", manual_event()).unwrap();
1673        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1674    }
1675
1676    #[test]
1677    fn step_by_step_waits_on_every_step() {
1678        let mut engine = engine_with_sops(vec![test_sop(
1679            "s1",
1680            SopExecutionMode::StepByStep,
1681            SopPriority::Normal,
1682        )]);
1683
1684        // Step 1: WaitApproval
1685        let action = engine.start_run("s1", manual_event()).unwrap();
1686        let run_id = extract_run_id(&action).to_string();
1687        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1688
1689        // Approve step 1
1690        let action = engine.approve_step(&run_id).unwrap();
1691        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1692
1693        // Complete step 1, step 2 should also WaitApproval
1694        let action = engine
1695            .advance_step(
1696                &run_id,
1697                SopStepResult {
1698                    step_number: 1,
1699                    status: SopStepStatus::Completed,
1700                    output: "ok".into(),
1701                    started_at: now_iso8601(),
1702                    completed_at: Some(now_iso8601()),
1703                },
1704            )
1705            .unwrap();
1706        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1707    }
1708
1709    #[test]
1710    fn priority_based_critical_auto() {
1711        let mut engine = engine_with_sops(vec![test_sop(
1712            "s1",
1713            SopExecutionMode::PriorityBased,
1714            SopPriority::Critical,
1715        )]);
1716        let action = engine.start_run("s1", manual_event()).unwrap();
1717        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1718    }
1719
1720    #[test]
1721    fn priority_based_normal_supervised() {
1722        let mut engine = engine_with_sops(vec![test_sop(
1723            "s1",
1724            SopExecutionMode::PriorityBased,
1725            SopPriority::Normal,
1726        )]);
1727        let action = engine.start_run("s1", manual_event()).unwrap();
1728        // Normal + PriorityBased → Supervised → WaitApproval on step 1
1729        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1730    }
1731
1732    #[test]
1733    fn requires_confirmation_overrides_auto() {
1734        let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Critical);
1735        sop.steps[0].requires_confirmation = true;
1736        let mut engine = engine_with_sops(vec![sop]);
1737        let action = engine.start_run("s1", manual_event()).unwrap();
1738        // Even in Auto mode, requires_confirmation forces WaitApproval
1739        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1740    }
1741
1742    // ── Approve ─────────────────────────────────────────
1743
1744    #[test]
1745    fn approve_transitions_to_execute() {
1746        let mut engine = engine_with_sops(vec![test_sop(
1747            "s1",
1748            SopExecutionMode::Supervised,
1749            SopPriority::Normal,
1750        )]);
1751        let action = engine.start_run("s1", manual_event()).unwrap();
1752        let run_id = extract_run_id(&action).to_string();
1753
1754        // Run should be WaitingApproval
1755        let run = engine.active_runs().get(&run_id).unwrap();
1756        assert_eq!(run.status, SopRunStatus::WaitingApproval);
1757
1758        // Approve
1759        let action = engine.approve_step(&run_id).unwrap();
1760        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1761
1762        let run = engine.active_runs().get(&run_id).unwrap();
1763        assert_eq!(run.status, SopRunStatus::Running);
1764    }
1765
1766    #[test]
1767    fn approve_non_waiting_fails() {
1768        let mut engine = engine_with_sops(vec![test_sop(
1769            "s1",
1770            SopExecutionMode::Auto,
1771            SopPriority::Normal,
1772        )]);
1773        let action = engine.start_run("s1", manual_event()).unwrap();
1774        let run_id = extract_run_id(&action).to_string();
1775        assert!(engine.approve_step(&run_id).is_err());
1776    }
1777
1778    // ── Context formatting ──────────────────────────────
1779
1780    #[test]
1781    fn step_context_includes_sop_name_and_step() {
1782        let sop = test_sop(
1783            "pump-shutdown",
1784            SopExecutionMode::Auto,
1785            SopPriority::Critical,
1786        );
1787        let run = SopRun {
1788            run_id: "run-001".into(),
1789            sop_name: "pump-shutdown".into(),
1790            trigger_event: manual_event(),
1791            status: SopRunStatus::Running,
1792            current_step: 1,
1793            total_steps: 2,
1794            started_at: now_iso8601(),
1795            completed_at: None,
1796            step_results: Vec::new(),
1797            waiting_since: None,
1798            llm_calls_saved: 0,
1799        };
1800        let ctx = format_step_context(&sop, &run, &sop.steps[0]);
1801        assert!(ctx.contains("pump-shutdown"));
1802        assert!(ctx.contains("Step 1 of 2"));
1803        assert!(ctx.contains("Step one"));
1804    }
1805
1806    // ── Get run (active + finished) ─────────────────────
1807
1808    #[test]
1809    fn get_run_finds_active_and_finished() {
1810        let mut engine = engine_with_sops(vec![test_sop(
1811            "s1",
1812            SopExecutionMode::Auto,
1813            SopPriority::Normal,
1814        )]);
1815        let action = engine.start_run("s1", manual_event()).unwrap();
1816        let run_id = extract_run_id(&action).to_string();
1817
1818        // Active
1819        assert!(engine.get_run(&run_id).is_some());
1820        assert_eq!(
1821            engine.get_run(&run_id).unwrap().status,
1822            SopRunStatus::Running
1823        );
1824
1825        // Complete
1826        engine
1827            .advance_step(
1828                &run_id,
1829                SopStepResult {
1830                    step_number: 1,
1831                    status: SopStepStatus::Completed,
1832                    output: "ok".into(),
1833                    started_at: now_iso8601(),
1834                    completed_at: Some(now_iso8601()),
1835                },
1836            )
1837            .unwrap();
1838        engine
1839            .advance_step(
1840                &run_id,
1841                SopStepResult {
1842                    step_number: 2,
1843                    status: SopStepStatus::Completed,
1844                    output: "ok".into(),
1845                    started_at: now_iso8601(),
1846                    completed_at: Some(now_iso8601()),
1847                },
1848            )
1849            .unwrap();
1850
1851        // Now finished — still findable
1852        assert!(engine.get_run(&run_id).is_some());
1853        assert_eq!(
1854            engine.get_run(&run_id).unwrap().status,
1855            SopRunStatus::Completed
1856        );
1857
1858        // Unknown
1859        assert!(engine.get_run("nonexistent").is_none());
1860    }
1861
1862    // ── ISO-8601 helpers ────────────────────────────────
1863
1864    #[test]
1865    fn iso8601_roundtrip() {
1866        let ts = now_iso8601();
1867        let secs = parse_iso8601_secs(&ts);
1868        assert!(secs.is_some());
1869        // Should be close to current time
1870        let now = std::time::SystemTime::now()
1871            .duration_since(std::time::UNIX_EPOCH)
1872            .unwrap()
1873            .as_secs();
1874        assert!(now.abs_diff(secs.unwrap()) < 2);
1875    }
1876
1877    #[test]
1878    fn parse_known_timestamp() {
1879        // 2026-01-01T00:00:00Z
1880        let secs = parse_iso8601_secs("2026-01-01T00:00:00Z").unwrap();
1881        // Jan 1 2026 = 20454 days since epoch * 86400
1882        assert_eq!(secs, 20454 * 86400);
1883    }
1884
1885    // ── Approval timeout ─────────────────────────────────
1886
1887    #[test]
1888    fn timeout_auto_approves_critical() {
1889        let mut engine = SopEngine::new(SopConfig {
1890            approval_timeout_secs: 1, // 1 second for test
1891            ..SopConfig::default()
1892        });
1893        let mut sop = test_sop("s1", SopExecutionMode::Supervised, SopPriority::Critical);
1894        // PriorityBased would auto-execute critical, so use Supervised to force WaitApproval
1895        sop.execution_mode = SopExecutionMode::Supervised;
1896        engine.set_sops_for_test(vec![sop]);
1897
1898        let action = engine.start_run("s1", manual_event()).unwrap();
1899        let run_id = extract_run_id(&action).to_string();
1900        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1901
1902        // Manually backdate waiting_since to simulate timeout
1903        let run = engine.active_runs.get_mut(&run_id).unwrap();
1904        run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1905
1906        let actions = engine.check_approval_timeouts();
1907        assert_eq!(actions.len(), 1);
1908        assert!(matches!(actions[0], SopRunAction::ExecuteStep { .. }));
1909    }
1910
1911    #[test]
1912    fn timeout_does_not_auto_approve_normal() {
1913        let mut engine = SopEngine::new(SopConfig {
1914            approval_timeout_secs: 1,
1915            ..SopConfig::default()
1916        });
1917        engine.set_sops_for_test(vec![test_sop(
1918            "s1",
1919            SopExecutionMode::Supervised,
1920            SopPriority::Normal,
1921        )]);
1922
1923        let action = engine.start_run("s1", manual_event()).unwrap();
1924        let run_id = extract_run_id(&action).to_string();
1925
1926        // Backdate waiting_since
1927        let run = engine.active_runs.get_mut(&run_id).unwrap();
1928        run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1929
1930        // Normal priority → no auto-approve
1931        let actions = engine.check_approval_timeouts();
1932        assert!(actions.is_empty());
1933        // Run should still be WaitingApproval
1934        assert_eq!(
1935            engine.get_run(&run_id).unwrap().status,
1936            SopRunStatus::WaitingApproval
1937        );
1938    }
1939
1940    #[test]
1941    fn timeout_zero_disables_check() {
1942        let mut engine = SopEngine::new(SopConfig {
1943            approval_timeout_secs: 0,
1944            ..SopConfig::default()
1945        });
1946        engine.set_sops_for_test(vec![test_sop(
1947            "s1",
1948            SopExecutionMode::Supervised,
1949            SopPriority::Critical,
1950        )]);
1951        let action = engine.start_run("s1", manual_event()).unwrap();
1952        let run_id = extract_run_id(&action).to_string();
1953
1954        let run = engine.active_runs.get_mut(&run_id).unwrap();
1955        run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1956
1957        let actions = engine.check_approval_timeouts();
1958        assert!(actions.is_empty());
1959    }
1960
1961    #[test]
1962    fn waiting_since_set_on_wait_approval() {
1963        let mut engine = engine_with_sops(vec![test_sop(
1964            "s1",
1965            SopExecutionMode::Supervised,
1966            SopPriority::Normal,
1967        )]);
1968        let action = engine.start_run("s1", manual_event()).unwrap();
1969        let run_id = extract_run_id(&action).to_string();
1970
1971        let run = engine.get_run(&run_id).unwrap();
1972        assert_eq!(run.status, SopRunStatus::WaitingApproval);
1973        assert!(run.waiting_since.is_some());
1974    }
1975
1976    // ── Eviction ──────────────────────────────────────
1977
1978    #[test]
1979    fn max_finished_runs_evicts_oldest() {
1980        let mut engine = SopEngine::new(SopConfig {
1981            max_finished_runs: 2,
1982            ..SopConfig::default()
1983        });
1984        // SOP with 1 step so each run completes in one advance
1985        let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
1986        sop.steps = vec![sop.steps[0].clone()];
1987        sop.max_concurrent = 10;
1988        engine.sops = vec![sop];
1989
1990        // Complete 3 runs
1991        let mut finished_ids = Vec::new();
1992        for _ in 0..3 {
1993            let action = engine.start_run("s1", manual_event()).unwrap();
1994            let rid = extract_run_id(&action).to_string();
1995            engine
1996                .advance_step(
1997                    &rid,
1998                    SopStepResult {
1999                        step_number: 1,
2000                        status: SopStepStatus::Completed,
2001                        output: "ok".into(),
2002                        started_at: now_iso8601(),
2003                        completed_at: Some(now_iso8601()),
2004                    },
2005                )
2006                .unwrap();
2007            finished_ids.push(rid);
2008        }
2009
2010        // Only 2 should be kept (max_finished_runs=2)
2011        let finished = engine.finished_runs(None);
2012        assert_eq!(
2013            finished.len(),
2014            2,
2015            "eviction should cap at max_finished_runs"
2016        );
2017        // Oldest (first) run should be evicted, newest two remain
2018        assert_eq!(finished[0].run_id, finished_ids[1]);
2019        assert_eq!(finished[1].run_id, finished_ids[2]);
2020    }
2021
2022    #[test]
2023    fn max_finished_runs_zero_means_unlimited() {
2024        let mut engine = SopEngine::new(SopConfig {
2025            max_finished_runs: 0,
2026            ..SopConfig::default()
2027        });
2028        let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
2029        sop.steps = vec![sop.steps[0].clone()];
2030        sop.max_concurrent = 10;
2031        engine.sops = vec![sop];
2032
2033        for _ in 0..5 {
2034            let action = engine.start_run("s1", manual_event()).unwrap();
2035            let rid = extract_run_id(&action).to_string();
2036            engine
2037                .advance_step(
2038                    &rid,
2039                    SopStepResult {
2040                        step_number: 1,
2041                        status: SopStepStatus::Completed,
2042                        output: "ok".into(),
2043                        started_at: now_iso8601(),
2044                        completed_at: Some(now_iso8601()),
2045                    },
2046                )
2047                .unwrap();
2048        }
2049
2050        assert_eq!(engine.finished_runs(None).len(), 5, "zero means unlimited");
2051    }
2052
2053    #[test]
2054    fn waiting_since_cleared_on_approve() {
2055        let mut engine = engine_with_sops(vec![test_sop(
2056            "s1",
2057            SopExecutionMode::Supervised,
2058            SopPriority::Normal,
2059        )]);
2060        let action = engine.start_run("s1", manual_event()).unwrap();
2061        let run_id = extract_run_id(&action).to_string();
2062        engine.approve_step(&run_id).unwrap();
2063
2064        let run = engine.get_run(&run_id).unwrap();
2065        assert_eq!(run.status, SopRunStatus::Running);
2066        assert!(run.waiting_since.is_none());
2067    }
2068
2069    // ── Deterministic execution ─────────────────────────
2070
2071    fn deterministic_sop(name: &str) -> Sop {
2072        Sop {
2073            name: name.into(),
2074            description: format!("Deterministic SOP: {name}"),
2075            version: "1.0.0".into(),
2076            priority: SopPriority::Normal,
2077            execution_mode: SopExecutionMode::Deterministic,
2078            triggers: vec![SopTrigger::Manual],
2079            steps: vec![
2080                SopStep {
2081                    number: 1,
2082                    title: "Step one".into(),
2083                    body: "Do step one".into(),
2084                    suggested_tools: vec![],
2085                    requires_confirmation: false,
2086                    kind: SopStepKind::Execute,
2087                    schema: None,
2088                },
2089                SopStep {
2090                    number: 2,
2091                    title: "Checkpoint".into(),
2092                    body: "Pause for approval".into(),
2093                    suggested_tools: vec![],
2094                    requires_confirmation: false,
2095                    kind: SopStepKind::Checkpoint,
2096                    schema: None,
2097                },
2098                SopStep {
2099                    number: 3,
2100                    title: "Step three".into(),
2101                    body: "Final step".into(),
2102                    suggested_tools: vec![],
2103                    requires_confirmation: false,
2104                    kind: SopStepKind::Execute,
2105                    schema: None,
2106                },
2107            ],
2108            cooldown_secs: 0,
2109            max_concurrent: 1,
2110            location: None,
2111            deterministic: true,
2112        }
2113    }
2114
2115    #[test]
2116    fn deterministic_start_returns_deterministic_step() {
2117        let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
2118        let action = engine.start_run("det-sop", manual_event()).unwrap();
2119        assert!(
2120            matches!(action, SopRunAction::DeterministicStep { ref step, .. } if step.number == 1),
2121            "First action should be DeterministicStep for step 1"
2122        );
2123        let run_id = extract_run_id(&action).to_string();
2124        assert!(run_id.starts_with("det-"));
2125    }
2126
2127    #[test]
2128    fn deterministic_start_routes_through_start_run() {
2129        let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
2130        // start_run should auto-route to start_deterministic_run
2131        let action = engine.start_run("det-sop", manual_event()).unwrap();
2132        assert!(matches!(action, SopRunAction::DeterministicStep { .. }));
2133    }
2134
2135    #[test]
2136    fn deterministic_advance_pipes_output() {
2137        let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
2138        let action = engine.start_run("det-sop", manual_event()).unwrap();
2139        let run_id = extract_run_id(&action).to_string();
2140
2141        // Advance step 1 with output
2142        let output = serde_json::json!({"result": "step1_done"});
2143        let action = engine
2144            .advance_deterministic_step(&run_id, output.clone())
2145            .unwrap();
2146
2147        // Step 2 is a checkpoint — should pause
2148        assert!(
2149            matches!(action, SopRunAction::CheckpointWait { ref step, .. } if step.number == 2),
2150            "Step 2 (checkpoint) should return CheckpointWait"
2151        );
2152    }
2153
2154    #[test]
2155    fn deterministic_checkpoint_pauses_run() {
2156        let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
2157        let action = engine.start_run("det-sop", manual_event()).unwrap();
2158        let run_id = extract_run_id(&action).to_string();
2159
2160        // Complete step 1
2161        let action = engine
2162            .advance_deterministic_step(&run_id, serde_json::json!({"ok": true}))
2163            .unwrap();
2164
2165        // Should be at checkpoint
2166        assert!(matches!(action, SopRunAction::CheckpointWait { .. }));
2167
2168        // Run should be PausedCheckpoint
2169        let run = engine.get_run(&run_id).unwrap();
2170        assert_eq!(run.status, SopRunStatus::PausedCheckpoint);
2171        assert!(run.waiting_since.is_some());
2172    }
2173
2174    #[test]
2175    fn deterministic_completion_tracks_savings() {
2176        let mut sop = deterministic_sop("det-sop");
2177        // Simplify: 2 execute steps, no checkpoint
2178        sop.steps = vec![
2179            SopStep {
2180                number: 1,
2181                title: "Step one".into(),
2182                body: "Do it".into(),
2183                suggested_tools: vec![],
2184                requires_confirmation: false,
2185                kind: SopStepKind::Execute,
2186                schema: None,
2187            },
2188            SopStep {
2189                number: 2,
2190                title: "Step two".into(),
2191                body: "Do it too".into(),
2192                suggested_tools: vec![],
2193                requires_confirmation: false,
2194                kind: SopStepKind::Execute,
2195                schema: None,
2196            },
2197        ];
2198        let mut engine = engine_with_sops(vec![sop]);
2199
2200        let action = engine.start_run("det-sop", manual_event()).unwrap();
2201        let run_id = extract_run_id(&action).to_string();
2202
2203        // Complete step 1
2204        let action = engine
2205            .advance_deterministic_step(&run_id, serde_json::json!("s1"))
2206            .unwrap();
2207        assert!(matches!(action, SopRunAction::DeterministicStep { .. }));
2208
2209        // Complete step 2
2210        let action = engine
2211            .advance_deterministic_step(&run_id, serde_json::json!("s2"))
2212            .unwrap();
2213        assert!(matches!(action, SopRunAction::Completed { .. }));
2214
2215        // Check savings
2216        let savings = engine.deterministic_savings();
2217        assert_eq!(savings.total_runs, 1);
2218        assert_eq!(savings.total_llm_calls_saved, 2);
2219    }
2220
2221    #[test]
2222    fn deterministic_non_deterministic_sop_rejected() {
2223        let mut engine = engine_with_sops(vec![test_sop(
2224            "s1",
2225            SopExecutionMode::Auto,
2226            SopPriority::Normal,
2227        )]);
2228        let result = engine.start_deterministic_run("s1", manual_event());
2229        assert!(result.is_err());
2230        assert!(
2231            result
2232                .unwrap_err()
2233                .to_string()
2234                .contains("not in deterministic mode")
2235        );
2236    }
2237
2238    #[test]
2239    fn new_engine_without_sops_dir_stays_empty() {
2240        let config = SopConfig {
2241            sops_dir: None,
2242            ..Default::default()
2243        };
2244        let engine = SopEngine::new(config);
2245        assert!(
2246            engine.sops().is_empty(),
2247            "engine without sops_dir must have no SOPs"
2248        );
2249    }
2250
2251    #[test]
2252    fn reload_loads_sops_when_sops_dir_is_configured() {
2253        let tmp = tempfile::tempdir().unwrap();
2254        let sops_dir = tmp.path().join("my_sops");
2255        let sop_subdir = sops_dir.join("test-sop");
2256        std::fs::create_dir_all(&sop_subdir).unwrap();
2257
2258        std::fs::write(
2259            sop_subdir.join("SOP.toml"),
2260            r#"
2261[sop]
2262name = "test-sop"
2263description = "A test SOP"
2264version = "1.0.0"
2265
2266[[triggers]]
2267type = "manual"
2268"#,
2269        )
2270        .unwrap();
2271
2272        let config = SopConfig {
2273            sops_dir: Some(sops_dir.to_string_lossy().into_owned()),
2274            ..Default::default()
2275        };
2276        let mut engine = SopEngine::new(config);
2277        engine.reload(tmp.path());
2278        assert_eq!(
2279            engine.sops().len(),
2280            1,
2281            "reload must populate SOPs from disk"
2282        );
2283        assert_eq!(engine.sops()[0].name, "test-sop");
2284    }
2285}