Skip to main content

zeroclaw_hardware/
subprocess.rs

1//! SubprocessTool — wraps any external binary as a [`Tool`].
2//!
3//! Plugins do not need to be written in Rust. Any executable that follows the
4//! ZeroClaw subprocess protocol is a valid tool:
5//!
6//! **Protocol (stdin/stdout, one line each):**
7//! ```text
8//! Host → binary stdin:  {"device":"pico0","pin":5}\n
9//! Binary → stdout:      {"success":true,"output":"done","error":null}\n
10//! ```
11//!
12//! Error protocol:
13//! - **Timeout (10 s)** — process is killed; `ToolResult::error` contains timeout message.
14//! - **Non-zero exit** — process is killed; `ToolResult::error` contains stderr.
15//! - **Empty / unparseable stdout** — `ToolResult::error` describes the failure.
16//!
17//! The schema advertised to the LLM is auto-generated from [`ToolManifest::parameters`].
18
19use super::manifest::ToolManifest;
20use async_trait::async_trait;
21use serde_json::json;
22use std::path::PathBuf;
23use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
24use tokio::process::Command;
25use tokio::time::{Duration, timeout};
26use zeroclaw_api::attribution::ToolKind;
27use zeroclaw_api::tool::{Tool, ToolResult};
28use zeroclaw_api::tool_attribution;
29
30tool_attribution!(SubprocessTool, ToolKind::Plugin);
31
32/// Subprocess timeout — kill the child process after this many seconds.
33const SUBPROCESS_TIMEOUT_SECS: u64 = 10;
34
35/// Timeout for waiting on child process exit after stdout has been read.
36/// Prevents a hung cleanup phase from blocking indefinitely.
37const PROCESS_EXIT_TIMEOUT_SECS: u64 = 5;
38
39/// A tool backed by an external subprocess.
40///
41/// The binary receives the LLM-supplied JSON arguments on stdin (one line,
42/// `\n`-terminated) and must write a single `ToolResult`-compatible JSON
43/// object to stdout before exiting.
44pub struct SubprocessTool {
45    /// Parsed plugin manifest (tool metadata + parameter definitions).
46    manifest: ToolManifest,
47    /// Resolved absolute path to the entry-point binary.
48    binary_path: PathBuf,
49}
50
51impl SubprocessTool {
52    /// Create a new `SubprocessTool` from a manifest and resolved binary path.
53    pub fn new(manifest: ToolManifest, binary_path: PathBuf) -> Self {
54        Self {
55            manifest,
56            binary_path,
57        }
58    }
59
60    /// Build JSON Schema `properties` and `required` arrays from the manifest.
61    fn build_schema_properties(
62        &self,
63    ) -> (
64        serde_json::Map<String, serde_json::Value>,
65        Vec<serde_json::Value>,
66    ) {
67        let mut properties = serde_json::Map::new();
68        let mut required = Vec::new();
69
70        for param in &self.manifest.parameters {
71            let mut prop = json!({
72                "type": param.r#type,
73                "description": param.description,
74            });
75
76            if let Some(default) = &param.default {
77                prop["default"] = default.clone();
78            }
79
80            properties.insert(param.name.clone(), prop);
81
82            if param.required {
83                required.push(serde_json::Value::String(param.name.clone()));
84            }
85        }
86
87        (properties, required)
88    }
89}
90
91#[async_trait]
92impl Tool for SubprocessTool {
93    fn name(&self) -> &str {
94        &self.manifest.tool.name
95    }
96
97    fn description(&self) -> &str {
98        &self.manifest.tool.description
99    }
100
101    /// JSON Schema Draft 7 — auto-generated from `manifest.parameters`.
102    fn parameters_schema(&self) -> serde_json::Value {
103        let (properties, required) = self.build_schema_properties();
104        json!({
105            "type": "object",
106            "properties": properties,
107            "required": required,
108        })
109    }
110
111    /// Spawn the binary, write args to stdin, read `ToolResult` from stdout.
112    ///
113    /// Steps:
114    /// 1. Serialize `args` to a JSON string.
115    /// 2. Spawn `binary_path` with piped stdin/stdout/stderr.
116    /// 3. Write `<json>\n` to child stdin; close stdin (signal EOF).
117    /// 4. Read one line from child stdout (10 s timeout).
118    /// 5. Kill the child process.
119    /// 6. Deserialize the line to `ToolResult`.
120    /// 7. On timeout → return error `ToolResult`; on empty/bad output → error.
121    async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
122        let args_json = serde_json::to_string(&args).map_err(|e| {
123            ::zeroclaw_log::record!(
124                ERROR,
125                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
126                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
127                    .with_attrs(::serde_json::json!({
128                        "plugin": self.manifest.tool.name,
129                        "error": format!("{}", e),
130                    })),
131                "subprocess plugin: failed to serialise tool args"
132            );
133            anyhow::Error::msg(format!("failed to serialise args: {e}"))
134        })?;
135
136        // Spawn child process.
137        let mut child = Command::new(&self.binary_path)
138            .stdin(std::process::Stdio::piped())
139            .stdout(std::process::Stdio::piped())
140            .stderr(std::process::Stdio::piped())
141            .spawn()
142            .map_err(|e| {
143                ::zeroclaw_log::record!(
144                    ERROR,
145                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
146                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
147                        .with_attrs(::serde_json::json!({
148                            "plugin": self.manifest.tool.name,
149                            "binary_path": self.binary_path.display().to_string(),
150                            "error": format!("{}", e),
151                        })),
152                    "subprocess plugin spawn failed"
153                );
154                anyhow::Error::msg(format!(
155                    "failed to spawn plugin '{}' at {}: {e}",
156                    self.manifest.tool.name,
157                    self.binary_path.display()
158                ))
159            })?;
160
161        // Write JSON args + newline to stdin, then drop stdin to signal EOF.
162        // BrokenPipe is tolerated — the child may exit before reading stdin
163        // (e.g. tools that only use command-line args or produce fixed output).
164        if let Some(mut stdin) = child.stdin.take() {
165            let write_result = async {
166                stdin.write_all(args_json.as_bytes()).await?;
167                stdin.write_all(b"\n").await?;
168                Ok::<(), std::io::Error>(())
169            }
170            .await;
171            if let Err(e) = write_result
172                && e.kind() != std::io::ErrorKind::BrokenPipe
173            {
174                let _ = child.kill().await;
175                ::zeroclaw_log::record!(
176                    ERROR,
177                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
178                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
179                        .with_attrs(::serde_json::json!({
180                            "plugin": self.manifest.tool.name,
181                            "error": format!("{}", e),
182                        })),
183                    "subprocess plugin: failed to write args to stdin"
184                );
185                anyhow::bail!(
186                    "failed to write args to plugin '{}' stdin: {}",
187                    self.manifest.tool.name,
188                    e
189                );
190            }
191            // stdin dropped here → child receives EOF
192        }
193
194        // Take stdout and stderr handles before we move `child`.
195        let stdout_handle = child.stdout.take();
196        let stderr_handle = child.stderr.take();
197
198        // Read one line from stdout with a hard timeout.
199        let read_result = match stdout_handle {
200            None => {
201                // No stdout — kill and error.
202                let _ = child.kill().await;
203                return Ok(ToolResult {
204                    success: false,
205                    output: String::new(),
206                    error: Some(format!(
207                        "plugin '{}': could not attach stdout pipe",
208                        self.manifest.tool.name
209                    )),
210                });
211            }
212            Some(stdout) => {
213                let mut reader = BufReader::new(stdout);
214                let mut line = String::new();
215                timeout(
216                    Duration::from_secs(SUBPROCESS_TIMEOUT_SECS),
217                    reader.read_line(&mut line),
218                )
219                .await
220                .map(|inner| inner.map(|_| line))
221            }
222        };
223
224        match read_result {
225            // ── Timeout ────────────────────────────────────────────────────
226            // The read deadline elapsed — force-kill the plugin and collect
227            // any stderr it emitted before dying.
228            Err(_elapsed) => {
229                let _ = child.kill().await;
230                let _ = child.wait().await;
231                let stderr_msg = collect_stderr(stderr_handle).await;
232                Ok(ToolResult {
233                    success: false,
234                    output: String::new(),
235                    error: Some(format!(
236                        "plugin '{}' timed out after {}s{}",
237                        self.manifest.tool.name,
238                        SUBPROCESS_TIMEOUT_SECS,
239                        if stderr_msg.is_empty() {
240                            String::new()
241                        } else {
242                            format!("; stderr: {}", stderr_msg)
243                        }
244                    )),
245                })
246            }
247
248            // ── I/O error reading stdout ───────────────────────────────────
249            Ok(Err(io_err)) => {
250                let _ = child.kill().await;
251                let _ = child.wait().await;
252                let stderr_msg = collect_stderr(stderr_handle).await;
253                Ok(ToolResult {
254                    success: false,
255                    output: String::new(),
256                    error: Some(format!(
257                        "plugin '{}': I/O error reading stdout: {}{}",
258                        self.manifest.tool.name,
259                        io_err,
260                        if stderr_msg.is_empty() {
261                            String::new()
262                        } else {
263                            format!("; stderr: {}", stderr_msg)
264                        }
265                    )),
266                })
267            }
268
269            // ── Got a line ────────────────────────────────────────────────
270            // Let the process finish naturally — plugins that write their
271            // result and then do cleanup should not be interrupted.
272            Ok(Ok(line)) => {
273                let child_status =
274                    timeout(Duration::from_secs(PROCESS_EXIT_TIMEOUT_SECS), child.wait())
275                        .await
276                        .ok()
277                        .and_then(|r| r.ok());
278                let stderr_msg = collect_stderr(stderr_handle).await;
279                let line = line.trim();
280
281                if line.is_empty() {
282                    return Ok(ToolResult {
283                        success: false,
284                        output: String::new(),
285                        error: Some(format!(
286                            "plugin '{}': empty stdout{}",
287                            self.manifest.tool.name,
288                            if stderr_msg.is_empty() {
289                                String::new()
290                            } else {
291                                format!("; stderr: {}", stderr_msg)
292                            }
293                        )),
294                    });
295                }
296
297                match serde_json::from_str::<ToolResult>(line) {
298                    Ok(result) => {
299                        // Non-zero exit overrides a parsed result: the plugin
300                        // signalled failure even if it wrote a success line.
301                        if let Some(status) = child_status
302                            && !status.success()
303                        {
304                            return Ok(ToolResult {
305                                success: false,
306                                output: String::new(),
307                                error: Some(format!(
308                                    "plugin '{}' exited with {}{}",
309                                    self.manifest.tool.name,
310                                    status,
311                                    if stderr_msg.is_empty() {
312                                        String::new()
313                                    } else {
314                                        format!("; stderr: {}", stderr_msg)
315                                    }
316                                )),
317                            });
318                        }
319                        Ok(result)
320                    }
321                    Err(parse_err) => Ok(ToolResult {
322                        success: false,
323                        output: String::new(),
324                        error: Some(format!(
325                            "plugin '{}': failed to parse output as ToolResult: {} (got: {:?})",
326                            self.manifest.tool.name,
327                            parse_err,
328                            // Truncate oversized output in the error message.
329                            // Use char-based truncation to avoid panic on multi-byte UTF-8.
330                            if line.chars().count() > 200 {
331                                let truncated: String = line.chars().take(200).collect();
332                                format!("{}...", truncated)
333                            } else {
334                                line.to_string()
335                            }
336                        )),
337                    }),
338                }
339            }
340        }
341    }
342}
343
344/// Collect up to 512 bytes from an optional stderr handle.
345/// Used to enrich error messages when a plugin writes nothing to stdout.
346async fn collect_stderr(handle: Option<tokio::process::ChildStderr>) -> String {
347    use tokio::io::AsyncReadExt;
348    let Some(mut stderr) = handle else {
349        return String::new();
350    };
351    let mut buf = vec![0u8; 512];
352    match stderr.read(&mut buf).await {
353        Ok(n) if n > 0 => String::from_utf8_lossy(&buf[..n]).trim().to_string(),
354        _ => String::new(),
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    use crate::manifest::{ExecConfig, ParameterDef, ToolManifest, ToolMeta};
362
363    fn make_manifest(name: &str, params: Vec<ParameterDef>) -> ToolManifest {
364        ToolManifest {
365            tool: ToolMeta {
366                name: name.to_string(),
367                version: "1.0.0".to_string(),
368                description: format!("Test tool: {}", name),
369            },
370            exec: ExecConfig {
371                binary: "tool".to_string(),
372            },
373            transport: None,
374            parameters: params,
375        }
376    }
377
378    fn make_param(name: &str, ty: &str, required: bool) -> ParameterDef {
379        ParameterDef {
380            name: name.to_string(),
381            r#type: ty.to_string(),
382            description: format!("param {}", name),
383            required,
384            default: None,
385        }
386    }
387
388    #[test]
389    fn name_and_description_come_from_manifest() {
390        let m = make_manifest("gpio_test", vec![]);
391        let tool = SubprocessTool::new(m, PathBuf::from("/bin/true"));
392        assert_eq!(tool.name(), "gpio_test");
393        assert_eq!(tool.description(), "Test tool: gpio_test");
394    }
395
396    #[test]
397    fn schema_reflects_parameter_definitions() {
398        let params = vec![
399            make_param("device", "string", true),
400            make_param("pin", "integer", true),
401            make_param("value", "integer", false),
402        ];
403        let m = make_manifest("gpio_write", params);
404        let tool = SubprocessTool::new(m, PathBuf::from("/bin/true"));
405        let schema = tool.parameters_schema();
406
407        assert_eq!(schema["type"], "object");
408        assert_eq!(schema["properties"]["device"]["type"], "string");
409        assert_eq!(schema["properties"]["pin"]["type"], "integer");
410
411        let required = schema["required"].as_array().unwrap();
412        let req_names: Vec<&str> = required.iter().map(|v| v.as_str().unwrap()).collect();
413        assert!(req_names.contains(&"device"));
414        assert!(req_names.contains(&"pin"));
415        assert!(!req_names.contains(&"value"));
416    }
417
418    #[test]
419    fn schema_parameterless_tool_has_empty_required() {
420        let m = make_manifest("noop", vec![]);
421        let tool = SubprocessTool::new(m, PathBuf::from("/bin/true"));
422        let schema = tool.parameters_schema();
423        let required = schema["required"].as_array().unwrap();
424        assert!(required.is_empty());
425    }
426
427    /// Verify that a binary which exits 0 with valid ToolResult JSON on stdout
428    /// is deserialised correctly.
429    #[tokio::test]
430    async fn execute_successful_subprocess() {
431        // Use `echo` to emit a valid ToolResult on stdout.
432        // `echo` prints its argument + newline and exits 0.
433        let result_json = r#"{"success":true,"output":"ok","error":null}"#;
434
435        // Build a manifest pointing at `echo`.
436        let m = make_manifest("echo_tool", vec![]);
437
438        // Construct an `echo` invocation as the binary with the JSON pre-set.
439        // We use `sh -c 'echo <json>'` because the SubprocessTool feeds the
440        // manifest binary with args on stdin — echo just ignores stdin.
441        let script = format!("echo '{}'", result_json);
442        let binary = PathBuf::from("sh");
443        // Override binary to `sh` and pass `-c` + script via a wrapper.
444        // Simpler: write a temp script.
445        let dir = tempfile::tempdir().unwrap();
446        let script_path = dir.path().join("tool.sh");
447        std::fs::write(
448            &script_path,
449            format!("#!/bin/sh\ncat > /dev/null\necho '{}'\n", result_json),
450        )
451        .unwrap();
452        #[cfg(unix)]
453        {
454            use std::os::unix::fs::PermissionsExt;
455            std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)).unwrap();
456        }
457
458        let tool = SubprocessTool::new(m, script_path.clone());
459        let result = tool
460            .execute(serde_json::json!({}))
461            .await
462            .expect("execute should not return Err");
463
464        assert!(result.success, "expected success=true, got: {:?}", result);
465        assert_eq!(result.output, "ok");
466        assert!(result.error.is_none());
467
468        let _ = script;
469        let _ = binary;
470    }
471
472    /// A binary that hangs forever should be killed and return a timeout error.
473    #[tokio::test]
474    #[ignore = "slow: waits SUBPROCESS_TIMEOUT_SECS (~10 s) to elapse — run manually"]
475    async fn execute_timeout_kills_process_and_returns_error() {
476        // Script sleeps forever — SubprocessTool should kill it and return a
477        // "timed out" error once SUBPROCESS_TIMEOUT_SECS elapses.
478        let dir = tempfile::tempdir().unwrap();
479        let script_path = dir.path().join("tool.sh");
480        std::fs::write(&script_path, "#!/bin/sh\nsleep 60\n").unwrap();
481        #[cfg(unix)]
482        {
483            use std::os::unix::fs::PermissionsExt;
484            std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)).unwrap();
485        }
486
487        let m = make_manifest("sleep_tool", vec![]);
488        let tool = SubprocessTool::new(m, script_path);
489        let result = tool
490            .execute(serde_json::json!({}))
491            .await
492            .expect("should not propagate Err");
493
494        assert!(!result.success);
495        let err = result.error.unwrap();
496        assert!(
497            err.contains("timed out"),
498            "expected 'timed out' in error, got: {}",
499            err
500        );
501    }
502}