1use anyhow::Result;
8use std::time::{Duration, Instant};
9use tokio_util::sync::CancellationToken;
10
11use crate::approval::ApprovalManager;
12use crate::observability::{Observer, ObserverEvent};
13use crate::tools::Tool;
14
15use super::loop_::{ParsedToolCall, ToolLoopCancelled, scrub_credentials};
17
18pub fn find_tool<'a>(tools: &'a [Box<dyn Tool>], name: &str) -> Option<&'a dyn Tool> {
22 tools.iter().find(|t| t.name() == name).map(|t| t.as_ref())
23}
24
25pub struct ToolExecutionOutcome {
28 pub output: String,
29 pub success: bool,
30 pub error_reason: Option<String>,
31 pub duration: Duration,
32 pub receipt: Option<String>,
35}
36
37pub async fn execute_one_tool(
40 call_name: &str,
41 call_arguments: serde_json::Value,
42 tool_call_id: Option<&str>,
43 tools_registry: &[Box<dyn Tool>],
44 activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
45 observer: &dyn Observer,
46 cancellation_token: Option<&CancellationToken>,
47 receipt_generator: Option<&super::tool_receipts::ReceiptGenerator>,
48) -> Result<ToolExecutionOutcome> {
49 let full_args = call_arguments.to_string();
56 let tool_call_id_owned = tool_call_id.map(str::to_string);
57 observer.record_event(&ObserverEvent::ToolCallStart {
58 tool: call_name.to_string(),
59 tool_call_id: tool_call_id_owned.clone(),
60 arguments: Some(full_args.clone()),
61 channel: None,
62 agent_alias: None,
63 turn_id: None,
64 });
65 let start = Instant::now();
66
67 let static_tool = find_tool(tools_registry, call_name);
68 let activated_arc = if static_tool.is_none() {
69 activated_tools.and_then(|at| at.lock().unwrap().get_resolved(call_name))
70 } else {
71 None
72 };
73 let Some(tool) = static_tool.or(activated_arc.as_deref()) else {
74 let reason = format!("Unknown tool: {call_name}");
75 let duration = start.elapsed();
76 let scrubbed_reason = scrub_credentials(&reason);
77 observer.record_event(&ObserverEvent::ToolCall {
78 tool: call_name.to_string(),
79 tool_call_id: tool_call_id_owned.clone(),
80 duration,
81 success: false,
82 arguments: Some(full_args.clone()),
83 result: Some(scrubbed_reason.clone()),
84 channel: None,
85 agent_alias: None,
86 turn_id: None,
87 });
88 return Ok(ToolExecutionOutcome {
89 output: reason,
90 success: false,
91 error_reason: Some(scrubbed_reason),
92 duration,
93 receipt: None,
94 });
95 };
96
97 use ::zeroclaw_log::Instrument;
98 let tool_span = ::zeroclaw_log::info_span!(
99 target: "zeroclaw_log_internal_scope",
100 "zeroclaw_scope",
101 tool = %call_name,
102 );
103
104 let _start_guard = tool_span.clone().entered();
108 ::zeroclaw_log::record!(
109 DEBUG,
110 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Invoke)
111 .with_category(::zeroclaw_log::EventCategory::Tool)
112 .with_attrs(::serde_json::json!({
113 "tool": call_name,
114 "tool_call_id": tool_call_id,
115 "input": call_arguments,
116 })),
117 format!("tool call: {call_name}")
118 );
119 drop(_start_guard);
120
121 let tool_future = tool
122 .execute(call_arguments.clone())
123 .instrument(tool_span.clone());
124 let tool_result = if let Some(token) = cancellation_token {
125 tokio::select! {
126 () = token.cancelled() => return Err(ToolLoopCancelled.into()),
127 result = tool_future => result,
128 }
129 } else {
130 tool_future.await
131 };
132
133 let _result_guard = tool_span.entered();
134 match tool_result {
135 Ok(r) => {
136 let duration = start.elapsed();
137 if r.success {
138 ::zeroclaw_log::record!(
139 DEBUG,
140 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
141 .with_category(::zeroclaw_log::EventCategory::Tool)
142 .with_outcome(::zeroclaw_log::EventOutcome::Success)
143 .with_duration(duration.as_millis() as u64)
144 .with_attrs(::serde_json::json!({
145 "tool": call_name,
146 "tool_call_id": tool_call_id,
147 "input": call_arguments,
148 "output": r.output,
149 })),
150 format!("tool result: {call_name}")
151 );
152 } else {
153 ::zeroclaw_log::record!(
154 WARN,
155 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
156 .with_category(::zeroclaw_log::EventCategory::Tool)
157 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
158 .with_duration(duration.as_millis() as u64)
159 .with_attrs(::serde_json::json!({
160 "tool": call_name,
161 "tool_call_id": tool_call_id,
162 "input": call_arguments,
163 "error": r.error.clone().unwrap_or_default(),
164 "output": r.output,
165 })),
166 format!("tool failed: {call_name}")
167 );
168 }
169 if r.success {
170 let normalized_output = if r.output.is_empty() {
171 "(no output)"
172 } else {
173 &r.output
174 };
175 let output = scrub_credentials(normalized_output);
176 let receipt = receipt_generator.map(|receipt_gen| {
177 receipt_gen.generate_now(call_name, &call_arguments, &output)
178 });
179 observer.record_event(&ObserverEvent::ToolCall {
180 tool: call_name.to_string(),
181 tool_call_id: tool_call_id_owned.clone(),
182 duration,
183 success: true,
184 arguments: Some(full_args.clone()),
185 result: Some(output.clone()),
186 channel: None,
187 agent_alias: None,
188 turn_id: None,
189 });
190 Ok(ToolExecutionOutcome {
191 output,
192 success: true,
193 error_reason: None,
194 duration,
195 receipt,
196 })
197 } else {
198 let reason = r.error.unwrap_or(r.output);
199 let scrubbed_reason = scrub_credentials(&reason);
200 observer.record_event(&ObserverEvent::ToolCall {
201 tool: call_name.to_string(),
202 tool_call_id: tool_call_id_owned.clone(),
203 duration,
204 success: false,
205 arguments: Some(full_args.clone()),
206 result: Some(scrubbed_reason.clone()),
207 channel: None,
208 agent_alias: None,
209 turn_id: None,
210 });
211 Ok(ToolExecutionOutcome {
212 output: format!("Error: {reason}"),
213 success: false,
214 error_reason: Some(scrubbed_reason),
215 duration,
216 receipt: None,
217 })
218 }
219 }
220 Err(e) => {
221 let duration = start.elapsed();
222 ::zeroclaw_log::record!(
223 ERROR,
224 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
225 .with_category(::zeroclaw_log::EventCategory::Tool)
226 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
227 .with_duration(duration.as_millis() as u64)
228 .with_attrs(::serde_json::json!({
229 "tool": call_name,
230 "tool_call_id": tool_call_id,
231 "input": call_arguments,
232 "error": format!("{e:?}"),
233 })),
234 format!("tool error: {call_name}")
235 );
236 let reason = format!("Error executing {call_name}: {e}");
237 let scrubbed_reason = scrub_credentials(&reason);
238 observer.record_event(&ObserverEvent::ToolCall {
239 tool: call_name.to_string(),
240 tool_call_id: tool_call_id_owned.clone(),
241 duration,
242 success: false,
243 arguments: Some(full_args.clone()),
244 result: Some(scrubbed_reason.clone()),
245 channel: None,
246 agent_alias: None,
247 turn_id: None,
248 });
249 Ok(ToolExecutionOutcome {
250 output: reason,
251 success: false,
252 error_reason: Some(scrubbed_reason),
253 duration,
254 receipt: None,
255 })
256 }
257 }
258}
259
260pub fn should_execute_tools_in_parallel(
263 tool_calls: &[ParsedToolCall],
264 approval: Option<&ApprovalManager>,
265) -> bool {
266 if tool_calls.len() <= 1 {
267 return false;
268 }
269
270 if tool_calls.iter().any(|call| call.name == "tool_search") {
275 return false;
276 }
277
278 if let Some(mgr) = approval
279 && tool_calls.iter().any(|call| mgr.needs_approval(&call.name))
280 {
281 return false;
284 }
285
286 true
287}
288
289pub async fn execute_tools_parallel(
292 tool_calls: &[ParsedToolCall],
293 tools_registry: &[Box<dyn Tool>],
294 activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
295 observer: &dyn Observer,
296 cancellation_token: Option<&CancellationToken>,
297 receipt_generator: Option<&super::tool_receipts::ReceiptGenerator>,
298) -> Result<Vec<ToolExecutionOutcome>> {
299 let futures: Vec<_> = tool_calls
300 .iter()
301 .map(|call| {
302 execute_one_tool(
303 &call.name,
304 call.arguments.clone(),
305 call.tool_call_id.as_deref(),
306 tools_registry,
307 activated_tools,
308 observer,
309 cancellation_token,
310 receipt_generator,
311 )
312 })
313 .collect();
314
315 let results = futures_util::future::join_all(futures).await;
316 results.into_iter().collect()
317}
318
319pub async fn execute_tools_sequential(
322 tool_calls: &[ParsedToolCall],
323 tools_registry: &[Box<dyn Tool>],
324 activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
325 observer: &dyn Observer,
326 cancellation_token: Option<&CancellationToken>,
327 receipt_generator: Option<&super::tool_receipts::ReceiptGenerator>,
328) -> Result<Vec<ToolExecutionOutcome>> {
329 let mut outcomes = Vec::with_capacity(tool_calls.len());
330
331 for call in tool_calls {
332 outcomes.push(
333 execute_one_tool(
334 &call.name,
335 call.arguments.clone(),
336 call.tool_call_id.as_deref(),
337 tools_registry,
338 activated_tools,
339 observer,
340 cancellation_token,
341 receipt_generator,
342 )
343 .await?,
344 );
345 }
346
347 Ok(outcomes)
348}