Skip to main content

zeroclaw_runtime/tools/
sop_advance.rs

1use std::sync::{Arc, Mutex};
2
3use async_trait::async_trait;
4use serde_json::json;
5
6use crate::sop::types::{SopRunAction, SopStepResult, SopStepStatus};
7use crate::sop::{SopAuditLogger, SopEngine, SopMetricsCollector};
8use zeroclaw_api::tool::{Tool, ToolResult};
9
10/// Report a step result and advance an SOP run to the next step.
11pub struct SopAdvanceTool {
12    engine: Arc<Mutex<SopEngine>>,
13    audit: Option<Arc<SopAuditLogger>>,
14    collector: Option<Arc<SopMetricsCollector>>,
15}
16
17impl SopAdvanceTool {
18    pub fn new(engine: Arc<Mutex<SopEngine>>) -> Self {
19        Self {
20            engine,
21            audit: None,
22            collector: None,
23        }
24    }
25
26    pub fn with_audit(mut self, audit: Arc<SopAuditLogger>) -> Self {
27        self.audit = Some(audit);
28        self
29    }
30
31    pub fn with_collector(mut self, collector: Arc<SopMetricsCollector>) -> Self {
32        self.collector = Some(collector);
33        self
34    }
35}
36
37#[async_trait]
38impl Tool for SopAdvanceTool {
39    fn name(&self) -> &str {
40        "sop_advance"
41    }
42
43    fn description(&self) -> &str {
44        "Report the result of the current SOP step and advance to the next step. Provide the run_id, whether the step succeeded or failed, and a brief output summary."
45    }
46
47    fn parameters_schema(&self) -> serde_json::Value {
48        json!({
49            "type": "object",
50            "properties": {
51                "run_id": {
52                    "type": "string",
53                    "description": "The run ID to advance"
54                },
55                "status": {
56                    "type": "string",
57                    "enum": ["completed", "failed", "skipped"],
58                    "description": "Result status of the current step"
59                },
60                "output": {
61                    "type": "string",
62                    "description": "Brief summary of what happened in this step"
63                }
64            },
65            "required": ["run_id", "status", "output"]
66        })
67    }
68
69    async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
70        let run_id = args.get("run_id").and_then(|v| v.as_str()).ok_or_else(|| {
71            ::zeroclaw_log::record!(
72                WARN,
73                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
74                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
75                    .with_attrs(::serde_json::json!({"param": "run_id"})),
76                "tool argument validation failed"
77            );
78
79            anyhow::Error::msg("Missing 'run_id' parameter")
80        })?;
81
82        let status_str = args.get("status").and_then(|v| v.as_str()).ok_or_else(|| {
83            ::zeroclaw_log::record!(
84                WARN,
85                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
86                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
87                    .with_attrs(::serde_json::json!({"param": "status"})),
88                "tool argument validation failed"
89            );
90
91            anyhow::Error::msg("Missing 'status' parameter")
92        })?;
93
94        let output = args.get("output").and_then(|v| v.as_str()).ok_or_else(|| {
95            ::zeroclaw_log::record!(
96                WARN,
97                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
98                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
99                    .with_attrs(::serde_json::json!({"param": "output"})),
100                "tool argument validation failed"
101            );
102
103            anyhow::Error::msg("Missing 'output' parameter")
104        })?;
105
106        let step_status = match status_str {
107            "completed" => SopStepStatus::Completed,
108            "failed" => SopStepStatus::Failed,
109            "skipped" => SopStepStatus::Skipped,
110            other => {
111                return Ok(ToolResult {
112                    success: false,
113                    output: String::new(),
114                    error: Some(format!(
115                        "Invalid status '{other}'. Must be: completed, failed, or skipped"
116                    )),
117                });
118            }
119        };
120
121        // Lock engine, advance step, snapshot data for audit, then drop lock
122        let (action, step_result_ok, finished_run) = {
123            let mut engine = self.engine.lock().map_err(|e| {
124                ::zeroclaw_log::record!(
125                    ERROR,
126                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
127                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
128                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
129                    "SOP engine lock poisoned"
130                );
131
132                anyhow::Error::msg(format!("Engine lock poisoned: {e}"))
133            })?;
134
135            let current_step = engine
136                .get_run(run_id)
137                .map(|r| r.current_step)
138                .ok_or_else(|| {
139                    ::zeroclaw_log::record!(
140                        WARN,
141                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
142                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
143                            .with_attrs(::serde_json::json!({"run_id": run_id})),
144                        "sop_advance tool: run not found"
145                    );
146                    anyhow::Error::msg(format!("Run not found: {run_id}"))
147                })?;
148
149            let now = now_iso8601();
150            let step_result = SopStepResult {
151                step_number: current_step,
152                status: step_status,
153                output: output.to_string(),
154                started_at: now.clone(),
155                completed_at: Some(now),
156            };
157            let step_result_clone = step_result.clone();
158
159            match engine.advance_step(run_id, step_result) {
160                Ok(action) => {
161                    // Snapshot finished run for audit (Completed/Failed/Cancelled)
162                    let finished = match &action {
163                        SopRunAction::Completed { run_id, .. }
164                        | SopRunAction::Failed { run_id, .. } => engine.get_run(run_id).cloned(),
165                        _ => None,
166                    };
167                    // Only audit step result when advance succeeded
168                    (Ok(action), Some(step_result_clone), finished)
169                }
170                Err(e) => (Err(e), None, None),
171            }
172        };
173
174        // Audit logging (engine lock dropped, safe to await)
175        if let Some(ref audit) = self.audit {
176            if let Some(ref sr) = step_result_ok
177                && let Err(e) = audit.log_step_result(run_id, sr).await
178            {
179                ::zeroclaw_log::record!(
180                    WARN,
181                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
182                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
183                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
184                    "SOP audit log_step_result failed"
185                );
186            }
187            if let Some(ref run) = finished_run
188                && let Err(e) = audit.log_run_complete(run).await
189            {
190                ::zeroclaw_log::record!(
191                    WARN,
192                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
193                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
194                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
195                    "SOP audit log_run_complete failed"
196                );
197            }
198        }
199
200        // Metrics collector (independent of audit)
201        if let Some(ref collector) = self.collector
202            && let Some(ref run) = finished_run
203        {
204            collector.record_run_complete(run);
205        }
206
207        match action {
208            Ok(action) => {
209                let result_output = match action {
210                    SopRunAction::ExecuteStep {
211                        run_id, context, ..
212                    } => {
213                        format!("Step recorded. Next step for run {run_id}:\n\n{context}")
214                    }
215                    SopRunAction::WaitApproval {
216                        run_id, context, ..
217                    } => {
218                        format!(
219                            "Step recorded. Next step for run {run_id} (waiting for approval):\n\n{context}"
220                        )
221                    }
222                    SopRunAction::Completed { run_id, sop_name } => {
223                        format!("SOP '{sop_name}' run {run_id} completed successfully.")
224                    }
225                    SopRunAction::Failed {
226                        run_id,
227                        sop_name,
228                        reason,
229                    } => {
230                        format!("SOP '{sop_name}' run {run_id} failed: {reason}")
231                    }
232                    SopRunAction::DeterministicStep { run_id, step, .. } => {
233                        format!(
234                            "Step recorded. Next deterministic step for run {run_id}: {}",
235                            step.title
236                        )
237                    }
238                    SopRunAction::CheckpointWait { run_id, step, .. } => {
239                        format!(
240                            "Step recorded. Run {run_id} paused at checkpoint: {}",
241                            step.title
242                        )
243                    }
244                };
245                Ok(ToolResult {
246                    success: true,
247                    output: result_output,
248                    error: None,
249                })
250            }
251            Err(e) => Ok(ToolResult {
252                success: false,
253                output: String::new(),
254                error: Some(format!("Failed to advance step: {e}")),
255            }),
256        }
257    }
258}
259
260use crate::sop::engine::now_iso8601;
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use crate::sop::engine::SopEngine;
266    use crate::sop::types::*;
267    use zeroclaw_config::schema::SopConfig;
268    use zeroclaw_memory::Memory;
269
270    fn test_sop() -> Sop {
271        Sop {
272            name: "test-sop".into(),
273            description: "Test SOP".into(),
274            version: "1.0.0".into(),
275            priority: SopPriority::Normal,
276            execution_mode: SopExecutionMode::Auto,
277            triggers: vec![SopTrigger::Manual],
278            steps: vec![
279                SopStep {
280                    number: 1,
281                    title: "Step one".into(),
282                    body: "Do step one".into(),
283                    suggested_tools: vec![],
284                    requires_confirmation: false,
285                    kind: SopStepKind::default(),
286                    schema: None,
287                },
288                SopStep {
289                    number: 2,
290                    title: "Step two".into(),
291                    body: "Do step two".into(),
292                    suggested_tools: vec![],
293                    requires_confirmation: false,
294                    kind: SopStepKind::default(),
295                    schema: None,
296                },
297            ],
298            cooldown_secs: 0,
299            max_concurrent: 1,
300            location: None,
301            deterministic: false,
302        }
303    }
304
305    fn engine_with_active_run() -> (Arc<Mutex<SopEngine>>, String) {
306        let mut engine = SopEngine::new(SopConfig::default());
307        engine.set_sops_for_test(vec![test_sop()]);
308        let event = SopEvent {
309            source: SopTriggerSource::Manual,
310            topic: None,
311            payload: None,
312            timestamp: "2026-02-19T12:00:00Z".into(),
313        };
314        engine.start_run("test-sop", event).unwrap();
315        let run_id = engine
316            .active_runs()
317            .keys()
318            .next()
319            .expect("expected active run")
320            .clone();
321        (Arc::new(Mutex::new(engine)), run_id)
322    }
323
324    #[tokio::test]
325    async fn advance_to_next_step() {
326        let (engine, run_id) = engine_with_active_run();
327        let tool = SopAdvanceTool::new(engine);
328        let result = tool
329            .execute(json!({
330                "run_id": run_id,
331                "status": "completed",
332                "output": "Step 1 done successfully"
333            }))
334            .await
335            .unwrap();
336        assert!(result.success);
337        assert!(result.output.contains("Next step"));
338        assert!(result.output.contains("Step two"));
339    }
340
341    #[tokio::test]
342    async fn advance_to_completion() {
343        let (engine, run_id) = engine_with_active_run();
344        let tool = SopAdvanceTool::new(engine.clone());
345
346        // Complete step 1
347        tool.execute(json!({
348            "run_id": run_id,
349            "status": "completed",
350            "output": "Step 1 done"
351        }))
352        .await
353        .unwrap();
354
355        // Complete step 2
356        let result = tool
357            .execute(json!({
358                "run_id": run_id,
359                "status": "completed",
360                "output": "Step 2 done"
361            }))
362            .await
363            .unwrap();
364        assert!(result.success);
365        assert!(result.output.contains("completed successfully"));
366    }
367
368    #[tokio::test]
369    async fn advance_with_failure() {
370        let (engine, run_id) = engine_with_active_run();
371        let tool = SopAdvanceTool::new(engine);
372        let result = tool
373            .execute(json!({
374                "run_id": run_id,
375                "status": "failed",
376                "output": "Valve stuck open"
377            }))
378            .await
379            .unwrap();
380        assert!(result.success); // tool succeeded, SOP failed
381        assert!(result.output.contains("failed"));
382        assert!(result.output.contains("Valve stuck open"));
383    }
384
385    #[tokio::test]
386    async fn advance_invalid_status() {
387        let (engine, run_id) = engine_with_active_run();
388        let tool = SopAdvanceTool::new(engine);
389        let result = tool
390            .execute(json!({
391                "run_id": run_id,
392                "status": "invalid",
393                "output": "whatever"
394            }))
395            .await
396            .unwrap();
397        assert!(!result.success);
398        assert!(result.error.unwrap().contains("Invalid status"));
399    }
400
401    #[tokio::test]
402    async fn advance_unknown_run() {
403        let engine = Arc::new(Mutex::new(SopEngine::new(SopConfig::default())));
404        let tool = SopAdvanceTool::new(engine);
405        let result = tool
406            .execute(json!({
407                "run_id": "nonexistent",
408                "status": "completed",
409                "output": "done"
410            }))
411            .await;
412        assert!(result.is_err());
413    }
414
415    #[test]
416    fn name_and_schema() {
417        let engine = Arc::new(Mutex::new(SopEngine::new(SopConfig::default())));
418        let tool = SopAdvanceTool::new(engine);
419        assert_eq!(tool.name(), "sop_advance");
420        let schema = tool.parameters_schema();
421        assert!(schema["properties"]["run_id"].is_object());
422        assert!(schema["properties"]["status"]["enum"].is_array());
423    }
424
425    #[tokio::test]
426    async fn advance_error_does_not_write_step_audit() {
427        // Use a run_id that doesn't exist — advance_step will fail
428        let engine = Arc::new(Mutex::new(SopEngine::new(SopConfig::default())));
429        let tmp = tempfile::tempdir().unwrap();
430        let mem_cfg = zeroclaw_config::schema::MemoryConfig {
431            backend: "sqlite".into(),
432            ..zeroclaw_config::schema::MemoryConfig::default()
433        };
434        let memory: Arc<dyn Memory> =
435            Arc::from(zeroclaw_memory::create_memory(&mem_cfg, tmp.path(), None).unwrap());
436        let audit = Arc::new(SopAuditLogger::new(memory.clone()));
437
438        let tool = SopAdvanceTool::new(engine).with_audit(audit.clone());
439        let result = tool
440            .execute(json!({
441                "run_id": "nonexistent",
442                "status": "completed",
443                "output": "done"
444            }))
445            .await;
446        // advance_step on nonexistent run returns Err (anyhow)
447        assert!(result.is_err());
448
449        // Verify no phantom audit entries were written
450        let runs = audit.list_runs().await.unwrap();
451        assert!(
452            runs.is_empty(),
453            "no audit entries should exist after advance error"
454        );
455    }
456
457    #[tokio::test]
458    async fn advance_success_writes_step_audit() {
459        let (engine, run_id) = engine_with_active_run();
460        let tmp = tempfile::tempdir().unwrap();
461        let mem_cfg = zeroclaw_config::schema::MemoryConfig {
462            backend: "sqlite".into(),
463            ..zeroclaw_config::schema::MemoryConfig::default()
464        };
465        let memory: Arc<dyn Memory> =
466            Arc::from(zeroclaw_memory::create_memory(&mem_cfg, tmp.path(), None).unwrap());
467        let audit = Arc::new(SopAuditLogger::new(memory.clone()));
468
469        let tool = SopAdvanceTool::new(engine).with_audit(audit.clone());
470        let result = tool
471            .execute(json!({
472                "run_id": run_id,
473                "status": "completed",
474                "output": "Step 1 done"
475            }))
476            .await
477            .unwrap();
478        assert!(result.success);
479
480        // Verify step audit was written
481        let entries = memory
482            .list(
483                Some(&zeroclaw_memory::traits::MemoryCategory::Custom(
484                    "sop".into(),
485                )),
486                None,
487            )
488            .await
489            .unwrap();
490        let step_keys: Vec<_> = entries
491            .iter()
492            .filter(|e| e.key.starts_with("sop_step_"))
493            .collect();
494        assert!(
495            !step_keys.is_empty(),
496            "step audit should be written on success"
497        );
498    }
499}