1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::path::{Path, PathBuf};
4
5use anyhow::{Result, bail};
6
7use super::condition::evaluate_condition;
8use super::load_sops;
9use super::types::{
10 DeterministicRunState, DeterministicSavings, Sop, SopEvent, SopExecutionMode, SopPriority,
11 SopRun, SopRunAction, SopRunStatus, SopStep, SopStepKind, SopStepResult, SopStepStatus,
12 SopTrigger, SopTriggerSource,
13};
14use zeroclaw_config::schema::SopConfig;
15
16pub struct SopEngine {
18 sops: Vec<Sop>,
19 active_runs: HashMap<String, SopRun>,
20 finished_runs: Vec<SopRun>,
22 config: SopConfig,
23 run_counter: u64,
24 deterministic_savings: DeterministicSavings,
26}
27
28impl SopEngine {
29 pub fn new(config: SopConfig) -> Self {
31 Self {
32 sops: Vec::new(),
33 active_runs: HashMap::new(),
34 finished_runs: Vec::new(),
35 config,
36 run_counter: 0,
37 deterministic_savings: DeterministicSavings::default(),
38 }
39 }
40
41 pub fn reload(&mut self, workspace_dir: &Path) {
43 self.sops = load_sops(
44 workspace_dir,
45 self.config.sops_dir.as_deref(),
46 super::parse_execution_mode(&self.config.default_execution_mode),
47 );
48 ::zeroclaw_log::record!(
49 INFO,
50 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
51 &format!("SOP engine loaded {} SOPs", self.sops.len())
52 );
53 }
54
55 pub fn sops(&self) -> &[Sop] {
57 &self.sops
58 }
59
60 pub fn active_runs(&self) -> &HashMap<String, SopRun> {
62 &self.active_runs
63 }
64
65 pub fn get_run(&self, run_id: &str) -> Option<&SopRun> {
67 self.active_runs
68 .get(run_id)
69 .or_else(|| self.finished_runs.iter().find(|r| r.run_id == run_id))
70 }
71
72 pub fn get_sop(&self, name: &str) -> Option<&Sop> {
74 self.sops.iter().find(|s| s.name == name)
75 }
76
77 pub fn match_trigger(&self, event: &SopEvent) -> Vec<&Sop> {
82 self.sops
83 .iter()
84 .filter(|sop| sop.triggers.iter().any(|t| trigger_matches(t, event)))
85 .collect()
86 }
87
88 pub fn can_start(&self, sop_name: &str) -> bool {
93 let sop = match self.get_sop(sop_name) {
94 Some(s) => s,
95 None => return false,
96 };
97
98 let active_for_sop = self
100 .active_runs
101 .values()
102 .filter(|r| r.sop_name == sop_name)
103 .count();
104 if active_for_sop >= sop.max_concurrent as usize {
105 return false;
106 }
107
108 if self.active_runs.len() >= self.config.max_concurrent_total {
110 return false;
111 }
112
113 if sop.cooldown_secs > 0
115 && let Some(last) = self.last_finished_run(sop_name)
116 && let Some(ref completed_at) = last.completed_at
117 && !cooldown_elapsed(completed_at, sop.cooldown_secs)
118 {
119 return false;
120 }
121
122 true
123 }
124
125 pub fn start_run(&mut self, sop_name: &str, event: SopEvent) -> Result<SopRunAction> {
128 if self
130 .get_sop(sop_name)
131 .is_some_and(|s| s.execution_mode == SopExecutionMode::Deterministic)
132 {
133 return self.start_deterministic_run(sop_name, event);
134 }
135
136 let sop = self
137 .get_sop(sop_name)
138 .ok_or_else(|| {
139 ::zeroclaw_log::record!(
140 WARN,
141 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
142 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
143 .with_attrs(::serde_json::json!({"sop_name": sop_name})),
144 "SOP engine: sop not found"
145 );
146 anyhow::Error::msg(format!("SOP not found: {sop_name}"))
147 })?
148 .clone();
149
150 if !self.can_start(sop_name) {
151 bail!(
152 "Cannot start SOP '{}': cooldown or concurrency limit reached",
153 sop_name
154 );
155 }
156
157 if sop.steps.is_empty() {
158 bail!("SOP '{}' has no steps defined", sop_name);
159 }
160
161 self.run_counter += 1;
162 let dur = std::time::SystemTime::now()
163 .duration_since(std::time::UNIX_EPOCH)
164 .unwrap_or_default();
165 let epoch_ms = dur.as_secs() * 1000 + u64::from(dur.subsec_millis());
166 let run_id = format!("run-{epoch_ms}-{:04}", self.run_counter);
167 let now = now_iso8601();
168
169 let run = SopRun {
170 run_id: run_id.clone(),
171 sop_name: sop_name.to_string(),
172 trigger_event: event,
173 status: SopRunStatus::Running,
174 current_step: 1,
175 total_steps: u32::try_from(sop.steps.len()).unwrap_or(u32::MAX),
176 started_at: now,
177 completed_at: None,
178 step_results: Vec::new(),
179 waiting_since: None,
180 llm_calls_saved: 0,
181 };
182
183 self.active_runs.insert(run_id.clone(), run);
184
185 ::zeroclaw_log::record!(
186 INFO,
187 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
188 &format!("SOP run {} started for '{}'", run_id, sop_name)
189 );
190
191 let step = sop.steps[0].clone();
193 let context = format_step_context(&sop, &self.active_runs[&run_id], &step);
194 let action = resolve_step_action(&sop, &step, run_id.clone(), context);
195
196 if matches!(action, SopRunAction::WaitApproval { .. })
198 && let Some(run) = self.active_runs.get_mut(&run_id)
199 {
200 run.status = SopRunStatus::WaitingApproval;
201 run.waiting_since = Some(now_iso8601());
202 }
203
204 Ok(action)
205 }
206
207 pub fn advance_step(&mut self, run_id: &str, result: SopStepResult) -> Result<SopRunAction> {
210 let run = self.active_runs.get_mut(run_id).ok_or_else(|| {
211 ::zeroclaw_log::record!(
212 WARN,
213 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
214 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
215 .with_attrs(::serde_json::json!({"run_id": run_id})),
216 "SOP engine: active run not found"
217 );
218 anyhow::Error::msg(format!("Active run not found: {run_id}"))
219 })?;
220
221 let sop = self
222 .sops
223 .iter()
224 .find(|s| s.name == run.sop_name)
225 .ok_or_else(|| {
226 let sop_name = run.sop_name.clone();
227 ::zeroclaw_log::record!(
228 WARN,
229 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
230 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
231 .with_attrs(::serde_json::json!({"sop_name": sop_name})),
232 "SOP engine: sop no longer loaded (definition removed mid-run)"
233 );
234 anyhow::Error::msg(format!("SOP '{}' no longer loaded", run.sop_name))
235 })?
236 .clone();
237
238 if sop.execution_mode == SopExecutionMode::Deterministic {
242 if result.status == SopStepStatus::Failed {
243 let reason = format!("Step {} failed: {}", result.step_number, result.output);
244 return Ok(self.finish_run(run_id, SopRunStatus::Failed, Some(reason)));
245 }
246 let piped = serde_json::Value::String(result.output.clone());
247 return self.advance_deterministic_step(
248 run_id,
249 piped,
250 Some((result.started_at.clone(), result.completed_at.clone())),
251 );
252 }
253
254 run.step_results.push(result.clone());
256
257 if result.status == SopStepStatus::Failed {
259 let reason = format!("Step {} failed: {}", result.step_number, result.output);
260 ::zeroclaw_log::record!(
261 WARN,
262 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
263 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
264 .with_attrs(
265 ::serde_json::json!({"run_id": run_id, "reason": reason.to_string()})
266 ),
267 "SOP run : "
268 );
269 return Ok(self.finish_run(run_id, SopRunStatus::Failed, Some(reason)));
270 }
271
272 let next_step_num = run.current_step + 1;
274 if next_step_num > run.total_steps {
275 ::zeroclaw_log::record!(
277 INFO,
278 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
279 .with_attrs(::serde_json::json!({"run_id": run_id})),
280 "SOP run completed successfully"
281 );
282 return Ok(self.finish_run(run_id, SopRunStatus::Completed, None));
283 }
284
285 let run = self.active_runs.get_mut(run_id).unwrap();
287 run.current_step = next_step_num;
288
289 let step_idx = (next_step_num - 1) as usize;
290 let step = sop.steps[step_idx].clone();
291 let context = format_step_context(&sop, run, &step);
292 let run_id_str = run_id.to_string();
293 let action = resolve_step_action(&sop, &step, run_id_str.clone(), context);
294
295 if matches!(action, SopRunAction::WaitApproval { .. })
297 && let Some(run) = self.active_runs.get_mut(&run_id_str)
298 {
299 run.status = SopRunStatus::WaitingApproval;
300 run.waiting_since = Some(now_iso8601());
301 }
302
303 Ok(action)
304 }
305
306 pub fn cancel_run(&mut self, run_id: &str) -> Result<()> {
308 if !self.active_runs.contains_key(run_id) {
309 bail!("Active run not found: {run_id}");
310 }
311 self.finish_run(run_id, SopRunStatus::Cancelled, None);
312 ::zeroclaw_log::record!(
313 INFO,
314 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
315 .with_attrs(::serde_json::json!({"run_id": run_id})),
316 "SOP run cancelled"
317 );
318 Ok(())
319 }
320
321 pub fn approve_step(&mut self, run_id: &str) -> Result<SopRunAction> {
323 let run = self.active_runs.get_mut(run_id).ok_or_else(|| {
324 ::zeroclaw_log::record!(
325 WARN,
326 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
327 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
328 .with_attrs(::serde_json::json!({"run_id": run_id})),
329 "SOP engine: active run not found"
330 );
331 anyhow::Error::msg(format!("Active run not found: {run_id}"))
332 })?;
333
334 if run.status == SopRunStatus::PausedCheckpoint {
338 let piped = run
339 .step_results
340 .last()
341 .map(|r| serde_json::Value::String(r.output.clone()))
342 .unwrap_or(serde_json::Value::Null);
343 run.status = SopRunStatus::Running;
344 run.waiting_since = None;
345 return self.advance_deterministic_step(run_id, piped, None);
346 }
347
348 if run.status != SopRunStatus::WaitingApproval {
349 bail!(
350 "Run {run_id} is not waiting for approval or paused at a checkpoint (status: {})",
351 run.status
352 );
353 }
354
355 run.status = SopRunStatus::Running;
356 run.waiting_since = None;
357
358 let sop = self
359 .sops
360 .iter()
361 .find(|s| s.name == run.sop_name)
362 .ok_or_else(|| {
363 let sop_name = run.sop_name.clone();
364 ::zeroclaw_log::record!(
365 WARN,
366 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
367 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
368 .with_attrs(::serde_json::json!({"sop_name": sop_name})),
369 "SOP engine: sop no longer loaded (definition removed mid-run)"
370 );
371 anyhow::Error::msg(format!("SOP '{}' no longer loaded", run.sop_name))
372 })?
373 .clone();
374
375 let step_idx = (run.current_step - 1) as usize;
376 let step = sop.steps[step_idx].clone();
377 let context = format_step_context(&sop, run, &step);
378
379 Ok(SopRunAction::ExecuteStep {
380 run_id: run_id.to_string(),
381 step,
382 context,
383 })
384 }
385
386 pub fn finished_runs(&self, sop_name: Option<&str>) -> Vec<&SopRun> {
388 self.finished_runs
389 .iter()
390 .filter(|r| sop_name.is_none_or(|name| r.sop_name == name))
391 .collect()
392 }
393
394 pub fn deterministic_savings(&self) -> &DeterministicSavings {
396 &self.deterministic_savings
397 }
398
399 pub fn start_deterministic_run(
404 &mut self,
405 sop_name: &str,
406 event: SopEvent,
407 ) -> Result<SopRunAction> {
408 let sop = self
409 .get_sop(sop_name)
410 .ok_or_else(|| {
411 ::zeroclaw_log::record!(
412 WARN,
413 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
414 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
415 .with_attrs(::serde_json::json!({"sop_name": sop_name})),
416 "SOP engine: sop not found"
417 );
418 anyhow::Error::msg(format!("SOP not found: {sop_name}"))
419 })?
420 .clone();
421
422 if sop.execution_mode != SopExecutionMode::Deterministic {
423 bail!(
424 "SOP '{}' is not in deterministic mode (mode: {})",
425 sop_name,
426 sop.execution_mode
427 );
428 }
429
430 if !self.can_start(sop_name) {
431 bail!(
432 "Cannot start SOP '{}': cooldown or concurrency limit reached",
433 sop_name
434 );
435 }
436
437 if sop.steps.is_empty() {
438 bail!("SOP '{}' has no steps defined", sop_name);
439 }
440
441 self.run_counter += 1;
442 let dur = std::time::SystemTime::now()
443 .duration_since(std::time::UNIX_EPOCH)
444 .unwrap_or_default();
445 let epoch_ms = dur.as_secs() * 1000 + u64::from(dur.subsec_millis());
446 let run_id = format!("det-{epoch_ms}-{:04}", self.run_counter);
447 let now = now_iso8601();
448
449 let total_steps = u32::try_from(sop.steps.len()).unwrap_or(u32::MAX);
450 let run = SopRun {
451 run_id: run_id.clone(),
452 sop_name: sop_name.to_string(),
453 trigger_event: event,
454 status: SopRunStatus::Running,
455 current_step: 1,
456 total_steps,
457 started_at: now,
458 completed_at: None,
459 step_results: Vec::new(),
460 waiting_since: None,
461 llm_calls_saved: 0,
462 };
463
464 self.active_runs.insert(run_id.clone(), run);
465 ::zeroclaw_log::record!(
466 INFO,
467 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
468 &format!(
469 "Deterministic SOP run {} started for '{}'",
470 run_id, sop_name
471 )
472 );
473
474 let step = sop.steps[0].clone();
476 let input = serde_json::Value::Null;
477 self.resolve_deterministic_action(&sop, &run_id, &step, input)
478 }
479
480 pub fn advance_deterministic_step(
483 &mut self,
484 run_id: &str,
485 step_output: serde_json::Value,
486 step_timestamps: Option<(String, Option<String>)>,
487 ) -> Result<SopRunAction> {
488 let run = self.active_runs.get_mut(run_id).ok_or_else(|| {
489 ::zeroclaw_log::record!(
490 WARN,
491 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
492 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
493 .with_attrs(::serde_json::json!({"run_id": run_id})),
494 "SOP engine: active run not found"
495 );
496 anyhow::Error::msg(format!("Active run not found: {run_id}"))
497 })?;
498
499 let sop = self
500 .sops
501 .iter()
502 .find(|s| s.name == run.sop_name)
503 .ok_or_else(|| {
504 let sop_name = run.sop_name.clone();
505 ::zeroclaw_log::record!(
506 WARN,
507 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
508 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
509 .with_attrs(::serde_json::json!({"sop_name": sop_name})),
510 "SOP engine: sop no longer loaded (definition removed mid-run)"
511 );
512 anyhow::Error::msg(format!("SOP '{}' no longer loaded", run.sop_name))
513 })?
514 .clone();
515
516 let (started_at, completed_at) = match step_timestamps {
518 Some((started, completed)) => (started, completed),
519 None => (run.started_at.clone(), Some(now_iso8601())),
520 };
521 let step_result = SopStepResult {
522 step_number: run.current_step,
523 status: SopStepStatus::Completed,
524 output: step_output.to_string(),
525 started_at,
526 completed_at,
527 };
528 run.step_results.push(step_result);
529
530 run.llm_calls_saved += 1;
532
533 let next_step_num = run.current_step + 1;
535 if next_step_num > run.total_steps {
536 ::zeroclaw_log::record!(
537 INFO,
538 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
539 &format!(
540 "Deterministic SOP run {run_id} completed ({} LLM calls saved)",
541 run.llm_calls_saved
542 )
543 );
544 let saved = run.llm_calls_saved;
545 self.deterministic_savings.total_llm_calls_saved += saved;
546 self.deterministic_savings.total_runs += 1;
547 return Ok(self.finish_run(run_id, SopRunStatus::Completed, None));
548 }
549
550 let run = self.active_runs.get_mut(run_id).unwrap();
551 run.current_step = next_step_num;
552
553 let step_idx = (next_step_num - 1) as usize;
554 let step = sop.steps[step_idx].clone();
555 let run_id_owned = run_id.to_string();
556
557 self.resolve_deterministic_action(&sop, &run_id_owned, &step, step_output)
558 }
559
560 pub fn resume_deterministic_run(
562 &mut self,
563 state: DeterministicRunState,
564 ) -> Result<SopRunAction> {
565 let run = self.active_runs.get_mut(&state.run_id).ok_or_else(|| {
566 let run_id = state.run_id.clone();
567 ::zeroclaw_log::record!(
568 WARN,
569 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
570 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
571 .with_attrs(::serde_json::json!({"run_id": run_id})),
572 "SOP engine: active run not found"
573 );
574 anyhow::Error::msg(format!("Active run not found: {}", state.run_id))
575 })?;
576
577 if run.status != SopRunStatus::PausedCheckpoint {
578 bail!(
579 "Run {} is not paused at checkpoint (status: {})",
580 state.run_id,
581 run.status
582 );
583 }
584
585 let sop = self
586 .sops
587 .iter()
588 .find(|s| s.name == run.sop_name)
589 .ok_or_else(|| {
590 let sop_name = run.sop_name.clone();
591 ::zeroclaw_log::record!(
592 WARN,
593 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
594 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
595 .with_attrs(::serde_json::json!({"sop_name": sop_name})),
596 "SOP engine: sop no longer loaded (definition removed mid-run)"
597 );
598 anyhow::Error::msg(format!("SOP '{}' no longer loaded", run.sop_name))
599 })?
600 .clone();
601
602 run.status = SopRunStatus::Running;
603 run.waiting_since = None;
604 run.llm_calls_saved = state.llm_calls_saved;
605
606 let next_step_num = state.last_completed_step + 1;
608 if next_step_num > state.total_steps {
609 ::zeroclaw_log::record!(
610 INFO,
611 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
612 &format!(
613 "Deterministic SOP run {} completed on resume ({} LLM calls saved)",
614 state.run_id, state.llm_calls_saved
615 )
616 );
617 self.deterministic_savings.total_llm_calls_saved += state.llm_calls_saved;
618 self.deterministic_savings.total_runs += 1;
619 return Ok(self.finish_run(&state.run_id, SopRunStatus::Completed, None));
620 }
621
622 let run = self.active_runs.get_mut(&state.run_id).unwrap();
623 run.current_step = next_step_num;
624
625 let step_idx = (next_step_num - 1) as usize;
626 let step = sop.steps[step_idx].clone();
627
628 let last_output = state
630 .step_outputs
631 .get(&state.last_completed_step)
632 .cloned()
633 .unwrap_or(serde_json::Value::Null);
634
635 let run_id = state.run_id.clone();
636 self.resolve_deterministic_action(&sop, &run_id, &step, last_output)
637 }
638
639 fn resolve_deterministic_action(
641 &mut self,
642 sop: &Sop,
643 run_id: &str,
644 step: &SopStep,
645 input: serde_json::Value,
646 ) -> Result<SopRunAction> {
647 if step.kind == SopStepKind::Checkpoint {
648 if let Some(run) = self.active_runs.get_mut(run_id) {
650 run.status = SopRunStatus::PausedCheckpoint;
651 run.waiting_since = Some(now_iso8601());
652 }
653
654 let state_file = self.persist_deterministic_state(run_id, sop)?;
655
656 ::zeroclaw_log::record!(
657 INFO,
658 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
659 &format!(
660 "Deterministic SOP run {run_id}: checkpoint at step {} '{}', state persisted to {}",
661 step.number,
662 step.title,
663 state_file.display().to_string()
664 )
665 );
666
667 Ok(SopRunAction::CheckpointWait {
668 run_id: run_id.to_string(),
669 step: step.clone(),
670 state_file,
671 })
672 } else {
673 Ok(SopRunAction::DeterministicStep {
674 run_id: run_id.to_string(),
675 step: step.clone(),
676 input,
677 })
678 }
679 }
680
681 fn persist_deterministic_state(&self, run_id: &str, sop: &Sop) -> Result<PathBuf> {
683 let run = self.active_runs.get(run_id).ok_or_else(|| {
684 ::zeroclaw_log::record!(
685 WARN,
686 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
687 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
688 .with_attrs(::serde_json::json!({"run_id": run_id})),
689 "SOP engine: run not found in history"
690 );
691 anyhow::Error::msg(format!("Run not found: {run_id}"))
692 })?;
693
694 let mut step_outputs = HashMap::new();
695 for result in &run.step_results {
696 let value = serde_json::from_str(&result.output)
698 .unwrap_or_else(|_| serde_json::Value::String(result.output.clone()));
699 step_outputs.insert(result.step_number, value);
700 }
701
702 let state = DeterministicRunState {
703 run_id: run_id.to_string(),
704 sop_name: run.sop_name.clone(),
705 last_completed_step: run.current_step.saturating_sub(1),
706 total_steps: run.total_steps,
707 step_outputs,
708 persisted_at: now_iso8601(),
709 llm_calls_saved: run.llm_calls_saved,
710 paused_at_checkpoint: run.status == SopRunStatus::PausedCheckpoint,
711 };
712
713 let temp_dir = std::env::temp_dir();
715 let dir = sop.location.as_deref().unwrap_or(temp_dir.as_path());
716 let state_file = dir.join(format!("{run_id}.state.json"));
717 let json = serde_json::to_string_pretty(&state)?;
718 std::fs::write(&state_file, json)?;
719
720 Ok(state_file)
721 }
722
723 pub fn load_deterministic_state(path: &Path) -> Result<DeterministicRunState> {
725 let content = std::fs::read_to_string(path)?;
726 let state: DeterministicRunState = serde_json::from_str(&content)?;
727 Ok(state)
728 }
729
730 pub fn check_approval_timeouts(&mut self) -> Vec<SopRunAction> {
736 let timeout_secs = self.config.approval_timeout_secs;
737 if timeout_secs == 0 {
738 return Vec::new();
739 }
740
741 let timed_out: Vec<(String, bool)> = self
744 .active_runs
745 .values()
746 .filter(|r| r.status == SopRunStatus::WaitingApproval)
747 .filter(|r| {
748 r.waiting_since
749 .as_deref()
750 .is_some_and(|ts| cooldown_elapsed(ts, timeout_secs))
751 })
752 .map(|r| {
753 let is_critical =
754 self.sops
755 .iter()
756 .find(|s| s.name == r.sop_name)
757 .is_some_and(|s| {
758 matches!(s.priority, SopPriority::Critical | SopPriority::High)
759 });
760 (r.run_id.clone(), is_critical)
761 })
762 .collect();
763
764 let mut actions = Vec::new();
765 for (run_id, is_critical) in timed_out {
766 if is_critical {
767 ::zeroclaw_log::record!(
769 INFO,
770 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
771 .with_attrs(::serde_json::json!({"run_id": run_id})),
772 "SOP run : approval timeout — auto-approving (critical/high priority)"
773 );
774 match self.approve_step(&run_id) {
775 Ok(action) => actions.push(action),
776 Err(e) => ::zeroclaw_log::record!(
777 WARN,
778 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
779 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
780 .with_attrs(
781 ::serde_json::json!({"error": format!("{}", e), "run_id": run_id})
782 ),
783 "SOP run : auto-approve failed"
784 ),
785 }
786 } else {
787 ::zeroclaw_log::record!(
788 INFO,
789 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
790 .with_attrs(::serde_json::json!({"run_id": run_id})),
791 "SOP run : approval timeout — waiting indefinitely (non-critical)"
792 );
793 }
794 }
795
796 actions
797 }
798
799 pub fn set_sops_for_test(&mut self, sops: Vec<Sop>) {
804 self.sops = sops;
805 }
806
807 pub fn last_finished_run(&self, sop_name: &str) -> Option<&SopRun> {
810 self.finished_runs
811 .iter()
812 .rev()
813 .find(|r| r.sop_name == sop_name)
814 }
815
816 pub fn finish_run(
817 &mut self,
818 run_id: &str,
819 status: SopRunStatus,
820 reason: Option<String>,
821 ) -> SopRunAction {
822 let mut run = self.active_runs.remove(run_id).unwrap();
823 run.status = status;
824 run.completed_at = Some(now_iso8601());
825 let sop_name = run.sop_name.clone();
826 let run_id_owned = run.run_id.clone();
827 self.finished_runs.push(run);
828
829 let max = self.config.max_finished_runs;
831 if max > 0 && self.finished_runs.len() > max {
832 let excess = self.finished_runs.len() - max;
833 self.finished_runs.drain(..excess);
834 }
835
836 match status {
837 SopRunStatus::Failed => SopRunAction::Failed {
838 run_id: run_id_owned,
839 sop_name,
840 reason: reason.unwrap_or_default(),
841 },
842 _ => SopRunAction::Completed {
843 run_id: run_id_owned,
844 sop_name,
845 },
846 }
847 }
848}
849
850fn trigger_matches(trigger: &SopTrigger, event: &SopEvent) -> bool {
854 match (trigger, event.source) {
855 (SopTrigger::Mqtt { topic, condition }, SopTriggerSource::Mqtt) => {
856 let topic_match = event
857 .topic
858 .as_deref()
859 .is_some_and(|t| mqtt_topic_matches(topic, t));
860 if !topic_match {
861 return false;
862 }
863 match condition {
865 Some(cond) => evaluate_condition(cond, event.payload.as_deref()),
866 None => true,
867 }
868 }
869
870 (SopTrigger::Webhook { path }, SopTriggerSource::Webhook) => {
871 event.topic.as_deref().is_some_and(|t| t == path)
872 }
873
874 (
875 SopTrigger::Peripheral {
876 board,
877 signal,
878 condition,
879 },
880 SopTriggerSource::Peripheral,
881 ) => {
882 let topic_match = event.topic.as_deref().is_some_and(|t| {
883 let expected = format!("{board}/{signal}");
884 t == expected
885 });
886 if !topic_match {
887 return false;
888 }
889 match condition {
891 Some(cond) => evaluate_condition(cond, event.payload.as_deref()),
892 None => true,
893 }
894 }
895
896 (SopTrigger::Cron { expression }, SopTriggerSource::Cron) => {
897 event.topic.as_deref().is_some_and(|t| t == expression)
898 }
899
900 (SopTrigger::Manual, SopTriggerSource::Manual) => true,
901
902 _ => false,
903 }
904}
905
906fn mqtt_topic_matches(pattern: &str, topic: &str) -> bool {
908 let pat_parts: Vec<&str> = pattern.split('/').collect();
909 let top_parts: Vec<&str> = topic.split('/').collect();
910
911 let mut pi = 0;
912 let mut ti = 0;
913
914 while pi < pat_parts.len() && ti < top_parts.len() {
915 match pat_parts[pi] {
916 "#" => return true, "+" => {
918 pi += 1;
920 ti += 1;
921 }
922 seg => {
923 if seg != top_parts[ti] {
924 return false;
925 }
926 pi += 1;
927 ti += 1;
928 }
929 }
930 }
931
932 pi == pat_parts.len() && ti == top_parts.len()
934}
935
936fn resolve_step_action(sop: &Sop, step: &SopStep, run_id: String, context: String) -> SopRunAction {
940 if step.requires_confirmation {
942 return SopRunAction::WaitApproval {
943 run_id,
944 step: step.clone(),
945 context,
946 };
947 }
948
949 let needs_approval = match sop.execution_mode {
950 SopExecutionMode::Auto | SopExecutionMode::Deterministic => false,
953 SopExecutionMode::Supervised => {
954 step.number == 1
956 }
957 SopExecutionMode::StepByStep => true,
958 SopExecutionMode::PriorityBased => match sop.priority {
959 SopPriority::Critical | SopPriority::High => false,
960 SopPriority::Normal | SopPriority::Low => {
961 step.number == 1
963 }
964 },
965 };
966
967 if needs_approval {
968 SopRunAction::WaitApproval {
969 run_id,
970 step: step.clone(),
971 context,
972 }
973 } else {
974 SopRunAction::ExecuteStep {
975 run_id,
976 step: step.clone(),
977 context,
978 }
979 }
980}
981
982fn format_step_context(sop: &Sop, run: &SopRun, step: &SopStep) -> String {
986 let mut ctx = format!(
987 "[SOP: {} (run {}) — Step {} of {}]\n\n",
988 sop.name, run.run_id, step.number, run.total_steps
989 );
990
991 let _ = writeln!(
992 ctx,
993 "Trigger: {} {}",
994 run.trigger_event.source,
995 run.trigger_event.topic.as_deref().unwrap_or("(no topic)")
996 );
997
998 if let Some(ref payload) = run.trigger_event.payload {
999 let _ = writeln!(ctx, "Payload: {payload}");
1000 }
1001
1002 if let Some(prev) = run.step_results.last() {
1004 let _ = writeln!(
1005 ctx,
1006 "Previous: Step {} {} — {}",
1007 prev.step_number, prev.status, prev.output
1008 );
1009 }
1010
1011 let _ = write!(ctx, "\nCurrent step: **{}**\n{}\n", step.title, step.body);
1012
1013 if !step.suggested_tools.is_empty() {
1014 let _ = write!(
1015 ctx,
1016 "\nSuggested tools: {}\n",
1017 step.suggested_tools.join(", ")
1018 );
1019 }
1020
1021 ctx.push_str("\nWhen done, report your result.\n");
1022
1023 ctx
1024}
1025
1026pub fn now_iso8601() -> String {
1029 let now = std::time::SystemTime::now()
1031 .duration_since(std::time::UNIX_EPOCH)
1032 .unwrap_or_default();
1033 let secs = now.as_secs();
1035 let days = secs / 86400;
1036 let time_secs = secs % 86400;
1037 let hours = time_secs / 3600;
1038 let minutes = (time_secs % 3600) / 60;
1039 let seconds = time_secs % 60;
1040
1041 let (year, month, day) = days_to_ymd(days);
1043 format!("{year:04}-{month:02}-{day:02}T{hours:02}:{minutes:02}:{seconds:02}Z")
1044}
1045
1046fn days_to_ymd(mut days: u64) -> (u64, u64, u64) {
1048 days += 719_468;
1050 let era = days / 146_097;
1051 let doe = days - era * 146_097;
1052 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
1053 let y = yoe + era * 400;
1054 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1055 let mp = (5 * doy + 2) / 153;
1056 let d = doy - (153 * mp + 2) / 5 + 1;
1057 let m = if mp < 10 { mp + 3 } else { mp - 9 };
1058 let y = if m <= 2 { y + 1 } else { y };
1059 (y, m, d)
1060}
1061
1062fn cooldown_elapsed(completed_at: &str, cooldown_secs: u64) -> bool {
1064 let completed = parse_iso8601_secs(completed_at);
1066 let now = std::time::SystemTime::now()
1067 .duration_since(std::time::UNIX_EPOCH)
1068 .unwrap_or_default()
1069 .as_secs();
1070
1071 match completed {
1072 Some(ts) => now.saturating_sub(ts) >= cooldown_secs,
1073 None => true, }
1075}
1076
1077fn parse_iso8601_secs(input: &str) -> Option<u64> {
1079 let input = input.trim_end_matches('Z');
1081 let parts: Vec<&str> = input.split('T').collect();
1082 if parts.len() != 2 {
1083 return None;
1084 }
1085 let date_parts: Vec<u64> = parts[0].split('-').filter_map(|p| p.parse().ok()).collect();
1086 let time_parts: Vec<u64> = parts[1].split(':').filter_map(|p| p.parse().ok()).collect();
1087 if date_parts.len() != 3 || time_parts.len() != 3 {
1088 return None;
1089 }
1090 let (year, month, day) = (date_parts[0], date_parts[1], date_parts[2]);
1091 let (hour, min, sec) = (time_parts[0], time_parts[1], time_parts[2]);
1092
1093 let year_adj = if month <= 2 { year - 1 } else { year };
1095 let month_adj = if month > 2 { month - 3 } else { month + 9 };
1096 let era = year_adj / 400;
1097 let yoe = year_adj - era * 400;
1098 let doy = (153 * month_adj + 2) / 5 + day - 1;
1099 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1100 let days = era * 146_097 + doe - 719_468;
1101
1102 Some(days * 86400 + hour * 3600 + min * 60 + sec)
1103}
1104
1105#[cfg(test)]
1106mod tests {
1107 use super::*;
1108 use crate::sop::types::SopExecutionMode;
1109
1110 fn manual_event() -> SopEvent {
1111 SopEvent {
1112 source: SopTriggerSource::Manual,
1113 topic: None,
1114 payload: None,
1115 timestamp: now_iso8601(),
1116 }
1117 }
1118
1119 fn mqtt_event(topic: &str, payload: &str) -> SopEvent {
1120 SopEvent {
1121 source: SopTriggerSource::Mqtt,
1122 topic: Some(topic.into()),
1123 payload: Some(payload.into()),
1124 timestamp: now_iso8601(),
1125 }
1126 }
1127
1128 fn test_sop(name: &str, mode: SopExecutionMode, priority: SopPriority) -> Sop {
1129 Sop {
1130 name: name.into(),
1131 description: format!("Test SOP: {name}"),
1132 version: "1.0.0".into(),
1133 priority,
1134 execution_mode: mode,
1135 triggers: vec![SopTrigger::Manual],
1136 steps: vec![
1137 SopStep {
1138 number: 1,
1139 title: "Step one".into(),
1140 body: "Do step one".into(),
1141 suggested_tools: vec!["shell".into()],
1142 requires_confirmation: false,
1143 kind: SopStepKind::default(),
1144 schema: None,
1145 },
1146 SopStep {
1147 number: 2,
1148 title: "Step two".into(),
1149 body: "Do step two".into(),
1150 suggested_tools: vec![],
1151 requires_confirmation: false,
1152 kind: SopStepKind::default(),
1153 schema: None,
1154 },
1155 ],
1156 cooldown_secs: 0,
1157 max_concurrent: 1,
1158 location: None,
1159 deterministic: false,
1160 }
1161 }
1162
1163 fn engine_with_sops(sops: Vec<Sop>) -> SopEngine {
1164 let mut engine = SopEngine::new(SopConfig::default());
1165 engine.sops = sops;
1166 engine
1167 }
1168
1169 fn extract_run_id(action: &SopRunAction) -> &str {
1171 match action {
1172 SopRunAction::ExecuteStep { run_id, .. }
1173 | SopRunAction::WaitApproval { run_id, .. }
1174 | SopRunAction::DeterministicStep { run_id, .. }
1175 | SopRunAction::CheckpointWait { run_id, .. }
1176 | SopRunAction::Completed { run_id, .. }
1177 | SopRunAction::Failed { run_id, .. } => run_id,
1178 }
1179 }
1180
1181 #[allow(dead_code)]
1183 fn first_active_run_id(engine: &SopEngine) -> String {
1184 engine
1185 .active_runs()
1186 .keys()
1187 .next()
1188 .expect("expected at least one active run")
1189 .clone()
1190 }
1191
1192 #[test]
1195 fn match_manual_trigger() {
1196 let engine = engine_with_sops(vec![test_sop(
1197 "s1",
1198 SopExecutionMode::Auto,
1199 SopPriority::Normal,
1200 )]);
1201 let matches = engine.match_trigger(&manual_event());
1202 assert_eq!(matches.len(), 1);
1203 assert_eq!(matches[0].name, "s1");
1204 }
1205
1206 #[test]
1207 fn no_match_for_wrong_source() {
1208 let engine = engine_with_sops(vec![test_sop(
1209 "s1",
1210 SopExecutionMode::Auto,
1211 SopPriority::Normal,
1212 )]);
1213 let event = mqtt_event("sensors/temp", "{}");
1214 let matches = engine.match_trigger(&event);
1215 assert!(matches.is_empty());
1216 }
1217
1218 #[test]
1219 fn match_mqtt_trigger_exact() {
1220 let sop = Sop {
1221 triggers: vec![SopTrigger::Mqtt {
1222 topic: "plant/pump/pressure".into(),
1223 condition: None,
1224 }],
1225 ..test_sop(
1226 "pressure-sop",
1227 SopExecutionMode::Auto,
1228 SopPriority::Critical,
1229 )
1230 };
1231 let engine = engine_with_sops(vec![sop]);
1232 let matches = engine.match_trigger(&mqtt_event("plant/pump/pressure", "87.3"));
1233 assert_eq!(matches.len(), 1);
1234 }
1235
1236 #[test]
1237 fn match_mqtt_wildcard_plus() {
1238 let sop = Sop {
1239 triggers: vec![SopTrigger::Mqtt {
1240 topic: "plant/+/pressure".into(),
1241 condition: None,
1242 }],
1243 ..test_sop("wildcard-sop", SopExecutionMode::Auto, SopPriority::Normal)
1244 };
1245 let engine = engine_with_sops(vec![sop]);
1246 assert_eq!(
1247 engine
1248 .match_trigger(&mqtt_event("plant/pump_3/pressure", "87"))
1249 .len(),
1250 1
1251 );
1252 assert!(
1253 engine
1254 .match_trigger(&mqtt_event("plant/pump_3/temperature", "50"))
1255 .is_empty()
1256 );
1257 }
1258
1259 #[test]
1260 fn match_mqtt_wildcard_hash() {
1261 let sop = Sop {
1262 triggers: vec![SopTrigger::Mqtt {
1263 topic: "plant/#".into(),
1264 condition: None,
1265 }],
1266 ..test_sop("hash-sop", SopExecutionMode::Auto, SopPriority::Normal)
1267 };
1268 let engine = engine_with_sops(vec![sop]);
1269 assert_eq!(
1270 engine
1271 .match_trigger(&mqtt_event("plant/pump/pressure", "87"))
1272 .len(),
1273 1
1274 );
1275 assert_eq!(
1276 engine
1277 .match_trigger(&mqtt_event("plant/a/b/c/d", "x"))
1278 .len(),
1279 1
1280 );
1281 }
1282
1283 #[test]
1284 fn mqtt_topic_matching_edge_cases() {
1285 assert!(mqtt_topic_matches("a/b/c", "a/b/c"));
1286 assert!(!mqtt_topic_matches("a/b/c", "a/b/d"));
1287 assert!(!mqtt_topic_matches("a/b/c", "a/b"));
1288 assert!(!mqtt_topic_matches("a/b", "a/b/c"));
1289 assert!(mqtt_topic_matches("+/+/+", "a/b/c"));
1290 assert!(!mqtt_topic_matches("+/+", "a/b/c"));
1291 assert!(mqtt_topic_matches("#", "a/b/c"));
1292 assert!(mqtt_topic_matches("a/#", "a/b/c"));
1293 assert!(!mqtt_topic_matches("b/#", "a/b/c"));
1294 }
1295
1296 #[test]
1299 fn webhook_trigger_matches_exact_path() {
1300 let sop = Sop {
1301 triggers: vec![SopTrigger::Webhook {
1302 path: "/webhook".into(),
1303 }],
1304 ..test_sop("webhook-sop", SopExecutionMode::Auto, SopPriority::Normal)
1305 };
1306 let engine = engine_with_sops(vec![sop]);
1307
1308 let event = SopEvent {
1310 source: SopTriggerSource::Webhook,
1311 topic: Some("/webhook".into()),
1312 payload: None,
1313 timestamp: now_iso8601(),
1314 };
1315 assert_eq!(engine.match_trigger(&event).len(), 1);
1316 }
1317
1318 #[test]
1319 fn webhook_trigger_rejects_different_path() {
1320 let sop = Sop {
1321 triggers: vec![SopTrigger::Webhook {
1322 path: "/sop/deploy".into(),
1323 }],
1324 ..test_sop("deploy-sop", SopExecutionMode::Auto, SopPriority::Normal)
1325 };
1326 let engine = engine_with_sops(vec![sop]);
1327
1328 let event = SopEvent {
1330 source: SopTriggerSource::Webhook,
1331 topic: Some("/webhook".into()),
1332 payload: None,
1333 timestamp: now_iso8601(),
1334 };
1335 assert!(engine.match_trigger(&event).is_empty());
1336
1337 let event = SopEvent {
1339 source: SopTriggerSource::Webhook,
1340 topic: Some("/sop/deploy".into()),
1341 payload: None,
1342 timestamp: now_iso8601(),
1343 };
1344 assert_eq!(engine.match_trigger(&event).len(), 1);
1345 }
1346
1347 #[test]
1350 fn cron_trigger_matches_only_matching_expression() {
1351 let sop = Sop {
1352 triggers: vec![SopTrigger::Cron {
1353 expression: "0 */5 * * *".into(),
1354 }],
1355 ..test_sop("cron-sop", SopExecutionMode::Auto, SopPriority::Normal)
1356 };
1357 let engine = engine_with_sops(vec![sop]);
1358
1359 let event = SopEvent {
1361 source: SopTriggerSource::Cron,
1362 topic: Some("0 */5 * * *".into()),
1363 payload: None,
1364 timestamp: now_iso8601(),
1365 };
1366 assert_eq!(engine.match_trigger(&event).len(), 1);
1367
1368 let event = SopEvent {
1370 source: SopTriggerSource::Cron,
1371 topic: Some("0 */10 * * *".into()),
1372 payload: None,
1373 timestamp: now_iso8601(),
1374 };
1375 assert!(engine.match_trigger(&event).is_empty());
1376
1377 let event = SopEvent {
1379 source: SopTriggerSource::Cron,
1380 topic: None,
1381 payload: None,
1382 timestamp: now_iso8601(),
1383 };
1384 assert!(engine.match_trigger(&event).is_empty());
1385 }
1386
1387 #[test]
1390 fn mqtt_condition_filters_by_payload() {
1391 let sop = Sop {
1392 triggers: vec![SopTrigger::Mqtt {
1393 topic: "sensors/pressure".into(),
1394 condition: Some("$.value > 85".into()),
1395 }],
1396 ..test_sop("cond-sop", SopExecutionMode::Auto, SopPriority::Critical)
1397 };
1398 let engine = engine_with_sops(vec![sop]);
1399
1400 let matches = engine.match_trigger(&mqtt_event("sensors/pressure", r#"{"value": 90}"#));
1402 assert_eq!(matches.len(), 1);
1403
1404 let matches = engine.match_trigger(&mqtt_event("sensors/pressure", r#"{"value": 50}"#));
1406 assert!(matches.is_empty());
1407 }
1408
1409 #[test]
1410 fn mqtt_no_condition_matches_any_payload() {
1411 let sop = Sop {
1412 triggers: vec![SopTrigger::Mqtt {
1413 topic: "sensors/temp".into(),
1414 condition: None,
1415 }],
1416 ..test_sop("no-cond", SopExecutionMode::Auto, SopPriority::Normal)
1417 };
1418 let engine = engine_with_sops(vec![sop]);
1419
1420 let matches = engine.match_trigger(&mqtt_event("sensors/temp", "anything"));
1421 assert_eq!(matches.len(), 1);
1422 }
1423
1424 #[test]
1425 fn mqtt_condition_no_payload_fails_closed() {
1426 let sop = Sop {
1427 triggers: vec![SopTrigger::Mqtt {
1428 topic: "sensors/temp".into(),
1429 condition: Some("$.value > 0".into()),
1430 }],
1431 ..test_sop("no-payload", SopExecutionMode::Auto, SopPriority::Normal)
1432 };
1433 let engine = engine_with_sops(vec![sop]);
1434
1435 let event = SopEvent {
1437 source: SopTriggerSource::Mqtt,
1438 topic: Some("sensors/temp".into()),
1439 payload: None,
1440 timestamp: now_iso8601(),
1441 };
1442 assert!(engine.match_trigger(&event).is_empty());
1443 }
1444
1445 #[test]
1446 fn peripheral_condition_filters_by_payload() {
1447 let sop = Sop {
1448 triggers: vec![SopTrigger::Peripheral {
1449 board: "nucleo".into(),
1450 signal: "pin_3".into(),
1451 condition: Some("> 0".into()),
1452 }],
1453 ..test_sop("periph-cond", SopExecutionMode::Auto, SopPriority::High)
1454 };
1455 let engine = engine_with_sops(vec![sop]);
1456
1457 let event = SopEvent {
1459 source: SopTriggerSource::Peripheral,
1460 topic: Some("nucleo/pin_3".into()),
1461 payload: Some("1".into()),
1462 timestamp: now_iso8601(),
1463 };
1464 assert_eq!(engine.match_trigger(&event).len(), 1);
1465
1466 let event = SopEvent {
1468 source: SopTriggerSource::Peripheral,
1469 topic: Some("nucleo/pin_3".into()),
1470 payload: Some("0".into()),
1471 timestamp: now_iso8601(),
1472 };
1473 assert!(engine.match_trigger(&event).is_empty());
1474 }
1475
1476 #[test]
1477 fn peripheral_no_condition_matches_any() {
1478 let sop = Sop {
1479 triggers: vec![SopTrigger::Peripheral {
1480 board: "rpi".into(),
1481 signal: "gpio_5".into(),
1482 condition: None,
1483 }],
1484 ..test_sop("periph-nocond", SopExecutionMode::Auto, SopPriority::Normal)
1485 };
1486 let engine = engine_with_sops(vec![sop]);
1487
1488 let event = SopEvent {
1489 source: SopTriggerSource::Peripheral,
1490 topic: Some("rpi/gpio_5".into()),
1491 payload: Some("0".into()),
1492 timestamp: now_iso8601(),
1493 };
1494 assert_eq!(engine.match_trigger(&event).len(), 1);
1495 }
1496
1497 #[test]
1500 fn start_run_returns_first_step() {
1501 let mut engine = engine_with_sops(vec![test_sop(
1502 "s1",
1503 SopExecutionMode::Auto,
1504 SopPriority::Normal,
1505 )]);
1506 let action = engine.start_run("s1", manual_event()).unwrap();
1507 let run_id = extract_run_id(&action);
1508 assert!(run_id.starts_with("run-"));
1509 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1510 assert_eq!(engine.active_runs().len(), 1);
1511 }
1512
1513 #[test]
1514 fn start_run_unknown_sop_fails() {
1515 let mut engine = engine_with_sops(vec![]);
1516 assert!(engine.start_run("nonexistent", manual_event()).is_err());
1517 }
1518
1519 #[test]
1520 fn advance_step_to_completion() {
1521 let mut engine = engine_with_sops(vec![test_sop(
1522 "s1",
1523 SopExecutionMode::Auto,
1524 SopPriority::Normal,
1525 )]);
1526 let action = engine.start_run("s1", manual_event()).unwrap();
1527 let run_id = extract_run_id(&action).to_string();
1528
1529 let action = engine
1531 .advance_step(
1532 &run_id,
1533 SopStepResult {
1534 step_number: 1,
1535 status: SopStepStatus::Completed,
1536 output: "done".into(),
1537 started_at: now_iso8601(),
1538 completed_at: Some(now_iso8601()),
1539 },
1540 )
1541 .unwrap();
1542
1543 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1545
1546 let action = engine
1548 .advance_step(
1549 &run_id,
1550 SopStepResult {
1551 step_number: 2,
1552 status: SopStepStatus::Completed,
1553 output: "done".into(),
1554 started_at: now_iso8601(),
1555 completed_at: Some(now_iso8601()),
1556 },
1557 )
1558 .unwrap();
1559
1560 assert!(matches!(action, SopRunAction::Completed { .. }));
1561 assert!(engine.active_runs().is_empty());
1562 assert_eq!(engine.finished_runs(None).len(), 1);
1563 }
1564
1565 #[test]
1566 fn step_failure_ends_run() {
1567 let mut engine = engine_with_sops(vec![test_sop(
1568 "s1",
1569 SopExecutionMode::Auto,
1570 SopPriority::Normal,
1571 )]);
1572 let action = engine.start_run("s1", manual_event()).unwrap();
1573 let run_id = extract_run_id(&action).to_string();
1574
1575 let action = engine
1576 .advance_step(
1577 &run_id,
1578 SopStepResult {
1579 step_number: 1,
1580 status: SopStepStatus::Failed,
1581 output: "valve stuck".into(),
1582 started_at: now_iso8601(),
1583 completed_at: Some(now_iso8601()),
1584 },
1585 )
1586 .unwrap();
1587
1588 assert!(
1589 matches!(action, SopRunAction::Failed { ref reason, .. } if reason.contains("valve stuck"))
1590 );
1591 assert!(engine.active_runs().is_empty());
1592 }
1593
1594 #[test]
1595 fn cancel_run() {
1596 let mut engine = engine_with_sops(vec![test_sop(
1597 "s1",
1598 SopExecutionMode::Auto,
1599 SopPriority::Normal,
1600 )]);
1601 let action = engine.start_run("s1", manual_event()).unwrap();
1602 let run_id = extract_run_id(&action).to_string();
1603 engine.cancel_run(&run_id).unwrap();
1604 assert!(engine.active_runs().is_empty());
1605 let finished = engine.finished_runs(None);
1606 assert_eq!(finished[0].status, SopRunStatus::Cancelled);
1607 }
1608
1609 #[test]
1610 fn cancel_unknown_run_fails() {
1611 let mut engine = engine_with_sops(vec![]);
1612 assert!(engine.cancel_run("nonexistent").is_err());
1613 }
1614
1615 #[test]
1618 fn per_sop_concurrency_limit() {
1619 let mut engine = engine_with_sops(vec![test_sop(
1620 "s1",
1621 SopExecutionMode::Auto,
1622 SopPriority::Normal,
1623 )]);
1624 engine.start_run("s1", manual_event()).unwrap();
1626 assert!(!engine.can_start("s1"));
1627 assert!(engine.start_run("s1", manual_event()).is_err());
1628 }
1629
1630 #[test]
1631 fn global_concurrency_limit() {
1632 let sops = vec![
1633 test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal),
1634 test_sop("s2", SopExecutionMode::Auto, SopPriority::Normal),
1635 ];
1636 let mut engine = SopEngine::new(SopConfig {
1637 max_concurrent_total: 1,
1638 ..SopConfig::default()
1639 });
1640 engine.sops = sops;
1641
1642 engine.start_run("s1", manual_event()).unwrap();
1643 assert!(!engine.can_start("s2"));
1644 }
1645
1646 #[test]
1649 fn cooldown_blocks_immediate_restart() {
1650 let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
1651 sop.cooldown_secs = 3600; let mut engine = engine_with_sops(vec![sop]);
1653
1654 let action = engine.start_run("s1", manual_event()).unwrap();
1655 let run_id = extract_run_id(&action).to_string();
1656 engine
1658 .advance_step(
1659 &run_id,
1660 SopStepResult {
1661 step_number: 1,
1662 status: SopStepStatus::Completed,
1663 output: "ok".into(),
1664 started_at: now_iso8601(),
1665 completed_at: Some(now_iso8601()),
1666 },
1667 )
1668 .unwrap();
1669 engine
1670 .advance_step(
1671 &run_id,
1672 SopStepResult {
1673 step_number: 2,
1674 status: SopStepStatus::Completed,
1675 output: "ok".into(),
1676 started_at: now_iso8601(),
1677 completed_at: Some(now_iso8601()),
1678 },
1679 )
1680 .unwrap();
1681
1682 assert!(!engine.can_start("s1"));
1684 }
1685
1686 #[test]
1689 fn auto_mode_executes_immediately() {
1690 let mut engine = engine_with_sops(vec![test_sop(
1691 "s1",
1692 SopExecutionMode::Auto,
1693 SopPriority::Normal,
1694 )]);
1695 let action = engine.start_run("s1", manual_event()).unwrap();
1696 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1697 }
1698
1699 #[test]
1700 fn supervised_mode_waits_on_first_step() {
1701 let mut engine = engine_with_sops(vec![test_sop(
1702 "s1",
1703 SopExecutionMode::Supervised,
1704 SopPriority::Normal,
1705 )]);
1706 let action = engine.start_run("s1", manual_event()).unwrap();
1707 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1708 }
1709
1710 #[test]
1711 fn step_by_step_waits_on_every_step() {
1712 let mut engine = engine_with_sops(vec![test_sop(
1713 "s1",
1714 SopExecutionMode::StepByStep,
1715 SopPriority::Normal,
1716 )]);
1717
1718 let action = engine.start_run("s1", manual_event()).unwrap();
1720 let run_id = extract_run_id(&action).to_string();
1721 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1722
1723 let action = engine.approve_step(&run_id).unwrap();
1725 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1726
1727 let action = engine
1729 .advance_step(
1730 &run_id,
1731 SopStepResult {
1732 step_number: 1,
1733 status: SopStepStatus::Completed,
1734 output: "ok".into(),
1735 started_at: now_iso8601(),
1736 completed_at: Some(now_iso8601()),
1737 },
1738 )
1739 .unwrap();
1740 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1741 }
1742
1743 #[test]
1744 fn priority_based_critical_auto() {
1745 let mut engine = engine_with_sops(vec![test_sop(
1746 "s1",
1747 SopExecutionMode::PriorityBased,
1748 SopPriority::Critical,
1749 )]);
1750 let action = engine.start_run("s1", manual_event()).unwrap();
1751 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1752 }
1753
1754 #[test]
1755 fn priority_based_normal_supervised() {
1756 let mut engine = engine_with_sops(vec![test_sop(
1757 "s1",
1758 SopExecutionMode::PriorityBased,
1759 SopPriority::Normal,
1760 )]);
1761 let action = engine.start_run("s1", manual_event()).unwrap();
1762 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1764 }
1765
1766 #[test]
1767 fn requires_confirmation_overrides_auto() {
1768 let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Critical);
1769 sop.steps[0].requires_confirmation = true;
1770 let mut engine = engine_with_sops(vec![sop]);
1771 let action = engine.start_run("s1", manual_event()).unwrap();
1772 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1774 }
1775
1776 #[test]
1779 fn approve_transitions_to_execute() {
1780 let mut engine = engine_with_sops(vec![test_sop(
1781 "s1",
1782 SopExecutionMode::Supervised,
1783 SopPriority::Normal,
1784 )]);
1785 let action = engine.start_run("s1", manual_event()).unwrap();
1786 let run_id = extract_run_id(&action).to_string();
1787
1788 let run = engine.active_runs().get(&run_id).unwrap();
1790 assert_eq!(run.status, SopRunStatus::WaitingApproval);
1791
1792 let action = engine.approve_step(&run_id).unwrap();
1794 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1795
1796 let run = engine.active_runs().get(&run_id).unwrap();
1797 assert_eq!(run.status, SopRunStatus::Running);
1798 }
1799
1800 #[test]
1801 fn approve_non_waiting_fails() {
1802 let mut engine = engine_with_sops(vec![test_sop(
1803 "s1",
1804 SopExecutionMode::Auto,
1805 SopPriority::Normal,
1806 )]);
1807 let action = engine.start_run("s1", manual_event()).unwrap();
1808 let run_id = extract_run_id(&action).to_string();
1809 assert!(engine.approve_step(&run_id).is_err());
1810 }
1811
1812 #[test]
1815 fn step_context_includes_sop_name_and_step() {
1816 let sop = test_sop(
1817 "pump-shutdown",
1818 SopExecutionMode::Auto,
1819 SopPriority::Critical,
1820 );
1821 let run = SopRun {
1822 run_id: "run-001".into(),
1823 sop_name: "pump-shutdown".into(),
1824 trigger_event: manual_event(),
1825 status: SopRunStatus::Running,
1826 current_step: 1,
1827 total_steps: 2,
1828 started_at: now_iso8601(),
1829 completed_at: None,
1830 step_results: Vec::new(),
1831 waiting_since: None,
1832 llm_calls_saved: 0,
1833 };
1834 let ctx = format_step_context(&sop, &run, &sop.steps[0]);
1835 assert!(ctx.contains("pump-shutdown"));
1836 assert!(ctx.contains("Step 1 of 2"));
1837 assert!(ctx.contains("Step one"));
1838 }
1839
1840 #[test]
1843 fn get_run_finds_active_and_finished() {
1844 let mut engine = engine_with_sops(vec![test_sop(
1845 "s1",
1846 SopExecutionMode::Auto,
1847 SopPriority::Normal,
1848 )]);
1849 let action = engine.start_run("s1", manual_event()).unwrap();
1850 let run_id = extract_run_id(&action).to_string();
1851
1852 assert!(engine.get_run(&run_id).is_some());
1854 assert_eq!(
1855 engine.get_run(&run_id).unwrap().status,
1856 SopRunStatus::Running
1857 );
1858
1859 engine
1861 .advance_step(
1862 &run_id,
1863 SopStepResult {
1864 step_number: 1,
1865 status: SopStepStatus::Completed,
1866 output: "ok".into(),
1867 started_at: now_iso8601(),
1868 completed_at: Some(now_iso8601()),
1869 },
1870 )
1871 .unwrap();
1872 engine
1873 .advance_step(
1874 &run_id,
1875 SopStepResult {
1876 step_number: 2,
1877 status: SopStepStatus::Completed,
1878 output: "ok".into(),
1879 started_at: now_iso8601(),
1880 completed_at: Some(now_iso8601()),
1881 },
1882 )
1883 .unwrap();
1884
1885 assert!(engine.get_run(&run_id).is_some());
1887 assert_eq!(
1888 engine.get_run(&run_id).unwrap().status,
1889 SopRunStatus::Completed
1890 );
1891
1892 assert!(engine.get_run("nonexistent").is_none());
1894 }
1895
1896 #[test]
1899 fn iso8601_roundtrip() {
1900 let ts = now_iso8601();
1901 let secs = parse_iso8601_secs(&ts);
1902 assert!(secs.is_some());
1903 let now = std::time::SystemTime::now()
1905 .duration_since(std::time::UNIX_EPOCH)
1906 .unwrap()
1907 .as_secs();
1908 assert!(now.abs_diff(secs.unwrap()) < 2);
1909 }
1910
1911 #[test]
1912 fn parse_known_timestamp() {
1913 let secs = parse_iso8601_secs("2026-01-01T00:00:00Z").unwrap();
1915 assert_eq!(secs, 20454 * 86400);
1917 }
1918
1919 #[test]
1922 fn timeout_auto_approves_critical() {
1923 let mut engine = SopEngine::new(SopConfig {
1924 approval_timeout_secs: 1, ..SopConfig::default()
1926 });
1927 let mut sop = test_sop("s1", SopExecutionMode::Supervised, SopPriority::Critical);
1928 sop.execution_mode = SopExecutionMode::Supervised;
1930 engine.set_sops_for_test(vec![sop]);
1931
1932 let action = engine.start_run("s1", manual_event()).unwrap();
1933 let run_id = extract_run_id(&action).to_string();
1934 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1935
1936 let run = engine.active_runs.get_mut(&run_id).unwrap();
1938 run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1939
1940 let actions = engine.check_approval_timeouts();
1941 assert_eq!(actions.len(), 1);
1942 assert!(matches!(actions[0], SopRunAction::ExecuteStep { .. }));
1943 }
1944
1945 #[test]
1946 fn timeout_does_not_auto_approve_normal() {
1947 let mut engine = SopEngine::new(SopConfig {
1948 approval_timeout_secs: 1,
1949 ..SopConfig::default()
1950 });
1951 engine.set_sops_for_test(vec![test_sop(
1952 "s1",
1953 SopExecutionMode::Supervised,
1954 SopPriority::Normal,
1955 )]);
1956
1957 let action = engine.start_run("s1", manual_event()).unwrap();
1958 let run_id = extract_run_id(&action).to_string();
1959
1960 let run = engine.active_runs.get_mut(&run_id).unwrap();
1962 run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1963
1964 let actions = engine.check_approval_timeouts();
1966 assert!(actions.is_empty());
1967 assert_eq!(
1969 engine.get_run(&run_id).unwrap().status,
1970 SopRunStatus::WaitingApproval
1971 );
1972 }
1973
1974 #[test]
1975 fn timeout_zero_disables_check() {
1976 let mut engine = SopEngine::new(SopConfig {
1977 approval_timeout_secs: 0,
1978 ..SopConfig::default()
1979 });
1980 engine.set_sops_for_test(vec![test_sop(
1981 "s1",
1982 SopExecutionMode::Supervised,
1983 SopPriority::Critical,
1984 )]);
1985 let action = engine.start_run("s1", manual_event()).unwrap();
1986 let run_id = extract_run_id(&action).to_string();
1987
1988 let run = engine.active_runs.get_mut(&run_id).unwrap();
1989 run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1990
1991 let actions = engine.check_approval_timeouts();
1992 assert!(actions.is_empty());
1993 }
1994
1995 #[test]
1996 fn waiting_since_set_on_wait_approval() {
1997 let mut engine = engine_with_sops(vec![test_sop(
1998 "s1",
1999 SopExecutionMode::Supervised,
2000 SopPriority::Normal,
2001 )]);
2002 let action = engine.start_run("s1", manual_event()).unwrap();
2003 let run_id = extract_run_id(&action).to_string();
2004
2005 let run = engine.get_run(&run_id).unwrap();
2006 assert_eq!(run.status, SopRunStatus::WaitingApproval);
2007 assert!(run.waiting_since.is_some());
2008 }
2009
2010 #[test]
2013 fn max_finished_runs_evicts_oldest() {
2014 let mut engine = SopEngine::new(SopConfig {
2015 max_finished_runs: 2,
2016 ..SopConfig::default()
2017 });
2018 let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
2020 sop.steps = vec![sop.steps[0].clone()];
2021 sop.max_concurrent = 10;
2022 engine.sops = vec![sop];
2023
2024 let mut finished_ids = Vec::new();
2026 for _ in 0..3 {
2027 let action = engine.start_run("s1", manual_event()).unwrap();
2028 let rid = extract_run_id(&action).to_string();
2029 engine
2030 .advance_step(
2031 &rid,
2032 SopStepResult {
2033 step_number: 1,
2034 status: SopStepStatus::Completed,
2035 output: "ok".into(),
2036 started_at: now_iso8601(),
2037 completed_at: Some(now_iso8601()),
2038 },
2039 )
2040 .unwrap();
2041 finished_ids.push(rid);
2042 }
2043
2044 let finished = engine.finished_runs(None);
2046 assert_eq!(
2047 finished.len(),
2048 2,
2049 "eviction should cap at max_finished_runs"
2050 );
2051 assert_eq!(finished[0].run_id, finished_ids[1]);
2053 assert_eq!(finished[1].run_id, finished_ids[2]);
2054 }
2055
2056 #[test]
2057 fn max_finished_runs_zero_means_unlimited() {
2058 let mut engine = SopEngine::new(SopConfig {
2059 max_finished_runs: 0,
2060 ..SopConfig::default()
2061 });
2062 let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
2063 sop.steps = vec![sop.steps[0].clone()];
2064 sop.max_concurrent = 10;
2065 engine.sops = vec![sop];
2066
2067 for _ in 0..5 {
2068 let action = engine.start_run("s1", manual_event()).unwrap();
2069 let rid = extract_run_id(&action).to_string();
2070 engine
2071 .advance_step(
2072 &rid,
2073 SopStepResult {
2074 step_number: 1,
2075 status: SopStepStatus::Completed,
2076 output: "ok".into(),
2077 started_at: now_iso8601(),
2078 completed_at: Some(now_iso8601()),
2079 },
2080 )
2081 .unwrap();
2082 }
2083
2084 assert_eq!(engine.finished_runs(None).len(), 5, "zero means unlimited");
2085 }
2086
2087 #[test]
2088 fn waiting_since_cleared_on_approve() {
2089 let mut engine = engine_with_sops(vec![test_sop(
2090 "s1",
2091 SopExecutionMode::Supervised,
2092 SopPriority::Normal,
2093 )]);
2094 let action = engine.start_run("s1", manual_event()).unwrap();
2095 let run_id = extract_run_id(&action).to_string();
2096 engine.approve_step(&run_id).unwrap();
2097
2098 let run = engine.get_run(&run_id).unwrap();
2099 assert_eq!(run.status, SopRunStatus::Running);
2100 assert!(run.waiting_since.is_none());
2101 }
2102
2103 fn deterministic_sop(name: &str) -> Sop {
2106 Sop {
2107 name: name.into(),
2108 description: format!("Deterministic SOP: {name}"),
2109 version: "1.0.0".into(),
2110 priority: SopPriority::Normal,
2111 execution_mode: SopExecutionMode::Deterministic,
2112 triggers: vec![SopTrigger::Manual],
2113 steps: vec![
2114 SopStep {
2115 number: 1,
2116 title: "Step one".into(),
2117 body: "Do step one".into(),
2118 suggested_tools: vec![],
2119 requires_confirmation: false,
2120 kind: SopStepKind::Execute,
2121 schema: None,
2122 },
2123 SopStep {
2124 number: 2,
2125 title: "Checkpoint".into(),
2126 body: "Pause for approval".into(),
2127 suggested_tools: vec![],
2128 requires_confirmation: false,
2129 kind: SopStepKind::Checkpoint,
2130 schema: None,
2131 },
2132 SopStep {
2133 number: 3,
2134 title: "Step three".into(),
2135 body: "Final step".into(),
2136 suggested_tools: vec![],
2137 requires_confirmation: false,
2138 kind: SopStepKind::Execute,
2139 schema: None,
2140 },
2141 ],
2142 cooldown_secs: 0,
2143 max_concurrent: 1,
2144 location: None,
2145 deterministic: true,
2146 }
2147 }
2148
2149 #[test]
2150 fn deterministic_start_returns_deterministic_step() {
2151 let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
2152 let action = engine.start_run("det-sop", manual_event()).unwrap();
2153 assert!(
2154 matches!(action, SopRunAction::DeterministicStep { ref step, .. } if step.number == 1),
2155 "First action should be DeterministicStep for step 1"
2156 );
2157 let run_id = extract_run_id(&action).to_string();
2158 assert!(run_id.starts_with("det-"));
2159 }
2160
2161 #[test]
2162 fn deterministic_start_routes_through_start_run() {
2163 let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
2164 let action = engine.start_run("det-sop", manual_event()).unwrap();
2166 assert!(matches!(action, SopRunAction::DeterministicStep { .. }));
2167 }
2168
2169 #[test]
2170 fn deterministic_advance_pipes_output() {
2171 let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
2172 let action = engine.start_run("det-sop", manual_event()).unwrap();
2173 let run_id = extract_run_id(&action).to_string();
2174
2175 let output = serde_json::json!({"result": "step1_done"});
2177 let action = engine
2178 .advance_deterministic_step(&run_id, output.clone(), None)
2179 .unwrap();
2180
2181 assert!(
2183 matches!(action, SopRunAction::CheckpointWait { ref step, .. } if step.number == 2),
2184 "Step 2 (checkpoint) should return CheckpointWait"
2185 );
2186 }
2187
2188 #[test]
2189 fn deterministic_checkpoint_pauses_run() {
2190 let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
2191 let action = engine.start_run("det-sop", manual_event()).unwrap();
2192 let run_id = extract_run_id(&action).to_string();
2193
2194 let action = engine
2196 .advance_deterministic_step(&run_id, serde_json::json!({"ok": true}), None)
2197 .unwrap();
2198
2199 assert!(matches!(action, SopRunAction::CheckpointWait { .. }));
2201
2202 let run = engine.get_run(&run_id).unwrap();
2204 assert_eq!(run.status, SopRunStatus::PausedCheckpoint);
2205 assert!(run.waiting_since.is_some());
2206 }
2207
2208 #[test]
2209 fn deterministic_completion_tracks_savings() {
2210 let mut sop = deterministic_sop("det-sop");
2211 sop.steps = vec![
2213 SopStep {
2214 number: 1,
2215 title: "Step one".into(),
2216 body: "Do it".into(),
2217 suggested_tools: vec![],
2218 requires_confirmation: false,
2219 kind: SopStepKind::Execute,
2220 schema: None,
2221 },
2222 SopStep {
2223 number: 2,
2224 title: "Step two".into(),
2225 body: "Do it too".into(),
2226 suggested_tools: vec![],
2227 requires_confirmation: false,
2228 kind: SopStepKind::Execute,
2229 schema: None,
2230 },
2231 ];
2232 let mut engine = engine_with_sops(vec![sop]);
2233
2234 let action = engine.start_run("det-sop", manual_event()).unwrap();
2235 let run_id = extract_run_id(&action).to_string();
2236
2237 let action = engine
2239 .advance_deterministic_step(&run_id, serde_json::json!("s1"), None)
2240 .unwrap();
2241 assert!(matches!(action, SopRunAction::DeterministicStep { .. }));
2242
2243 let action = engine
2245 .advance_deterministic_step(&run_id, serde_json::json!("s2"), None)
2246 .unwrap();
2247 assert!(matches!(action, SopRunAction::Completed { .. }));
2248
2249 let savings = engine.deterministic_savings();
2251 assert_eq!(savings.total_runs, 1);
2252 assert_eq!(savings.total_llm_calls_saved, 2);
2253 }
2254
2255 #[test]
2256 fn deterministic_non_deterministic_sop_rejected() {
2257 let mut engine = engine_with_sops(vec![test_sop(
2258 "s1",
2259 SopExecutionMode::Auto,
2260 SopPriority::Normal,
2261 )]);
2262 let result = engine.start_deterministic_run("s1", manual_event());
2263 assert!(result.is_err());
2264 assert!(
2265 result
2266 .unwrap_err()
2267 .to_string()
2268 .contains("not in deterministic mode")
2269 );
2270 }
2271
2272 #[test]
2273 fn new_engine_without_sops_dir_stays_empty() {
2274 let config = SopConfig {
2275 sops_dir: None,
2276 ..Default::default()
2277 };
2278 let engine = SopEngine::new(config);
2279 assert!(
2280 engine.sops().is_empty(),
2281 "engine without sops_dir must have no SOPs"
2282 );
2283 }
2284
2285 #[test]
2286 fn reload_loads_sops_when_sops_dir_is_configured() {
2287 let tmp = tempfile::tempdir().unwrap();
2288 let sops_dir = tmp.path().join("my_sops");
2289 let sop_subdir = sops_dir.join("test-sop");
2290 std::fs::create_dir_all(&sop_subdir).unwrap();
2291
2292 std::fs::write(
2293 sop_subdir.join("SOP.toml"),
2294 r#"
2295[sop]
2296name = "test-sop"
2297description = "A test SOP"
2298version = "1.0.0"
2299
2300[[triggers]]
2301type = "manual"
2302"#,
2303 )
2304 .unwrap();
2305
2306 let config = SopConfig {
2307 sops_dir: Some(sops_dir.to_string_lossy().into_owned()),
2308 ..Default::default()
2309 };
2310 let mut engine = SopEngine::new(config);
2311 engine.reload(tmp.path());
2312 assert_eq!(
2313 engine.sops().len(),
2314 1,
2315 "reload must populate SOPs from disk"
2316 );
2317 assert_eq!(engine.sops()[0].name, "test-sop");
2318 }
2319
2320 fn deterministic_sop_all_execute(name: &str) -> Sop {
2321 Sop {
2322 name: name.into(),
2323 description: format!("Deterministic SOP: {name}"),
2324 version: "1.0.0".into(),
2325 priority: SopPriority::Normal,
2326 execution_mode: SopExecutionMode::Deterministic,
2327 triggers: vec![SopTrigger::Manual],
2328 steps: vec![
2329 SopStep {
2330 number: 1,
2331 title: "Step one".into(),
2332 body: "Do step one".into(),
2333 suggested_tools: vec![],
2334 requires_confirmation: false,
2335 kind: SopStepKind::Execute,
2336 schema: None,
2337 },
2338 SopStep {
2339 number: 2,
2340 title: "Step two".into(),
2341 body: "Do step two".into(),
2342 suggested_tools: vec![],
2343 requires_confirmation: false,
2344 kind: SopStepKind::Execute,
2345 schema: None,
2346 },
2347 ],
2348 cooldown_secs: 0,
2349 max_concurrent: 1,
2350 location: None,
2351 deterministic: true,
2352 }
2353 }
2354
2355 #[test]
2356 fn deterministic_run_drives_to_completion_through_advance_step() {
2357 let mut engine = engine_with_sops(vec![deterministic_sop_all_execute("det-run")]);
2358 let action = engine.start_run("det-run", manual_event()).unwrap();
2359 let run_id = extract_run_id(&action).to_string();
2360 assert!(
2361 matches!(action, SopRunAction::DeterministicStep { ref step, .. } if step.number == 1)
2362 );
2363
2364 let action = engine
2365 .advance_step(
2366 &run_id,
2367 SopStepResult {
2368 step_number: 1,
2369 status: SopStepStatus::Completed,
2370 output: "step1-output".into(),
2371 started_at: now_iso8601(),
2372 completed_at: Some(now_iso8601()),
2373 },
2374 )
2375 .unwrap();
2376 assert!(
2377 matches!(action, SopRunAction::DeterministicStep { ref step, .. } if step.number == 2),
2378 "advance_step on a deterministic run must route to the deterministic path"
2379 );
2380
2381 let action = engine
2382 .advance_step(
2383 &run_id,
2384 SopStepResult {
2385 step_number: 2,
2386 status: SopStepStatus::Completed,
2387 output: "step2-output".into(),
2388 started_at: now_iso8601(),
2389 completed_at: Some(now_iso8601()),
2390 },
2391 )
2392 .unwrap();
2393 assert!(
2394 matches!(action, SopRunAction::Completed { .. }),
2395 "deterministic run should complete after its final step"
2396 );
2397 }
2398
2399 #[test]
2400 fn deterministic_failed_step_fails_run_through_advance_step() {
2401 let mut engine = engine_with_sops(vec![deterministic_sop_all_execute("det-fail")]);
2402 let action = engine.start_run("det-fail", manual_event()).unwrap();
2403 let run_id = extract_run_id(&action).to_string();
2404
2405 let action = engine
2406 .advance_step(
2407 &run_id,
2408 SopStepResult {
2409 step_number: 1,
2410 status: SopStepStatus::Failed,
2411 output: "boom".into(),
2412 started_at: now_iso8601(),
2413 completed_at: Some(now_iso8601()),
2414 },
2415 )
2416 .unwrap();
2417 assert!(
2418 matches!(action, SopRunAction::Failed { .. }),
2419 "a failed deterministic step must fail the run"
2420 );
2421 }
2422
2423 #[test]
2424 fn deterministic_advance_step_preserves_caller_timestamps() {
2425 let mut engine = engine_with_sops(vec![deterministic_sop_all_execute("det-ts")]);
2426 let action = engine.start_run("det-ts", manual_event()).unwrap();
2427 let run_id = extract_run_id(&action).to_string();
2428
2429 let started = "2026-01-01T00:00:00Z".to_string();
2430 let completed = "2026-01-01T00:00:42Z".to_string();
2431 engine
2432 .advance_step(
2433 &run_id,
2434 SopStepResult {
2435 step_number: 1,
2436 status: SopStepStatus::Completed,
2437 output: "step1-output".into(),
2438 started_at: started.clone(),
2439 completed_at: Some(completed.clone()),
2440 },
2441 )
2442 .unwrap();
2443
2444 let recorded = engine
2445 .get_run(&run_id)
2446 .unwrap()
2447 .step_results
2448 .iter()
2449 .find(|r| r.step_number == 1)
2450 .expect("step 1 result recorded");
2451 assert_eq!(recorded.started_at, started);
2452 assert_eq!(recorded.completed_at, Some(completed));
2453 }
2454
2455 #[test]
2456 fn deterministic_checkpoint_resumes_through_approve_step() {
2457 let mut engine = engine_with_sops(vec![deterministic_sop("det-cp")]);
2461 let action = engine.start_run("det-cp", manual_event()).unwrap();
2462 let run_id = extract_run_id(&action).to_string();
2463
2464 let action = engine
2466 .advance_deterministic_step(&run_id, serde_json::json!("s1-out"), None)
2467 .unwrap();
2468 assert!(matches!(action, SopRunAction::CheckpointWait { .. }));
2469 assert_eq!(
2470 engine.get_run(&run_id).unwrap().status,
2471 SopRunStatus::PausedCheckpoint
2472 );
2473
2474 let action = engine.approve_step(&run_id).unwrap();
2476 assert!(
2477 matches!(action, SopRunAction::DeterministicStep { ref step, .. } if step.number == 3),
2478 "approving a deterministic checkpoint must resume to the next step"
2479 );
2480
2481 let action = engine
2483 .advance_step(
2484 &run_id,
2485 SopStepResult {
2486 step_number: 3,
2487 status: SopStepStatus::Completed,
2488 output: "s3-out".into(),
2489 started_at: now_iso8601(),
2490 completed_at: Some(now_iso8601()),
2491 },
2492 )
2493 .unwrap();
2494 assert!(
2495 matches!(action, SopRunAction::Completed { .. }),
2496 "deterministic run should complete after the post-checkpoint step"
2497 );
2498 }
2499}