1use std::sync::{Arc, Mutex};
2
3use async_trait::async_trait;
4use serde_json::json;
5
6use crate::sop::types::{SopRunAction, SopStepResult, SopStepStatus};
7use crate::sop::{SopAuditLogger, SopEngine, SopMetricsCollector};
8use zeroclaw_api::tool::{Tool, ToolResult};
9
10pub struct SopAdvanceTool {
12 engine: Arc<Mutex<SopEngine>>,
13 audit: Option<Arc<SopAuditLogger>>,
14 collector: Option<Arc<SopMetricsCollector>>,
15}
16
17impl SopAdvanceTool {
18 pub fn new(engine: Arc<Mutex<SopEngine>>) -> Self {
19 Self {
20 engine,
21 audit: None,
22 collector: None,
23 }
24 }
25
26 pub fn with_audit(mut self, audit: Arc<SopAuditLogger>) -> Self {
27 self.audit = Some(audit);
28 self
29 }
30
31 pub fn with_collector(mut self, collector: Arc<SopMetricsCollector>) -> Self {
32 self.collector = Some(collector);
33 self
34 }
35}
36
37#[async_trait]
38impl Tool for SopAdvanceTool {
39 fn name(&self) -> &str {
40 "sop_advance"
41 }
42
43 fn description(&self) -> &str {
44 "Report the result of the current SOP step and advance to the next step. Provide the run_id, whether the step succeeded or failed, and a brief output summary."
45 }
46
47 fn parameters_schema(&self) -> serde_json::Value {
48 json!({
49 "type": "object",
50 "properties": {
51 "run_id": {
52 "type": "string",
53 "description": "The run ID to advance"
54 },
55 "status": {
56 "type": "string",
57 "enum": ["completed", "failed", "skipped"],
58 "description": "Result status of the current step"
59 },
60 "output": {
61 "type": "string",
62 "description": "Brief summary of what happened in this step"
63 }
64 },
65 "required": ["run_id", "status", "output"]
66 })
67 }
68
69 async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
70 let run_id = args.get("run_id").and_then(|v| v.as_str()).ok_or_else(|| {
71 ::zeroclaw_log::record!(
72 WARN,
73 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
74 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
75 .with_attrs(::serde_json::json!({"param": "run_id"})),
76 "tool argument validation failed"
77 );
78
79 anyhow::Error::msg("Missing 'run_id' parameter")
80 })?;
81
82 let status_str = args.get("status").and_then(|v| v.as_str()).ok_or_else(|| {
83 ::zeroclaw_log::record!(
84 WARN,
85 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
86 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
87 .with_attrs(::serde_json::json!({"param": "status"})),
88 "tool argument validation failed"
89 );
90
91 anyhow::Error::msg("Missing 'status' parameter")
92 })?;
93
94 let output = args.get("output").and_then(|v| v.as_str()).ok_or_else(|| {
95 ::zeroclaw_log::record!(
96 WARN,
97 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
98 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
99 .with_attrs(::serde_json::json!({"param": "output"})),
100 "tool argument validation failed"
101 );
102
103 anyhow::Error::msg("Missing 'output' parameter")
104 })?;
105
106 let step_status = match status_str {
107 "completed" => SopStepStatus::Completed,
108 "failed" => SopStepStatus::Failed,
109 "skipped" => SopStepStatus::Skipped,
110 other => {
111 return Ok(ToolResult {
112 success: false,
113 output: String::new(),
114 error: Some(format!(
115 "Invalid status '{other}'. Must be: completed, failed, or skipped"
116 )),
117 });
118 }
119 };
120
121 let (action, step_result_ok, finished_run) = {
123 let mut engine = self.engine.lock().map_err(|e| {
124 ::zeroclaw_log::record!(
125 ERROR,
126 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
127 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
128 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
129 "SOP engine lock poisoned"
130 );
131
132 anyhow::Error::msg(format!("Engine lock poisoned: {e}"))
133 })?;
134
135 let current_step = engine
136 .get_run(run_id)
137 .map(|r| r.current_step)
138 .ok_or_else(|| {
139 ::zeroclaw_log::record!(
140 WARN,
141 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
142 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
143 .with_attrs(::serde_json::json!({"run_id": run_id})),
144 "sop_advance tool: run not found"
145 );
146 anyhow::Error::msg(format!("Run not found: {run_id}"))
147 })?;
148
149 let now = now_iso8601();
150 let step_result = SopStepResult {
151 step_number: current_step,
152 status: step_status,
153 output: output.to_string(),
154 started_at: now.clone(),
155 completed_at: Some(now),
156 };
157 let step_result_clone = step_result.clone();
158
159 match engine.advance_step(run_id, step_result) {
160 Ok(action) => {
161 let finished = match &action {
163 SopRunAction::Completed { run_id, .. }
164 | SopRunAction::Failed { run_id, .. } => engine.get_run(run_id).cloned(),
165 _ => None,
166 };
167 (Ok(action), Some(step_result_clone), finished)
169 }
170 Err(e) => (Err(e), None, None),
171 }
172 };
173
174 if let Some(ref audit) = self.audit {
176 if let Some(ref sr) = step_result_ok
177 && let Err(e) = audit.log_step_result(run_id, sr).await
178 {
179 ::zeroclaw_log::record!(
180 WARN,
181 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
182 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
183 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
184 "SOP audit log_step_result failed"
185 );
186 }
187 if let Some(ref run) = finished_run
188 && let Err(e) = audit.log_run_complete(run).await
189 {
190 ::zeroclaw_log::record!(
191 WARN,
192 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
193 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
194 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
195 "SOP audit log_run_complete failed"
196 );
197 }
198 }
199
200 if let Some(ref collector) = self.collector
202 && let Some(ref run) = finished_run
203 {
204 collector.record_run_complete(run);
205 }
206
207 match action {
208 Ok(action) => {
209 let result_output = match action {
210 SopRunAction::ExecuteStep {
211 run_id, context, ..
212 } => {
213 format!("Step recorded. Next step for run {run_id}:\n\n{context}")
214 }
215 SopRunAction::WaitApproval {
216 run_id, context, ..
217 } => {
218 format!(
219 "Step recorded. Next step for run {run_id} (waiting for approval):\n\n{context}"
220 )
221 }
222 SopRunAction::Completed { run_id, sop_name } => {
223 format!("SOP '{sop_name}' run {run_id} completed successfully.")
224 }
225 SopRunAction::Failed {
226 run_id,
227 sop_name,
228 reason,
229 } => {
230 format!("SOP '{sop_name}' run {run_id} failed: {reason}")
231 }
232 SopRunAction::DeterministicStep { run_id, step, .. } => {
233 format!(
234 "Step recorded. Next deterministic step for run {run_id}: {}",
235 step.title
236 )
237 }
238 SopRunAction::CheckpointWait { run_id, step, .. } => {
239 format!(
240 "Step recorded. Run {run_id} paused at checkpoint: {}",
241 step.title
242 )
243 }
244 };
245 Ok(ToolResult {
246 success: true,
247 output: result_output,
248 error: None,
249 })
250 }
251 Err(e) => Ok(ToolResult {
252 success: false,
253 output: String::new(),
254 error: Some(format!("Failed to advance step: {e}")),
255 }),
256 }
257 }
258}
259
260use crate::sop::engine::now_iso8601;
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265 use crate::sop::engine::SopEngine;
266 use crate::sop::types::*;
267 use zeroclaw_config::schema::SopConfig;
268 use zeroclaw_memory::Memory;
269
270 fn test_sop() -> Sop {
271 Sop {
272 name: "test-sop".into(),
273 description: "Test SOP".into(),
274 version: "1.0.0".into(),
275 priority: SopPriority::Normal,
276 execution_mode: SopExecutionMode::Auto,
277 triggers: vec![SopTrigger::Manual],
278 steps: vec![
279 SopStep {
280 number: 1,
281 title: "Step one".into(),
282 body: "Do step one".into(),
283 suggested_tools: vec![],
284 requires_confirmation: false,
285 kind: SopStepKind::default(),
286 schema: None,
287 },
288 SopStep {
289 number: 2,
290 title: "Step two".into(),
291 body: "Do step two".into(),
292 suggested_tools: vec![],
293 requires_confirmation: false,
294 kind: SopStepKind::default(),
295 schema: None,
296 },
297 ],
298 cooldown_secs: 0,
299 max_concurrent: 1,
300 location: None,
301 deterministic: false,
302 }
303 }
304
305 fn engine_with_active_run() -> (Arc<Mutex<SopEngine>>, String) {
306 let mut engine = SopEngine::new(SopConfig::default());
307 engine.set_sops_for_test(vec![test_sop()]);
308 let event = SopEvent {
309 source: SopTriggerSource::Manual,
310 topic: None,
311 payload: None,
312 timestamp: "2026-02-19T12:00:00Z".into(),
313 };
314 engine.start_run("test-sop", event).unwrap();
315 let run_id = engine
316 .active_runs()
317 .keys()
318 .next()
319 .expect("expected active run")
320 .clone();
321 (Arc::new(Mutex::new(engine)), run_id)
322 }
323
324 #[tokio::test]
325 async fn advance_to_next_step() {
326 let (engine, run_id) = engine_with_active_run();
327 let tool = SopAdvanceTool::new(engine);
328 let result = tool
329 .execute(json!({
330 "run_id": run_id,
331 "status": "completed",
332 "output": "Step 1 done successfully"
333 }))
334 .await
335 .unwrap();
336 assert!(result.success);
337 assert!(result.output.contains("Next step"));
338 assert!(result.output.contains("Step two"));
339 }
340
341 #[tokio::test]
342 async fn advance_to_completion() {
343 let (engine, run_id) = engine_with_active_run();
344 let tool = SopAdvanceTool::new(engine.clone());
345
346 tool.execute(json!({
348 "run_id": run_id,
349 "status": "completed",
350 "output": "Step 1 done"
351 }))
352 .await
353 .unwrap();
354
355 let result = tool
357 .execute(json!({
358 "run_id": run_id,
359 "status": "completed",
360 "output": "Step 2 done"
361 }))
362 .await
363 .unwrap();
364 assert!(result.success);
365 assert!(result.output.contains("completed successfully"));
366 }
367
368 #[tokio::test]
369 async fn advance_with_failure() {
370 let (engine, run_id) = engine_with_active_run();
371 let tool = SopAdvanceTool::new(engine);
372 let result = tool
373 .execute(json!({
374 "run_id": run_id,
375 "status": "failed",
376 "output": "Valve stuck open"
377 }))
378 .await
379 .unwrap();
380 assert!(result.success); assert!(result.output.contains("failed"));
382 assert!(result.output.contains("Valve stuck open"));
383 }
384
385 #[tokio::test]
386 async fn advance_invalid_status() {
387 let (engine, run_id) = engine_with_active_run();
388 let tool = SopAdvanceTool::new(engine);
389 let result = tool
390 .execute(json!({
391 "run_id": run_id,
392 "status": "invalid",
393 "output": "whatever"
394 }))
395 .await
396 .unwrap();
397 assert!(!result.success);
398 assert!(result.error.unwrap().contains("Invalid status"));
399 }
400
401 #[tokio::test]
402 async fn advance_unknown_run() {
403 let engine = Arc::new(Mutex::new(SopEngine::new(SopConfig::default())));
404 let tool = SopAdvanceTool::new(engine);
405 let result = tool
406 .execute(json!({
407 "run_id": "nonexistent",
408 "status": "completed",
409 "output": "done"
410 }))
411 .await;
412 assert!(result.is_err());
413 }
414
415 #[test]
416 fn name_and_schema() {
417 let engine = Arc::new(Mutex::new(SopEngine::new(SopConfig::default())));
418 let tool = SopAdvanceTool::new(engine);
419 assert_eq!(tool.name(), "sop_advance");
420 let schema = tool.parameters_schema();
421 assert!(schema["properties"]["run_id"].is_object());
422 assert!(schema["properties"]["status"]["enum"].is_array());
423 }
424
425 #[tokio::test]
426 async fn advance_error_does_not_write_step_audit() {
427 let engine = Arc::new(Mutex::new(SopEngine::new(SopConfig::default())));
429 let tmp = tempfile::tempdir().unwrap();
430 let mem_cfg = zeroclaw_config::schema::MemoryConfig {
431 backend: "sqlite".into(),
432 ..zeroclaw_config::schema::MemoryConfig::default()
433 };
434 let memory: Arc<dyn Memory> =
435 Arc::from(zeroclaw_memory::create_memory(&mem_cfg, tmp.path(), None).unwrap());
436 let audit = Arc::new(SopAuditLogger::new(memory.clone()));
437
438 let tool = SopAdvanceTool::new(engine).with_audit(audit.clone());
439 let result = tool
440 .execute(json!({
441 "run_id": "nonexistent",
442 "status": "completed",
443 "output": "done"
444 }))
445 .await;
446 assert!(result.is_err());
448
449 let runs = audit.list_runs().await.unwrap();
451 assert!(
452 runs.is_empty(),
453 "no audit entries should exist after advance error"
454 );
455 }
456
457 #[tokio::test]
458 async fn advance_success_writes_step_audit() {
459 let (engine, run_id) = engine_with_active_run();
460 let tmp = tempfile::tempdir().unwrap();
461 let mem_cfg = zeroclaw_config::schema::MemoryConfig {
462 backend: "sqlite".into(),
463 ..zeroclaw_config::schema::MemoryConfig::default()
464 };
465 let memory: Arc<dyn Memory> =
466 Arc::from(zeroclaw_memory::create_memory(&mem_cfg, tmp.path(), None).unwrap());
467 let audit = Arc::new(SopAuditLogger::new(memory.clone()));
468
469 let tool = SopAdvanceTool::new(engine).with_audit(audit.clone());
470 let result = tool
471 .execute(json!({
472 "run_id": run_id,
473 "status": "completed",
474 "output": "Step 1 done"
475 }))
476 .await
477 .unwrap();
478 assert!(result.success);
479
480 let entries = memory
482 .list(
483 Some(&zeroclaw_memory::traits::MemoryCategory::Custom(
484 "sop".into(),
485 )),
486 None,
487 )
488 .await
489 .unwrap();
490 let step_keys: Vec<_> = entries
491 .iter()
492 .filter(|e| e.key.starts_with("sop_step_"))
493 .collect();
494 assert!(
495 !step_keys.is_empty(),
496 "step audit should be written on success"
497 );
498 }
499}