1use super::loop_;
2use crate::agent::dispatcher::{
3 NativeToolDispatcher, ParsedToolCall, ToolDispatcher, ToolExecutionResult, XmlToolDispatcher,
4};
5use crate::agent::eval::AutoClassifyExt;
6use crate::agent::memory_loader::{DefaultMemoryLoader, MemoryLoader};
7use crate::agent::prompt::{PromptContext, SystemPromptBuilder};
8use crate::approval::{ApprovalManager, ApprovalRequest, ApprovalRequirement, ApprovalResponse};
9use crate::observability::{self, Observer, ObserverEvent};
10use crate::platform;
11use crate::security::SecurityPolicy;
12use crate::tools::{self, Tool, ToolSpec};
13use anyhow::{Context, Result};
14use chrono::{Datelike, Timelike};
15use std::collections::{HashMap, VecDeque};
16use std::io::Write as IoWrite;
17use std::path::Path;
18use std::sync::Arc;
19use std::time::Instant;
20use zeroclaw_config::schema::Config;
21use zeroclaw_memory::{self, Memory, MemoryCategory};
22use zeroclaw_providers::{self, ChatMessage, ChatRequest, ConversationMessage, ModelProvider};
23use zeroclaw_tool_call_parser::strip_think_tags;
24
25pub use zeroclaw_api::agent::TurnEvent;
27
28use crate::channel_targets::build_channel_targets;
29
30pub struct Agent {
31 model_provider: Box<dyn ModelProvider>,
32 tools: Vec<Box<dyn Tool>>,
33 tool_specs: Vec<ToolSpec>,
34 memory: Arc<dyn Memory>,
35 observer: Arc<dyn Observer>,
36 prompt_builder: SystemPromptBuilder,
37 tool_dispatcher: Box<dyn ToolDispatcher>,
38 memory_loader: Box<dyn MemoryLoader>,
39 config: zeroclaw_config::schema::AliasedAgentConfig,
40 multimodal_config: zeroclaw_config::schema::MultimodalConfig,
41 model_name: String,
42 model_provider_name: String,
43 temperature: f64,
44 workspace_dir: std::path::PathBuf,
45 agent_workspace_dir: std::path::PathBuf,
50 identity_config: zeroclaw_config::schema::IdentityConfig,
51 skills: Vec<crate::skills::Skill>,
52 skills_prompt_mode: zeroclaw_config::schema::SkillsPromptInjectionMode,
53 auto_save: bool,
54 memory_session_id: Option<String>,
55 history: Vec<ConversationMessage>,
56 classification_config: zeroclaw_config::schema::QueryClassificationConfig,
57 available_hints: Vec<String>,
58 route_model_by_hint: HashMap<String, String>,
59 #[allow(dead_code)] allowed_tools: Option<Vec<String>>,
61 response_cache: Option<Arc<zeroclaw_memory::response_cache::ResponseCache>>,
62 security_summary: Option<String>,
65 channel_targets: Option<String>,
68 autonomy_level: crate::security::AutonomyLevel,
70 activated_tools: Option<Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
74 hook_runner: Option<Arc<crate::hooks::HookRunner>>,
77 approval_manager: Option<Arc<ApprovalManager>>,
79 channel_handles: AgentChannelHandles,
85}
86
87#[derive(Debug)]
88pub struct StreamedTurnSuccess {
89 pub response: String,
90 pub new_messages: Vec<ConversationMessage>,
91}
92
93#[derive(Debug)]
94pub struct StreamedTurnError {
95 pub error: anyhow::Error,
96 pub committed_response: String,
97 pub new_messages: Vec<ConversationMessage>,
98}
99
100#[derive(Clone, Default)]
103pub struct AgentChannelHandles {
104 pub ask_user: Option<tools::PerToolChannelHandle>,
105 pub reaction: tools::PerToolChannelHandle,
106 pub poll: Option<tools::PerToolChannelHandle>,
107 pub escalate: Option<tools::PerToolChannelHandle>,
108 pub channel_send: Option<tools::PerToolChannelHandle>,
109}
110
111impl AgentChannelHandles {
112 fn populated_handles(&self) -> Vec<Option<&tools::PerToolChannelHandle>> {
114 vec![
115 self.ask_user.as_ref(),
116 Some(&self.reaction),
117 self.poll.as_ref(),
118 self.escalate.as_ref(),
119 self.channel_send.as_ref(),
120 ]
121 }
122
123 pub fn register_channel(
126 &self,
127 name: impl Into<String>,
128 channel: Arc<dyn zeroclaw_api::channel::Channel>,
129 ) {
130 let name = name.into();
131 for handle in self.populated_handles().into_iter().flatten() {
132 handle.write().insert(name.clone(), Arc::clone(&channel));
133 }
134 }
135
136 pub fn unregister_channel(&self, name: &str) {
138 for handle in self.populated_handles().into_iter().flatten() {
139 handle.write().remove(name);
140 }
141 }
142
143 pub fn get_channel(&self, name: &str) -> Option<Arc<dyn zeroclaw_api::channel::Channel>> {
145 for handle in self.populated_handles().into_iter().flatten() {
146 if let Some(channel) = handle.read().get(name) {
147 return Some(Arc::clone(channel));
148 }
149 }
150 None
151 }
152}
153
154pub struct AgentBuilder {
155 model_provider: Option<Box<dyn ModelProvider>>,
156 tools: Option<Vec<Box<dyn Tool>>>,
157 memory: Option<Arc<dyn Memory>>,
158 observer: Option<Arc<dyn Observer>>,
159 prompt_builder: Option<SystemPromptBuilder>,
160 tool_dispatcher: Option<Box<dyn ToolDispatcher>>,
161 memory_loader: Option<Box<dyn MemoryLoader>>,
162 config: Option<zeroclaw_config::schema::AliasedAgentConfig>,
163 multimodal_config: Option<zeroclaw_config::schema::MultimodalConfig>,
164 model_name: Option<String>,
165 model_provider_name: Option<String>,
166 temperature: Option<f64>,
167 workspace_dir: Option<std::path::PathBuf>,
168 agent_workspace_dir: Option<std::path::PathBuf>,
169 identity_config: Option<zeroclaw_config::schema::IdentityConfig>,
170 skills: Option<Vec<crate::skills::Skill>>,
171 skills_prompt_mode: Option<zeroclaw_config::schema::SkillsPromptInjectionMode>,
172 auto_save: Option<bool>,
173 memory_session_id: Option<String>,
174 classification_config: Option<zeroclaw_config::schema::QueryClassificationConfig>,
175 available_hints: Option<Vec<String>>,
176 route_model_by_hint: Option<HashMap<String, String>>,
177 allowed_tools: Option<Vec<String>>,
178 response_cache: Option<Arc<zeroclaw_memory::response_cache::ResponseCache>>,
179 security_summary: Option<String>,
180 autonomy_level: Option<crate::security::AutonomyLevel>,
181 activated_tools: Option<Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
182 hook_runner: Option<Arc<crate::hooks::HookRunner>>,
183 approval_manager: Option<Arc<ApprovalManager>>,
184}
185
186impl Default for AgentBuilder {
187 fn default() -> Self {
188 Self::new()
189 }
190}
191
192impl AgentBuilder {
193 pub fn new() -> Self {
194 Self {
195 model_provider: None,
196 tools: None,
197 memory: None,
198 observer: None,
199 prompt_builder: None,
200 tool_dispatcher: None,
201 memory_loader: None,
202 config: None,
203 multimodal_config: None,
204 model_name: None,
205 model_provider_name: None,
206 temperature: None,
207 workspace_dir: None,
208 agent_workspace_dir: None,
209 identity_config: None,
210 skills: None,
211 skills_prompt_mode: None,
212 auto_save: None,
213 memory_session_id: None,
214 classification_config: None,
215 available_hints: None,
216 route_model_by_hint: None,
217 allowed_tools: None,
218 response_cache: None,
219 security_summary: None,
220 autonomy_level: None,
221 activated_tools: None,
222 hook_runner: None,
223 approval_manager: None,
224 }
225 }
226
227 pub fn model_provider(mut self, model_provider: Box<dyn ModelProvider>) -> Self {
228 self.model_provider = Some(model_provider);
229 self
230 }
231
232 pub fn tools(mut self, tools: Vec<Box<dyn Tool>>) -> Self {
233 self.tools = Some(tools);
234 self
235 }
236
237 pub fn memory(mut self, memory: Arc<dyn Memory>) -> Self {
238 self.memory = Some(memory);
239 self
240 }
241
242 pub fn observer(mut self, observer: Arc<dyn Observer>) -> Self {
243 self.observer = Some(observer);
244 self
245 }
246
247 pub fn prompt_builder(mut self, prompt_builder: SystemPromptBuilder) -> Self {
248 self.prompt_builder = Some(prompt_builder);
249 self
250 }
251
252 pub fn tool_dispatcher(mut self, tool_dispatcher: Box<dyn ToolDispatcher>) -> Self {
253 self.tool_dispatcher = Some(tool_dispatcher);
254 self
255 }
256
257 pub fn memory_loader(mut self, memory_loader: Box<dyn MemoryLoader>) -> Self {
258 self.memory_loader = Some(memory_loader);
259 self
260 }
261
262 pub fn config(mut self, config: zeroclaw_config::schema::AliasedAgentConfig) -> Self {
263 self.config = Some(config);
264 self
265 }
266
267 pub fn multimodal_config(
268 mut self,
269 multimodal_config: zeroclaw_config::schema::MultimodalConfig,
270 ) -> Self {
271 self.multimodal_config = Some(multimodal_config);
272 self
273 }
274
275 pub fn model_name(mut self, model_name: String) -> Self {
276 self.model_name = Some(model_name);
277 self
278 }
279
280 pub fn model_provider_name(mut self, name: String) -> Self {
281 self.model_provider_name = Some(name);
282 self
283 }
284
285 pub fn temperature(mut self, temperature: f64) -> Self {
286 self.temperature = Some(temperature);
287 self
288 }
289
290 pub fn workspace_dir(mut self, workspace_dir: std::path::PathBuf) -> Self {
291 self.workspace_dir = Some(workspace_dir);
292 self
293 }
294
295 pub fn agent_workspace_dir(mut self, agent_workspace_dir: std::path::PathBuf) -> Self {
296 self.agent_workspace_dir = Some(agent_workspace_dir);
297 self
298 }
299
300 pub fn identity_config(
301 mut self,
302 identity_config: zeroclaw_config::schema::IdentityConfig,
303 ) -> Self {
304 self.identity_config = Some(identity_config);
305 self
306 }
307
308 pub fn skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
309 self.skills = Some(skills);
310 self
311 }
312
313 pub fn skills_prompt_mode(
314 mut self,
315 skills_prompt_mode: zeroclaw_config::schema::SkillsPromptInjectionMode,
316 ) -> Self {
317 self.skills_prompt_mode = Some(skills_prompt_mode);
318 self
319 }
320
321 pub fn auto_save(mut self, auto_save: bool) -> Self {
322 self.auto_save = Some(auto_save);
323 self
324 }
325
326 pub fn memory_session_id(mut self, memory_session_id: Option<String>) -> Self {
327 self.memory_session_id = memory_session_id;
328 self
329 }
330
331 pub fn classification_config(
332 mut self,
333 classification_config: zeroclaw_config::schema::QueryClassificationConfig,
334 ) -> Self {
335 self.classification_config = Some(classification_config);
336 self
337 }
338
339 pub fn available_hints(mut self, available_hints: Vec<String>) -> Self {
340 self.available_hints = Some(available_hints);
341 self
342 }
343
344 pub fn route_model_by_hint(mut self, route_model_by_hint: HashMap<String, String>) -> Self {
345 self.route_model_by_hint = Some(route_model_by_hint);
346 self
347 }
348
349 pub fn allowed_tools(mut self, allowed_tools: Option<Vec<String>>) -> Self {
350 self.allowed_tools = allowed_tools;
351 self
352 }
353
354 pub fn response_cache(
355 mut self,
356 cache: Option<Arc<zeroclaw_memory::response_cache::ResponseCache>>,
357 ) -> Self {
358 self.response_cache = cache;
359 self
360 }
361
362 pub fn security_summary(mut self, summary: Option<String>) -> Self {
363 self.security_summary = summary;
364 self
365 }
366
367 pub fn autonomy_level(mut self, level: crate::security::AutonomyLevel) -> Self {
368 self.autonomy_level = Some(level);
369 self
370 }
371
372 pub fn activated_tools(
373 mut self,
374 activated: Option<Arc<std::sync::Mutex<tools::ActivatedToolSet>>>,
375 ) -> Self {
376 self.activated_tools = activated;
377 self
378 }
379
380 pub fn hook_runner(mut self, runner: Option<Arc<crate::hooks::HookRunner>>) -> Self {
381 self.hook_runner = runner;
382 self
383 }
384
385 pub fn approval_manager(mut self, manager: Option<Arc<ApprovalManager>>) -> Self {
386 self.approval_manager = manager;
387 self
388 }
389
390 pub fn build(self) -> Result<Agent> {
391 let mut tools = self.tools.ok_or_else(|| {
392 ::zeroclaw_log::record!(
393 ERROR,
394 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
395 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
396 .with_attrs(::serde_json::json!({"missing_field": "tools"})),
397 "AgentBuilder::build missing required field"
398 );
399 anyhow::Error::msg("tools are required")
400 })?;
401 let allowed = self.allowed_tools.clone();
402 if let Some(ref allow_list) = allowed {
403 tools.retain(|t| allow_list.iter().any(|name| name == t.name()));
404 }
405 let tool_specs = tools.iter().map(|tool| tool.spec()).collect();
406
407 Ok(Agent {
408 model_provider: self.model_provider.ok_or_else(|| {
409 ::zeroclaw_log::record!(
410 ERROR,
411 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
412 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
413 .with_attrs(::serde_json::json!({"missing_field": "model_provider"})),
414 "AgentBuilder::build missing required field"
415 );
416 anyhow::Error::msg("model_provider is required")
417 })?,
418 tools,
419 tool_specs,
420 memory: self.memory.ok_or_else(|| {
421 ::zeroclaw_log::record!(
422 ERROR,
423 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
424 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
425 .with_attrs(::serde_json::json!({"missing_field": "memory"})),
426 "AgentBuilder::build missing required field"
427 );
428 anyhow::Error::msg("memory is required")
429 })?,
430 observer: self.observer.ok_or_else(|| {
431 ::zeroclaw_log::record!(
432 ERROR,
433 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
434 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
435 .with_attrs(::serde_json::json!({"missing_field": "observer"})),
436 "AgentBuilder::build missing required field"
437 );
438 anyhow::Error::msg("observer is required")
439 })?,
440 prompt_builder: self
441 .prompt_builder
442 .unwrap_or_else(SystemPromptBuilder::with_defaults),
443 tool_dispatcher: self.tool_dispatcher.ok_or_else(|| {
444 ::zeroclaw_log::record!(
445 ERROR,
446 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
447 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
448 .with_attrs(::serde_json::json!({"missing_field": "tool_dispatcher"})),
449 "AgentBuilder::build missing required field"
450 );
451 anyhow::Error::msg("tool_dispatcher is required")
452 })?,
453 memory_loader: self
454 .memory_loader
455 .unwrap_or_else(|| Box::new(DefaultMemoryLoader::default())),
456 config: self.config.unwrap_or_default(),
457 multimodal_config: self.multimodal_config.unwrap_or_default(),
458 model_name: self.model_name.unwrap_or_else(|| "<unconfigured>".into()),
464 model_provider_name: self
465 .model_provider_name
466 .unwrap_or_else(|| "<unconfigured>".into()),
467 temperature: self.temperature.unwrap_or(0.7),
468 workspace_dir: self
469 .workspace_dir
470 .clone()
471 .unwrap_or_else(|| std::path::PathBuf::from(".")),
472 agent_workspace_dir: self.agent_workspace_dir.unwrap_or_else(|| {
473 self.workspace_dir
474 .clone()
475 .unwrap_or_else(|| std::path::PathBuf::from("."))
476 }),
477 identity_config: self.identity_config.unwrap_or_default(),
478 skills: self.skills.unwrap_or_default(),
479 skills_prompt_mode: self.skills_prompt_mode.unwrap_or_default(),
480 auto_save: self.auto_save.unwrap_or(false),
481 memory_session_id: self.memory_session_id,
482 history: Vec::new(),
483 classification_config: self.classification_config.unwrap_or_default(),
484 available_hints: self.available_hints.unwrap_or_default(),
485 route_model_by_hint: self.route_model_by_hint.unwrap_or_default(),
486 allowed_tools: allowed,
487 response_cache: self.response_cache,
488 security_summary: self.security_summary,
489 channel_targets: None,
490 autonomy_level: self
491 .autonomy_level
492 .unwrap_or(crate::security::AutonomyLevel::Supervised),
493 activated_tools: self.activated_tools,
494 hook_runner: self.hook_runner,
495 approval_manager: self.approval_manager,
496 channel_handles: AgentChannelHandles::default(),
497 })
498 }
499}
500
501impl Agent {
502 pub fn builder() -> AgentBuilder {
503 AgentBuilder::new()
504 }
505
506 pub fn history(&self) -> &[ConversationMessage] {
507 &self.history
508 }
509
510 pub fn channel_handles(&self) -> &AgentChannelHandles {
516 &self.channel_handles
517 }
518
519 pub fn populate_channels(
527 &self,
528 channel_map: &std::collections::HashMap<String, Arc<dyn zeroclaw_api::channel::Channel>>,
529 ) -> Vec<String> {
530 let mut names = Vec::new();
531 for (name, ch) in channel_map {
532 self.channel_handles.register_channel(name, Arc::clone(ch));
533 names.push(name.clone());
534 }
535 names
536 }
537
538 pub fn clear_history(&mut self) {
539 self.history.clear();
540 }
541
542 fn encode_response_cache_transcript(messages: &[ChatMessage]) -> String {
543 let mut transcript = String::new();
544 for message in messages.iter().filter(|message| message.role != "system") {
545 transcript.push_str("role=");
546 transcript.push_str(&message.role.len().to_string());
547 transcript.push(':');
548 transcript.push_str(&message.role);
549 transcript.push_str(";content=");
550 transcript.push_str(&message.content.len().to_string());
551 transcript.push(':');
552 transcript.push_str(&message.content);
553 transcript.push('\n');
554 }
555 transcript
556 }
557
558 fn response_cache_key_for_messages(
559 &self,
560 messages: &[ChatMessage],
561 effective_model: &str,
562 ) -> Option<String> {
563 if self.temperature != 0.0 || self.response_cache.is_none() {
564 return None;
565 }
566
567 let system = messages
568 .iter()
569 .find(|message| message.role == "system")
570 .map(|message| message.content.as_str());
571 let transcript = Self::encode_response_cache_transcript(messages);
572
573 Some(zeroclaw_memory::response_cache::ResponseCache::cache_key(
574 effective_model,
575 system,
576 &transcript,
577 ))
578 }
579
580 fn drain_steering_messages(
581 steering_rx: &mut Option<&mut tokio::sync::mpsc::Receiver<String>>,
582 ) -> Vec<String> {
583 let Some(rx) = steering_rx.as_deref_mut() else {
584 return Vec::new();
585 };
586
587 let mut messages = Vec::new();
588 loop {
589 match rx.try_recv() {
590 Ok(message) => messages.push(message),
591 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
592 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
593 }
594 }
595 messages
596 }
597
598 async fn append_streamed_user_message_to_history(
599 &mut self,
600 user_message: &str,
601 new_msgs: &mut Vec<ConversationMessage>,
602 ) {
603 let context = self
604 .memory_loader
605 .load_context(
606 self.memory.as_ref(),
607 user_message,
608 self.memory_session_id.as_deref(),
609 )
610 .await
611 .unwrap_or_default();
612
613 if self.auto_save {
614 let _ = self
615 .memory
616 .store(
617 "user_msg",
618 user_message,
619 MemoryCategory::Conversation,
620 self.memory_session_id.as_deref(),
621 )
622 .await;
623 }
624
625 let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S %Z");
626 let enriched = if context.is_empty() {
627 format!("[{now}] {user_message}")
628 } else {
629 format!("{context}[{now}] {user_message}")
630 };
631
632 let user_msg = ConversationMessage::Chat(ChatMessage::user(enriched));
633 new_msgs.push(user_msg.clone());
634 self.history.push(user_msg);
635 }
636
637 fn marked_partial_response(partial: &str, marker: &str) -> String {
638 if partial.is_empty() {
639 marker.to_string()
640 } else {
641 format!("{partial}\n\n{marker}")
642 }
643 }
644
645 fn append_streamed_assistant_message_to_history(
646 &mut self,
647 content: String,
648 new_msgs: &mut Vec<ConversationMessage>,
649 committed_response: &mut String,
650 ) {
651 let assistant_msg = ConversationMessage::Chat(ChatMessage::assistant(content.clone()));
652 new_msgs.push(assistant_msg.clone());
653 self.history.push(assistant_msg);
654 committed_response.push_str(&content);
655 }
656
657 fn should_send_tool_specs(&self) -> bool {
658 self.tool_dispatcher.should_send_tool_specs() && !self.tool_specs.is_empty()
659 }
660
661 fn parse_response_for_effective_tools(
662 &self,
663 response: &zeroclaw_providers::ChatResponse,
664 ) -> (String, Vec<ParsedToolCall>) {
665 if self.tool_specs.is_empty() {
666 return (strip_think_tags(response.text_or_empty()), Vec::new());
667 }
668
669 if self.config.strict_tool_parsing && response.tool_calls.is_empty() {
670 return (strip_think_tags(response.text_or_empty()), Vec::new());
671 }
672
673 self.tool_dispatcher.parse_response(response)
674 }
675
676 pub fn set_memory_session_id(&mut self, session_id: Option<String>) {
677 self.memory_session_id = session_id;
678 }
679
680 pub fn seed_history(&mut self, messages: &[ChatMessage]) {
686 if self.history.is_empty()
687 && let Ok(sys) = self.build_system_prompt()
688 {
689 self.history
690 .push(ConversationMessage::Chat(ChatMessage::system(sys)));
691 }
692 for msg in messages {
693 if msg.role != "system" {
694 self.history.push(ConversationMessage::Chat(msg.clone()));
695 }
696 }
697 }
698
699 pub fn seed_conversation_history(&mut self, messages: Vec<ConversationMessage>) {
704 if self.history.is_empty()
705 && let Ok(sys) = self.build_system_prompt()
706 {
707 self.history
708 .push(ConversationMessage::Chat(ChatMessage::system(sys)));
709 }
710 for msg in messages {
711 if matches!(&msg, ConversationMessage::Chat(m) if m.role == "system") {
713 continue;
714 }
715 self.history.push(msg);
716 }
717 self.trim_history();
722 }
723
724 pub async fn from_config(config: &Config, agent_alias: &str) -> Result<Self> {
725 Self::from_config_with_session_cwd(config, agent_alias, None).await
726 }
727
728 pub async fn from_config_with_session_cwd(
739 config: &Config,
740 agent_alias: &str,
741 session_cwd: Option<&Path>,
742 ) -> Result<Self> {
743 Self::from_config_with_session_cwd_and_mcp(config, agent_alias, session_cwd, true).await
744 }
745
746 pub async fn from_config_with_session_cwd_and_mcp(
752 config: &Config,
753 agent_alias: &str,
754 session_cwd: Option<&Path>,
755 initialize_mcp: bool,
756 ) -> Result<Self> {
757 Self::from_config_with_session_cwd_and_mcp_approval_mode(
758 config,
759 agent_alias,
760 session_cwd,
761 initialize_mcp,
762 false,
763 None,
764 )
765 .await
766 }
767
768 pub async fn from_config_with_session_cwd_and_mcp_backchannel(
776 config: &Config,
777 agent_alias: &str,
778 session_cwd: Option<&Path>,
779 initialize_mcp: bool,
780 canvas_store: Option<tools::CanvasStore>,
781 ) -> Result<Self> {
782 Self::from_config_with_session_cwd_and_mcp_approval_mode(
783 config,
784 agent_alias,
785 session_cwd,
786 initialize_mcp,
787 true,
788 canvas_store,
789 )
790 .await
791 }
792
793 async fn from_config_with_session_cwd_and_mcp_approval_mode(
794 config: &Config,
795 agent_alias: &str,
796 session_cwd: Option<&Path>,
797 initialize_mcp: bool,
798 approval_backchannel: bool,
799 canvas_store: Option<tools::CanvasStore>,
800 ) -> Result<Self> {
801 let agent_cfg = config
802 .agent(agent_alias)
803 .with_context(|| format!("agents.{agent_alias} is not configured"))?;
804 let risk_profile = config
805 .risk_profile_for_agent(agent_alias)
806 .with_context(|| {
807 format!(
808 "agents.{agent_alias}.risk_profile does not name a configured risk_profiles entry"
809 )
810 })?;
811
812 let observer: Arc<dyn Observer> =
813 Arc::from(observability::create_observer(&config.observability));
814 let runtime: Arc<dyn platform::RuntimeAdapter> =
815 Arc::from(platform::create_runtime(&config.runtime)?);
816 let agent_workspace = config.agent_workspace_dir(agent_alias);
821 if let Err(e) = tokio::fs::create_dir_all(&agent_workspace).await {
825 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "workspace": agent_workspace.display().to_string(), "e": e.to_string()})), "Failed to create per-agent workspace dir (continuing): ");
826 }
827 if let Err(e) = zeroclaw_config::schema::ensure_bootstrap_files(&agent_workspace).await {
833 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "workspace": agent_workspace.display().to_string(), "e": e.to_string()})), "Failed to ensure per-agent bootstrap files (continuing with whatever exists): ");
834 }
835 let runtime_profile = config.runtime_profile_for_agent(agent_alias);
836 let security = Arc::new({
837 let mut policy = SecurityPolicy::from_profiles(
838 risk_profile,
839 runtime_profile,
840 session_cwd.unwrap_or(&agent_workspace),
841 );
842 if session_cwd.is_some() {
848 policy.allowed_roots.push(agent_workspace.clone());
849 }
850 policy
851 });
852
853 let (provider_name, provider_alias, agent_model_provider) =
854 match config.resolved_model_provider_for_agent(agent_alias) {
855 Some(resolved) => (resolved.0, resolved.1, Some(resolved.2)),
856 None => {
857 let agent_ref = agent_cfg.model_provider.as_str();
858 if !agent_ref.is_empty() {
859 anyhow::bail!(
860 "agents.{agent_alias}.model_provider = \"{agent_ref}\" does not \
861 resolve to a configured [model_providers.<type>.<alias>] entry"
862 );
863 }
864 anyhow::bail!(
867 "agents.{agent_alias}.model_provider is empty — set it to a \
868 configured \"<type>.<alias>\" (e.g. \"anthropic.{agent_alias}\")"
869 );
870 }
871 };
872 let memory: Arc<dyn Memory> = zeroclaw_memory::create_memory_for_agent(
873 config,
874 agent_alias,
875 agent_model_provider.and_then(|e| e.api_key.as_deref()),
876 )
877 .await?;
878
879 let composio_key = if config.composio.enabled {
880 config.composio.api_key.as_deref()
881 } else {
882 None
883 };
884 let composio_entity_id = if config.composio.enabled {
885 Some(config.composio.entity_id.as_str())
886 } else {
887 None
888 };
889
890 let all_tools_result = tools::all_tools_with_runtime(
891 Arc::new(config.clone()),
892 &security,
893 risk_profile,
894 agent_alias,
895 runtime,
896 memory.clone(),
897 composio_key,
898 composio_entity_id,
899 &config.browser,
900 &config.http_request,
901 &config.web_fetch,
902 &security.workspace_dir,
903 &config.agents,
904 agent_model_provider.and_then(|e| e.api_key.as_deref()),
905 config,
906 canvas_store,
907 false,
908 );
909 let mut tools = all_tools_result.tools;
910 let delegate_handle = all_tools_result.delegate_handle;
911 let ask_user_handle = all_tools_result.ask_user_handle;
912 let reaction_handle = all_tools_result.reaction_handle;
913 let poll_handle = all_tools_result.poll_handle;
914 let escalate_handle = all_tools_result.escalate_handle;
915 let channel_send_handle = all_tools_result.channel_send_handle;
916
917 let before_policy_filter = tools.len();
932 crate::agent::loop_::apply_policy_tool_filter(&mut tools, Some(security.as_ref()), None);
933 if tools.len() != before_policy_filter {
934 ::zeroclaw_log::record!(
935 INFO,
936 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
937 .with_attrs(::serde_json::json!({
938 "before": before_policy_filter,
939 "retained": tools.len(),
940 "policy_allowed": security.allowed_tools.as_ref().map(|v| v.len()),
941 "policy_excluded": security.excluded_tools.as_ref().map(|v| v.len()),
942 })),
943 "Applied SecurityPolicy built-in tool filter (from_config path)"
944 );
945 }
946
947 let mut activated_tools: Option<Arc<std::sync::Mutex<tools::ActivatedToolSet>>> = None;
952 let mut mcp_elevation_arcs: Vec<Arc<dyn tools::Tool>> = Vec::new();
954 if initialize_mcp && config.mcp.enabled && !config.mcp.servers.is_empty() {
955 ::zeroclaw_log::record!(
956 INFO,
957 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
958 &format!(
959 "Initializing MCP client — {} server(s) configured",
960 config.mcp.servers.len()
961 )
962 );
963 match tools::McpRegistry::connect_all(&config.mcp.servers).await {
964 Ok(registry) => {
965 let registry = std::sync::Arc::new(registry);
966 mcp_elevation_arcs = tools::collect_mcp_elevation_arcs(®istry).await;
967 if config.mcp.deferred_loading {
968 let deferred_set = tools::DeferredMcpToolSet::from_registry(
969 std::sync::Arc::clone(®istry),
970 )
971 .await;
972 ::zeroclaw_log::record!(
973 INFO,
974 ::zeroclaw_log::Event::new(
975 module_path!(),
976 ::zeroclaw_log::Action::Note
977 ),
978 &format!(
979 "MCP deferred: {} tool stub(s) from {} server(s)",
980 deferred_set.len(),
981 registry.server_count()
982 )
983 );
984 let activated =
985 Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
986 activated_tools = Some(Arc::clone(&activated));
987 tools.push(Box::new(tools::ToolSearchTool::new(
988 deferred_set,
989 activated,
990 )));
991 } else {
992 let names = registry.tool_names();
993 let mut registered = 0usize;
994 for name in names {
995 if let Some(def) = registry.get_tool_def(&name).await {
996 let wrapper: std::sync::Arc<dyn tools::Tool> =
997 std::sync::Arc::new(tools::McpToolWrapper::new(
998 name,
999 def,
1000 std::sync::Arc::clone(®istry),
1001 ));
1002 if let Some(ref handle) = delegate_handle {
1003 handle.write().push(std::sync::Arc::clone(&wrapper));
1004 }
1005 tools.push(Box::new(tools::ArcToolRef(wrapper)));
1006 registered += 1;
1007 }
1008 }
1009 ::zeroclaw_log::record!(
1010 INFO,
1011 ::zeroclaw_log::Event::new(
1012 module_path!(),
1013 ::zeroclaw_log::Action::Note
1014 ),
1015 &format!(
1016 "MCP: {} tool(s) registered from {} server(s)",
1017 registered,
1018 registry.server_count()
1019 )
1020 );
1021 }
1022 }
1023 Err(e) => {
1024 ::zeroclaw_log::record!(
1025 ERROR,
1026 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1027 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1028 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1029 "MCP registry failed to initialize"
1030 );
1031 }
1032 }
1033 }
1034
1035 let model_name = match agent_model_provider
1036 .and_then(|e| e.model.as_deref())
1037 .map(str::trim)
1038 .filter(|m| !m.is_empty())
1039 {
1040 Some(m) => m.to_string(),
1041 None => anyhow::bail!(
1042 "agents.{agent_alias}.model_provider resolves to a model_provider entry \
1043 with no `model` set. Configure [model_providers.{provider_name}.<alias>] \
1044 model = \"...\".",
1045 ),
1046 };
1047
1048 let provider_ref = format!("{provider_name}.{provider_alias}");
1049 let provider_runtime_options = zeroclaw_providers::provider_runtime_options_for_alias(
1050 config,
1051 provider_name,
1052 provider_alias,
1053 );
1054
1055 let model_provider: Box<dyn ModelProvider> =
1056 zeroclaw_providers::create_routed_model_provider_with_options(
1057 config,
1058 &provider_ref,
1059 agent_model_provider.and_then(|e| e.api_key.as_deref()),
1060 agent_model_provider.and_then(|e| e.uri.as_deref()),
1061 &config.reliability,
1062 &config.model_routes,
1063 &model_name,
1064 &provider_runtime_options,
1065 )?;
1066
1067 let dispatcher_choice = agent_cfg.tool_dispatcher.as_str();
1068 let tool_dispatcher: Box<dyn ToolDispatcher> = match dispatcher_choice {
1069 "native" => Box::new(NativeToolDispatcher),
1070 "xml" => Box::new(XmlToolDispatcher),
1071 _ if model_provider.supports_native_tools() => Box::new(NativeToolDispatcher),
1072 _ => Box::new(XmlToolDispatcher),
1073 };
1074
1075 let route_model_by_hint: HashMap<String, String> = config
1076 .model_routes
1077 .iter()
1078 .map(|route| (route.hint.clone(), route.model.clone()))
1079 .collect();
1080 let available_hints: Vec<String> = route_model_by_hint.keys().cloned().collect();
1081
1082 let response_cache = if config.memory.response_cache_enabled {
1083 zeroclaw_memory::response_cache::ResponseCache::with_hot_cache(
1084 &config.data_dir,
1085 config.memory.response_cache_ttl_minutes,
1086 config.memory.response_cache_max_entries,
1087 config.memory.response_cache_hot_entries,
1088 )
1089 .ok()
1090 .map(Arc::new)
1091 } else {
1092 None
1093 };
1094
1095 let excluded = &risk_profile.excluded_tools;
1099 if !excluded.is_empty() {
1100 tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
1101 }
1102
1103 let skills = crate::skills::load_skills_for_agent(&config.data_dir, config, agent_alias);
1109 let skill_resolution_registry: Vec<Arc<dyn tools::Tool>> = all_tools_result
1112 .unfiltered_tool_arcs
1113 .iter()
1114 .cloned()
1115 .chain(mcp_elevation_arcs.iter().cloned())
1116 .collect();
1117 tools::register_skill_tools_with_context(
1118 &mut tools,
1119 &skills,
1120 security.clone(),
1121 &skill_resolution_registry,
1122 );
1123
1124 let approval_manager = if approval_backchannel {
1125 ApprovalManager::for_non_interactive_backchannel(risk_profile)
1126 } else {
1127 ApprovalManager::for_non_interactive(risk_profile)
1128 };
1129
1130 let mut agent = Agent::builder()
1131 .model_provider(model_provider)
1132 .tools(tools)
1133 .memory(memory)
1134 .observer(observer)
1135 .response_cache(response_cache)
1136 .tool_dispatcher(tool_dispatcher)
1137 .memory_loader(Box::new(DefaultMemoryLoader::new(
1138 5,
1139 config.memory.min_relevance_score,
1140 )))
1141 .prompt_builder(SystemPromptBuilder::with_defaults())
1142 .config(agent_cfg.clone())
1143 .multimodal_config(config.multimodal.clone())
1144 .model_name(model_name)
1145 .model_provider_name(provider_name.to_string())
1146 .temperature(
1147 agent_model_provider
1148 .and_then(|e| e.temperature)
1149 .unwrap_or(0.7),
1150 )
1151 .workspace_dir(security.workspace_dir.clone())
1152 .agent_workspace_dir(agent_workspace.clone())
1153 .classification_config(config.query_classification.clone())
1154 .available_hints(available_hints)
1155 .route_model_by_hint(route_model_by_hint)
1156 .identity_config(agent_cfg.identity.clone())
1157 .skills(skills)
1158 .skills_prompt_mode(config.skills.prompt_injection_mode)
1159 .auto_save(config.memory.auto_save)
1160 .security_summary(Some(security.prompt_summary()))
1161 .autonomy_level(risk_profile.level)
1162 .activated_tools(activated_tools)
1163 .hook_runner(if config.hooks.enabled {
1164 let mut runner = crate::hooks::HookRunner::new();
1165 if config.hooks.builtin.command_logger {
1166 runner.register(Box::new(crate::hooks::builtin::CommandLoggerHook::new()));
1167 }
1168 if config.hooks.builtin.webhook_audit.enabled {
1169 runner.register(Box::new(crate::hooks::builtin::WebhookAuditHook::new(
1170 config.hooks.builtin.webhook_audit.clone(),
1171 )));
1172 }
1173 Some(Arc::new(runner))
1174 } else {
1175 None
1176 })
1177 .approval_manager(Some(Arc::new(approval_manager)))
1178 .build()?;
1179
1180 let channel_targets = build_channel_targets(config);
1184 if let Some(ref targets) = channel_targets
1185 && agent.tools.iter().any(|t| t.name() == "channel_send")
1186 {
1187 agent.channel_targets = Some(targets.clone());
1188 }
1189
1190 agent.channel_handles = AgentChannelHandles {
1193 ask_user: ask_user_handle,
1194 reaction: reaction_handle,
1195 poll: poll_handle,
1196 escalate: escalate_handle,
1197 channel_send: channel_send_handle,
1198 };
1199
1200 Ok(agent)
1201 }
1202
1203 fn trim_history(&mut self) {
1204 let max = self.config.max_history_messages;
1205 if self.history.len() <= max {
1206 return;
1207 }
1208
1209 let mut system_messages = Vec::new();
1210 let mut other_messages = Vec::new();
1211
1212 for msg in self.history.drain(..) {
1213 match &msg {
1214 ConversationMessage::Chat(chat) if chat.role == "system" => {
1215 system_messages.push(msg);
1216 }
1217 _ => other_messages.push(msg),
1218 }
1219 }
1220
1221 if other_messages.len() > max {
1222 let mut drop_count = other_messages.len() - max;
1223
1224 while drop_count < other_messages.len()
1232 && matches!(
1233 &other_messages[drop_count],
1234 ConversationMessage::ToolResults(_)
1235 )
1236 {
1237 drop_count += 1;
1238 }
1239
1240 other_messages.drain(0..drop_count);
1241 }
1242
1243 self.history = system_messages;
1244 self.history.extend(other_messages);
1245 }
1246
1247 fn build_system_prompt(&self) -> Result<String> {
1248 let expose_text_tool_protocol =
1249 !self.config.strict_tool_parsing || self.tool_dispatcher.should_send_tool_specs();
1250 let no_tools: Vec<Box<dyn Tool>> = Vec::new();
1251 let prompt_tools = if expose_text_tool_protocol {
1252 &self.tools
1253 } else {
1254 &no_tools
1255 };
1256 let instructions = self.tool_dispatcher.prompt_instructions(prompt_tools);
1257 let ctx = PromptContext {
1258 workspace_dir: &self.workspace_dir,
1259 agent_workspace_dir: &self.agent_workspace_dir,
1260 model_name: &self.model_name,
1261 tools: prompt_tools,
1262 skills: &self.skills,
1263 skills_prompt_mode: self.skills_prompt_mode,
1264 identity_config: Some(&self.identity_config),
1265 dispatcher_instructions: &instructions,
1266 sends_native_tool_specs: self.tool_dispatcher.should_send_tool_specs()
1267 && !prompt_tools.is_empty(),
1268 security_summary: self.security_summary.clone(),
1269 autonomy_level: self.autonomy_level,
1270 channel_targets: self.channel_targets.clone(),
1271 };
1272 self.prompt_builder.build(&ctx)
1273 }
1274
1275 async fn prepare_provider_messages(
1276 &self,
1277 messages: &[ChatMessage],
1278 ) -> Result<Vec<ChatMessage>> {
1279 let prepared = zeroclaw_providers::multimodal::prepare_messages_for_provider(
1280 messages,
1281 &self.multimodal_config,
1282 )
1283 .await?;
1284 Ok(prepared.messages)
1285 }
1286
1287 async fn execute_tool_call(&self, call: &ParsedToolCall) -> ToolExecutionResult {
1288 let start = Instant::now();
1289
1290 let mut tool_name = call.name.clone();
1294 let mut tool_args = call.arguments.clone();
1295 if let Some(ref hooks) = self.hook_runner {
1296 match hooks
1297 .run_before_tool_call(tool_name.clone(), tool_args.clone())
1298 .await
1299 {
1300 crate::hooks::HookResult::Continue((n, a)) => {
1301 tool_name = n;
1302 tool_args = a;
1303 }
1304 crate::hooks::HookResult::Cancel(reason) => {
1305 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"tool": call.name, "reason": reason.to_string()})), "tool call cancelled by hook");
1306 return ToolExecutionResult {
1307 name: call.name.clone(),
1308 output: format!("Cancelled by hook: {reason}"),
1309 success: false,
1310 tool_call_id: call.tool_call_id.clone(),
1311 };
1312 }
1313 }
1314 }
1315
1316 super::set_runtime_approved_arg(&tool_name, &mut tool_args, false);
1317
1318 let mut approval_requirement = self
1323 .approval_manager
1324 .as_deref()
1325 .map(|mgr| mgr.approval_requirement(&tool_name))
1326 .unwrap_or(ApprovalRequirement::NotRequired);
1327 if let Some(mgr) = self.approval_manager.as_deref()
1328 && approval_requirement == ApprovalRequirement::Prompt
1329 {
1330 let request = ApprovalRequest {
1331 tool_name: tool_name.clone(),
1332 arguments: tool_args.clone(),
1333 };
1334
1335 let (decision, decision_channel) = if mgr.is_non_interactive() {
1336 let ch_request = zeroclaw_api::channel::ChannelApprovalRequest {
1341 tool_name: request.tool_name.clone(),
1342 arguments_summary: crate::approval::summarize_args(&request.arguments),
1343 raw_arguments: Some(request.arguments.clone()),
1344 };
1345 let mut channel_decision: Option<zeroclaw_api::channel::ChannelApprovalResponse> =
1346 None;
1347 let mut decision_channel_name = String::new();
1348 let channels: Vec<(String, Arc<dyn zeroclaw_api::channel::Channel>)> = self
1351 .channel_handles
1352 .ask_user
1353 .as_ref()
1354 .map(|h| {
1355 h.read()
1356 .iter()
1357 .map(|(k, v)| (k.clone(), Arc::clone(v)))
1358 .collect()
1359 })
1360 .unwrap_or_default();
1361 for (ch_name, ch) in &channels {
1362 match ch.request_approval("", &ch_request).await {
1363 Ok(Some(r)) => {
1364 decision_channel_name = ch_name.clone();
1365 channel_decision = Some(r);
1366 break;
1367 }
1368 Ok(None) => continue,
1369 Err(e) => {
1370 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"tool": tool_name, "channel": ch_name, "error": format!("{}", e)})), "channel approval request failed");
1371 }
1372 }
1373 }
1374 let approval = match channel_decision {
1375 Some(zeroclaw_api::channel::ChannelApprovalResponse::Approve) => {
1376 ApprovalResponse::Yes
1377 }
1378 Some(zeroclaw_api::channel::ChannelApprovalResponse::AlwaysApprove) => {
1379 ApprovalResponse::Always
1380 }
1381 Some(zeroclaw_api::channel::ChannelApprovalResponse::Deny) => {
1382 ApprovalResponse::No
1383 }
1384 None => {
1385 ::zeroclaw_log::record!(
1386 WARN,
1387 ::zeroclaw_log::Event::new(
1388 module_path!(),
1389 ::zeroclaw_log::Action::Note
1390 )
1391 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1392 .with_attrs(::serde_json::json!({"tool": tool_name})),
1393 "no approval channel handled this request — denying. \
1394 Configure a back-channel (ACP or WS) that implements \
1395 request_approval to enable interactive approval."
1396 );
1397 ApprovalResponse::No
1398 }
1399 };
1400 (approval, decision_channel_name)
1401 } else {
1402 (mgr.prompt_cli(&request), String::new())
1403 };
1404
1405 mgr.record_decision(&tool_name, &tool_args, decision, &decision_channel);
1406
1407 if decision == ApprovalResponse::No {
1408 return ToolExecutionResult {
1409 name: tool_name,
1410 output: "Denied by user.".to_string(),
1411 success: false,
1412 tool_call_id: call.tool_call_id.clone(),
1413 };
1414 }
1415
1416 if matches!(decision, ApprovalResponse::Yes | ApprovalResponse::Always) {
1417 approval_requirement = ApprovalRequirement::Approved;
1418 }
1419 }
1420 super::set_runtime_approved_arg(
1421 &tool_name,
1422 &mut tool_args,
1423 approval_requirement == ApprovalRequirement::Approved,
1424 );
1425
1426 let args_json = tool_args.to_string();
1430 let tool_call_id = call.tool_call_id.clone();
1431
1432 let (result, success) =
1434 if let Some(tool) = self.tools.iter().find(|t| t.name() == tool_name) {
1435 match tool.execute(tool_args.clone()).await {
1436 Ok(r) => {
1437 let (outcome_text, ok) = if r.success {
1438 (r.output, true)
1439 } else {
1440 (format!("Error: {}", r.error.unwrap_or(r.output)), false)
1441 };
1442 self.observer.record_event(&ObserverEvent::ToolCall {
1443 tool: tool_name.clone(),
1444 tool_call_id: tool_call_id.clone(),
1445 duration: start.elapsed(),
1446 success: ok,
1447 arguments: Some(args_json.clone()),
1448 result: Some(super::loop_::scrub_credentials(&outcome_text)),
1449 });
1450 (outcome_text, ok)
1451 }
1452 Err(e) => {
1453 let err_text = format!("Error executing {}: {e}", tool_name);
1454 self.observer.record_event(&ObserverEvent::ToolCall {
1455 tool: tool_name.clone(),
1456 tool_call_id: tool_call_id.clone(),
1457 duration: start.elapsed(),
1458 success: false,
1459 arguments: Some(args_json.clone()),
1460 result: Some(super::loop_::scrub_credentials(&err_text)),
1461 });
1462 (err_text, false)
1463 }
1464 }
1465 } else if let Some(activated_arc) = self.activated_tools.as_ref() {
1466 let activated_opt = activated_arc.lock().unwrap().get_resolved(&tool_name);
1467 if let Some(tool) = activated_opt {
1468 match tool.execute(tool_args.clone()).await {
1469 Ok(r) => {
1470 let (outcome_text, ok) = if r.success {
1471 (r.output, true)
1472 } else {
1473 (format!("Error: {}", r.error.unwrap_or(r.output)), false)
1474 };
1475 self.observer.record_event(&ObserverEvent::ToolCall {
1476 tool: tool_name.clone(),
1477 tool_call_id: tool_call_id.clone(),
1478 duration: start.elapsed(),
1479 success: ok,
1480 arguments: Some(args_json.clone()),
1481 result: Some(super::loop_::scrub_credentials(&outcome_text)),
1482 });
1483 (outcome_text, ok)
1484 }
1485 Err(e) => {
1486 let err_text = format!("Error executing {}: {e}", tool_name);
1487 self.observer.record_event(&ObserverEvent::ToolCall {
1488 tool: tool_name.clone(),
1489 tool_call_id: tool_call_id.clone(),
1490 duration: start.elapsed(),
1491 success: false,
1492 arguments: Some(args_json.clone()),
1493 result: Some(super::loop_::scrub_credentials(&err_text)),
1494 });
1495 (err_text, false)
1496 }
1497 }
1498 } else {
1499 (format!("Unknown tool: {}", tool_name), false)
1500 }
1501 } else {
1502 (format!("Unknown tool: {}", tool_name), false)
1503 };
1504
1505 let duration = start.elapsed();
1506
1507 if let Some(ref hooks) = self.hook_runner {
1509 let tool_result_obj = crate::tools::ToolResult {
1510 success,
1511 output: result.clone(),
1512 error: None,
1513 };
1514 hooks
1515 .fire_after_tool_call(&tool_name, &tool_result_obj, duration)
1516 .await;
1517 }
1518
1519 ToolExecutionResult {
1520 name: tool_name,
1521 output: result,
1522 success,
1523 tool_call_id: call.tool_call_id.clone(),
1524 }
1525 }
1526
1527 async fn execute_tools(&self, calls: &[ParsedToolCall]) -> Vec<ToolExecutionResult> {
1528 let approval_required = self.approval_manager.as_deref().is_some_and(|mgr| {
1529 calls
1530 .iter()
1531 .any(|call| mgr.needs_approval(call.name.as_str()))
1532 });
1533 if !self.config.parallel_tools || approval_required {
1534 let mut results = Vec::with_capacity(calls.len());
1535 for call in calls {
1536 results.push(self.execute_tool_call(call).await);
1537 }
1538 return results;
1539 }
1540
1541 let futs: Vec<_> = calls
1542 .iter()
1543 .map(|call| self.execute_tool_call(call))
1544 .collect();
1545 futures_util::future::join_all(futs).await
1546 }
1547
1548 fn classify_model(&self, user_message: &str) -> String {
1549 if let Some(decision) =
1550 super::classifier::classify_with_decision(&self.classification_config, user_message)
1551 && self.available_hints.contains(&decision.hint)
1552 {
1553 let resolved_model = self
1554 .route_model_by_hint
1555 .get(&decision.hint)
1556 .map(String::as_str)
1557 .unwrap_or("unknown");
1558 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"hint": decision.hint.as_str(), "model": resolved_model, "rule_priority": decision.priority, "message_length": user_message.len()})), "Classified message route");
1559 return format!("hint:{}", decision.hint);
1560 }
1561
1562 if let Some(ref ac) = self.config.auto_classify {
1564 let tier = super::eval::estimate_complexity(user_message);
1565 if let Some(hint) = ac.hint_for(tier)
1566 && self.available_hints.contains(&hint.to_string())
1567 {
1568 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"hint": hint, "complexity": format!("{:?}", tier), "message_length": user_message.len()})), "Auto-classified by complexity");
1569 return format!("hint:{hint}");
1570 }
1571 }
1572
1573 self.model_name.clone()
1574 }
1575
1576 pub async fn turn(&mut self, user_message: &str) -> Result<String> {
1577 if self.history.is_empty() {
1578 let system_prompt = self.build_system_prompt()?;
1579 self.history
1580 .push(ConversationMessage::Chat(ChatMessage::system(
1581 system_prompt,
1582 )));
1583 }
1584
1585 let context = self
1586 .memory_loader
1587 .load_context(
1588 self.memory.as_ref(),
1589 user_message,
1590 self.memory_session_id.as_deref(),
1591 )
1592 .await
1593 .unwrap_or_default();
1594
1595 if self.auto_save {
1596 let _ = self
1597 .memory
1598 .store(
1599 "user_msg",
1600 user_message,
1601 MemoryCategory::Conversation,
1602 self.memory_session_id.as_deref(),
1603 )
1604 .await;
1605 }
1606
1607 let now = chrono::Local::now();
1608 let (year, month, day) = (now.year(), now.month(), now.day());
1609 let (hour, minute, second) = (now.hour(), now.minute(), now.second());
1610 let tz = now.format("%Z");
1611 let date_str =
1612 format!("{year:04}-{month:02}-{day:02} {hour:02}:{minute:02}:{second:02} {tz}");
1613
1614 let enriched = if context.is_empty() {
1615 format!("[CURRENT DATE & TIME: {date_str}]\n\n{user_message}")
1616 } else {
1617 format!("[CURRENT DATE & TIME: {date_str}]\n\n{context}\n\n{user_message}")
1618 };
1619
1620 self.history
1621 .push(ConversationMessage::Chat(ChatMessage::user(enriched)));
1622
1623 let effective_model = self.classify_model(user_message);
1624
1625 for _ in 0..self.config.max_tool_iterations {
1626 let messages = self.tool_dispatcher.to_provider_messages(&self.history);
1627 let prepared_messages = self.prepare_provider_messages(&messages).await?;
1628
1629 let cache_key =
1633 self.response_cache_key_for_messages(&prepared_messages, &effective_model);
1634
1635 if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
1636 if let Ok(Some(cached)) = cache.get(key) {
1637 self.observer.record_event(&ObserverEvent::CacheHit {
1638 cache_type: "response".into(),
1639 tokens_saved: 0,
1640 });
1641 self.history
1642 .push(ConversationMessage::Chat(ChatMessage::assistant(
1643 cached.clone(),
1644 )));
1645 self.trim_history();
1646 return Ok(cached);
1647 }
1648 self.observer.record_event(&ObserverEvent::CacheMiss {
1649 cache_type: "response".into(),
1650 });
1651 }
1652
1653 let llm_started_at = Instant::now();
1654 self.observer.record_event(&ObserverEvent::LlmRequest {
1655 model_provider: self.model_provider_name.clone(),
1656 model: effective_model.clone(),
1657 messages_count: messages.len(),
1658 });
1659
1660 let response = match self
1661 .model_provider
1662 .chat(
1663 ChatRequest {
1664 messages: &prepared_messages,
1665 tools: if self.should_send_tool_specs() {
1666 Some(&self.tool_specs)
1667 } else {
1668 None
1669 },
1670 thinking: None,
1671 },
1672 &effective_model,
1673 Some(self.temperature),
1674 )
1675 .await
1676 {
1677 Ok(resp) => {
1678 let (resp_input_tokens, resp_output_tokens) = resp
1679 .usage
1680 .as_ref()
1681 .map(|u| (u.input_tokens, u.output_tokens))
1682 .unwrap_or((None, None));
1683 self.observer.record_event(&ObserverEvent::LlmResponse {
1684 model_provider: self.model_provider_name.clone(),
1685 model: effective_model.clone(),
1686 duration: llm_started_at.elapsed(),
1687 success: true,
1688 error_message: None,
1689 input_tokens: resp_input_tokens,
1690 output_tokens: resp_output_tokens,
1691 });
1692 resp
1693 }
1694 Err(err) => {
1695 let safe_error = zeroclaw_providers::sanitize_api_error(&err.to_string());
1696 self.observer.record_event(&ObserverEvent::LlmResponse {
1697 model_provider: self.model_provider_name.clone(),
1698 model: effective_model.clone(),
1699 duration: llm_started_at.elapsed(),
1700 success: false,
1701 error_message: Some(safe_error),
1702 input_tokens: None,
1703 output_tokens: None,
1704 });
1705 return Err(err);
1706 }
1707 };
1708
1709 let (text, calls) = self.parse_response_for_effective_tools(&response);
1710 if calls.is_empty() {
1711 let final_text = if text.is_empty() && !self.tool_specs.is_empty() {
1712 response.text.unwrap_or_default()
1713 } else {
1714 text
1715 };
1716
1717 if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
1719 let token_count = response
1720 .usage
1721 .as_ref()
1722 .and_then(|u| u.output_tokens)
1723 .unwrap_or(0);
1724 #[allow(clippy::cast_possible_truncation)]
1725 let _ = cache.put(key, &effective_model, &final_text, token_count as u32);
1726 }
1727
1728 self.history
1729 .push(ConversationMessage::Chat(ChatMessage::assistant(
1730 final_text.clone(),
1731 )));
1732 self.trim_history();
1733
1734 return Ok(final_text);
1735 }
1736
1737 if !text.is_empty() {
1738 print!("{text}");
1739 let _ = std::io::stdout().flush();
1740 }
1741
1742 self.history.push(ConversationMessage::AssistantToolCalls {
1743 text: response.text.clone(),
1744 tool_calls: response.tool_calls.clone(),
1745 reasoning_content: response.reasoning_content.clone(),
1746 });
1747
1748 let results = self.execute_tools(&calls).await;
1749 let formatted = self.tool_dispatcher.format_results(&results);
1750 self.history.push(formatted);
1751 self.trim_history();
1752 }
1753
1754 anyhow::bail!(
1755 "Agent exceeded maximum tool iterations ({})",
1756 self.config.max_tool_iterations
1757 )
1758 }
1759
1760 pub async fn turn_streamed(
1771 &mut self,
1772 user_message: &str,
1773 event_tx: tokio::sync::mpsc::Sender<TurnEvent>,
1774 cancel_token: Option<tokio_util::sync::CancellationToken>,
1775 ) -> Result<(String, Vec<ConversationMessage>)> {
1776 self.turn_streamed_with_steering_state(user_message, event_tx, cancel_token, None)
1777 .await
1778 .map(|outcome| (outcome.response, outcome.new_messages))
1779 .map_err(|err| err.error)
1780 }
1781
1782 pub async fn turn_streamed_with_steering_state(
1783 &mut self,
1784 user_message: &str,
1785 event_tx: tokio::sync::mpsc::Sender<TurnEvent>,
1786 cancel_token: Option<tokio_util::sync::CancellationToken>,
1787 mut steering_rx: Option<&mut tokio::sync::mpsc::Receiver<String>>,
1788 ) -> std::result::Result<StreamedTurnSuccess, StreamedTurnError> {
1789 if self.history.is_empty() {
1791 let system_prompt = self
1792 .build_system_prompt()
1793 .map_err(|error| StreamedTurnError {
1794 error,
1795 committed_response: String::new(),
1796 new_messages: Vec::new(),
1797 })?;
1798 self.history
1799 .push(ConversationMessage::Chat(ChatMessage::system(
1800 system_prompt,
1801 )));
1802 }
1803
1804 let mut new_msgs: Vec<ConversationMessage> = Vec::new();
1805 self.append_streamed_user_message_to_history(user_message, &mut new_msgs)
1806 .await;
1807
1808 let effective_model = self.classify_model(user_message);
1809 let turn_started_at = std::time::Instant::now();
1810 let mut committed_response = String::new();
1811
1812 for _ in 0..self.config.max_tool_iterations {
1814 if cancel_token
1816 .as_ref()
1817 .is_some_and(tokio_util::sync::CancellationToken::is_cancelled)
1818 {
1819 self.append_streamed_assistant_message_to_history(
1820 "[interrupted by user]".to_string(),
1821 &mut new_msgs,
1822 &mut committed_response,
1823 );
1824 return Err(StreamedTurnError {
1825 error: crate::agent::loop_::ToolLoopCancelled.into(),
1826 committed_response,
1827 new_messages: new_msgs,
1828 });
1829 }
1830
1831 for steering_message in Self::drain_steering_messages(&mut steering_rx) {
1832 self.append_streamed_user_message_to_history(&steering_message, &mut new_msgs)
1833 .await;
1834 }
1835
1836 let messages = self.tool_dispatcher.to_provider_messages(&self.history);
1837 let prepared_messages = match self.prepare_provider_messages(&messages).await {
1838 Ok(messages) => messages,
1839 Err(error) => {
1840 return Err(StreamedTurnError {
1841 error,
1842 committed_response,
1843 new_messages: new_msgs,
1844 });
1845 }
1846 };
1847
1848 let cache_key =
1850 self.response_cache_key_for_messages(&prepared_messages, &effective_model);
1851
1852 if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
1853 if let Ok(Some(cached)) = cache.get(key) {
1854 self.observer.record_event(&ObserverEvent::CacheHit {
1855 cache_type: "response".into(),
1856 tokens_saved: 0,
1857 });
1858 let cached_msg =
1859 ConversationMessage::Chat(ChatMessage::assistant(cached.clone()));
1860 new_msgs.push(cached_msg.clone());
1861 self.history.push(cached_msg);
1862 self.trim_history();
1863 self.observer.record_event(&ObserverEvent::TurnComplete);
1864 self.observer.record_event(&ObserverEvent::AgentEnd {
1865 model_provider: self.model_provider_name.clone(),
1866 model: effective_model.clone(),
1867 duration: turn_started_at.elapsed(),
1868 tokens_used: None,
1869 cost_usd: None,
1870 });
1871 committed_response.push_str(&cached);
1872 return Ok(StreamedTurnSuccess {
1873 response: committed_response,
1874 new_messages: new_msgs,
1875 });
1876 }
1877 self.observer.record_event(&ObserverEvent::CacheMiss {
1878 cache_type: "response".into(),
1879 });
1880 }
1881
1882 use futures_util::StreamExt;
1886
1887 let llm_started_at = Instant::now();
1888 self.observer.record_event(&ObserverEvent::LlmRequest {
1889 model_provider: self.model_provider_name.clone(),
1890 model: effective_model.clone(),
1891 messages_count: messages.len(),
1892 });
1893
1894 let stream_opts = zeroclaw_providers::traits::StreamOptions::new(
1895 self.model_provider.supports_streaming(),
1896 );
1897 let mut stream = self.model_provider.stream_chat(
1898 zeroclaw_providers::ChatRequest {
1899 messages: &prepared_messages,
1900 tools: if self.should_send_tool_specs() {
1901 Some(&self.tool_specs)
1902 } else {
1903 None
1904 },
1905 thinking: None,
1906 },
1907 &effective_model,
1908 Some(self.temperature),
1909 stream_opts,
1910 );
1911
1912 let mut streamed_text = String::new();
1913 let mut streamed_reasoning = String::new();
1914 let mut streamed_tool_calls: Vec<zeroclaw_providers::traits::ToolCall> = Vec::new();
1915 let mut streamed_usage: Option<zeroclaw_providers::traits::TokenUsage> = None;
1916 let mut got_stream = false;
1917 let mut pre_executed_call_ids: HashMap<String, VecDeque<String>> = HashMap::new();
1918 let mut was_cancelled = false;
1919
1920 loop {
1925 let next_item = stream.next();
1926
1927 let item = if let Some(ref token) = cancel_token {
1928 tokio::select! {
1929 biased;
1930 () = token.cancelled() => {
1931 was_cancelled = true;
1932 break;
1933 }
1934 item = next_item => item,
1935 }
1936 } else {
1937 next_item.await
1938 };
1939
1940 let Some(item) = item else { break };
1941 match item {
1942 Ok(event) => match event {
1943 zeroclaw_providers::traits::StreamEvent::TextDelta(chunk) => {
1944 if let Some(reasoning) = chunk.reasoning
1945 && !reasoning.is_empty()
1946 {
1947 streamed_reasoning.push_str(&reasoning);
1951 let _ = event_tx
1952 .send(TurnEvent::Thinking { delta: reasoning })
1953 .await;
1954 }
1955 if !chunk.delta.is_empty() {
1956 got_stream = true;
1957 streamed_text.push_str(&chunk.delta);
1958 let _ =
1959 event_tx.send(TurnEvent::Chunk { delta: chunk.delta }).await;
1960 }
1961 }
1962 zeroclaw_providers::traits::StreamEvent::ToolCall(tc) => {
1963 got_stream = true;
1964 streamed_tool_calls.push(tc);
1967 }
1968 zeroclaw_providers::traits::StreamEvent::PreExecutedToolCall {
1969 name,
1970 args,
1971 } => {
1972 let call_id = uuid::Uuid::new_v4().to_string();
1973 pre_executed_call_ids
1974 .entry(name.clone())
1975 .or_default()
1976 .push_back(call_id.clone());
1977 let _ = event_tx
1978 .send(TurnEvent::ToolCall {
1979 id: call_id,
1980 name,
1981 args: serde_json::from_str(&args).unwrap_or_default(),
1982 })
1983 .await;
1984 }
1986 zeroclaw_providers::traits::StreamEvent::PreExecutedToolResult {
1987 name,
1988 output,
1989 } => {
1990 let result_id = pre_executed_call_ids
1991 .get_mut(&name)
1992 .and_then(|ids| ids.pop_front())
1993 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1994 let _ = event_tx
1995 .send(TurnEvent::ToolResult {
1996 id: result_id,
1997 name,
1998 output,
1999 })
2000 .await;
2001 }
2002 zeroclaw_providers::traits::StreamEvent::Usage(usage) => {
2003 streamed_usage = Some(usage);
2004 }
2005 zeroclaw_providers::traits::StreamEvent::Final => break,
2006 },
2007 Err(error) => {
2008 if got_stream || !committed_response.is_empty() {
2009 if !streamed_text.is_empty() {
2010 let partial = Self::marked_partial_response(
2011 &streamed_text,
2012 "[stream interrupted]",
2013 );
2014 self.append_streamed_assistant_message_to_history(
2015 partial,
2016 &mut new_msgs,
2017 &mut committed_response,
2018 );
2019 }
2020 let safe_error =
2021 zeroclaw_providers::sanitize_api_error(&error.to_string());
2022 self.observer.record_event(&ObserverEvent::LlmResponse {
2023 model_provider: self.model_provider_name.clone(),
2024 model: effective_model.clone(),
2025 duration: llm_started_at.elapsed(),
2026 success: false,
2027 error_message: Some(safe_error),
2028 input_tokens: None,
2029 output_tokens: None,
2030 });
2031 return Err(StreamedTurnError {
2032 error: anyhow::Error::msg(error.to_string()),
2033 committed_response,
2034 new_messages: new_msgs,
2035 });
2036 }
2037 break;
2038 }
2039 }
2040 }
2041 drop(stream);
2043
2044 if was_cancelled {
2048 let partial =
2049 Self::marked_partial_response(&streamed_text, "[interrupted by user]");
2050 self.append_streamed_assistant_message_to_history(
2051 partial,
2052 &mut new_msgs,
2053 &mut committed_response,
2054 );
2055 self.observer.record_event(&ObserverEvent::LlmResponse {
2056 model_provider: self.model_provider_name.clone(),
2057 model: effective_model.clone(),
2058 duration: llm_started_at.elapsed(),
2059 success: false,
2060 error_message: Some("request cancelled by user".into()),
2061 input_tokens: None,
2062 output_tokens: None,
2063 });
2064 return Err(StreamedTurnError {
2065 error: crate::agent::loop_::ToolLoopCancelled.into(),
2066 committed_response,
2067 new_messages: new_msgs,
2068 });
2069 }
2070
2071 let response = if got_stream {
2074 zeroclaw_providers::ChatResponse {
2080 text: Some(streamed_text),
2081 tool_calls: streamed_tool_calls,
2082 usage: streamed_usage.clone(),
2083 reasoning_content: if streamed_reasoning.is_empty() {
2084 None
2085 } else {
2086 Some(streamed_reasoning)
2087 },
2088 }
2089 } else {
2090 let chat_fut = self.model_provider.chat(
2092 ChatRequest {
2093 messages: &prepared_messages,
2094 tools: if self.should_send_tool_specs() {
2095 Some(&self.tool_specs)
2096 } else {
2097 None
2098 },
2099 thinking: None,
2100 },
2101 &effective_model,
2102 Some(self.temperature),
2103 );
2104 let chat_result = if let Some(ref token) = cancel_token {
2105 tokio::select! {
2106 biased;
2107 () = token.cancelled() => {
2108 self.append_streamed_assistant_message_to_history(
2109 "[interrupted by user]".to_string(),
2110 &mut new_msgs,
2111 &mut committed_response,
2112 );
2113 self.observer.record_event(&ObserverEvent::LlmResponse {
2114 model_provider: self.model_provider_name.clone(),
2115 model: effective_model.clone(),
2116 duration: llm_started_at.elapsed(),
2117 success: false,
2118 error_message: Some("request cancelled by user".into()),
2119 input_tokens: None,
2120 output_tokens: None,
2121 });
2122 return Err(StreamedTurnError {
2123 error: crate::agent::loop_::ToolLoopCancelled.into(),
2124 committed_response,
2125 new_messages: new_msgs,
2126 });
2127 }
2128 result = chat_fut => result,
2129 }
2130 } else {
2131 chat_fut.await
2132 };
2133 match chat_result {
2134 Ok(resp) => resp,
2135 Err(error) => {
2136 let safe_error = zeroclaw_providers::sanitize_api_error(&error.to_string());
2137 self.observer.record_event(&ObserverEvent::LlmResponse {
2138 model_provider: self.model_provider_name.clone(),
2139 model: effective_model.clone(),
2140 duration: llm_started_at.elapsed(),
2141 success: false,
2142 error_message: Some(safe_error),
2143 input_tokens: None,
2144 output_tokens: None,
2145 });
2146 return Err(StreamedTurnError {
2147 error,
2148 committed_response,
2149 new_messages: new_msgs,
2150 });
2151 }
2152 }
2153 };
2154
2155 let (resp_input_tokens, resp_output_tokens) = response
2156 .usage
2157 .as_ref()
2158 .map(|u| (u.input_tokens, u.output_tokens))
2159 .unwrap_or((None, None));
2160 self.observer.record_event(&ObserverEvent::LlmResponse {
2161 model_provider: self.model_provider_name.clone(),
2162 model: effective_model.clone(),
2163 duration: llm_started_at.elapsed(),
2164 success: true,
2165 error_message: None,
2166 input_tokens: resp_input_tokens,
2167 output_tokens: resp_output_tokens,
2168 });
2169
2170 if let Some(ref usage) = response.usage {
2175 let _ = event_tx
2176 .send(TurnEvent::Usage {
2177 input_tokens: usage.input_tokens,
2178 output_tokens: usage.output_tokens,
2179 cost_usd: None,
2180 })
2181 .await;
2182 }
2183
2184 let (text, mut calls) = self.parse_response_for_effective_tools(&response);
2185 if calls.is_empty() {
2186 let final_text = if text.is_empty() && !self.tool_specs.is_empty() {
2187 response.text.unwrap_or_default()
2188 } else {
2189 text
2190 };
2191
2192 let steering_messages = Self::drain_steering_messages(&mut steering_rx);
2193 if !steering_messages.is_empty() {
2194 if !final_text.is_empty() {
2195 let assistant_msg =
2196 ConversationMessage::Chat(ChatMessage::assistant(final_text.clone()));
2197 new_msgs.push(assistant_msg.clone());
2198 self.history.push(assistant_msg);
2199 committed_response.push_str(&final_text);
2200 self.trim_history();
2201 }
2202
2203 for steering_message in steering_messages {
2204 self.append_streamed_user_message_to_history(
2205 &steering_message,
2206 &mut new_msgs,
2207 )
2208 .await;
2209 }
2210 continue;
2211 }
2212
2213 if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
2215 let token_count = response
2216 .usage
2217 .as_ref()
2218 .and_then(|u| u.output_tokens)
2219 .unwrap_or(0);
2220 #[allow(clippy::cast_possible_truncation)]
2221 let _ = cache.put(key, &effective_model, &final_text, token_count as u32);
2222 }
2223
2224 if !got_stream && !final_text.is_empty() {
2226 let _ = event_tx
2227 .send(TurnEvent::Chunk {
2228 delta: final_text.clone(),
2229 })
2230 .await;
2231 }
2232
2233 new_msgs.push(ConversationMessage::Chat(ChatMessage::assistant(
2234 final_text.clone(),
2235 )));
2236 self.history
2237 .push(ConversationMessage::Chat(ChatMessage::assistant(
2238 final_text.clone(),
2239 )));
2240 committed_response.push_str(&final_text);
2241 self.trim_history();
2242 self.observer.record_event(&ObserverEvent::TurnComplete);
2243 self.observer.record_event(&ObserverEvent::AgentEnd {
2244 model_provider: self.model_provider_name.clone(),
2245 model: effective_model.clone(),
2246 duration: turn_started_at.elapsed(),
2247 tokens_used: None,
2248 cost_usd: None,
2249 });
2250 return Ok(StreamedTurnSuccess {
2251 response: committed_response,
2252 new_messages: new_msgs,
2253 });
2254 }
2255
2256 for call in &mut calls {
2258 if call.tool_call_id.is_none() {
2259 call.tool_call_id = Some(uuid::Uuid::new_v4().to_string());
2260 }
2261 }
2262
2263 let tool_call_msg = ConversationMessage::AssistantToolCalls {
2265 text: response.text.clone(),
2266 tool_calls: response.tool_calls.clone(),
2267 reasoning_content: response.reasoning_content.clone(),
2268 };
2269 new_msgs.push(tool_call_msg.clone());
2270 self.history.push(tool_call_msg);
2271
2272 for call in &calls {
2274 let call_id = call.tool_call_id.as_ref().unwrap().clone();
2275 let _ = event_tx
2276 .send(TurnEvent::ToolCall {
2277 id: call_id,
2278 name: call.name.clone(),
2279 args: call.arguments.clone(),
2280 })
2281 .await;
2282 }
2283
2284 let results = self.execute_tools(&calls).await;
2285
2286 for result in &results {
2288 let result_id = result.tool_call_id.as_ref().unwrap().clone();
2289 let _ = event_tx
2290 .send(TurnEvent::ToolResult {
2291 id: result_id,
2292 name: result.name.clone(),
2293 output: result.output.clone(),
2294 })
2295 .await;
2296 }
2297
2298 let formatted = self.tool_dispatcher.format_results(&results);
2299 new_msgs.push(formatted.clone());
2300 self.history.push(formatted);
2301 self.trim_history();
2302 }
2303
2304 Err(StreamedTurnError {
2305 error: anyhow::Error::msg(format!(
2306 "Agent exceeded maximum tool iterations ({})",
2307 self.config.max_tool_iterations
2308 )),
2309 committed_response,
2310 new_messages: new_msgs,
2311 })
2312 }
2313
2314 pub async fn run_single(&mut self, message: &str) -> Result<String> {
2315 self.turn(message).await
2316 }
2317
2318 pub async fn run_interactive(&mut self) -> Result<()> {
2319 println!("🦀 ZeroClaw Interactive Mode");
2320 println!("Type /quit to exit.\n");
2321
2322 let (tx, mut rx) = tokio::sync::mpsc::channel(32);
2323 let cli = crate::agent::loop_::CLI_CHANNEL_FN
2324 .get()
2325 .expect("CLI channel factory not registered — call register_cli_channel_fn at startup")(
2326 );
2327
2328 let listen_handle = tokio::spawn(async move {
2329 let _ = zeroclaw_api::channel::Channel::listen(&*cli, tx).await;
2330 });
2331
2332 while let Some(msg) = rx.recv().await {
2333 let response = match self.turn(&msg.content).await {
2334 Ok(resp) => resp,
2335 Err(e) => {
2336 eprintln!("\nError: {e}\n");
2337 continue;
2338 }
2339 };
2340 println!("\n{response}\n");
2341 }
2342
2343 listen_handle.abort();
2344 Ok(())
2345 }
2346}
2347
2348pub async fn run(
2349 config: Config,
2350 agent_alias: &str,
2351 message: Option<String>,
2352 provider_override: Option<String>,
2353 model_override: Option<String>,
2354 temperature: f64,
2355) -> Result<()> {
2356 let start = Instant::now();
2357
2358 let mut effective_config = config;
2359 if let Some(p) = provider_override {
2360 if let Some((type_key, alias_key)) = p.split_once('.') {
2363 effective_config
2364 .providers
2365 .models
2366 .ensure(type_key, alias_key);
2367 } else {
2368 effective_config.providers.models.ensure(&p, "default");
2369 }
2370 }
2371 if let Some(entry) = effective_config.first_model_provider_mut() {
2372 if let Some(m) = model_override {
2373 entry.model = Some(m);
2374 }
2375 entry.temperature = Some(temperature);
2376 }
2377
2378 let mut agent = Agent::from_config(&effective_config, agent_alias).await?;
2379
2380 let channels_seed = loop_::seed_channel_handles(
2382 &agent.channel_handles.ask_user,
2383 &agent.channel_handles.reaction,
2384 &agent.channel_handles.poll,
2385 &agent.channel_handles.escalate,
2386 &agent.channel_handles.channel_send,
2387 );
2388 if channels_seed > 0 {
2389 ::zeroclaw_log::record!(
2390 INFO,
2391 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2392 .with_attrs(::serde_json::json!({"count": channels_seed})),
2393 &format!("Registered {} channel(s) for CLI agent", channels_seed),
2394 );
2395 }
2396
2397 let provider_name = effective_config
2398 .first_model_provider_alias()
2399 .unwrap_or_else(|| "openrouter.default".to_string());
2400 let model_name = effective_config
2405 .first_model_provider()
2406 .and_then(|e| e.model.as_deref())
2407 .map(str::trim)
2408 .filter(|m| !m.is_empty())
2409 .map(ToString::to_string)
2410 .or_else(|| effective_config.resolve_default_model())
2411 .unwrap_or_else(|| "<unresolved>".to_string());
2412
2413 agent.observer.record_event(&ObserverEvent::AgentStart {
2414 model_provider: provider_name.clone(),
2415 model: model_name.clone(),
2416 });
2417
2418 if let Some(msg) = message {
2419 let response = agent.run_single(&msg).await?;
2420 println!("{response}");
2421 } else {
2422 agent.run_interactive().await?;
2423 }
2424
2425 agent.observer.record_event(&ObserverEvent::AgentEnd {
2426 model_provider: provider_name,
2427 model: model_name,
2428 duration: start.elapsed(),
2429 tokens_used: None,
2430 cost_usd: None,
2431 });
2432
2433 Ok(())
2434}
2435
2436#[cfg(test)]
2437mod tests {
2438 use super::*;
2439 use async_trait::async_trait;
2440 use parking_lot::Mutex;
2441 use std::collections::HashMap;
2442 use std::sync::atomic::{AtomicUsize, Ordering};
2443 use zeroclaw_api::observability_traits::ObserverMetric;
2444
2445 zeroclaw_api::mock_tool_attribution!(
2446 CountingTool,
2447 NamedMockTool,
2448 MockTool,
2449 CapturingApprovalArgTool,
2450 );
2451
2452 struct MockModelProvider {
2453 responses: Mutex<Vec<zeroclaw_providers::ChatResponse>>,
2454 }
2455
2456 #[async_trait]
2457 impl ModelProvider for MockModelProvider {
2458 async fn chat_with_system(
2459 &self,
2460 _system_prompt: Option<&str>,
2461 _message: &str,
2462 _model: &str,
2463 _temperature: Option<f64>,
2464 ) -> Result<String> {
2465 Ok("ok".into())
2466 }
2467
2468 async fn chat(
2469 &self,
2470 _request: ChatRequest<'_>,
2471 _model: &str,
2472 _temperature: Option<f64>,
2473 ) -> Result<zeroclaw_providers::ChatResponse> {
2474 let mut guard = self.responses.lock();
2475 if guard.is_empty() {
2476 return Ok(zeroclaw_providers::ChatResponse {
2477 text: Some("done".into()),
2478 tool_calls: vec![],
2479 usage: None,
2480 reasoning_content: None,
2481 });
2482 }
2483 Ok(guard.remove(0))
2484 }
2485 }
2486 impl ::zeroclaw_api::attribution::Attributable for MockModelProvider {
2487 fn role(&self) -> ::zeroclaw_api::attribution::Role {
2488 ::zeroclaw_api::attribution::Role::Provider(
2489 ::zeroclaw_api::attribution::ProviderKind::Model(
2490 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
2491 ),
2492 )
2493 }
2494 fn alias(&self) -> &str {
2495 "MockModelProvider"
2496 }
2497 }
2498
2499 struct ModelCaptureModelProvider {
2500 responses: Mutex<Vec<zeroclaw_providers::ChatResponse>>,
2501 seen_models: Arc<Mutex<Vec<String>>>,
2502 }
2503
2504 #[async_trait]
2505 impl ModelProvider for ModelCaptureModelProvider {
2506 async fn chat_with_system(
2507 &self,
2508 _system_prompt: Option<&str>,
2509 _message: &str,
2510 _model: &str,
2511 _temperature: Option<f64>,
2512 ) -> Result<String> {
2513 Ok("ok".into())
2514 }
2515
2516 async fn chat(
2517 &self,
2518 _request: ChatRequest<'_>,
2519 model: &str,
2520 _temperature: Option<f64>,
2521 ) -> Result<zeroclaw_providers::ChatResponse> {
2522 self.seen_models.lock().push(model.to_string());
2523 let mut guard = self.responses.lock();
2524 if guard.is_empty() {
2525 return Ok(zeroclaw_providers::ChatResponse {
2526 text: Some("done".into()),
2527 tool_calls: vec![],
2528 usage: None,
2529 reasoning_content: None,
2530 });
2531 }
2532 Ok(guard.remove(0))
2533 }
2534 }
2535 impl ::zeroclaw_api::attribution::Attributable for ModelCaptureModelProvider {
2536 fn role(&self) -> ::zeroclaw_api::attribution::Role {
2537 ::zeroclaw_api::attribution::Role::Provider(
2538 ::zeroclaw_api::attribution::ProviderKind::Model(
2539 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
2540 ),
2541 )
2542 }
2543 fn alias(&self) -> &str {
2544 "ModelCaptureModelProvider"
2545 }
2546 }
2547
2548 struct TranscriptCaptureModelProvider {
2549 responses: Mutex<Vec<zeroclaw_providers::ChatResponse>>,
2550 seen_messages: Arc<Mutex<Vec<Vec<ChatMessage>>>>,
2551 }
2552
2553 #[async_trait]
2554 impl ModelProvider for TranscriptCaptureModelProvider {
2555 async fn chat_with_system(
2556 &self,
2557 _system_prompt: Option<&str>,
2558 _message: &str,
2559 _model: &str,
2560 _temperature: Option<f64>,
2561 ) -> Result<String> {
2562 Ok("ok".into())
2563 }
2564
2565 async fn chat(
2566 &self,
2567 request: ChatRequest<'_>,
2568 _model: &str,
2569 _temperature: Option<f64>,
2570 ) -> Result<zeroclaw_providers::ChatResponse> {
2571 self.seen_messages.lock().push(request.messages.to_vec());
2572 let mut responses = self.responses.lock();
2573 if responses.is_empty() {
2574 return Ok(zeroclaw_providers::ChatResponse {
2575 text: Some("done".into()),
2576 tool_calls: vec![],
2577 usage: None,
2578 reasoning_content: None,
2579 });
2580 }
2581 Ok(responses.remove(0))
2582 }
2583 }
2584
2585 impl ::zeroclaw_api::attribution::Attributable for TranscriptCaptureModelProvider {
2586 fn role(&self) -> ::zeroclaw_api::attribution::Role {
2587 ::zeroclaw_api::attribution::Role::Provider(
2588 ::zeroclaw_api::attribution::ProviderKind::Model(
2589 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
2590 ),
2591 )
2592 }
2593 fn alias(&self) -> &str {
2594 "TranscriptCaptureModelProvider"
2595 }
2596 }
2597
2598 struct StreamingSteeringModelProvider {
2599 seen_messages: Arc<Mutex<Vec<Vec<ChatMessage>>>>,
2600 call_count: AtomicUsize,
2601 fail_on_call: Option<usize>,
2602 fail_after_delta_on_call: Option<usize>,
2603 }
2604
2605 #[async_trait]
2606 impl ModelProvider for StreamingSteeringModelProvider {
2607 async fn chat_with_system(
2608 &self,
2609 _system_prompt: Option<&str>,
2610 _message: &str,
2611 _model: &str,
2612 _temperature: Option<f64>,
2613 ) -> Result<String> {
2614 Ok("ok".into())
2615 }
2616
2617 async fn chat(
2618 &self,
2619 request: ChatRequest<'_>,
2620 _model: &str,
2621 _temperature: Option<f64>,
2622 ) -> Result<zeroclaw_providers::ChatResponse> {
2623 let call = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
2624 self.seen_messages.lock().push(request.messages.to_vec());
2625 if self.fail_on_call == Some(call) {
2626 anyhow::bail!("synthetic provider failure on call {call}");
2627 }
2628 if self.fail_after_delta_on_call == Some(call) {
2629 anyhow::bail!("synthetic provider failure after delta on call {call}");
2630 }
2631 Ok(zeroclaw_providers::ChatResponse {
2632 text: Some(if call == 1 { "draft" } else { "final" }.into()),
2633 tool_calls: vec![],
2634 usage: None,
2635 reasoning_content: None,
2636 })
2637 }
2638
2639 fn supports_streaming(&self) -> bool {
2640 true
2641 }
2642
2643 fn stream_chat(
2644 &self,
2645 request: ChatRequest<'_>,
2646 _model: &str,
2647 _temperature: Option<f64>,
2648 _options: zeroclaw_providers::traits::StreamOptions,
2649 ) -> futures_util::stream::BoxStream<
2650 'static,
2651 zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
2652 > {
2653 use futures_util::StreamExt as _;
2654
2655 let call = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
2656 self.seen_messages.lock().push(request.messages.to_vec());
2657 let should_fail = self.fail_on_call == Some(call);
2658 let should_fail_after_delta = self.fail_after_delta_on_call == Some(call);
2659 let delta = if call == 1 { "draft" } else { "final" }.to_string();
2660 futures_util::stream::unfold(0, move |step| {
2661 let delta = delta.clone();
2662 async move {
2663 match step {
2664 0 if should_fail => Some((
2665 Err(zeroclaw_providers::traits::StreamError::ModelProvider(
2666 "synthetic provider failure".into(),
2667 )),
2668 1,
2669 )),
2670 0 => Some((
2671 Ok(zeroclaw_providers::traits::StreamEvent::TextDelta(
2672 zeroclaw_providers::traits::StreamChunk {
2673 delta,
2674 is_final: false,
2675 reasoning: None,
2676 token_count: 0,
2677 },
2678 )),
2679 1,
2680 )),
2681 1 if should_fail_after_delta => Some((
2682 Err(zeroclaw_providers::traits::StreamError::ModelProvider(
2683 "synthetic provider failure after delta".into(),
2684 )),
2685 2,
2686 )),
2687 1 => {
2688 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
2689 Some((Ok(zeroclaw_providers::traits::StreamEvent::Final), 2))
2690 }
2691 _ => None,
2692 }
2693 }
2694 })
2695 .boxed()
2696 }
2697 }
2698
2699 impl ::zeroclaw_api::attribution::Attributable for StreamingSteeringModelProvider {
2700 fn role(&self) -> ::zeroclaw_api::attribution::Role {
2701 ::zeroclaw_api::attribution::Role::Provider(
2702 ::zeroclaw_api::attribution::ProviderKind::Model(
2703 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
2704 ),
2705 )
2706 }
2707 fn alias(&self) -> &str {
2708 "StreamingSteeringModelProvider"
2709 }
2710 }
2711
2712 #[derive(Default)]
2713 struct CapturingObserver {
2714 events: parking_lot::Mutex<Vec<ObserverEvent>>,
2715 }
2716
2717 impl Observer for CapturingObserver {
2718 fn record_event(&self, event: &ObserverEvent) {
2719 self.events.lock().push(event.clone());
2720 }
2721 fn record_metric(&self, _metric: &ObserverMetric) {}
2722 fn name(&self) -> &str {
2723 "capturing"
2724 }
2725 fn as_any(&self) -> &dyn std::any::Any {
2726 self
2727 }
2728 fn flush(&self) {}
2729 }
2730
2731 struct MultimodalCaptureProvider {
2732 seen_user_messages: Arc<Mutex<Vec<String>>>,
2733 streamed: bool,
2734 }
2735
2736 #[async_trait]
2737 impl ModelProvider for MultimodalCaptureProvider {
2738 async fn chat_with_system(
2739 &self,
2740 _system_prompt: Option<&str>,
2741 _message: &str,
2742 _model: &str,
2743 _temperature: Option<f64>,
2744 ) -> Result<String> {
2745 Ok("ok".into())
2746 }
2747
2748 async fn chat(
2749 &self,
2750 request: ChatRequest<'_>,
2751 _model: &str,
2752 _temperature: Option<f64>,
2753 ) -> Result<zeroclaw_providers::ChatResponse> {
2754 if let Some(message) = request.messages.iter().rfind(|msg| msg.role == "user") {
2755 self.seen_user_messages.lock().push(message.content.clone());
2756 }
2757 Ok(zeroclaw_providers::ChatResponse {
2758 text: Some("done".into()),
2759 tool_calls: vec![],
2760 usage: None,
2761 reasoning_content: None,
2762 })
2763 }
2764
2765 fn stream_chat(
2766 &self,
2767 request: ChatRequest<'_>,
2768 _model: &str,
2769 _temperature: Option<f64>,
2770 _options: zeroclaw_providers::traits::StreamOptions,
2771 ) -> futures_util::stream::BoxStream<
2772 'static,
2773 zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
2774 > {
2775 use futures_util::stream::{self, StreamExt};
2776
2777 if let Some(message) = request.messages.iter().rfind(|msg| msg.role == "user") {
2778 self.seen_user_messages.lock().push(message.content.clone());
2779 }
2780
2781 if self.streamed {
2782 let chunk = zeroclaw_providers::traits::StreamEvent::TextDelta(
2783 zeroclaw_providers::traits::StreamChunk {
2784 delta: "stream-done".into(),
2785 is_final: false,
2786 reasoning: None,
2787 token_count: 0,
2788 },
2789 );
2790 stream::iter(vec![
2791 Ok(chunk),
2792 Ok(zeroclaw_providers::traits::StreamEvent::Final),
2793 ])
2794 .boxed()
2795 } else {
2796 stream::iter(vec![Ok(zeroclaw_providers::traits::StreamEvent::Final)]).boxed()
2797 }
2798 }
2799
2800 fn supports_vision(&self) -> bool {
2801 true
2802 }
2803 }
2804 impl ::zeroclaw_api::attribution::Attributable for MultimodalCaptureProvider {
2805 fn role(&self) -> ::zeroclaw_api::attribution::Role {
2806 ::zeroclaw_api::attribution::Role::Provider(
2807 ::zeroclaw_api::attribution::ProviderKind::Model(
2808 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
2809 ),
2810 )
2811 }
2812 fn alias(&self) -> &str {
2813 "MultimodalCaptureProvider"
2814 }
2815 }
2816
2817 struct MockTool;
2818
2819 #[async_trait]
2820 impl Tool for MockTool {
2821 fn name(&self) -> &str {
2822 "echo"
2823 }
2824
2825 fn description(&self) -> &str {
2826 "echo"
2827 }
2828
2829 fn parameters_schema(&self) -> serde_json::Value {
2830 serde_json::json!({"type": "object"})
2831 }
2832
2833 async fn execute(&self, _args: serde_json::Value) -> Result<crate::tools::ToolResult> {
2834 Ok(crate::tools::ToolResult {
2835 success: true,
2836 output: "tool-out".into(),
2837 error: None,
2838 })
2839 }
2840 }
2841
2842 struct CountingTool {
2843 calls: Arc<AtomicUsize>,
2844 }
2845
2846 #[async_trait]
2847 impl Tool for CountingTool {
2848 fn name(&self) -> &str {
2849 "echo"
2850 }
2851
2852 fn description(&self) -> &str {
2853 "echo"
2854 }
2855
2856 fn parameters_schema(&self) -> serde_json::Value {
2857 serde_json::json!({"type": "object"})
2858 }
2859
2860 async fn execute(&self, _args: serde_json::Value) -> Result<crate::tools::ToolResult> {
2861 self.calls.fetch_add(1, Ordering::SeqCst);
2862 Ok(crate::tools::ToolResult {
2863 success: true,
2864 output: "tool-out".into(),
2865 error: None,
2866 })
2867 }
2868 }
2869
2870 struct CapturingApprovalArgTool {
2871 name: &'static str,
2872 output: &'static str,
2873 calls: Arc<AtomicUsize>,
2874 last_args: Arc<std::sync::Mutex<Option<serde_json::Value>>>,
2875 }
2876
2877 #[async_trait]
2878 impl Tool for CapturingApprovalArgTool {
2879 fn name(&self) -> &str {
2880 self.name
2881 }
2882
2883 fn description(&self) -> &str {
2884 self.name
2885 }
2886
2887 fn parameters_schema(&self) -> serde_json::Value {
2888 serde_json::json!({"type": "object"})
2889 }
2890
2891 async fn execute(&self, args: serde_json::Value) -> Result<crate::tools::ToolResult> {
2892 self.calls.fetch_add(1, Ordering::SeqCst);
2893 *self.last_args.lock().unwrap() = Some(args);
2894 Ok(crate::tools::ToolResult {
2895 success: true,
2896 output: self.output.into(),
2897 error: None,
2898 })
2899 }
2900 }
2901
2902 struct ApprovalChannel {
2903 response: zeroclaw_api::channel::ChannelApprovalResponse,
2904 requests: Arc<AtomicUsize>,
2905 }
2906
2907 impl ::zeroclaw_api::attribution::Attributable for ApprovalChannel {
2908 fn role(&self) -> ::zeroclaw_api::attribution::Role {
2909 ::zeroclaw_api::attribution::Role::Channel(
2910 ::zeroclaw_api::attribution::ChannelKind::AcpChannel,
2911 )
2912 }
2913 fn alias(&self) -> &str {
2914 "test"
2915 }
2916 }
2917
2918 #[async_trait]
2919 impl zeroclaw_api::channel::Channel for ApprovalChannel {
2920 fn name(&self) -> &str {
2921 "acp"
2922 }
2923
2924 async fn send(&self, _message: &zeroclaw_api::channel::SendMessage) -> anyhow::Result<()> {
2925 Ok(())
2926 }
2927
2928 async fn listen(
2929 &self,
2930 _tx: tokio::sync::mpsc::Sender<zeroclaw_api::channel::ChannelMessage>,
2931 ) -> anyhow::Result<()> {
2932 Ok(())
2933 }
2934
2935 async fn request_approval(
2936 &self,
2937 _recipient: &str,
2938 _request: &zeroclaw_api::channel::ChannelApprovalRequest,
2939 ) -> anyhow::Result<Option<zeroclaw_api::channel::ChannelApprovalResponse>> {
2940 self.requests.fetch_add(1, Ordering::SeqCst);
2941 Ok(Some(self.response))
2942 }
2943 }
2944
2945 #[tokio::test]
2946 async fn turn_without_tools_returns_text() {
2947 let model_provider = Box::new(MockModelProvider {
2948 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
2949 text: Some("hello".into()),
2950 tool_calls: vec![],
2951 usage: None,
2952 reasoning_content: None,
2953 }]),
2954 });
2955
2956 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
2957 backend: "none".into(),
2958 ..zeroclaw_config::schema::MemoryConfig::default()
2959 };
2960 let mem: Arc<dyn Memory> = Arc::from(
2961 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
2962 .expect("memory creation should succeed with valid config"),
2963 );
2964
2965 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
2966 let mut agent = Agent::builder()
2967 .model_provider(model_provider)
2968 .tools(vec![Box::new(MockTool)])
2969 .memory(mem)
2970 .observer(observer)
2971 .tool_dispatcher(Box::new(XmlToolDispatcher))
2972 .workspace_dir(std::path::PathBuf::from("/tmp"))
2973 .build()
2974 .expect("agent builder should succeed with valid config");
2975
2976 let response = agent.turn("hi").await.unwrap();
2977 assert_eq!(response, "hello");
2978 }
2979
2980 #[tokio::test]
2981 async fn direct_agent_strict_tool_parsing_ignores_xml_dispatcher_calls() {
2982 let provider = Box::new(MockModelProvider {
2983 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
2984 text: Some(
2985 r#"<tool_call>{"name":"echo","arguments":{"value":"ignored"}}</tool_call>"#
2986 .into(),
2987 ),
2988 tool_calls: vec![],
2989 usage: None,
2990 reasoning_content: None,
2991 }]),
2992 });
2993
2994 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
2995 backend: "none".into(),
2996 ..zeroclaw_config::schema::MemoryConfig::default()
2997 };
2998 let mem: Arc<dyn Memory> = Arc::from(
2999 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3000 .expect("memory creation should succeed with valid config"),
3001 );
3002 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3003 let calls = Arc::new(AtomicUsize::new(0));
3004 let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
3005 strict_tool_parsing: true,
3006 ..zeroclaw_config::schema::AliasedAgentConfig::default()
3007 };
3008 let mut agent = Agent::builder()
3009 .model_provider(provider)
3010 .tools(vec![Box::new(CountingTool {
3011 calls: Arc::clone(&calls),
3012 })])
3013 .memory(mem)
3014 .observer(observer)
3015 .tool_dispatcher(Box::new(XmlToolDispatcher))
3016 .config(agent_config)
3017 .workspace_dir(std::path::PathBuf::from("/tmp"))
3018 .build()
3019 .expect("agent builder should succeed with valid config");
3020
3021 let system_prompt = agent
3022 .build_system_prompt()
3023 .expect("system prompt should render");
3024 assert!(
3025 !system_prompt.contains("## Tools"),
3026 "strict parsing should not advertise text tool instructions"
3027 );
3028 assert!(
3029 !system_prompt.contains("<tool_call"),
3030 "strict parsing should not advertise XML tool calls"
3031 );
3032
3033 let response = agent.turn("hi").await.unwrap();
3034
3035 assert_eq!(calls.load(Ordering::SeqCst), 0);
3036 assert!(response.contains("<tool_call>"));
3037 }
3038
3039 #[test]
3040 fn native_agent_prompt_omits_duplicate_tools_section() {
3041 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3042 backend: "none".into(),
3043 ..zeroclaw_config::schema::MemoryConfig::default()
3044 };
3045 let workspace = tempfile::TempDir::new().expect("temp dir");
3046 let mem: Arc<dyn Memory> = Arc::from(
3047 zeroclaw_memory::create_memory(&memory_cfg, workspace.path(), None)
3048 .expect("memory creation should succeed with valid config"),
3049 );
3050 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3051
3052 let native_agent = Agent::builder()
3053 .model_provider(Box::new(MockModelProvider {
3054 responses: Mutex::new(vec![]),
3055 }))
3056 .tools(vec![Box::new(MockTool)])
3057 .memory(Arc::clone(&mem))
3058 .observer(Arc::clone(&observer))
3059 .tool_dispatcher(Box::new(NativeToolDispatcher))
3060 .workspace_dir(workspace.path().to_path_buf())
3061 .build()
3062 .expect("agent builder should succeed with valid config");
3063 let native_prompt = native_agent.build_system_prompt().unwrap();
3064 assert!(!native_prompt.contains("## Tools"));
3065 assert!(!native_prompt.contains("echo"));
3066
3067 let xml_agent = Agent::builder()
3068 .model_provider(Box::new(MockModelProvider {
3069 responses: Mutex::new(vec![]),
3070 }))
3071 .tools(vec![Box::new(MockTool)])
3072 .memory(mem)
3073 .observer(observer)
3074 .tool_dispatcher(Box::new(XmlToolDispatcher))
3075 .workspace_dir(workspace.path().to_path_buf())
3076 .build()
3077 .expect("agent builder should succeed with valid config");
3078 let xml_prompt = xml_agent.build_system_prompt().unwrap();
3079 assert!(xml_prompt.contains("## Tools"));
3080 assert!(xml_prompt.contains("echo"));
3081 assert!(xml_prompt.contains("## Tool Use Protocol"));
3082 }
3083
3084 #[tokio::test]
3085 async fn direct_agent_tool_execution_requests_acp_approval() {
3086 let model_provider = Box::new(MockModelProvider {
3087 responses: Mutex::new(vec![]),
3088 });
3089 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3090 backend: "none".into(),
3091 ..zeroclaw_config::schema::MemoryConfig::default()
3092 };
3093 let mem: Arc<dyn Memory> = Arc::from(
3094 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3095 .expect("memory creation should succeed with valid config"),
3096 );
3097 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3098 let tool_calls = Arc::new(AtomicUsize::new(0));
3099 let approval_requests = Arc::new(AtomicUsize::new(0));
3100 let approval_cfg = zeroclaw_config::schema::RiskProfileConfig {
3101 always_ask: vec!["echo".into()],
3102 ..zeroclaw_config::schema::RiskProfileConfig::default()
3103 };
3104 let mut agent = Agent::builder()
3105 .model_provider(model_provider)
3106 .tools(vec![Box::new(CountingTool {
3107 calls: Arc::clone(&tool_calls),
3108 })])
3109 .memory(mem)
3110 .observer(observer)
3111 .tool_dispatcher(Box::new(NativeToolDispatcher))
3112 .workspace_dir(std::path::PathBuf::from("/tmp"))
3113 .approval_manager(Some(Arc::new(ApprovalManager::for_non_interactive(
3114 &approval_cfg,
3115 ))))
3116 .build()
3117 .expect("agent builder should succeed with valid config");
3118
3119 let handle: tools::PerToolChannelHandle =
3120 Arc::new(parking_lot::RwLock::new(HashMap::new()));
3121 agent.channel_handles.ask_user = Some(Arc::clone(&handle));
3122 let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
3123 response: zeroclaw_api::channel::ChannelApprovalResponse::Approve,
3124 requests: Arc::clone(&approval_requests),
3125 });
3126 agent.channel_handles().register_channel("acp", channel);
3127
3128 let result = agent
3129 .execute_tool_call(&ParsedToolCall {
3130 name: "echo".into(),
3131 arguments: serde_json::json!({"message": "hi"}),
3132 tool_call_id: Some("tc1".into()),
3133 })
3134 .await;
3135
3136 assert!(result.success);
3137 assert_eq!(result.output, "tool-out");
3138 assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
3139 assert_eq!(tool_calls.load(Ordering::SeqCst), 1);
3140 }
3141
3142 #[tokio::test]
3143 async fn direct_agent_tool_execution_denies_when_acp_rejects() {
3144 let model_provider = Box::new(MockModelProvider {
3145 responses: Mutex::new(vec![]),
3146 });
3147 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3148 backend: "none".into(),
3149 ..zeroclaw_config::schema::MemoryConfig::default()
3150 };
3151 let mem: Arc<dyn Memory> = Arc::from(
3152 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3153 .expect("memory creation should succeed with valid config"),
3154 );
3155 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3156 let tool_calls = Arc::new(AtomicUsize::new(0));
3157 let approval_requests = Arc::new(AtomicUsize::new(0));
3158 let approval_cfg = zeroclaw_config::schema::RiskProfileConfig {
3159 always_ask: vec!["echo".into()],
3160 ..zeroclaw_config::schema::RiskProfileConfig::default()
3161 };
3162 let mut agent = Agent::builder()
3163 .model_provider(model_provider)
3164 .tools(vec![Box::new(CountingTool {
3165 calls: Arc::clone(&tool_calls),
3166 })])
3167 .memory(mem)
3168 .observer(observer)
3169 .tool_dispatcher(Box::new(NativeToolDispatcher))
3170 .workspace_dir(std::path::PathBuf::from("/tmp"))
3171 .approval_manager(Some(Arc::new(ApprovalManager::for_non_interactive(
3172 &approval_cfg,
3173 ))))
3174 .build()
3175 .expect("agent builder should succeed with valid config");
3176
3177 let handle: tools::PerToolChannelHandle =
3178 Arc::new(parking_lot::RwLock::new(HashMap::new()));
3179 agent.channel_handles.ask_user = Some(Arc::clone(&handle));
3180 let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
3181 response: zeroclaw_api::channel::ChannelApprovalResponse::Deny,
3182 requests: Arc::clone(&approval_requests),
3183 });
3184 agent.channel_handles().register_channel("acp", channel);
3185
3186 let result = agent
3187 .execute_tool_call(&ParsedToolCall {
3188 name: "echo".into(),
3189 arguments: serde_json::json!({"message": "hi"}),
3190 tool_call_id: Some("tc1".into()),
3191 })
3192 .await;
3193
3194 assert!(!result.success);
3195 assert_eq!(result.output, "Denied by user.");
3196 assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
3197 assert_eq!(tool_calls.load(Ordering::SeqCst), 0);
3198 }
3199
3200 #[tokio::test]
3201 async fn direct_agent_shell_does_not_trust_model_supplied_approved_arg() {
3202 let provider = Box::new(MockModelProvider {
3203 responses: Mutex::new(vec![]),
3204 });
3205 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3206 backend: "none".into(),
3207 ..zeroclaw_config::schema::MemoryConfig::default()
3208 };
3209 let mem: Arc<dyn Memory> = Arc::from(
3210 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3211 .expect("memory creation should succeed with valid config"),
3212 );
3213 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3214 let tool_calls = Arc::new(AtomicUsize::new(0));
3215 let approval_requests = Arc::new(AtomicUsize::new(0));
3216 let captured_args = Arc::new(std::sync::Mutex::new(None));
3217 let approval_cfg = zeroclaw_config::schema::RiskProfileConfig::default();
3218 let mut agent = Agent::builder()
3219 .model_provider(provider)
3220 .tools(vec![Box::new(CapturingApprovalArgTool {
3221 name: "shell",
3222 output: "shell-out",
3223 calls: Arc::clone(&tool_calls),
3224 last_args: Arc::clone(&captured_args),
3225 })])
3226 .memory(mem)
3227 .observer(observer)
3228 .tool_dispatcher(Box::new(NativeToolDispatcher))
3229 .workspace_dir(std::path::PathBuf::from("/tmp"))
3230 .approval_manager(Some(Arc::new(
3231 ApprovalManager::for_non_interactive_backchannel(&approval_cfg),
3232 )))
3233 .build()
3234 .expect("agent builder should succeed with valid config");
3235
3236 let handle: tools::PerToolChannelHandle =
3237 Arc::new(parking_lot::RwLock::new(HashMap::new()));
3238 agent.channel_handles.ask_user = Some(Arc::clone(&handle));
3239 let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
3240 response: zeroclaw_api::channel::ChannelApprovalResponse::Deny,
3241 requests: Arc::clone(&approval_requests),
3242 });
3243 agent.channel_handles().register_channel("acp", channel);
3244
3245 let result = agent
3246 .execute_tool_call(&ParsedToolCall {
3247 name: "shell".into(),
3248 arguments: serde_json::json!({
3249 "command": "touch should-not-run",
3250 "approved": true
3251 }),
3252 tool_call_id: Some("tc1".into()),
3253 })
3254 .await;
3255
3256 assert!(!result.success);
3257 assert_eq!(result.output, "Denied by user.");
3258 assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
3259 assert_eq!(tool_calls.load(Ordering::SeqCst), 0);
3260 assert!(captured_args.lock().unwrap().is_none());
3261 }
3262
3263 #[tokio::test]
3264 async fn direct_agent_shell_marks_args_approved_after_backchannel_approval() {
3265 let provider = Box::new(MockModelProvider {
3266 responses: Mutex::new(vec![]),
3267 });
3268 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3269 backend: "none".into(),
3270 ..zeroclaw_config::schema::MemoryConfig::default()
3271 };
3272 let mem: Arc<dyn Memory> = Arc::from(
3273 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3274 .expect("memory creation should succeed with valid config"),
3275 );
3276 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3277 let tool_calls = Arc::new(AtomicUsize::new(0));
3278 let approval_requests = Arc::new(AtomicUsize::new(0));
3279 let captured_args = Arc::new(std::sync::Mutex::new(None));
3280 let approval_cfg = zeroclaw_config::schema::RiskProfileConfig::default();
3281 let mut agent = Agent::builder()
3282 .model_provider(provider)
3283 .tools(vec![Box::new(CapturingApprovalArgTool {
3284 name: "shell",
3285 output: "shell-out",
3286 calls: Arc::clone(&tool_calls),
3287 last_args: Arc::clone(&captured_args),
3288 })])
3289 .memory(mem)
3290 .observer(observer)
3291 .tool_dispatcher(Box::new(NativeToolDispatcher))
3292 .workspace_dir(std::path::PathBuf::from("/tmp"))
3293 .approval_manager(Some(Arc::new(
3294 ApprovalManager::for_non_interactive_backchannel(&approval_cfg),
3295 )))
3296 .build()
3297 .expect("agent builder should succeed with valid config");
3298
3299 let handle: tools::PerToolChannelHandle =
3300 Arc::new(parking_lot::RwLock::new(HashMap::new()));
3301 agent.channel_handles.ask_user = Some(Arc::clone(&handle));
3302 let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
3303 response: zeroclaw_api::channel::ChannelApprovalResponse::Approve,
3304 requests: Arc::clone(&approval_requests),
3305 });
3306 agent.channel_handles().register_channel("acp", channel);
3307
3308 let result = agent
3309 .execute_tool_call(&ParsedToolCall {
3310 name: "shell".into(),
3311 arguments: serde_json::json!({
3312 "command": "touch should-run-after-human-approval",
3313 "approved": false
3314 }),
3315 tool_call_id: Some("tc1".into()),
3316 })
3317 .await;
3318
3319 assert!(result.success);
3320 assert_eq!(result.output, "shell-out");
3321 assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
3322 assert_eq!(tool_calls.load(Ordering::SeqCst), 1);
3323 let args = captured_args
3324 .lock()
3325 .unwrap()
3326 .clone()
3327 .expect("shell tool should capture executed args");
3328 assert_eq!(args["approved"], true);
3329 }
3330
3331 #[tokio::test]
3332 async fn direct_agent_shell_keeps_runtime_approval_from_always_allowlist() {
3333 let provider = Box::new(MockModelProvider {
3334 responses: Mutex::new(vec![]),
3335 });
3336 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3337 backend: "none".into(),
3338 ..zeroclaw_config::schema::MemoryConfig::default()
3339 };
3340 let mem: Arc<dyn Memory> = Arc::from(
3341 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3342 .expect("memory creation should succeed with valid config"),
3343 );
3344 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3345 let tool_calls = Arc::new(AtomicUsize::new(0));
3346 let approval_requests = Arc::new(AtomicUsize::new(0));
3347 let captured_args = Arc::new(std::sync::Mutex::new(None));
3348 let approval_cfg = zeroclaw_config::schema::RiskProfileConfig::default();
3349 let mut agent = Agent::builder()
3350 .model_provider(provider)
3351 .tools(vec![Box::new(CapturingApprovalArgTool {
3352 name: "shell",
3353 output: "shell-out",
3354 calls: Arc::clone(&tool_calls),
3355 last_args: Arc::clone(&captured_args),
3356 })])
3357 .memory(mem)
3358 .observer(observer)
3359 .tool_dispatcher(Box::new(NativeToolDispatcher))
3360 .workspace_dir(std::path::PathBuf::from("/tmp"))
3361 .approval_manager(Some(Arc::new(
3362 ApprovalManager::for_non_interactive_backchannel(&approval_cfg),
3363 )))
3364 .build()
3365 .expect("agent builder should succeed with valid config");
3366
3367 let handle: tools::PerToolChannelHandle =
3368 Arc::new(parking_lot::RwLock::new(HashMap::new()));
3369 agent.channel_handles.ask_user = Some(Arc::clone(&handle));
3370 let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
3371 response: zeroclaw_api::channel::ChannelApprovalResponse::AlwaysApprove,
3372 requests: Arc::clone(&approval_requests),
3373 });
3374 agent.channel_handles().register_channel("acp", channel);
3375
3376 let first_result = agent
3377 .execute_tool_call(&ParsedToolCall {
3378 name: "shell".into(),
3379 arguments: serde_json::json!({
3380 "command": "touch should-run-after-always-approval",
3381 "approved": false
3382 }),
3383 tool_call_id: Some("tc1".into()),
3384 })
3385 .await;
3386 let second_result = agent
3387 .execute_tool_call(&ParsedToolCall {
3388 name: "shell".into(),
3389 arguments: serde_json::json!({
3390 "command": "touch should-run-from-allowlist",
3391 "approved": false
3392 }),
3393 tool_call_id: Some("tc2".into()),
3394 })
3395 .await;
3396
3397 assert!(first_result.success);
3398 assert!(second_result.success);
3399 assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
3400 assert_eq!(tool_calls.load(Ordering::SeqCst), 2);
3401 let args = captured_args
3402 .lock()
3403 .unwrap()
3404 .clone()
3405 .expect("shell tool should capture executed args");
3406 assert_eq!(args["approved"], true);
3407 }
3408
3409 #[tokio::test]
3410 async fn direct_agent_cron_add_does_not_trust_model_supplied_approved_arg() {
3411 let provider = Box::new(MockModelProvider {
3412 responses: Mutex::new(vec![]),
3413 });
3414 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3415 backend: "none".into(),
3416 ..zeroclaw_config::schema::MemoryConfig::default()
3417 };
3418 let mem: Arc<dyn Memory> = Arc::from(
3419 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3420 .expect("memory creation should succeed with valid config"),
3421 );
3422 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3423 let tool_calls = Arc::new(AtomicUsize::new(0));
3424 let captured_args = Arc::new(std::sync::Mutex::new(None));
3425 let agent = Agent::builder()
3426 .model_provider(provider)
3427 .tools(vec![Box::new(CapturingApprovalArgTool {
3428 name: "cron_add",
3429 output: "cron-out",
3430 calls: Arc::clone(&tool_calls),
3431 last_args: Arc::clone(&captured_args),
3432 })])
3433 .memory(mem)
3434 .observer(observer)
3435 .tool_dispatcher(Box::new(NativeToolDispatcher))
3436 .workspace_dir(std::path::PathBuf::from("/tmp"))
3437 .build()
3438 .expect("agent builder should succeed with valid config");
3439
3440 let result = agent
3441 .execute_tool_call(&ParsedToolCall {
3442 name: "cron_add".into(),
3443 arguments: serde_json::json!({
3444 "command": "echo should-not-be-model-approved",
3445 "approved": true
3446 }),
3447 tool_call_id: Some("tc1".into()),
3448 })
3449 .await;
3450
3451 assert!(result.success);
3452 assert_eq!(result.output, "cron-out");
3453 assert_eq!(tool_calls.load(Ordering::SeqCst), 1);
3454 let args = captured_args
3455 .lock()
3456 .unwrap()
3457 .clone()
3458 .expect("cron_add tool should capture executed args");
3459 assert_eq!(args["approved"], false);
3460 }
3461
3462 #[tokio::test]
3463 async fn turn_with_native_dispatcher_handles_tool_results_variant() {
3464 let model_provider = Box::new(MockModelProvider {
3465 responses: Mutex::new(vec![
3466 zeroclaw_providers::ChatResponse {
3467 text: Some(String::new()),
3468 tool_calls: vec![zeroclaw_providers::ToolCall {
3469 id: "tc1".into(),
3470 name: "echo".into(),
3471 arguments: "{}".into(),
3472 extra_content: None,
3473 }],
3474 usage: None,
3475 reasoning_content: None,
3476 },
3477 zeroclaw_providers::ChatResponse {
3478 text: Some("done".into()),
3479 tool_calls: vec![],
3480 usage: None,
3481 reasoning_content: None,
3482 },
3483 ]),
3484 });
3485
3486 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3487 backend: "none".into(),
3488 ..zeroclaw_config::schema::MemoryConfig::default()
3489 };
3490 let mem: Arc<dyn Memory> = Arc::from(
3491 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3492 .expect("memory creation should succeed with valid config"),
3493 );
3494
3495 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3496 let mut agent = Agent::builder()
3497 .model_provider(model_provider)
3498 .tools(vec![Box::new(MockTool)])
3499 .memory(mem)
3500 .observer(observer)
3501 .tool_dispatcher(Box::new(NativeToolDispatcher))
3502 .workspace_dir(std::path::PathBuf::from("/tmp"))
3503 .build()
3504 .expect("agent builder should succeed with valid config");
3505
3506 let response = agent.turn("hi").await.unwrap();
3507 assert_eq!(response, "done");
3508 assert!(
3509 agent
3510 .history()
3511 .iter()
3512 .any(|msg| matches!(msg, ConversationMessage::ToolResults(_)))
3513 );
3514 }
3515
3516 #[tokio::test]
3517 async fn turn_routes_with_hint_when_query_classification_matches() {
3518 let seen_models = Arc::new(Mutex::new(Vec::new()));
3519 let model_provider = Box::new(ModelCaptureModelProvider {
3520 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
3521 text: Some("classified".into()),
3522 tool_calls: vec![],
3523 usage: None,
3524 reasoning_content: None,
3525 }]),
3526 seen_models: seen_models.clone(),
3527 });
3528
3529 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3530 backend: "none".into(),
3531 ..zeroclaw_config::schema::MemoryConfig::default()
3532 };
3533 let mem: Arc<dyn Memory> = Arc::from(
3534 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3535 .expect("memory creation should succeed with valid config"),
3536 );
3537
3538 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3539 let mut route_model_by_hint = HashMap::new();
3540 route_model_by_hint.insert("fast".to_string(), "anthropic/claude-haiku-4-5".to_string());
3541 let mut agent = Agent::builder()
3542 .model_provider(model_provider)
3543 .tools(vec![Box::new(MockTool)])
3544 .memory(mem)
3545 .observer(observer)
3546 .tool_dispatcher(Box::new(NativeToolDispatcher))
3547 .workspace_dir(std::path::PathBuf::from("/tmp"))
3548 .classification_config(zeroclaw_config::schema::QueryClassificationConfig {
3549 enabled: true,
3550 rules: vec![zeroclaw_config::schema::ClassificationRule {
3551 hint: "fast".to_string(),
3552 keywords: vec!["quick".to_string()],
3553 patterns: vec![],
3554 min_length: None,
3555 max_length: None,
3556 priority: 10,
3557 }],
3558 })
3559 .available_hints(vec!["fast".to_string()])
3560 .route_model_by_hint(route_model_by_hint)
3561 .build()
3562 .expect("agent builder should succeed with valid config");
3563
3564 let response = agent.turn("quick summary please").await.unwrap();
3565 assert_eq!(response, "classified");
3566 let seen = seen_models.lock();
3567 assert_eq!(seen.as_slice(), &["hint:fast".to_string()]);
3568 }
3569
3570 #[tokio::test]
3571 async fn from_config_passes_extra_headers_to_custom_provider() {
3572 use axum::{Json, Router, http::HeaderMap, routing::post};
3573 use tempfile::TempDir;
3574 use tokio::net::TcpListener;
3575
3576 let captured_headers: Arc<std::sync::Mutex<Option<HashMap<String, String>>>> =
3577 Arc::new(std::sync::Mutex::new(None));
3578 let captured_headers_clone = captured_headers.clone();
3579
3580 let app = Router::new().route(
3581 "/chat/completions",
3582 post(
3583 move |headers: HeaderMap, Json(_body): Json<serde_json::Value>| {
3584 let captured_headers = captured_headers_clone.clone();
3585 async move {
3586 let collected = headers
3587 .iter()
3588 .filter_map(|(name, value)| {
3589 value
3590 .to_str()
3591 .ok()
3592 .map(|value| (name.as_str().to_string(), value.to_string()))
3593 })
3594 .collect();
3595 *captured_headers.lock().unwrap() = Some(collected);
3596 Json(serde_json::json!({
3597 "choices": [{
3598 "message": {
3599 "content": "hello from mock"
3600 }
3601 }]
3602 }))
3603 }
3604 },
3605 ),
3606 );
3607
3608 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
3609 let mock_addr = listener.local_addr().unwrap();
3610 let server_handle = tokio::spawn(async move {
3611 axum::serve(listener, app).await.unwrap();
3612 });
3613
3614 let tmp = TempDir::new().expect("temp dir");
3615 let workspace_dir = tmp.path().join("workspace");
3616 std::fs::create_dir_all(&workspace_dir).unwrap();
3617
3618 let mut config = zeroclaw_config::schema::Config {
3619 data_dir: workspace_dir,
3620 config_path: tmp.path().join("config.toml"),
3621 ..Default::default()
3622 };
3623 {
3624 let entry = config
3634 .providers
3635 .models
3636 .ensure("custom", "default")
3637 .expect("custom model_provider type slot");
3638 entry.api_key = Some("test-key".to_string());
3639 entry.model = Some("test-model".to_string());
3640 entry.uri = Some(format!("http://{mock_addr}"));
3641 entry.extra_headers.insert(
3642 "User-Agent".to_string(),
3643 "zeroclaw-web-test/1.0".to_string(),
3644 );
3645 entry
3646 .extra_headers
3647 .insert("X-Title".to_string(), "zeroclaw-web".to_string());
3648 }
3649 config.memory.backend = "none".to_string();
3650 config.memory.auto_save = false;
3651
3652 config.risk_profiles.insert(
3656 "test-profile".to_string(),
3657 zeroclaw_config::schema::RiskProfileConfig::default(),
3658 );
3659 let provider_alias = config
3660 .first_model_provider_type()
3661 .expect("model_provider configured above")
3662 .to_string();
3663 let agent_cfg = zeroclaw_config::schema::AliasedAgentConfig {
3664 model_provider: format!("{provider_alias}.default").into(),
3665 risk_profile: "test-profile".to_string(),
3666 ..zeroclaw_config::schema::AliasedAgentConfig::default()
3667 };
3668 config.agents.insert("test-agent".to_string(), agent_cfg);
3669
3670 let mut agent = Agent::from_config(&config, "test-agent")
3671 .await
3672 .expect("agent from config");
3673 let response = agent.turn("hello").await.expect("agent turn");
3674
3675 assert_eq!(response, "hello from mock");
3676
3677 let headers = captured_headers
3678 .lock()
3679 .unwrap()
3680 .clone()
3681 .expect("captured headers");
3682 assert_eq!(
3683 headers.get("user-agent").map(String::as_str),
3684 Some("zeroclaw-web-test/1.0")
3685 );
3686 assert_eq!(
3687 headers.get("x-title").map(String::as_str),
3688 Some("zeroclaw-web")
3689 );
3690
3691 server_handle.abort();
3692 }
3693
3694 #[tokio::test]
3695 async fn from_config_accepts_openai_alias_with_requires_openai_auth() {
3696 use tempfile::TempDir;
3697 use zeroclaw_config::schema::{
3698 AliasedAgentConfig, Config, ModelProviderConfig, OpenAIModelProviderConfig,
3699 RiskProfileConfig, WireApi,
3700 };
3701
3702 let tmp = TempDir::new().expect("temp dir");
3703 let workspace_dir = tmp.path().join("workspace");
3704 std::fs::create_dir_all(&workspace_dir).expect("workspace dir");
3705
3706 let mut config = Config {
3707 data_dir: workspace_dir,
3708 config_path: tmp.path().join("config.toml"),
3709 ..Default::default()
3710 };
3711 config.memory.backend = "none".to_string();
3712 config.memory.auto_save = false;
3713 config
3714 .risk_profiles
3715 .insert("test-profile".to_string(), RiskProfileConfig::default());
3716 config.providers.models.openai.insert(
3717 "codex".to_string(),
3718 OpenAIModelProviderConfig {
3719 base: ModelProviderConfig {
3720 model: Some("gpt-5.4".to_string()),
3721 requires_openai_auth: true,
3722 wire_api: Some(WireApi::Responses),
3723 ..ModelProviderConfig::default()
3724 },
3725 },
3726 );
3727 config.agents.insert(
3728 "test-agent".to_string(),
3729 AliasedAgentConfig {
3730 model_provider: "openai.codex".into(),
3731 risk_profile: "test-profile".to_string(),
3732 ..AliasedAgentConfig::default()
3733 },
3734 );
3735
3736 let result = Agent::from_config(&config, "test-agent").await;
3737
3738 assert!(
3739 result.is_ok(),
3740 "openai alias with requires_openai_auth should construct via Codex OAuth path: {}",
3741 result.err().unwrap()
3742 );
3743 }
3744
3745 #[test]
3746 fn builder_allowed_tools_none_keeps_all_tools() {
3747 let model_provider = Box::new(MockModelProvider {
3748 responses: Mutex::new(vec![]),
3749 });
3750
3751 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3752 backend: "none".into(),
3753 ..zeroclaw_config::schema::MemoryConfig::default()
3754 };
3755 let mem: Arc<dyn Memory> = Arc::from(
3756 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3757 .expect("memory creation should succeed with valid config"),
3758 );
3759
3760 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3761 let agent = Agent::builder()
3762 .model_provider(model_provider)
3763 .tools(vec![Box::new(MockTool)])
3764 .memory(mem)
3765 .observer(observer)
3766 .tool_dispatcher(Box::new(NativeToolDispatcher))
3767 .workspace_dir(std::path::PathBuf::from("/tmp"))
3768 .allowed_tools(None)
3769 .build()
3770 .expect("agent builder should succeed with valid config");
3771
3772 assert_eq!(agent.tool_specs.len(), 1);
3773 assert_eq!(agent.tool_specs[0].name, "echo");
3774 }
3775
3776 #[test]
3777 fn builder_allowed_tools_some_filters_tools() {
3778 let model_provider = Box::new(MockModelProvider {
3779 responses: Mutex::new(vec![]),
3780 });
3781
3782 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3783 backend: "none".into(),
3784 ..zeroclaw_config::schema::MemoryConfig::default()
3785 };
3786 let mem: Arc<dyn Memory> = Arc::from(
3787 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3788 .expect("memory creation should succeed with valid config"),
3789 );
3790
3791 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3792 let agent = Agent::builder()
3793 .model_provider(model_provider)
3794 .tools(vec![Box::new(MockTool)])
3795 .memory(mem)
3796 .observer(observer)
3797 .tool_dispatcher(Box::new(NativeToolDispatcher))
3798 .workspace_dir(std::path::PathBuf::from("/tmp"))
3799 .allowed_tools(Some(vec!["nonexistent".to_string()]))
3800 .build()
3801 .expect("agent builder should succeed with valid config");
3802
3803 assert!(
3804 agent.tool_specs.is_empty(),
3805 "No tools should match a non-existent allowlist entry"
3806 );
3807 }
3808
3809 #[test]
3813 fn session_cwd_keeps_workspace_in_allowed_roots() {
3814 let workspace = std::env::temp_dir().join("zeroclaw_test_session_cwd_workspace");
3815 let session = std::env::temp_dir().join("zeroclaw_test_session_cwd_session");
3816 let _ = std::fs::create_dir_all(&workspace);
3817 let _ = std::fs::create_dir_all(&session);
3818
3819 let skill_file = workspace.join("SKILL.md");
3820 let _ = std::fs::write(&skill_file, "body");
3821 let skill_resolved = std::fs::canonicalize(&skill_file).unwrap_or(skill_file);
3823
3824 let risk_profile = zeroclaw_config::schema::RiskProfileConfig::default();
3825
3826 let mut policy = SecurityPolicy::from_risk_profile(&risk_profile, &session);
3828 policy.allowed_roots.push(workspace.clone());
3829 assert!(
3830 policy.is_resolved_path_allowed(&skill_resolved),
3831 "workspace skills must remain readable when session_cwd differs"
3832 );
3833
3834 let policy_no_push = SecurityPolicy::from_risk_profile(&risk_profile, &session);
3837 assert!(
3838 !policy_no_push.is_resolved_path_allowed(&skill_resolved),
3839 "without allowed_roots.push, workspace files must be outside the sandbox"
3840 );
3841 }
3842
3843 #[test]
3844 fn seed_history_prepends_system_and_skips_system_from_seed() {
3845 let model_provider = Box::new(MockModelProvider {
3846 responses: Mutex::new(vec![]),
3847 });
3848
3849 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3850 backend: "none".into(),
3851 ..zeroclaw_config::schema::MemoryConfig::default()
3852 };
3853 let mem: Arc<dyn Memory> = Arc::from(
3854 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3855 .expect("memory creation should succeed with valid config"),
3856 );
3857
3858 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3859 let mut agent = Agent::builder()
3860 .model_provider(model_provider)
3861 .tools(vec![Box::new(MockTool)])
3862 .memory(mem)
3863 .observer(observer)
3864 .tool_dispatcher(Box::new(NativeToolDispatcher))
3865 .workspace_dir(std::path::PathBuf::from("/tmp"))
3866 .build()
3867 .expect("agent builder should succeed with valid config");
3868
3869 let seed = vec![
3870 ChatMessage::system("old system prompt"),
3871 ChatMessage::user("hello"),
3872 ChatMessage::assistant("hi there"),
3873 ];
3874 agent.seed_history(&seed);
3875
3876 let history = agent.history();
3877 assert!(matches!(&history[0], ConversationMessage::Chat(m) if m.role == "system"));
3879 assert!(
3881 matches!(&history[1], ConversationMessage::Chat(m) if m.role == "user" && m.content == "hello")
3882 );
3883 assert!(
3884 matches!(&history[2], ConversationMessage::Chat(m) if m.role == "assistant" && m.content == "hi there")
3885 );
3886 assert_eq!(history.len(), 3);
3887 }
3888
3889 #[test]
3890 fn seed_conversation_history_preserves_tool_call_variants() {
3891 use zeroclaw_api::model_provider::{
3892 ChatMessage, ConversationMessage, ToolCall, ToolResultMessage,
3893 };
3894
3895 let provider = Box::new(MockModelProvider {
3896 responses: Mutex::new(vec![]),
3897 });
3898
3899 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3900 backend: "none".into(),
3901 ..zeroclaw_config::schema::MemoryConfig::default()
3902 };
3903 let mem: Arc<dyn Memory> = Arc::from(
3904 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3905 .expect("memory creation should succeed with valid config"),
3906 );
3907
3908 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3909 let mut agent = Agent::builder()
3910 .model_provider(provider)
3911 .tools(vec![Box::new(MockTool)])
3912 .memory(mem)
3913 .observer(observer)
3914 .tool_dispatcher(Box::new(NativeToolDispatcher))
3915 .workspace_dir(std::path::PathBuf::from("/tmp"))
3916 .build()
3917 .expect("agent builder should succeed with valid config");
3918
3919 let messages = vec![
3920 ConversationMessage::Chat(ChatMessage::user("run it")),
3921 ConversationMessage::AssistantToolCalls {
3922 text: None,
3923 tool_calls: vec![ToolCall {
3924 id: "tc-1".into(),
3925 name: "shell".into(),
3926 arguments: r#"{"command":"ls"}"#.into(),
3927 extra_content: None,
3928 }],
3929 reasoning_content: None,
3930 },
3931 ConversationMessage::ToolResults(vec![ToolResultMessage {
3932 tool_call_id: "tc-1".into(),
3933 content: "ok".into(),
3934 }]),
3935 ConversationMessage::Chat(ChatMessage::assistant("done")),
3936 ];
3937
3938 agent.seed_conversation_history(messages);
3939
3940 let non_system: Vec<_> = agent
3942 .history()
3943 .iter()
3944 .filter(|m| !matches!(m, ConversationMessage::Chat(c) if c.role == "system"))
3945 .collect();
3946
3947 assert_eq!(non_system.len(), 4);
3948 assert!(
3949 matches!(non_system[1], ConversationMessage::AssistantToolCalls { tool_calls, .. } if tool_calls[0].id == "tc-1")
3950 );
3951 assert!(
3952 matches!(non_system[2], ConversationMessage::ToolResults(r) if r[0].tool_call_id == "tc-1")
3953 );
3954 }
3955
3956 struct StreamToolCaptureModelProvider {
3959 tools_received: Arc<Mutex<Vec<bool>>>,
3960 call_count: Arc<Mutex<usize>>,
3961 }
3962
3963 #[async_trait]
3964 impl ModelProvider for StreamToolCaptureModelProvider {
3965 async fn chat_with_system(
3966 &self,
3967 _system_prompt: Option<&str>,
3968 _message: &str,
3969 _model: &str,
3970 _temperature: Option<f64>,
3971 ) -> Result<String> {
3972 Ok("ok".into())
3973 }
3974
3975 async fn chat(
3976 &self,
3977 request: ChatRequest<'_>,
3978 _model: &str,
3979 _temperature: Option<f64>,
3980 ) -> Result<zeroclaw_providers::ChatResponse> {
3981 self.tools_received.lock().push(request.tools.is_some());
3982 let mut count = self.call_count.lock();
3983 *count += 1;
3984 if *count == 1 {
3985 Ok(zeroclaw_providers::ChatResponse {
3986 text: Some(String::new()),
3987 tool_calls: vec![zeroclaw_providers::ToolCall {
3988 id: "00000000-0000-0000-0000-000000000001".into(),
3989 name: "echo".into(),
3990 arguments: "{}".into(),
3991 extra_content: None,
3992 }],
3993 usage: None,
3994 reasoning_content: None,
3995 })
3996 } else {
3997 Ok(zeroclaw_providers::ChatResponse {
3998 text: Some("stream-done".into()),
3999 tool_calls: vec![],
4000 usage: None,
4001 reasoning_content: None,
4002 })
4003 }
4004 }
4005
4006 fn supports_native_tools(&self) -> bool {
4007 true
4008 }
4009
4010 fn stream_chat(
4011 &self,
4012 request: ChatRequest<'_>,
4013 _model: &str,
4014 _temperature: Option<f64>,
4015 _options: zeroclaw_providers::traits::StreamOptions,
4016 ) -> futures_util::stream::BoxStream<
4017 'static,
4018 zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
4019 > {
4020 use futures_util::stream::{self, StreamExt};
4021 self.tools_received.lock().push(request.tools.is_some());
4022 let mut count = self.call_count.lock();
4023 *count += 1;
4024 if *count == 1 {
4025 let tc = zeroclaw_providers::traits::StreamEvent::ToolCall(
4026 zeroclaw_providers::ToolCall {
4027 id: "00000000-0000-0000-0000-000000000001".into(),
4028 name: "echo".into(),
4029 arguments: "{}".into(),
4030 extra_content: None,
4031 },
4032 );
4033 stream::iter(vec![
4034 Ok(tc),
4035 Ok(zeroclaw_providers::traits::StreamEvent::Final),
4036 ])
4037 .boxed()
4038 } else {
4039 let chunk = zeroclaw_providers::traits::StreamEvent::TextDelta(
4040 zeroclaw_providers::traits::StreamChunk {
4041 delta: "stream-done".into(),
4042 is_final: false,
4043 reasoning: None,
4044 token_count: 0,
4045 },
4046 );
4047 stream::iter(vec![
4048 Ok(chunk),
4049 Ok(zeroclaw_providers::traits::StreamEvent::Final),
4050 ])
4051 .boxed()
4052 }
4053 }
4054 }
4055 impl ::zeroclaw_api::attribution::Attributable for StreamToolCaptureModelProvider {
4056 fn role(&self) -> ::zeroclaw_api::attribution::Role {
4057 ::zeroclaw_api::attribution::Role::Provider(
4058 ::zeroclaw_api::attribution::ProviderKind::Model(
4059 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
4060 ),
4061 )
4062 }
4063 fn alias(&self) -> &str {
4064 "StreamToolCaptureModelProvider"
4065 }
4066 }
4067
4068 #[tokio::test]
4069 async fn turn_streamed_passes_tool_specs_to_provider() {
4070 let tools_received = Arc::new(Mutex::new(Vec::new()));
4071 let model_provider = Box::new(StreamToolCaptureModelProvider {
4072 tools_received: tools_received.clone(),
4073 call_count: Arc::new(Mutex::new(0)),
4074 });
4075
4076 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4077 backend: "none".into(),
4078 ..zeroclaw_config::schema::MemoryConfig::default()
4079 };
4080 let mem: Arc<dyn Memory> = Arc::from(
4081 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4082 .expect("memory creation should succeed with valid config"),
4083 );
4084
4085 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4086 let mut agent = Agent::builder()
4087 .model_provider(model_provider)
4088 .tools(vec![Box::new(MockTool)])
4089 .memory(mem)
4090 .observer(observer)
4091 .tool_dispatcher(Box::new(NativeToolDispatcher))
4092 .workspace_dir(std::path::PathBuf::from("/tmp"))
4093 .build()
4094 .expect("agent builder should succeed with valid config");
4095
4096 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
4097 let (response, _) = agent
4098 .turn_streamed("use the echo tool", event_tx, None)
4099 .await
4100 .unwrap();
4101 assert_eq!(response, "stream-done");
4102
4103 let received = tools_received.lock();
4105 assert!(
4106 received.len() >= 2,
4107 "Expected at least 2 stream_chat calls, got {}",
4108 received.len()
4109 );
4110 assert!(
4111 received[0],
4112 "First stream_chat call should have received tool specs"
4113 );
4114 assert!(
4115 received[1],
4116 "Second stream_chat call should have received tool specs"
4117 );
4118
4119 let mut events = Vec::new();
4121 while let Ok(ev) = event_rx.try_recv() {
4122 events.push(ev);
4123 }
4124 let has_tool_call = events
4125 .iter()
4126 .any(|e| matches!(e, TurnEvent::ToolCall { name, .. } if name == "echo"));
4127 let has_tool_result = events
4128 .iter()
4129 .any(|e| matches!(e, TurnEvent::ToolResult { name, .. } if name == "echo"));
4130 assert!(
4131 has_tool_call,
4132 "Should have emitted a ToolCall event for 'echo'"
4133 );
4134 assert!(
4135 has_tool_result,
4136 "Should have emitted a ToolResult event for 'echo'"
4137 );
4138
4139 let call_id = events
4141 .iter()
4142 .find_map(|e| {
4143 if let TurnEvent::ToolCall { id, .. } = e {
4144 Some(id.clone())
4145 } else {
4146 None
4147 }
4148 })
4149 .expect("ToolCall should have an ID");
4150
4151 let result_id = events
4152 .iter()
4153 .find_map(|e| {
4154 if let TurnEvent::ToolResult { id, .. } = e {
4155 Some(id.clone())
4156 } else {
4157 None
4158 }
4159 })
4160 .expect("ToolResult should have an ID");
4161
4162 assert_eq!(
4163 call_id, result_id,
4164 "ToolCall and ToolResult should share the same ID for correlation"
4165 );
4166
4167 assert!(
4169 uuid::Uuid::parse_str(&call_id).is_ok(),
4170 "Generated ID should be a valid UUID: got '{}'",
4171 call_id
4172 );
4173 }
4174
4175 struct PreExecutedToolModelProvider;
4176
4177 #[async_trait]
4178 impl ModelProvider for PreExecutedToolModelProvider {
4179 async fn chat_with_system(
4180 &self,
4181 _system_prompt: Option<&str>,
4182 _message: &str,
4183 _model: &str,
4184 _temperature: Option<f64>,
4185 ) -> Result<String> {
4186 Ok(String::new())
4187 }
4188
4189 async fn chat(
4190 &self,
4191 _request: ChatRequest<'_>,
4192 _model: &str,
4193 _temperature: Option<f64>,
4194 ) -> Result<zeroclaw_providers::ChatResponse> {
4195 Ok(zeroclaw_providers::ChatResponse {
4196 text: Some(String::new()),
4197 tool_calls: vec![],
4198 usage: None,
4199 reasoning_content: None,
4200 })
4201 }
4202
4203 fn stream_chat(
4204 &self,
4205 _request: ChatRequest<'_>,
4206 _model: &str,
4207 _temperature: Option<f64>,
4208 _options: zeroclaw_providers::traits::StreamOptions,
4209 ) -> futures_util::stream::BoxStream<
4210 'static,
4211 zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
4212 > {
4213 use futures_util::stream::{self, StreamExt};
4214
4215 stream::iter(vec![
4216 Ok(
4217 zeroclaw_providers::traits::StreamEvent::PreExecutedToolCall {
4218 name: "file_read".into(),
4219 args: "{\"path\":\"a.txt\"}".into(),
4220 },
4221 ),
4222 Ok(
4223 zeroclaw_providers::traits::StreamEvent::PreExecutedToolCall {
4224 name: "shell".into(),
4225 args: "{\"command\":\"pwd\"}".into(),
4226 },
4227 ),
4228 Ok(
4229 zeroclaw_providers::traits::StreamEvent::PreExecutedToolResult {
4230 name: "file_read".into(),
4231 output: "a".into(),
4232 },
4233 ),
4234 Ok(
4235 zeroclaw_providers::traits::StreamEvent::PreExecutedToolResult {
4236 name: "shell".into(),
4237 output: "b".into(),
4238 },
4239 ),
4240 Ok(zeroclaw_providers::traits::StreamEvent::Final),
4241 ])
4242 .boxed()
4243 }
4244 }
4245 impl ::zeroclaw_api::attribution::Attributable for PreExecutedToolModelProvider {
4246 fn role(&self) -> ::zeroclaw_api::attribution::Role {
4247 ::zeroclaw_api::attribution::Role::Provider(
4248 ::zeroclaw_api::attribution::ProviderKind::Model(
4249 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
4250 ),
4251 )
4252 }
4253 fn alias(&self) -> &str {
4254 "PreExecutedToolModelProvider"
4255 }
4256 }
4257
4258 #[tokio::test]
4259 async fn pre_executed_tool_results_keep_ids_when_calls_overlap() {
4260 let model_provider = Box::new(PreExecutedToolModelProvider);
4261
4262 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4263 backend: "none".into(),
4264 ..zeroclaw_config::schema::MemoryConfig::default()
4265 };
4266 let mem: Arc<dyn Memory> = Arc::from(
4267 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4268 .expect("memory creation should succeed with valid config"),
4269 );
4270
4271 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4272 let mut agent = Agent::builder()
4273 .model_provider(model_provider)
4274 .tools(vec![Box::new(MockTool)])
4275 .memory(mem)
4276 .observer(observer)
4277 .tool_dispatcher(Box::new(NativeToolDispatcher))
4278 .workspace_dir(std::path::PathBuf::from("/tmp"))
4279 .build()
4280 .expect("agent builder should succeed with valid config");
4281
4282 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
4283 let _ = agent
4284 .turn_streamed("use pre-executed tools", event_tx, None)
4285 .await
4286 .unwrap();
4287
4288 let mut call_ids = HashMap::new();
4289 let mut result_ids = HashMap::new();
4290 while let Ok(event) = event_rx.try_recv() {
4291 match event {
4292 TurnEvent::ToolCall { id, name, .. } => {
4293 call_ids.insert(name, id);
4294 }
4295 TurnEvent::ToolResult { id, name, .. } => {
4296 result_ids.insert(name, id);
4297 }
4298 _ => {}
4299 }
4300 }
4301
4302 assert_eq!(call_ids.len(), 2, "expected two pre-executed tool calls");
4303 assert_eq!(
4304 result_ids.len(),
4305 2,
4306 "expected two pre-executed tool results"
4307 );
4308 assert_eq!(call_ids.get("file_read"), result_ids.get("file_read"));
4309 assert_eq!(call_ids.get("shell"), result_ids.get("shell"));
4310 }
4311
4312 #[tokio::test]
4313 async fn turn_normalizes_user_image_markers_before_provider_call() {
4314 let seen_user_messages = Arc::new(Mutex::new(Vec::new()));
4315 let provider = Box::new(MultimodalCaptureProvider {
4316 seen_user_messages: seen_user_messages.clone(),
4317 streamed: false,
4318 });
4319
4320 let temp = tempfile::tempdir().expect("tempdir");
4321 let image_path = temp.path().join("agent-turn.png");
4322 std::fs::write(
4323 &image_path,
4324 [0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n'],
4325 )
4326 .expect("write fixture");
4327
4328 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4329 backend: "none".into(),
4330 ..zeroclaw_config::schema::MemoryConfig::default()
4331 };
4332 let mem: Arc<dyn Memory> = Arc::from(
4333 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4334 .expect("memory creation should succeed with valid config"),
4335 );
4336
4337 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4338 let mut agent = Agent::builder()
4339 .model_provider(provider)
4340 .tools(vec![Box::new(MockTool)])
4341 .memory(mem)
4342 .observer(observer)
4343 .tool_dispatcher(Box::new(NativeToolDispatcher))
4344 .workspace_dir(std::path::PathBuf::from("/tmp"))
4345 .multimodal_config(zeroclaw_config::schema::MultimodalConfig::default())
4346 .build()
4347 .expect("agent builder should succeed with valid config");
4348
4349 agent
4350 .turn(&format!(
4351 "inspect [IMAGE:{}]",
4352 image_path.display().to_string()
4353 ))
4354 .await
4355 .expect("turn should succeed");
4356
4357 let seen = seen_user_messages.lock();
4358 let last = seen.last().expect("provider should receive a user message");
4359 assert!(
4360 last.contains("data:image/png;base64,"),
4361 "expected normalized data URI in provider request, got: {last}"
4362 );
4363 }
4364
4365 #[tokio::test]
4366 async fn turn_streamed_normalizes_user_image_markers_before_provider_call() {
4367 let seen_user_messages = Arc::new(Mutex::new(Vec::new()));
4368 let provider = Box::new(MultimodalCaptureProvider {
4369 seen_user_messages: seen_user_messages.clone(),
4370 streamed: true,
4371 });
4372
4373 let temp = tempfile::tempdir().expect("tempdir");
4374 let image_path = temp.path().join("agent-stream.png");
4375 std::fs::write(
4376 &image_path,
4377 [0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n'],
4378 )
4379 .expect("write fixture");
4380
4381 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4382 backend: "none".into(),
4383 ..zeroclaw_config::schema::MemoryConfig::default()
4384 };
4385 let mem: Arc<dyn Memory> = Arc::from(
4386 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4387 .expect("memory creation should succeed with valid config"),
4388 );
4389
4390 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4391 let mut agent = Agent::builder()
4392 .model_provider(provider)
4393 .tools(vec![Box::new(MockTool)])
4394 .memory(mem)
4395 .observer(observer)
4396 .tool_dispatcher(Box::new(NativeToolDispatcher))
4397 .workspace_dir(std::path::PathBuf::from("/tmp"))
4398 .multimodal_config(zeroclaw_config::schema::MultimodalConfig::default())
4399 .build()
4400 .expect("agent builder should succeed with valid config");
4401
4402 let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(8);
4403 agent
4404 .turn_streamed(
4405 &format!("inspect [IMAGE:{}]", image_path.display().to_string()),
4406 event_tx,
4407 None,
4408 )
4409 .await
4410 .expect("turn_streamed should succeed");
4411
4412 let seen = seen_user_messages.lock();
4413 let last = seen.last().expect("provider should receive a user message");
4414 assert!(
4415 last.contains("data:image/png;base64,"),
4416 "expected normalized data URI in provider request, got: {last}"
4417 );
4418 }
4419
4420 #[test]
4436 fn trim_history_does_not_leave_orphan_tool_results() {
4437 use zeroclaw_providers::{ToolCall, ToolResultMessage};
4438
4439 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4440 backend: "none".into(),
4441 ..zeroclaw_config::schema::MemoryConfig::default()
4442 };
4443 let mem: Arc<dyn Memory> = Arc::from(
4444 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4445 .expect("memory creation should succeed with valid config"),
4446 );
4447
4448 let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
4452 max_history_messages: 4,
4453 ..zeroclaw_config::schema::AliasedAgentConfig::default()
4454 };
4455
4456 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4457 let mut agent = Agent::builder()
4458 .model_provider(Box::new(MockModelProvider {
4459 responses: Mutex::new(vec![]),
4460 }))
4461 .tools(vec![Box::new(MockTool)])
4462 .memory(mem)
4463 .observer(observer)
4464 .tool_dispatcher(Box::new(NativeToolDispatcher))
4465 .workspace_dir(std::path::PathBuf::from("/tmp"))
4466 .config(agent_config)
4467 .build()
4468 .expect("agent builder should succeed with valid config");
4469
4470 for i in 1..=3 {
4472 agent.history.push(ConversationMessage::AssistantToolCalls {
4473 text: Some(format!("Calling tool {i}")),
4474 tool_calls: vec![ToolCall {
4475 id: format!("tc{i}"),
4476 name: format!("tool{i}"),
4477 arguments: "{}".into(),
4478 extra_content: None,
4479 }],
4480 reasoning_content: None,
4481 });
4482 if i < 3 {
4486 agent
4487 .history
4488 .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
4489 tool_call_id: format!("tc{i}"),
4490 content: format!("result{i}"),
4491 }]));
4492 }
4493 }
4494
4495 assert_eq!(agent.history.len(), 5);
4496 agent.trim_history();
4497
4498 if let Some(first) = agent.history.first() {
4502 assert!(
4503 !matches!(first, ConversationMessage::ToolResults(_)),
4504 "trim_history left an orphan ToolResults at the head of the \
4505 history; this would cause Anthropic to reject the next \
4506 request with 'unexpected tool_use_id found in tool_result \
4507 blocks'"
4508 );
4509 }
4510
4511 for window in agent.history.windows(2) {
4514 if matches!(&window[1], ConversationMessage::ToolResults(_)) {
4515 assert!(
4516 matches!(&window[0], ConversationMessage::AssistantToolCalls { .. }),
4517 "ToolResults entry is not preceded by an AssistantToolCalls \
4518 entry — pair was split during trim"
4519 );
4520 }
4521 }
4522 }
4523
4524 #[tokio::test]
4532 async fn narration_with_tool_calls_produces_no_consecutive_assistant_entries() {
4533 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4534 backend: "none".into(),
4535 ..zeroclaw_config::schema::MemoryConfig::default()
4536 };
4537 let mem: Arc<dyn Memory> = Arc::from(
4538 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4539 .expect("memory creation should succeed with valid config"),
4540 );
4541
4542 let model_provider = Box::new(MockModelProvider {
4543 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
4544 text: Some("I will echo the message.".into()),
4545 tool_calls: vec![zeroclaw_providers::ToolCall {
4546 id: "tc1".into(),
4547 name: "echo".into(),
4548 arguments: "{}".into(),
4549 extra_content: None,
4550 }],
4551 usage: None,
4552 reasoning_content: None,
4553 }]),
4554 });
4555
4556 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4557 let mut agent = Agent::builder()
4558 .model_provider(model_provider)
4559 .tools(vec![Box::new(MockTool)])
4560 .memory(mem)
4561 .observer(observer)
4562 .tool_dispatcher(Box::new(NativeToolDispatcher))
4563 .workspace_dir(std::path::PathBuf::from("/tmp"))
4564 .build()
4565 .expect("agent builder should succeed with valid config");
4566
4567 agent.turn("hi").await.unwrap();
4568
4569 let history = agent.history();
4570 for window in history.windows(2) {
4571 let prev_is_assistant_chat = matches!(
4572 &window[0],
4573 ConversationMessage::Chat(m) if m.role == "assistant"
4574 );
4575 let next_is_tool_calls =
4576 matches!(&window[1], ConversationMessage::AssistantToolCalls { .. });
4577 assert!(
4578 !(prev_is_assistant_chat && next_is_tool_calls),
4579 "history contains Chat(assistant) immediately before AssistantToolCalls — \
4580 duplicate narration push was not removed"
4581 );
4582 }
4583 }
4584
4585 struct NarrationStreamModelProvider {
4589 call_count: Arc<Mutex<usize>>,
4590 }
4591
4592 #[async_trait]
4593 impl ModelProvider for NarrationStreamModelProvider {
4594 async fn chat_with_system(
4595 &self,
4596 _system_prompt: Option<&str>,
4597 _message: &str,
4598 _model: &str,
4599 _temperature: Option<f64>,
4600 ) -> Result<String> {
4601 Ok("ok".into())
4602 }
4603
4604 async fn chat(
4605 &self,
4606 _request: ChatRequest<'_>,
4607 _model: &str,
4608 _temperature: Option<f64>,
4609 ) -> Result<zeroclaw_providers::ChatResponse> {
4610 Ok(zeroclaw_providers::ChatResponse {
4611 text: Some("done".into()),
4612 tool_calls: vec![],
4613 usage: None,
4614 reasoning_content: None,
4615 })
4616 }
4617
4618 fn supports_native_tools(&self) -> bool {
4619 true
4620 }
4621
4622 fn stream_chat(
4623 &self,
4624 _request: ChatRequest<'_>,
4625 _model: &str,
4626 _temperature: Option<f64>,
4627 _options: zeroclaw_providers::traits::StreamOptions,
4628 ) -> futures_util::stream::BoxStream<
4629 'static,
4630 zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
4631 > {
4632 use futures_util::stream::{self, StreamExt};
4633 let mut count = self.call_count.lock();
4634 *count += 1;
4635 if *count == 1 {
4636 stream::iter(vec![
4637 Ok(zeroclaw_providers::traits::StreamEvent::TextDelta(
4638 zeroclaw_providers::traits::StreamChunk {
4639 delta: "I will echo the message.".into(),
4640 is_final: false,
4641 reasoning: None,
4642 token_count: 0,
4643 },
4644 )),
4645 Ok(zeroclaw_providers::traits::StreamEvent::ToolCall(
4646 zeroclaw_providers::ToolCall {
4647 id: "tc1".into(),
4648 name: "echo".into(),
4649 arguments: "{}".into(),
4650 extra_content: None,
4651 },
4652 )),
4653 Ok(zeroclaw_providers::traits::StreamEvent::Final),
4654 ])
4655 .boxed()
4656 } else {
4657 stream::iter(vec![
4658 Ok(zeroclaw_providers::traits::StreamEvent::TextDelta(
4659 zeroclaw_providers::traits::StreamChunk {
4660 delta: "done".into(),
4661 is_final: false,
4662 reasoning: None,
4663 token_count: 0,
4664 },
4665 )),
4666 Ok(zeroclaw_providers::traits::StreamEvent::Final),
4667 ])
4668 .boxed()
4669 }
4670 }
4671 }
4672 impl ::zeroclaw_api::attribution::Attributable for NarrationStreamModelProvider {
4673 fn role(&self) -> ::zeroclaw_api::attribution::Role {
4674 ::zeroclaw_api::attribution::Role::Provider(
4675 ::zeroclaw_api::attribution::ProviderKind::Model(
4676 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
4677 ),
4678 )
4679 }
4680 fn alias(&self) -> &str {
4681 "NarrationStreamModelProvider"
4682 }
4683 }
4684
4685 #[tokio::test]
4686 async fn streaming_narration_with_tool_calls_produces_no_consecutive_assistant_entries() {
4687 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4688 backend: "none".into(),
4689 ..zeroclaw_config::schema::MemoryConfig::default()
4690 };
4691 let mem: Arc<dyn Memory> = Arc::from(
4692 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4693 .expect("memory creation should succeed with valid config"),
4694 );
4695
4696 let model_provider = Box::new(NarrationStreamModelProvider {
4697 call_count: Arc::new(Mutex::new(0)),
4698 });
4699
4700 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4701 let mut agent = Agent::builder()
4702 .model_provider(model_provider)
4703 .tools(vec![Box::new(MockTool)])
4704 .memory(mem)
4705 .observer(observer)
4706 .tool_dispatcher(Box::new(NativeToolDispatcher))
4707 .workspace_dir(std::path::PathBuf::from("/tmp"))
4708 .build()
4709 .expect("agent builder should succeed with valid config");
4710
4711 let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
4712 agent.turn_streamed("hi", event_tx, None).await.unwrap();
4713
4714 let history = agent.history();
4715 for window in history.windows(2) {
4716 let prev_is_assistant_chat = matches!(
4717 &window[0],
4718 ConversationMessage::Chat(m) if m.role == "assistant"
4719 );
4720 let next_is_tool_calls =
4721 matches!(&window[1], ConversationMessage::AssistantToolCalls { .. });
4722 assert!(
4723 !(prev_is_assistant_chat && next_is_tool_calls),
4724 "streaming path: history contains Chat(assistant) immediately before \
4725 AssistantToolCalls — duplicate narration push was not removed"
4726 );
4727 }
4728 }
4729
4730 #[tokio::test]
4731 async fn response_cache_key_uses_full_provider_visible_transcript() {
4732 let tmp = tempfile::tempdir().expect("temp response cache dir");
4733 let cache = Arc::new(
4734 zeroclaw_memory::response_cache::ResponseCache::new(tmp.path(), 60, 100)
4735 .expect("response cache should initialize"),
4736 );
4737
4738 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4739 backend: "none".into(),
4740 ..zeroclaw_config::schema::MemoryConfig::default()
4741 };
4742 let mem_a: Arc<dyn Memory> = Arc::from(
4743 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4744 .expect("memory creation should succeed with valid config"),
4745 );
4746 let mem_b: Arc<dyn Memory> = Arc::from(
4747 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4748 .expect("memory creation should succeed with valid config"),
4749 );
4750
4751 let seen_a = Arc::new(Mutex::new(Vec::new()));
4752 let seen_b = Arc::new(Mutex::new(Vec::new()));
4753 let provider_a = Box::new(TranscriptCaptureModelProvider {
4754 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
4755 text: Some("from prior transcript".into()),
4756 tool_calls: vec![],
4757 usage: None,
4758 reasoning_content: None,
4759 }]),
4760 seen_messages: seen_a.clone(),
4761 });
4762 let provider_b = Box::new(TranscriptCaptureModelProvider {
4763 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
4764 text: Some("from fresh transcript".into()),
4765 tool_calls: vec![],
4766 usage: None,
4767 reasoning_content: None,
4768 }]),
4769 seen_messages: seen_b.clone(),
4770 });
4771
4772 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4773 let mut agent_a = Agent::builder()
4774 .model_provider(provider_a)
4775 .tools(vec![Box::new(MockTool)])
4776 .memory(mem_a)
4777 .observer(observer.clone())
4778 .response_cache(Some(cache.clone()))
4779 .tool_dispatcher(Box::new(NativeToolDispatcher))
4780 .workspace_dir(std::path::PathBuf::from("/tmp"))
4781 .model_name("test-model".into())
4782 .temperature(0.0)
4783 .build()
4784 .expect("agent builder should succeed with valid config");
4785 agent_a.seed_history(&[
4786 ChatMessage::user("earlier turn"),
4787 ChatMessage::assistant("earlier answer"),
4788 ]);
4789
4790 let mut agent_b = Agent::builder()
4791 .model_provider(provider_b)
4792 .tools(vec![Box::new(MockTool)])
4793 .memory(mem_b)
4794 .observer(observer)
4795 .response_cache(Some(cache))
4796 .tool_dispatcher(Box::new(NativeToolDispatcher))
4797 .workspace_dir(std::path::PathBuf::from("/tmp"))
4798 .model_name("test-model".into())
4799 .temperature(0.0)
4800 .build()
4801 .expect("agent builder should succeed with valid config");
4802
4803 assert_eq!(
4804 agent_a.turn("same final prompt").await.unwrap(),
4805 "from prior transcript"
4806 );
4807 assert_eq!(
4808 agent_b.turn("same final prompt").await.unwrap(),
4809 "from fresh transcript"
4810 );
4811 assert_eq!(seen_a.lock().len(), 1);
4812 assert_eq!(
4813 seen_b.lock().len(),
4814 1,
4815 "fresh transcript must not reuse a cache entry written for a different prior transcript"
4816 );
4817 }
4818
4819 #[tokio::test]
4820 async fn turn_streamed_with_steering_commits_streamed_output_before_continuing() {
4821 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4822 backend: "none".into(),
4823 ..zeroclaw_config::schema::MemoryConfig::default()
4824 };
4825 let mem: Arc<dyn Memory> = Arc::from(
4826 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4827 .expect("memory creation should succeed with valid config"),
4828 );
4829
4830 let seen_messages = Arc::new(Mutex::new(Vec::new()));
4831 let model_provider = Box::new(StreamingSteeringModelProvider {
4832 seen_messages: seen_messages.clone(),
4833 call_count: AtomicUsize::new(0),
4834 fail_on_call: None,
4835 fail_after_delta_on_call: None,
4836 });
4837 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4838 let mut agent = Agent::builder()
4839 .model_provider(model_provider)
4840 .tools(vec![Box::new(MockTool)])
4841 .memory(mem)
4842 .observer(observer)
4843 .tool_dispatcher(Box::new(NativeToolDispatcher))
4844 .workspace_dir(std::path::PathBuf::from("/tmp"))
4845 .build()
4846 .expect("agent builder should succeed with valid config");
4847
4848 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
4849 let (steering_tx, mut steering_rx) = tokio::sync::mpsc::channel::<String>(4);
4850 let handle = tokio::spawn(async move {
4851 agent
4852 .turn_streamed_with_steering_state("first", event_tx, None, Some(&mut steering_rx))
4853 .await
4854 });
4855
4856 loop {
4857 match event_rx.recv().await.expect("turn event should arrive") {
4858 TurnEvent::Chunk { delta } if delta == "draft" => {
4859 steering_tx
4860 .send("second".into())
4861 .await
4862 .expect("steering message should enqueue");
4863 break;
4864 }
4865 _ => {}
4866 }
4867 }
4868
4869 let outcome = handle
4870 .await
4871 .expect("turn task should finish")
4872 .expect("steered turn should succeed");
4873 assert_eq!(outcome.response, "draftfinal");
4874
4875 let new_chat_messages: Vec<_> = outcome
4876 .new_messages
4877 .iter()
4878 .filter_map(|msg| match msg {
4879 ConversationMessage::Chat(message) => {
4880 Some((message.role.as_str(), message.content.as_str()))
4881 }
4882 _ => None,
4883 })
4884 .collect();
4885 assert!(
4886 new_chat_messages
4887 .iter()
4888 .any(|(role, content)| { *role == "assistant" && *content == "draft" }),
4889 "already streamed output must be committed before the steering continuation"
4890 );
4891 assert!(
4892 new_chat_messages
4893 .iter()
4894 .any(|(role, content)| { *role == "user" && content.contains("second") }),
4895 "accepted steering must be retained as its own user turn"
4896 );
4897
4898 let seen = seen_messages.lock();
4899 assert_eq!(seen.len(), 2);
4900 let second_call = &seen[1];
4901 assert!(
4902 second_call
4903 .iter()
4904 .any(|msg| msg.role == "assistant" && msg.content == "draft"),
4905 "second provider call must see the committed streamed assistant text"
4906 );
4907 assert!(
4908 second_call
4909 .iter()
4910 .filter(|msg| msg.role == "user")
4911 .any(|msg| msg.content.contains("second")),
4912 "second provider call must include the accepted steering user message"
4913 );
4914 }
4915
4916 #[tokio::test]
4917 async fn turn_streamed_with_steering_error_returns_committed_partial_output() {
4918 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4919 backend: "none".into(),
4920 ..zeroclaw_config::schema::MemoryConfig::default()
4921 };
4922 let mem: Arc<dyn Memory> = Arc::from(
4923 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4924 .expect("memory creation should succeed with valid config"),
4925 );
4926
4927 let model_provider = Box::new(StreamingSteeringModelProvider {
4928 seen_messages: Arc::new(Mutex::new(Vec::new())),
4929 call_count: AtomicUsize::new(0),
4930 fail_on_call: Some(2),
4931 fail_after_delta_on_call: None,
4932 });
4933 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4934 let mut agent = Agent::builder()
4935 .model_provider(model_provider)
4936 .tools(vec![Box::new(MockTool)])
4937 .memory(mem)
4938 .observer(observer)
4939 .tool_dispatcher(Box::new(NativeToolDispatcher))
4940 .workspace_dir(std::path::PathBuf::from("/tmp"))
4941 .build()
4942 .expect("agent builder should succeed with valid config");
4943
4944 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
4945 let (steering_tx, mut steering_rx) = tokio::sync::mpsc::channel::<String>(4);
4946 let handle = tokio::spawn(async move {
4947 agent
4948 .turn_streamed_with_steering_state("first", event_tx, None, Some(&mut steering_rx))
4949 .await
4950 });
4951
4952 loop {
4953 match event_rx.recv().await.expect("turn event should arrive") {
4954 TurnEvent::Chunk { delta } if delta == "draft" => {
4955 steering_tx
4956 .send("second".into())
4957 .await
4958 .expect("steering message should enqueue");
4959 break;
4960 }
4961 _ => {}
4962 }
4963 }
4964
4965 let err = handle
4966 .await
4967 .expect("turn task should finish")
4968 .expect_err("second provider call should fail");
4969 assert_eq!(err.committed_response, "draft");
4970 assert!(
4971 err.new_messages.iter().any(|msg| {
4972 matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content == "draft")
4973 }),
4974 "committed partial assistant output should be returned for persistence after continuation failure"
4975 );
4976 assert!(
4977 err.new_messages.iter().any(|msg| {
4978 matches!(msg, ConversationMessage::Chat(message) if message.role == "user" && message.content.contains("second"))
4979 }),
4980 "accepted steering user message should still be returned after continuation failure"
4981 );
4982 }
4983
4984 #[tokio::test]
4985 async fn turn_streamed_error_after_delta_returns_visible_partial_output() {
4986 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4987 backend: "none".into(),
4988 ..zeroclaw_config::schema::MemoryConfig::default()
4989 };
4990 let mem: Arc<dyn Memory> = Arc::from(
4991 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4992 .expect("memory creation should succeed with valid config"),
4993 );
4994
4995 let model_provider = Box::new(StreamingSteeringModelProvider {
4996 seen_messages: Arc::new(Mutex::new(Vec::new())),
4997 call_count: AtomicUsize::new(0),
4998 fail_on_call: None,
4999 fail_after_delta_on_call: Some(1),
5000 });
5001 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5002 let mut agent = Agent::builder()
5003 .model_provider(model_provider)
5004 .tools(vec![Box::new(MockTool)])
5005 .memory(mem)
5006 .observer(observer)
5007 .tool_dispatcher(Box::new(NativeToolDispatcher))
5008 .workspace_dir(std::path::PathBuf::from("/tmp"))
5009 .build()
5010 .expect("agent builder should succeed with valid config");
5011
5012 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
5013 let handle = tokio::spawn(async move {
5014 agent
5015 .turn_streamed_with_steering_state("first", event_tx, None, None)
5016 .await
5017 });
5018
5019 assert!(
5020 matches!(
5021 event_rx.recv().await,
5022 Some(TurnEvent::Chunk { delta }) if delta == "draft"
5023 ),
5024 "the client should see the streamed text before the provider error"
5025 );
5026
5027 let err = handle
5028 .await
5029 .expect("turn task should finish")
5030 .expect_err("provider stream failure should be returned");
5031 assert!(
5032 err.error
5033 .to_string()
5034 .contains("synthetic provider failure after delta"),
5035 "unexpected error: {}",
5036 err.error
5037 );
5038 assert!(
5039 err.committed_response.contains("draft"),
5040 "visible streamed text should be committed after a provider stream error"
5041 );
5042 assert!(
5043 err.committed_response.contains("[stream interrupted]"),
5044 "persisted partial text should mark that the stream did not complete"
5045 );
5046 assert!(
5047 err.new_messages.iter().any(|msg| {
5048 matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content.contains("draft"))
5049 }),
5050 "new messages should carry the visible assistant partial for gateway persistence"
5051 );
5052 }
5053
5054 #[tokio::test]
5055 async fn turn_streamed_cancel_before_output_returns_interruption_message() {
5056 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5057 backend: "none".into(),
5058 ..zeroclaw_config::schema::MemoryConfig::default()
5059 };
5060 let mem: Arc<dyn Memory> = Arc::from(
5061 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5062 .expect("memory creation should succeed with valid config"),
5063 );
5064
5065 let model_provider = Box::new(StreamingSteeringModelProvider {
5066 seen_messages: Arc::new(Mutex::new(Vec::new())),
5067 call_count: AtomicUsize::new(0),
5068 fail_on_call: None,
5069 fail_after_delta_on_call: None,
5070 });
5071 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5072 let mut agent = Agent::builder()
5073 .model_provider(model_provider)
5074 .tools(vec![Box::new(MockTool)])
5075 .memory(mem)
5076 .observer(observer)
5077 .tool_dispatcher(Box::new(NativeToolDispatcher))
5078 .workspace_dir(std::path::PathBuf::from("/tmp"))
5079 .build()
5080 .expect("agent builder should succeed with valid config");
5081
5082 let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
5083 let cancel_token = tokio_util::sync::CancellationToken::new();
5084 cancel_token.cancel();
5085
5086 let err = agent
5087 .turn_streamed_with_steering_state("first", event_tx, Some(cancel_token), None)
5088 .await
5089 .expect_err("pre-cancelled turn should return cancellation");
5090
5091 assert!(
5092 crate::agent::loop_::is_tool_loop_cancelled(&err.error),
5093 "unexpected error: {}",
5094 err.error
5095 );
5096 assert_eq!(err.committed_response, "[interrupted by user]");
5097 assert!(
5098 err.new_messages.iter().any(|msg| {
5099 matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content == "[interrupted by user]")
5100 }),
5101 "cancelled turn should include an assistant interruption marker for persistence"
5102 );
5103 }
5104
5105 #[tokio::test]
5106 async fn turn_streamed_stream_error_after_delta_emits_llm_response_failure() {
5107 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5108 backend: "none".into(),
5109 ..zeroclaw_config::schema::MemoryConfig::default()
5110 };
5111 let mem: Arc<dyn Memory> = Arc::from(
5112 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5113 .expect("memory creation should succeed with valid config"),
5114 );
5115
5116 let model_provider = Box::new(StreamingSteeringModelProvider {
5117 seen_messages: Arc::new(Mutex::new(Vec::new())),
5118 call_count: AtomicUsize::new(0),
5119 fail_on_call: None,
5120 fail_after_delta_on_call: Some(1),
5121 });
5122 let capturing = Arc::new(CapturingObserver::default());
5123 let observer: Arc<dyn Observer> = capturing.clone();
5124 let mut agent = Agent::builder()
5125 .model_provider(model_provider)
5126 .tools(vec![Box::new(MockTool)])
5127 .memory(mem)
5128 .observer(observer)
5129 .tool_dispatcher(Box::new(NativeToolDispatcher))
5130 .workspace_dir(std::path::PathBuf::from("/tmp"))
5131 .build()
5132 .expect("agent builder should succeed with valid config");
5133
5134 let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
5135 let err = agent
5136 .turn_streamed_with_steering_state("test", event_tx, None, None)
5137 .await
5138 .expect_err("provider stream failure should be returned");
5139
5140 assert!(
5141 err.committed_response.contains("draft")
5142 && err.committed_response.contains("[stream interrupted]"),
5143 "unexpected committed_response: {}",
5144 err.committed_response
5145 );
5146
5147 let events = capturing.events.lock();
5148 let request = events
5149 .iter()
5150 .find(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
5151 .expect("LlmRequest should have been recorded");
5152 let response = events
5153 .iter()
5154 .find(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
5155 .expect("LlmResponse should have been recorded");
5156
5157 assert_eq!(
5158 events
5159 .iter()
5160 .filter(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
5161 .count(),
5162 1,
5163 "exactly one LlmRequest expected"
5164 );
5165 assert_eq!(
5166 events
5167 .iter()
5168 .filter(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
5169 .count(),
5170 1,
5171 "exactly one LlmResponse expected"
5172 );
5173
5174 let (
5175 ObserverEvent::LlmRequest {
5176 model_provider: req_provider,
5177 model: req_model,
5178 ..
5179 },
5180 ObserverEvent::LlmResponse {
5181 model_provider: resp_provider,
5182 model: resp_model,
5183 success,
5184 error_message,
5185 ..
5186 },
5187 ) = (request, response)
5188 else {
5189 panic!("matched event variants should be LlmRequest and LlmResponse");
5190 };
5191
5192 assert!(!success, "LlmResponse on stream error must be a failure");
5193 assert!(
5194 error_message.as_deref().is_some_and(|m| !m.is_empty()),
5195 "failure LlmResponse must carry a non-empty error_message"
5196 );
5197 assert_eq!(req_provider, resp_provider, "provider should match");
5198 assert_eq!(req_model, resp_model, "model should match");
5199 }
5200
5201 #[tokio::test]
5202 async fn turn_streamed_cancel_during_stream_emits_llm_response_failure() {
5203 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5204 backend: "none".into(),
5205 ..zeroclaw_config::schema::MemoryConfig::default()
5206 };
5207 let mem: Arc<dyn Memory> = Arc::from(
5208 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5209 .expect("memory creation should succeed with valid config"),
5210 );
5211
5212 let model_provider = Box::new(StreamingSteeringModelProvider {
5213 seen_messages: Arc::new(Mutex::new(Vec::new())),
5214 call_count: AtomicUsize::new(0),
5215 fail_on_call: None,
5216 fail_after_delta_on_call: None,
5217 });
5218 let capturing = Arc::new(CapturingObserver::default());
5219 let observer: Arc<dyn Observer> = capturing.clone();
5220 let mut agent = Agent::builder()
5221 .model_provider(model_provider)
5222 .tools(vec![Box::new(MockTool)])
5223 .memory(mem)
5224 .observer(observer)
5225 .tool_dispatcher(Box::new(NativeToolDispatcher))
5226 .workspace_dir(std::path::PathBuf::from("/tmp"))
5227 .build()
5228 .expect("agent builder should succeed with valid config");
5229
5230 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
5231 let cancel_token = tokio_util::sync::CancellationToken::new();
5232 let cancel_for_task = cancel_token.clone();
5233
5234 let canceller = tokio::spawn(async move {
5235 while let Some(event) = event_rx.recv().await {
5236 if matches!(event, TurnEvent::Chunk { ref delta } if delta == "draft") {
5237 cancel_for_task.cancel();
5238 break;
5239 }
5240 }
5241 while event_rx.recv().await.is_some() {}
5242 });
5243
5244 let err = agent
5245 .turn_streamed_with_steering_state("test", event_tx, Some(cancel_token), None)
5246 .await
5247 .expect_err("cancelled turn should return cancellation");
5248
5249 canceller.await.expect("canceller task should finish");
5250
5251 assert!(
5252 crate::agent::loop_::is_tool_loop_cancelled(&err.error),
5253 "cancelled turn should carry the cancellation error: {}",
5254 err.error
5255 );
5256
5257 let events = capturing.events.lock();
5258 assert_eq!(
5259 events
5260 .iter()
5261 .filter(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
5262 .count(),
5263 1,
5264 "exactly one LlmRequest expected"
5265 );
5266 assert_eq!(
5267 events
5268 .iter()
5269 .filter(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
5270 .count(),
5271 1,
5272 "exactly one LlmResponse expected"
5273 );
5274
5275 let request = events
5276 .iter()
5277 .find(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
5278 .expect("LlmRequest should have been recorded");
5279 let response = events
5280 .iter()
5281 .find(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
5282 .expect("LlmResponse should have been recorded");
5283
5284 let (
5285 ObserverEvent::LlmRequest {
5286 model_provider: req_provider,
5287 model: req_model,
5288 ..
5289 },
5290 ObserverEvent::LlmResponse {
5291 model_provider: resp_provider,
5292 model: resp_model,
5293 success,
5294 error_message,
5295 ..
5296 },
5297 ) = (request, response)
5298 else {
5299 panic!("matched event variants should be LlmRequest and LlmResponse");
5300 };
5301
5302 assert!(!success, "cancellation LlmResponse must be a failure");
5303 assert_eq!(
5304 error_message.as_deref(),
5305 Some("request cancelled by user"),
5306 "cancellation LlmResponse must carry the fixed cancel message"
5307 );
5308 assert_eq!(req_provider, resp_provider, "provider should match");
5309 assert_eq!(req_model, resp_model, "model should match");
5310 }
5311
5312 struct NamedMockTool {
5317 tool_name: String,
5318 }
5319
5320 impl NamedMockTool {
5321 fn new(name: &str) -> Self {
5322 Self {
5323 tool_name: name.to_string(),
5324 }
5325 }
5326 }
5327
5328 #[async_trait]
5329 impl Tool for NamedMockTool {
5330 fn name(&self) -> &str {
5331 &self.tool_name
5332 }
5333
5334 fn description(&self) -> &str {
5335 "mock"
5336 }
5337
5338 fn parameters_schema(&self) -> serde_json::Value {
5339 serde_json::json!({"type": "object"})
5340 }
5341
5342 async fn execute(&self, _args: serde_json::Value) -> Result<crate::tools::ToolResult> {
5343 Ok(crate::tools::ToolResult {
5344 success: true,
5345 output: "ok".into(),
5346 error: None,
5347 })
5348 }
5349 }
5350
5351 fn make_skill(name: &str, tool_names: &[&str]) -> crate::skills::Skill {
5352 crate::skills::Skill {
5353 name: name.to_string(),
5354 description: format!("{name} skill"),
5355 version: "0.1.0".to_string(),
5356 author: None,
5357 tags: vec![],
5358 tools: tool_names
5359 .iter()
5360 .map(|t| crate::skills::SkillTool {
5361 name: t.to_string(),
5362 description: format!("{t} tool"),
5363 kind: "shell".to_string(),
5364 command: format!("echo {t}"),
5365 args: std::collections::HashMap::new(),
5366 target: None,
5367 locked_args: std::collections::HashMap::new(),
5368 })
5369 .collect(),
5370 prompts: vec![],
5371 location: None,
5372 }
5373 }
5374
5375 #[test]
5376 fn register_skill_tools_adds_skill_tools_to_registry() {
5377 let security = Arc::new(crate::security::SecurityPolicy::default());
5378 let mut tools: Vec<Box<dyn Tool>> = vec![Box::new(NamedMockTool::new("builtin_a"))];
5379
5380 let skills = vec![make_skill("deploy", &["run", "status"])];
5381 tools::register_skill_tools(&mut tools, &skills, security);
5382
5383 let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
5384 assert_eq!(names, &["builtin_a", "deploy__run", "deploy__status"]);
5385 }
5386
5387 #[test]
5388 fn register_skill_tools_skips_shadowed_builtins() {
5389 let security = Arc::new(crate::security::SecurityPolicy::default());
5390 let mut tools: Vec<Box<dyn Tool>> = vec![Box::new(NamedMockTool::new("my_skill__run"))];
5392
5393 let skills = vec![make_skill("my_skill", &["run"])];
5394 tools::register_skill_tools(&mut tools, &skills, security);
5395
5396 assert_eq!(tools.len(), 1);
5398 assert_eq!(tools[0].name(), "my_skill__run");
5399 }
5400
5401 #[test]
5402 fn from_config_policy_filter_blocks_raw_target_but_keeps_scoped_wrapper() {
5403 use crate::skills::{Skill, SkillTool};
5412
5413 let shell: Arc<dyn Tool> = Arc::new(NamedMockTool::new("shell"));
5414 let file_read: Arc<dyn Tool> = Arc::new(NamedMockTool::new("file_read"));
5415 let resolution: Vec<Arc<dyn Tool>> = vec![Arc::clone(&shell), Arc::clone(&file_read)];
5418
5419 let mut tools: Vec<Box<dyn Tool>> = vec![
5420 Box::new(crate::tools::ArcToolRef(Arc::clone(&shell))),
5421 Box::new(crate::tools::ArcToolRef(Arc::clone(&file_read))),
5422 ];
5423
5424 let policy = crate::security::SecurityPolicy {
5428 allowed_tools: Some(vec!["file_read".to_string()]),
5429 workspace_dir: std::env::temp_dir(),
5430 ..crate::security::SecurityPolicy::default()
5431 };
5432 crate::agent::loop_::apply_policy_tool_filter(&mut tools, Some(&policy), None);
5433 assert!(
5434 !tools.iter().any(|t| t.name() == "shell"),
5435 "raw shell must be removed by the allowlist on the from_config path"
5436 );
5437 assert!(
5438 tools.iter().any(|t| t.name() == "file_read"),
5439 "allowlisted file_read must survive the filter"
5440 );
5441
5442 let skill = Skill {
5443 name: "ops".to_string(),
5444 description: "d".to_string(),
5445 version: "1".to_string(),
5446 author: None,
5447 tags: vec![],
5448 tools: vec![SkillTool {
5449 name: "use_shell".to_string(),
5450 description: "scoped shell".to_string(),
5451 kind: "builtin".to_string(),
5452 command: String::new(),
5453 args: std::collections::HashMap::new(),
5454 target: Some("shell".to_string()),
5455 locked_args: std::collections::HashMap::new(),
5456 }],
5457 prompts: vec![],
5458 location: None,
5459 };
5460 tools::register_skill_tools_with_context(
5461 &mut tools,
5462 &[skill],
5463 Arc::new(crate::security::SecurityPolicy::default()),
5464 &resolution,
5465 );
5466
5467 assert!(
5468 !tools.iter().any(|t| t.name() == "shell"),
5469 "raw shell must STILL be unavailable after skill registration"
5470 );
5471 assert!(
5472 tools.iter().any(|t| t.name() == "ops__use_shell"),
5473 "the scoped elevation wrapper must remain the only callable path to shell"
5474 );
5475 }
5476
5477 #[test]
5478 fn excluded_tools_filters_matching_tools() {
5479 let mut tools: Vec<Box<dyn Tool>> = vec![
5480 Box::new(NamedMockTool::new("shell")),
5481 Box::new(NamedMockTool::new("file_write")),
5482 Box::new(NamedMockTool::new("web_search")),
5483 ];
5484
5485 let excluded = ["shell".to_string(), "file_write".to_string()];
5486 tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
5487
5488 let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
5489 assert_eq!(names, &["web_search"]);
5490 }
5491
5492 #[test]
5493 fn excluded_tools_preserves_non_excluded() {
5494 let mut tools: Vec<Box<dyn Tool>> = vec![
5495 Box::new(NamedMockTool::new("shell")),
5496 Box::new(NamedMockTool::new("file_read")),
5497 Box::new(NamedMockTool::new("web_fetch")),
5498 ];
5499
5500 let excluded = ["shell".to_string()];
5502 tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
5503
5504 let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
5505 assert_eq!(names, &["file_read", "web_fetch"]);
5506 }
5507
5508 #[test]
5509 fn empty_excluded_tools_preserves_all() {
5510 let mut tools: Vec<Box<dyn Tool>> = vec![
5511 Box::new(NamedMockTool::new("shell")),
5512 Box::new(NamedMockTool::new("file_read")),
5513 ];
5514
5515 let excluded: Vec<String> = vec![];
5516 if !excluded.is_empty() {
5517 tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
5518 }
5519
5520 assert_eq!(tools.len(), 2);
5521 }
5522
5523 #[tokio::test]
5529 async fn turn_streamed_returns_new_messages_at_history_limit() {
5530 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5531 backend: "none".into(),
5532 ..zeroclaw_config::schema::MemoryConfig::default()
5533 };
5534 let mem: Arc<dyn Memory> = Arc::from(
5535 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5536 .expect("memory creation should succeed with valid config"),
5537 );
5538
5539 let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
5542 max_history_messages: 4,
5543 ..zeroclaw_config::schema::AliasedAgentConfig::default()
5544 };
5545
5546 let provider = Box::new(NarrationStreamModelProvider {
5548 call_count: Arc::new(Mutex::new(0)),
5549 });
5550
5551 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5552 let mut agent = Agent::builder()
5553 .model_provider(provider)
5554 .tools(vec![Box::new(MockTool)])
5555 .memory(mem)
5556 .observer(observer)
5557 .tool_dispatcher(Box::new(NativeToolDispatcher))
5558 .workspace_dir(std::path::PathBuf::from("/tmp"))
5559 .config(agent_config)
5560 .build()
5561 .expect("agent builder should succeed with valid config");
5562
5563 agent
5568 .history
5569 .push(ConversationMessage::Chat(ChatMessage::system("sys")));
5570 for i in 0..2 {
5571 agent
5572 .history
5573 .push(ConversationMessage::Chat(ChatMessage::user(format!(
5574 "old {i}"
5575 ))));
5576 agent
5577 .history
5578 .push(ConversationMessage::Chat(ChatMessage::assistant(format!(
5579 "old reply {i}"
5580 ))));
5581 }
5582 let (event_tx, _rx) = tokio::sync::mpsc::channel::<TurnEvent>(8);
5587 let (_, new_msgs) = agent
5588 .turn_streamed("new question", event_tx, None)
5589 .await
5590 .expect("turn_streamed should succeed");
5591
5592 let has_user = new_msgs
5594 .iter()
5595 .any(|m| matches!(m, ConversationMessage::Chat(c) if c.role == "user"));
5596 assert!(
5597 has_user,
5598 "new_msgs must include the user message even after trim; got: {new_msgs:?}"
5599 );
5600
5601 let has_assistant = new_msgs
5603 .iter()
5604 .any(|m| matches!(m, ConversationMessage::Chat(c) if c.role == "assistant"));
5605 assert!(
5606 has_assistant,
5607 "new_msgs must include the assistant reply even after trim; got: {new_msgs:?}"
5608 );
5609 }
5610
5611 #[tokio::test]
5619 async fn from_config_runtime_profile_propagates_budget_caps() {
5620 use axum::{Json, Router, routing::post};
5621 use tempfile::TempDir;
5622 use tokio::net::TcpListener;
5623
5624 let app = Router::new().route(
5625 "/chat/completions",
5626 post(|Json(_body): Json<serde_json::Value>| async move {
5627 Json(serde_json::json!({
5628 "choices": [{
5629 "message": {
5630 "content": "ok"
5631 }
5632 }]
5633 }))
5634 }),
5635 );
5636
5637 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
5638 let mock_addr = listener.local_addr().unwrap();
5639 let server_handle = tokio::spawn(async move {
5640 axum::serve(listener, app).await.unwrap();
5641 });
5642
5643 let tmp = TempDir::new().expect("temp dir");
5644 let workspace_dir = tmp.path().join("workspace");
5645 std::fs::create_dir_all(&workspace_dir).unwrap();
5646
5647 let mut config = zeroclaw_config::schema::Config {
5648 data_dir: workspace_dir,
5649 config_path: tmp.path().join("config.toml"),
5650 ..Default::default()
5651 };
5652 {
5653 let entry = config
5654 .providers
5655 .models
5656 .ensure("custom", "default")
5657 .expect("custom model_provider type slot");
5658 entry.api_key = Some("test-key".to_string());
5659 entry.model = Some("test-model".to_string());
5660 entry.uri = Some(format!("http://{mock_addr}"));
5661 }
5662 config.memory.backend = "none".to_string();
5663 config.memory.auto_save = false;
5664
5665 config.risk_profiles.insert(
5666 "test-profile".to_string(),
5667 zeroclaw_config::schema::RiskProfileConfig::default(),
5668 );
5669
5670 config.runtime_profiles.insert(
5672 "test-runtime".to_string(),
5673 zeroclaw_config::schema::RuntimeProfileConfig {
5674 max_actions_per_hour: 99,
5675 ..zeroclaw_config::schema::RuntimeProfileConfig::default()
5676 },
5677 );
5678
5679 let provider_alias = config
5680 .first_model_provider_type()
5681 .expect("model_provider configured above")
5682 .to_string();
5683 let agent_cfg = zeroclaw_config::schema::AliasedAgentConfig {
5684 model_provider: format!("{provider_alias}.default").into(),
5685 risk_profile: "test-profile".to_string(),
5686 runtime_profile: "test-runtime".to_string(),
5687 ..zeroclaw_config::schema::AliasedAgentConfig::default()
5688 };
5689 config.agents.insert("test-agent".to_string(), agent_cfg);
5690
5691 let agent = Agent::from_config(&config, "test-agent")
5692 .await
5693 .expect("agent from config");
5694
5695 let summary = agent
5696 .security_summary
5697 .as_deref()
5698 .expect("security_summary should be populated by from_config");
5699
5700 assert!(
5701 summary.contains("99"),
5702 "expected security_summary to contain runtime max_actions_per_hour=99; got: {summary}"
5703 );
5704
5705 server_handle.abort();
5706 }
5707
5708 #[test]
5709 fn excluded_tools_then_skill_registration_end_to_end() {
5710 let security = Arc::new(crate::security::SecurityPolicy::default());
5711 let mut tools: Vec<Box<dyn Tool>> = vec![
5712 Box::new(NamedMockTool::new("shell")),
5713 Box::new(NamedMockTool::new("file_read")),
5714 Box::new(NamedMockTool::new("web_fetch")),
5715 ];
5716
5717 let excluded = ["shell".to_string()];
5719 tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
5720
5721 let skills = vec![make_skill("ops", &["deploy", "rollback"])];
5723 tools::register_skill_tools(&mut tools, &skills, security);
5724
5725 let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
5726 assert_eq!(
5727 names,
5728 &["file_read", "web_fetch", "ops__deploy", "ops__rollback"]
5729 );
5730 }
5731
5732 #[test]
5736 fn channel_targets_guarded_when_channel_send_excluded() {
5737 let mut tools: Vec<Box<dyn Tool>> = vec![
5738 Box::new(NamedMockTool::new("shell")),
5739 Box::new(NamedMockTool::new("file_read")),
5740 Box::new(NamedMockTool::new("channel_send")),
5741 ];
5742
5743 let excluded = ["channel_send".to_string()];
5745 tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
5746
5747 let channel_send_present = tools.iter().any(|t| t.name() == "channel_send");
5749 assert!(
5750 !channel_send_present,
5751 "channel_send should have been excluded by the policy filter"
5752 );
5753
5754 }
5757}