1use super::manifest::ToolManifest;
20use async_trait::async_trait;
21use serde_json::json;
22use std::path::PathBuf;
23use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
24use tokio::process::Command;
25use tokio::time::{Duration, timeout};
26use zeroclaw_api::attribution::ToolKind;
27use zeroclaw_api::tool::{Tool, ToolResult};
28use zeroclaw_api::tool_attribution;
29
30tool_attribution!(SubprocessTool, ToolKind::Plugin);
31
32const SUBPROCESS_TIMEOUT_SECS: u64 = 10;
34
35const PROCESS_EXIT_TIMEOUT_SECS: u64 = 5;
38
39pub struct SubprocessTool {
45 manifest: ToolManifest,
47 binary_path: PathBuf,
49}
50
51impl SubprocessTool {
52 pub fn new(manifest: ToolManifest, binary_path: PathBuf) -> Self {
54 Self {
55 manifest,
56 binary_path,
57 }
58 }
59
60 fn build_schema_properties(
62 &self,
63 ) -> (
64 serde_json::Map<String, serde_json::Value>,
65 Vec<serde_json::Value>,
66 ) {
67 let mut properties = serde_json::Map::new();
68 let mut required = Vec::new();
69
70 for param in &self.manifest.parameters {
71 let mut prop = json!({
72 "type": param.r#type,
73 "description": param.description,
74 });
75
76 if let Some(default) = ¶m.default {
77 prop["default"] = default.clone();
78 }
79
80 properties.insert(param.name.clone(), prop);
81
82 if param.required {
83 required.push(serde_json::Value::String(param.name.clone()));
84 }
85 }
86
87 (properties, required)
88 }
89}
90
91#[async_trait]
92impl Tool for SubprocessTool {
93 fn name(&self) -> &str {
94 &self.manifest.tool.name
95 }
96
97 fn description(&self) -> &str {
98 &self.manifest.tool.description
99 }
100
101 fn parameters_schema(&self) -> serde_json::Value {
103 let (properties, required) = self.build_schema_properties();
104 json!({
105 "type": "object",
106 "properties": properties,
107 "required": required,
108 })
109 }
110
111 async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
122 let args_json = serde_json::to_string(&args).map_err(|e| {
123 ::zeroclaw_log::record!(
124 ERROR,
125 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
126 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
127 .with_attrs(::serde_json::json!({
128 "plugin": self.manifest.tool.name,
129 "error": format!("{}", e),
130 })),
131 "subprocess plugin: failed to serialise tool args"
132 );
133 anyhow::Error::msg(format!("failed to serialise args: {e}"))
134 })?;
135
136 let mut child = Command::new(&self.binary_path)
138 .stdin(std::process::Stdio::piped())
139 .stdout(std::process::Stdio::piped())
140 .stderr(std::process::Stdio::piped())
141 .spawn()
142 .map_err(|e| {
143 ::zeroclaw_log::record!(
144 ERROR,
145 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
146 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
147 .with_attrs(::serde_json::json!({
148 "plugin": self.manifest.tool.name,
149 "binary_path": self.binary_path.display().to_string(),
150 "error": format!("{}", e),
151 })),
152 "subprocess plugin spawn failed"
153 );
154 anyhow::Error::msg(format!(
155 "failed to spawn plugin '{}' at {}: {e}",
156 self.manifest.tool.name,
157 self.binary_path.display()
158 ))
159 })?;
160
161 if let Some(mut stdin) = child.stdin.take() {
165 let write_result = async {
166 stdin.write_all(args_json.as_bytes()).await?;
167 stdin.write_all(b"\n").await?;
168 Ok::<(), std::io::Error>(())
169 }
170 .await;
171 if let Err(e) = write_result
172 && e.kind() != std::io::ErrorKind::BrokenPipe
173 {
174 let _ = child.kill().await;
175 ::zeroclaw_log::record!(
176 ERROR,
177 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
178 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
179 .with_attrs(::serde_json::json!({
180 "plugin": self.manifest.tool.name,
181 "error": format!("{}", e),
182 })),
183 "subprocess plugin: failed to write args to stdin"
184 );
185 anyhow::bail!(
186 "failed to write args to plugin '{}' stdin: {}",
187 self.manifest.tool.name,
188 e
189 );
190 }
191 }
193
194 let stdout_handle = child.stdout.take();
196 let stderr_handle = child.stderr.take();
197
198 let read_result = match stdout_handle {
200 None => {
201 let _ = child.kill().await;
203 return Ok(ToolResult {
204 success: false,
205 output: String::new(),
206 error: Some(format!(
207 "plugin '{}': could not attach stdout pipe",
208 self.manifest.tool.name
209 )),
210 });
211 }
212 Some(stdout) => {
213 let mut reader = BufReader::new(stdout);
214 let mut line = String::new();
215 timeout(
216 Duration::from_secs(SUBPROCESS_TIMEOUT_SECS),
217 reader.read_line(&mut line),
218 )
219 .await
220 .map(|inner| inner.map(|_| line))
221 }
222 };
223
224 match read_result {
225 Err(_elapsed) => {
229 let _ = child.kill().await;
230 let _ = child.wait().await;
231 let stderr_msg = collect_stderr(stderr_handle).await;
232 Ok(ToolResult {
233 success: false,
234 output: String::new(),
235 error: Some(format!(
236 "plugin '{}' timed out after {}s{}",
237 self.manifest.tool.name,
238 SUBPROCESS_TIMEOUT_SECS,
239 if stderr_msg.is_empty() {
240 String::new()
241 } else {
242 format!("; stderr: {}", stderr_msg)
243 }
244 )),
245 })
246 }
247
248 Ok(Err(io_err)) => {
250 let _ = child.kill().await;
251 let _ = child.wait().await;
252 let stderr_msg = collect_stderr(stderr_handle).await;
253 Ok(ToolResult {
254 success: false,
255 output: String::new(),
256 error: Some(format!(
257 "plugin '{}': I/O error reading stdout: {}{}",
258 self.manifest.tool.name,
259 io_err,
260 if stderr_msg.is_empty() {
261 String::new()
262 } else {
263 format!("; stderr: {}", stderr_msg)
264 }
265 )),
266 })
267 }
268
269 Ok(Ok(line)) => {
273 let child_status =
274 timeout(Duration::from_secs(PROCESS_EXIT_TIMEOUT_SECS), child.wait())
275 .await
276 .ok()
277 .and_then(|r| r.ok());
278 let stderr_msg = collect_stderr(stderr_handle).await;
279 let line = line.trim();
280
281 if line.is_empty() {
282 return Ok(ToolResult {
283 success: false,
284 output: String::new(),
285 error: Some(format!(
286 "plugin '{}': empty stdout{}",
287 self.manifest.tool.name,
288 if stderr_msg.is_empty() {
289 String::new()
290 } else {
291 format!("; stderr: {}", stderr_msg)
292 }
293 )),
294 });
295 }
296
297 match serde_json::from_str::<ToolResult>(line) {
298 Ok(result) => {
299 if let Some(status) = child_status
302 && !status.success()
303 {
304 return Ok(ToolResult {
305 success: false,
306 output: String::new(),
307 error: Some(format!(
308 "plugin '{}' exited with {}{}",
309 self.manifest.tool.name,
310 status,
311 if stderr_msg.is_empty() {
312 String::new()
313 } else {
314 format!("; stderr: {}", stderr_msg)
315 }
316 )),
317 });
318 }
319 Ok(result)
320 }
321 Err(parse_err) => Ok(ToolResult {
322 success: false,
323 output: String::new(),
324 error: Some(format!(
325 "plugin '{}': failed to parse output as ToolResult: {} (got: {:?})",
326 self.manifest.tool.name,
327 parse_err,
328 if line.chars().count() > 200 {
331 let truncated: String = line.chars().take(200).collect();
332 format!("{}...", truncated)
333 } else {
334 line.to_string()
335 }
336 )),
337 }),
338 }
339 }
340 }
341 }
342}
343
344async fn collect_stderr(handle: Option<tokio::process::ChildStderr>) -> String {
347 use tokio::io::AsyncReadExt;
348 let Some(mut stderr) = handle else {
349 return String::new();
350 };
351 let mut buf = vec![0u8; 512];
352 match stderr.read(&mut buf).await {
353 Ok(n) if n > 0 => String::from_utf8_lossy(&buf[..n]).trim().to_string(),
354 _ => String::new(),
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361 use crate::manifest::{ExecConfig, ParameterDef, ToolManifest, ToolMeta};
362
363 fn make_manifest(name: &str, params: Vec<ParameterDef>) -> ToolManifest {
364 ToolManifest {
365 tool: ToolMeta {
366 name: name.to_string(),
367 version: "1.0.0".to_string(),
368 description: format!("Test tool: {}", name),
369 },
370 exec: ExecConfig {
371 binary: "tool".to_string(),
372 },
373 transport: None,
374 parameters: params,
375 }
376 }
377
378 fn make_param(name: &str, ty: &str, required: bool) -> ParameterDef {
379 ParameterDef {
380 name: name.to_string(),
381 r#type: ty.to_string(),
382 description: format!("param {}", name),
383 required,
384 default: None,
385 }
386 }
387
388 #[test]
389 fn name_and_description_come_from_manifest() {
390 let m = make_manifest("gpio_test", vec![]);
391 let tool = SubprocessTool::new(m, PathBuf::from("/bin/true"));
392 assert_eq!(tool.name(), "gpio_test");
393 assert_eq!(tool.description(), "Test tool: gpio_test");
394 }
395
396 #[test]
397 fn schema_reflects_parameter_definitions() {
398 let params = vec![
399 make_param("device", "string", true),
400 make_param("pin", "integer", true),
401 make_param("value", "integer", false),
402 ];
403 let m = make_manifest("gpio_write", params);
404 let tool = SubprocessTool::new(m, PathBuf::from("/bin/true"));
405 let schema = tool.parameters_schema();
406
407 assert_eq!(schema["type"], "object");
408 assert_eq!(schema["properties"]["device"]["type"], "string");
409 assert_eq!(schema["properties"]["pin"]["type"], "integer");
410
411 let required = schema["required"].as_array().unwrap();
412 let req_names: Vec<&str> = required.iter().map(|v| v.as_str().unwrap()).collect();
413 assert!(req_names.contains(&"device"));
414 assert!(req_names.contains(&"pin"));
415 assert!(!req_names.contains(&"value"));
416 }
417
418 #[test]
419 fn schema_parameterless_tool_has_empty_required() {
420 let m = make_manifest("noop", vec![]);
421 let tool = SubprocessTool::new(m, PathBuf::from("/bin/true"));
422 let schema = tool.parameters_schema();
423 let required = schema["required"].as_array().unwrap();
424 assert!(required.is_empty());
425 }
426
427 #[tokio::test]
430 async fn execute_successful_subprocess() {
431 let result_json = r#"{"success":true,"output":"ok","error":null}"#;
434
435 let m = make_manifest("echo_tool", vec![]);
437
438 let script = format!("echo '{}'", result_json);
442 let binary = PathBuf::from("sh");
443 let dir = tempfile::tempdir().unwrap();
446 let script_path = dir.path().join("tool.sh");
447 std::fs::write(
448 &script_path,
449 format!("#!/bin/sh\ncat > /dev/null\necho '{}'\n", result_json),
450 )
451 .unwrap();
452 #[cfg(unix)]
453 {
454 use std::os::unix::fs::PermissionsExt;
455 std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)).unwrap();
456 }
457
458 let tool = SubprocessTool::new(m, script_path.clone());
459 let result = tool
460 .execute(serde_json::json!({}))
461 .await
462 .expect("execute should not return Err");
463
464 assert!(result.success, "expected success=true, got: {:?}", result);
465 assert_eq!(result.output, "ok");
466 assert!(result.error.is_none());
467
468 let _ = script;
469 let _ = binary;
470 }
471
472 #[tokio::test]
474 #[ignore = "slow: waits SUBPROCESS_TIMEOUT_SECS (~10 s) to elapse — run manually"]
475 async fn execute_timeout_kills_process_and_returns_error() {
476 let dir = tempfile::tempdir().unwrap();
479 let script_path = dir.path().join("tool.sh");
480 std::fs::write(&script_path, "#!/bin/sh\nsleep 60\n").unwrap();
481 #[cfg(unix)]
482 {
483 use std::os::unix::fs::PermissionsExt;
484 std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)).unwrap();
485 }
486
487 let m = make_manifest("sleep_tool", vec![]);
488 let tool = SubprocessTool::new(m, script_path);
489 let result = tool
490 .execute(serde_json::json!({}))
491 .await
492 .expect("should not propagate Err");
493
494 assert!(!result.success);
495 let err = result.error.unwrap();
496 assert!(
497 err.contains("timed out"),
498 "expected 'timed out' in error, got: {}",
499 err
500 );
501 }
502}