1use crate::agent::dispatcher::{
2 NativeToolDispatcher, ParsedToolCall, ToolDispatcher, ToolExecutionResult, XmlToolDispatcher,
3};
4use crate::agent::eval::AutoClassifyExt;
5use crate::agent::prompt::{PromptContext, SystemPromptBuilder};
6use crate::approval::{ApprovalManager, ApprovalRequest, ApprovalRequirement, ApprovalResponse};
7use crate::observability::{self, Observer, ObserverEvent};
8use crate::platform;
9use crate::security::SecurityPolicy;
10use crate::tools::{self, Tool, ToolSpec};
11use anyhow::{Context, Result};
12use chrono::{Datelike, Timelike};
13use std::collections::{HashMap, VecDeque};
14use std::io::Write as IoWrite;
15use std::path::Path;
16use std::sync::Arc;
17use std::time::Instant;
18use zeroclaw_config::schema::Config;
19use zeroclaw_memory::{self, Memory, MemoryCategory};
20use zeroclaw_providers::{
21 self, ChatMessage, ChatRequest, ConversationMessage, ModelProvider, ToolResultMessage,
22};
23use zeroclaw_tool_call_parser::strip_think_tags;
24
25pub use zeroclaw_api::agent::TurnEvent;
27
28pub fn build_session_model_provider(
35 config: &Config,
36 model_provider_ref: &str,
37 model_override: Option<&str>,
38) -> Result<(Box<dyn ModelProvider>, String, String)> {
39 let (model_provider_name, model_provider_alias) = model_provider_ref
40 .split_once('.')
41 .map(|(t, a)| (t.to_string(), a.to_string()))
42 .ok_or_else(|| {
43 anyhow::Error::msg(format!(
44 "model_provider reference `{model_provider_ref}` must be `<type>.<alias>`"
45 ))
46 })?;
47
48 let entry = config
49 .providers
50 .models
51 .find(&model_provider_name, &model_provider_alias);
52 let model_name = model_override
53 .map(str::trim)
54 .filter(|m| !m.is_empty())
55 .map(str::to_string)
56 .or_else(|| {
57 entry
58 .and_then(|e| e.model.as_deref())
59 .map(str::trim)
60 .filter(|m| !m.is_empty())
61 .map(str::to_string)
62 })
63 .ok_or_else(|| {
64 anyhow::Error::msg(format!(
65 "model_provider `{model_provider_ref}` has no `model` configured and no model \
66 override was supplied"
67 ))
68 })?;
69
70 let model_provider_runtime_options = zeroclaw_providers::provider_runtime_options_for_alias(
71 config,
72 &model_provider_name,
73 &model_provider_alias,
74 );
75
76 let model_provider = zeroclaw_providers::create_routed_model_provider_with_options(
77 config,
78 model_provider_ref,
79 entry.and_then(|e| e.api_key.as_deref()),
80 entry.and_then(|e| e.uri.as_deref()),
81 &config.reliability,
82 &config.model_routes,
83 &model_name,
84 &model_provider_runtime_options,
85 )?;
86
87 Ok((model_provider, model_provider_name, model_name))
88}
89
90struct TurnGuard {
91 observer: Arc<dyn Observer>,
92 model_provider: String,
93 model: String,
94 turn_id: Option<String>,
95 turn_started_at: Instant,
96 agent_alias: Option<String>,
97 total_input_tokens: u64,
98 total_output_tokens: u64,
99 saw_usage: bool,
100 done: bool,
101}
102
103impl TurnGuard {
104 fn fire(&mut self) {
105 if self.done {
106 return;
107 }
108 self.done = true;
109 self.observer.record_event(&ObserverEvent::AgentEnd {
110 model_provider: self.model_provider.clone(),
111 model: self.model.clone(),
112 duration: self.turn_started_at.elapsed(),
113 tokens_used: self.saw_usage.then_some(
114 zeroclaw_api::observability_traits::TurnTokenUsage {
115 input_tokens: self.total_input_tokens,
116 output_tokens: self.total_output_tokens,
117 },
118 ),
119 cost_usd: None,
120 channel: None,
121 agent_alias: self.agent_alias.clone(),
122 turn_id: self.turn_id.clone(),
123 });
124 }
125}
126
127impl Drop for TurnGuard {
128 fn drop(&mut self) {
129 self.fire();
130 }
131}
132
133pub struct Agent {
134 model_provider: Box<dyn ModelProvider>,
135 tools: Vec<Box<dyn Tool>>,
136 tool_specs: Vec<ToolSpec>,
137 memory: Arc<dyn Memory>,
138 observer: Arc<dyn Observer>,
139 prompt_builder: SystemPromptBuilder,
140 tool_dispatcher: Box<dyn ToolDispatcher>,
141 memory_strategy: Arc<dyn zeroclaw_api::memory_traits::MemoryStrategy>,
142 config: zeroclaw_config::schema::AliasedAgentConfig,
143 multimodal_config: zeroclaw_config::schema::MultimodalConfig,
144 model_name: String,
145 model_provider_name: String,
146 temperature: Option<f64>,
147 workspace_dir: std::path::PathBuf,
148 agent_workspace_dir: std::path::PathBuf,
153 identity_config: zeroclaw_config::schema::IdentityConfig,
154 skills: Vec<crate::skills::Skill>,
155 skills_prompt_mode: zeroclaw_config::schema::SkillsPromptInjectionMode,
156 auto_save: bool,
157 memory_session_id: Option<String>,
158 history: Vec<ConversationMessage>,
159 classification_config: zeroclaw_config::schema::QueryClassificationConfig,
160 available_hints: Vec<String>,
161 route_model_by_hint: HashMap<String, String>,
162 #[allow(dead_code)] allowed_tools: Option<Vec<String>>,
164 response_cache: Option<Arc<zeroclaw_memory::response_cache::ResponseCache>>,
165 security_summary: Option<String>,
168 autonomy_level: crate::security::AutonomyLevel,
170 activated_tools: Option<Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
174 hook_runner: Option<Arc<crate::hooks::HookRunner>>,
177 approval_manager: Option<Arc<ApprovalManager>>,
179 agent_alias: String,
182 channel_handles: AgentChannelHandles,
189 #[allow(dead_code)]
193 exclude_memory: bool,
194 image_cache: zeroclaw_providers::multimodal::LocalImageCache,
198}
199
200impl Drop for Agent {
201 fn drop(&mut self) {
202 ::zeroclaw_log::record!(
203 INFO,
204 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
205 .with_category(::zeroclaw_log::EventCategory::Agent)
206 .with_attrs(::serde_json::json!({
207 "model_provider": self.model_provider_name,
208 "model": self.model_name,
209 "history_messages_freed": self.history.len(),
210 })),
211 "Agent dropped; conversation history and per-session state freed"
212 );
213 }
214}
215
216#[derive(Debug)]
217pub struct StreamedTurnSuccess {
218 pub response: String,
219 pub new_messages: Vec<ConversationMessage>,
220}
221
222#[derive(Debug)]
223pub struct StreamedTurnError {
224 pub error: anyhow::Error,
225 pub committed_response: String,
226 pub new_messages: Vec<ConversationMessage>,
227}
228
229#[derive(Clone, Default)]
232pub struct AgentChannelHandles {
233 pub ask_user: Option<tools::PerToolChannelHandle>,
234 pub reaction: tools::PerToolChannelHandle,
235 pub poll: Option<tools::PerToolChannelHandle>,
236 pub escalate: Option<tools::PerToolChannelHandle>,
237}
238
239impl AgentChannelHandles {
240 fn populated_handles(&self) -> Vec<Option<&tools::PerToolChannelHandle>> {
242 vec![
243 self.ask_user.as_ref(),
244 Some(&self.reaction),
245 self.poll.as_ref(),
246 self.escalate.as_ref(),
247 ]
248 }
249
250 pub fn register_channel(
253 &self,
254 name: impl Into<String>,
255 channel: Arc<dyn zeroclaw_api::channel::Channel>,
256 ) {
257 let name = name.into();
258 for handle in self.populated_handles().into_iter().flatten() {
259 handle.write().insert(name.clone(), Arc::clone(&channel));
260 }
261 }
262
263 pub fn unregister_channel(&self, name: &str) {
265 for handle in self.populated_handles().into_iter().flatten() {
266 handle.write().remove(name);
267 }
268 }
269
270 pub fn get_channel(&self, name: &str) -> Option<Arc<dyn zeroclaw_api::channel::Channel>> {
272 for handle in self.populated_handles().into_iter().flatten() {
273 if let Some(channel) = handle.read().get(name) {
274 return Some(Arc::clone(channel));
275 }
276 }
277 None
278 }
279}
280
281pub struct AgentBuilder {
282 model_provider: Option<Box<dyn ModelProvider>>,
283 tools: Option<Vec<Box<dyn Tool>>>,
284 memory: Option<Arc<dyn Memory>>,
285 observer: Option<Arc<dyn Observer>>,
286 prompt_builder: Option<SystemPromptBuilder>,
287 tool_dispatcher: Option<Box<dyn ToolDispatcher>>,
288 memory_strategy: Option<Arc<dyn zeroclaw_api::memory_traits::MemoryStrategy>>,
289 config: Option<zeroclaw_config::schema::AliasedAgentConfig>,
290 multimodal_config: Option<zeroclaw_config::schema::MultimodalConfig>,
291 model_name: Option<String>,
292 model_provider_name: Option<String>,
293 temperature: Option<f64>,
294 workspace_dir: Option<std::path::PathBuf>,
295 agent_workspace_dir: Option<std::path::PathBuf>,
296 identity_config: Option<zeroclaw_config::schema::IdentityConfig>,
297 skills: Option<Vec<crate::skills::Skill>>,
298 skills_prompt_mode: Option<zeroclaw_config::schema::SkillsPromptInjectionMode>,
299 auto_save: Option<bool>,
300 memory_session_id: Option<String>,
301 classification_config: Option<zeroclaw_config::schema::QueryClassificationConfig>,
302 available_hints: Option<Vec<String>>,
303 route_model_by_hint: Option<HashMap<String, String>>,
304 allowed_tools: Option<Vec<String>>,
305 response_cache: Option<Arc<zeroclaw_memory::response_cache::ResponseCache>>,
306 security_summary: Option<String>,
307 autonomy_level: Option<crate::security::AutonomyLevel>,
308 activated_tools: Option<Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
309 hook_runner: Option<Arc<crate::hooks::HookRunner>>,
310 approval_manager: Option<Arc<ApprovalManager>>,
311 agent_alias: Option<String>,
312 exclude_memory: bool,
313}
314
315impl Default for AgentBuilder {
316 fn default() -> Self {
317 Self::new()
318 }
319}
320
321impl AgentBuilder {
322 pub fn new() -> Self {
323 Self {
324 model_provider: None,
325 tools: None,
326 memory: None,
327 observer: None,
328 prompt_builder: None,
329 tool_dispatcher: None,
330 memory_strategy: None,
331 config: None,
332 multimodal_config: None,
333 model_name: None,
334 model_provider_name: None,
335 temperature: None,
336 workspace_dir: None,
337 agent_workspace_dir: None,
338 identity_config: None,
339 skills: None,
340 skills_prompt_mode: None,
341 auto_save: None,
342 memory_session_id: None,
343 classification_config: None,
344 available_hints: None,
345 route_model_by_hint: None,
346 allowed_tools: None,
347 response_cache: None,
348 security_summary: None,
349 autonomy_level: None,
350 activated_tools: None,
351 hook_runner: None,
352 approval_manager: None,
353 agent_alias: None,
354 exclude_memory: false,
355 }
356 }
357
358 pub fn model_provider(mut self, model_provider: Box<dyn ModelProvider>) -> Self {
359 self.model_provider = Some(model_provider);
360 self
361 }
362
363 pub fn tools(mut self, tools: Vec<Box<dyn Tool>>) -> Self {
364 self.tools = Some(tools);
365 self
366 }
367
368 pub fn memory(mut self, memory: Arc<dyn Memory>) -> Self {
369 self.memory = Some(memory);
370 self
371 }
372
373 pub fn observer(mut self, observer: Arc<dyn Observer>) -> Self {
374 self.observer = Some(observer);
375 self
376 }
377
378 pub fn prompt_builder(mut self, prompt_builder: SystemPromptBuilder) -> Self {
379 self.prompt_builder = Some(prompt_builder);
380 self
381 }
382
383 pub fn tool_dispatcher(mut self, tool_dispatcher: Box<dyn ToolDispatcher>) -> Self {
384 self.tool_dispatcher = Some(tool_dispatcher);
385 self
386 }
387
388 pub fn memory_strategy(
389 mut self,
390 memory_strategy: Arc<dyn zeroclaw_api::memory_traits::MemoryStrategy>,
391 ) -> Self {
392 self.memory_strategy = Some(memory_strategy);
393 self
394 }
395
396 pub fn config(mut self, config: zeroclaw_config::schema::AliasedAgentConfig) -> Self {
397 self.config = Some(config);
398 self
399 }
400
401 pub fn multimodal_config(
402 mut self,
403 multimodal_config: zeroclaw_config::schema::MultimodalConfig,
404 ) -> Self {
405 self.multimodal_config = Some(multimodal_config);
406 self
407 }
408
409 pub fn model_name(mut self, model_name: String) -> Self {
410 self.model_name = Some(model_name);
411 self
412 }
413
414 pub fn model_provider_name(mut self, name: String) -> Self {
415 self.model_provider_name = Some(name);
416 self
417 }
418
419 pub fn temperature(mut self, temperature: Option<f64>) -> Self {
420 self.temperature = temperature;
421 self
422 }
423
424 pub fn workspace_dir(mut self, workspace_dir: std::path::PathBuf) -> Self {
425 self.workspace_dir = Some(workspace_dir);
426 self
427 }
428
429 pub fn agent_workspace_dir(mut self, agent_workspace_dir: std::path::PathBuf) -> Self {
430 self.agent_workspace_dir = Some(agent_workspace_dir);
431 self
432 }
433
434 pub fn identity_config(
435 mut self,
436 identity_config: zeroclaw_config::schema::IdentityConfig,
437 ) -> Self {
438 self.identity_config = Some(identity_config);
439 self
440 }
441
442 pub fn skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
443 self.skills = Some(skills);
444 self
445 }
446
447 pub fn skills_prompt_mode(
448 mut self,
449 skills_prompt_mode: zeroclaw_config::schema::SkillsPromptInjectionMode,
450 ) -> Self {
451 self.skills_prompt_mode = Some(skills_prompt_mode);
452 self
453 }
454
455 pub fn auto_save(mut self, auto_save: bool) -> Self {
456 self.auto_save = Some(auto_save);
457 self
458 }
459
460 pub fn memory_session_id(mut self, memory_session_id: Option<String>) -> Self {
461 self.memory_session_id = memory_session_id;
462 self
463 }
464
465 pub fn classification_config(
466 mut self,
467 classification_config: zeroclaw_config::schema::QueryClassificationConfig,
468 ) -> Self {
469 self.classification_config = Some(classification_config);
470 self
471 }
472
473 pub fn available_hints(mut self, available_hints: Vec<String>) -> Self {
474 self.available_hints = Some(available_hints);
475 self
476 }
477
478 pub fn route_model_by_hint(mut self, route_model_by_hint: HashMap<String, String>) -> Self {
479 self.route_model_by_hint = Some(route_model_by_hint);
480 self
481 }
482
483 pub fn allowed_tools(mut self, allowed_tools: Option<Vec<String>>) -> Self {
484 self.allowed_tools = allowed_tools;
485 self
486 }
487
488 pub fn response_cache(
489 mut self,
490 cache: Option<Arc<zeroclaw_memory::response_cache::ResponseCache>>,
491 ) -> Self {
492 self.response_cache = cache;
493 self
494 }
495
496 pub fn security_summary(mut self, summary: Option<String>) -> Self {
497 self.security_summary = summary;
498 self
499 }
500
501 pub fn autonomy_level(mut self, level: crate::security::AutonomyLevel) -> Self {
502 self.autonomy_level = Some(level);
503 self
504 }
505
506 pub fn activated_tools(
507 mut self,
508 activated: Option<Arc<std::sync::Mutex<tools::ActivatedToolSet>>>,
509 ) -> Self {
510 self.activated_tools = activated;
511 self
512 }
513
514 pub fn hook_runner(mut self, runner: Option<Arc<crate::hooks::HookRunner>>) -> Self {
515 self.hook_runner = runner;
516 self
517 }
518
519 pub fn approval_manager(mut self, manager: Option<Arc<ApprovalManager>>) -> Self {
520 self.approval_manager = manager;
521 self
522 }
523
524 pub fn agent_alias(mut self, alias: String) -> Self {
526 self.agent_alias = Some(alias);
527 self
528 }
529
530 pub fn exclude_memory(mut self, exclude: bool) -> Self {
536 self.exclude_memory = exclude;
537 self
538 }
539
540 pub fn build(self) -> Result<Agent> {
541 let mut tools = self.tools.ok_or_else(|| {
542 ::zeroclaw_log::record!(
543 ERROR,
544 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
545 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
546 .with_attrs(::serde_json::json!({"missing_field": "tools"})),
547 "AgentBuilder::build missing required field"
548 );
549 anyhow::Error::msg("tools are required")
550 })?;
551 let allowed = self.allowed_tools.clone();
552 if let Some(ref allow_list) = allowed {
553 tools.retain(|t| allow_list.iter().any(|name| name == t.name()));
554 }
555
556 let exclude_memory = self.exclude_memory;
559 if exclude_memory {
560 tools.retain(|t| !zeroclaw_tools::MEMORY_TOOL_NAMES.contains(&t.name()));
561 }
562
563 let tool_specs = tools.iter().map(|tool| tool.spec()).collect();
564 let workspace_dir = self
565 .workspace_dir
566 .clone()
567 .unwrap_or_else(|| std::path::PathBuf::from("."));
568
569 let memory: Arc<dyn Memory> = if exclude_memory {
570 Arc::new(zeroclaw_memory::NoneMemory::new("none"))
571 } else {
572 self.memory.ok_or_else(|| {
573 ::zeroclaw_log::record!(
574 ERROR,
575 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
576 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
577 .with_attrs(::serde_json::json!({"missing_field": "memory"})),
578 "AgentBuilder::build missing required field"
579 );
580 anyhow::Error::msg("memory is required")
581 })?
582 };
583 let memory_strategy = if exclude_memory {
586 None
587 } else {
588 self.memory_strategy
589 };
590
591 Ok(Agent {
592 model_provider: self.model_provider.ok_or_else(|| {
593 ::zeroclaw_log::record!(
594 ERROR,
595 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
596 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
597 .with_attrs(::serde_json::json!({"missing_field": "model_provider"})),
598 "AgentBuilder::build missing required field"
599 );
600 anyhow::Error::msg("model_provider is required")
601 })?,
602 tools,
603 tool_specs,
604 memory: memory.clone(),
605 observer: self.observer.ok_or_else(|| {
606 ::zeroclaw_log::record!(
607 ERROR,
608 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
609 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
610 .with_attrs(::serde_json::json!({"missing_field": "observer"})),
611 "AgentBuilder::build missing required field"
612 );
613 anyhow::Error::msg("observer is required")
614 })?,
615 prompt_builder: self
616 .prompt_builder
617 .unwrap_or_else(SystemPromptBuilder::with_defaults),
618 tool_dispatcher: self.tool_dispatcher.ok_or_else(|| {
619 ::zeroclaw_log::record!(
620 ERROR,
621 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
622 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
623 .with_attrs(::serde_json::json!({"missing_field": "tool_dispatcher"})),
624 "AgentBuilder::build missing required field"
625 );
626 anyhow::Error::msg("tool_dispatcher is required")
627 })?,
628 memory_strategy: memory_strategy.unwrap_or_else(|| {
629 Arc::new(
630 crate::agent::memory_strategy::DefaultMemoryStrategy::with_config(
631 memory.clone(),
632 zeroclaw_config::schema::MemoryConfig::default(),
633 workspace_dir.clone(),
634 ),
635 )
636 }),
637 config: self.config.unwrap_or_default(),
638 multimodal_config: self.multimodal_config.unwrap_or_default(),
639 model_name: self.model_name.unwrap_or_else(|| "<unconfigured>".into()),
645 model_provider_name: self
646 .model_provider_name
647 .unwrap_or_else(|| "<unconfigured>".into()),
648 temperature: self.temperature,
649 workspace_dir: self
651 .workspace_dir
652 .clone()
653 .unwrap_or_else(|| std::path::PathBuf::from(".")),
654 agent_workspace_dir: self.agent_workspace_dir.unwrap_or_else(|| {
655 self.workspace_dir
656 .clone()
657 .unwrap_or_else(|| std::path::PathBuf::from("."))
658 }),
659 identity_config: self.identity_config.unwrap_or_default(),
660 skills: self.skills.unwrap_or_default(),
661 skills_prompt_mode: self.skills_prompt_mode.unwrap_or_default(),
662 auto_save: if exclude_memory {
663 false
664 } else {
665 self.auto_save.unwrap_or(false)
666 },
667 memory_session_id: self.memory_session_id,
668 history: Vec::new(),
669 classification_config: self.classification_config.unwrap_or_default(),
670 available_hints: self.available_hints.unwrap_or_default(),
671 route_model_by_hint: self.route_model_by_hint.unwrap_or_default(),
672 allowed_tools: allowed,
673 response_cache: self.response_cache,
674 security_summary: self.security_summary,
675 autonomy_level: self
676 .autonomy_level
677 .unwrap_or(crate::security::AutonomyLevel::Supervised),
678 activated_tools: self.activated_tools,
679 hook_runner: self.hook_runner,
680 approval_manager: self.approval_manager,
681 agent_alias: self.agent_alias.unwrap_or_default(),
682 channel_handles: AgentChannelHandles::default(),
683 exclude_memory,
684 image_cache: zeroclaw_providers::multimodal::LocalImageCache::new(),
685 })
686 }
687}
688
689impl Agent {
690 pub fn builder() -> AgentBuilder {
691 AgentBuilder::new()
692 }
693
694 fn new_turn_id() -> String {
695 uuid::Uuid::new_v4().to_string()
696 }
697
698 fn observer_agent_alias(&self) -> Option<String> {
699 if self.agent_alias.is_empty() {
700 None
701 } else {
702 Some(self.agent_alias.clone())
703 }
704 }
705
706 pub fn history(&self) -> &[ConversationMessage] {
707 &self.history
708 }
709
710 pub fn channel_handles(&self) -> &AgentChannelHandles {
716 &self.channel_handles
717 }
718
719 pub fn populate_channels(
727 &self,
728 channel_map: &std::collections::HashMap<String, Arc<dyn zeroclaw_api::channel::Channel>>,
729 ) -> Vec<String> {
730 let mut names = Vec::new();
731 for (name, ch) in channel_map {
732 self.channel_handles.register_channel(name, Arc::clone(ch));
733 names.push(name.clone());
734 }
735 names
736 }
737
738 pub fn attribution_fields(&self) -> (String, String, String) {
743 (
744 self.agent_alias.clone(),
745 self.model_provider_name.clone(),
746 self.model_name.clone(),
747 )
748 }
749
750 pub fn clear_history(&mut self) {
751 self.history.clear();
752 }
753
754 fn encode_response_cache_transcript(messages: &[ChatMessage]) -> String {
755 let mut transcript = String::new();
756 for message in messages.iter().filter(|message| message.role != "system") {
757 transcript.push_str("role=");
758 transcript.push_str(&message.role.len().to_string());
759 transcript.push(':');
760 transcript.push_str(&message.role);
761 transcript.push_str(";content=");
762 transcript.push_str(&message.content.len().to_string());
763 transcript.push(':');
764 transcript.push_str(&message.content);
765 transcript.push('\n');
766 }
767 transcript
768 }
769
770 fn response_cache_key_for_messages(
771 &self,
772 messages: &[ChatMessage],
773 effective_model: &str,
774 ) -> Option<String> {
775 if self.temperature != Some(0.0) || self.response_cache.is_none() {
776 return None;
777 }
778
779 let system = messages
780 .iter()
781 .find(|message| message.role == "system")
782 .map(|message| message.content.as_str());
783 let transcript = Self::encode_response_cache_transcript(messages);
784
785 Some(zeroclaw_memory::response_cache::ResponseCache::cache_key(
786 effective_model,
787 system,
788 &transcript,
789 ))
790 }
791
792 fn drain_steering_messages(
793 steering_rx: &mut Option<&mut tokio::sync::mpsc::Receiver<String>>,
794 ) -> Vec<String> {
795 let Some(rx) = steering_rx.as_deref_mut() else {
796 return Vec::new();
797 };
798
799 let mut messages = Vec::new();
800 loop {
801 match rx.try_recv() {
802 Ok(message) => messages.push(message),
803 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
804 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
805 }
806 }
807 messages
808 }
809
810 async fn append_streamed_user_message_to_history(
811 &mut self,
812 user_message: &str,
813 new_msgs: &mut Vec<ConversationMessage>,
814 ) {
815 let context = self
816 .memory_strategy
817 .load_context(user_message, self.memory_session_id.as_deref())
818 .await
819 .unwrap_or_default();
820
821 if self.auto_save {
822 let _ = self
823 .memory
824 .store(
825 "user_msg",
826 user_message,
827 MemoryCategory::Conversation,
828 self.memory_session_id.as_deref(),
829 )
830 .await;
831 }
832
833 let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S %Z");
834 let enriched = if context.is_empty() {
835 format!("[{now}] {user_message}")
836 } else {
837 format!("{context}[{now}] {user_message}")
838 };
839
840 let user_msg = ConversationMessage::Chat(ChatMessage::user(enriched));
841 new_msgs.push(user_msg.clone());
842 self.history.push(user_msg);
843 }
844
845 fn marked_partial_response(partial: &str, marker: &str) -> String {
846 if partial.is_empty() {
847 marker.to_string()
848 } else {
849 format!("{partial}\n\n{marker}")
850 }
851 }
852
853 async fn send_turn_event(
864 event_tx: &tokio::sync::mpsc::Sender<TurnEvent>,
865 cancel_token: Option<&tokio_util::sync::CancellationToken>,
866 event: TurnEvent,
867 ) -> bool {
868 match cancel_token {
869 Some(token) => {
870 tokio::select! {
871 biased;
872 () = token.cancelled() => false,
873 _ = event_tx.send(event) => true,
874 }
875 }
876 None => {
877 let _ = event_tx.send(event).await;
878 true
879 }
880 }
881 }
882
883 fn append_streamed_assistant_message_to_history(
884 &mut self,
885 content: String,
886 new_msgs: &mut Vec<ConversationMessage>,
887 committed_response: &mut String,
888 ) {
889 let assistant_msg = ConversationMessage::Chat(ChatMessage::assistant(content.clone()));
890 new_msgs.push(assistant_msg.clone());
891 self.history.push(assistant_msg);
892 committed_response.push_str(&content);
893 }
894
895 fn synthesize_cancelled_tool_results(
896 &mut self,
897 completed: Vec<ToolResultMessage>,
898 remaining: &[zeroclaw_providers::ToolCall],
899 new_msgs: &mut Vec<ConversationMessage>,
900 ) {
901 let mut results = completed;
902 results.extend(remaining.iter().map(|call| ToolResultMessage {
903 tool_call_id: call.id.clone(),
904 content: "[interrupted by user before this tool produced a result]".to_string(),
905 }));
906 if results.is_empty() {
907 return;
908 }
909 let msg = ConversationMessage::ToolResults(results);
910 new_msgs.push(msg.clone());
911 self.history.push(msg);
912 }
913
914 fn should_send_tool_specs(&self) -> bool {
915 self.tool_dispatcher.should_send_tool_specs() && !self.tool_specs.is_empty()
916 }
917
918 fn parse_response_for_effective_tools(
919 &self,
920 response: &zeroclaw_providers::ChatResponse,
921 ) -> (String, Vec<ParsedToolCall>) {
922 if self.tool_specs.is_empty() {
923 return (strip_think_tags(response.text_or_empty()), Vec::new());
924 }
925
926 if self.config.resolved.strict_tool_parsing && response.tool_calls.is_empty() {
927 return (strip_think_tags(response.text_or_empty()), Vec::new());
928 }
929
930 self.tool_dispatcher.parse_response(response)
931 }
932
933 pub fn set_memory_session_id(&mut self, session_id: Option<String>) {
934 self.memory_session_id = session_id;
935 }
936
937 pub fn set_temperature(&mut self, temperature: Option<f64>) {
938 self.temperature = temperature;
939 }
940
941 #[cfg(test)]
942 pub fn temperature_for_test(&self) -> Option<f64> {
943 self.temperature
944 }
945
946 pub fn set_model_name(&mut self, model_name: String) {
947 self.model_name = model_name;
948 }
949
950 pub fn set_model_provider(&mut self, model_provider: Box<dyn ModelProvider>) {
951 self.model_provider = model_provider;
952 }
953
954 pub fn set_model_provider_name(&mut self, model_provider_name: String) {
955 self.model_provider_name = model_provider_name;
956 }
957
958 #[cfg(test)]
961 pub fn tool_names(&self) -> Vec<&str> {
962 self.tools.iter().map(|t| t.name()).collect()
963 }
964
965 pub fn seed_history(&mut self, messages: &[ChatMessage]) {
971 if self.history.is_empty()
972 && let Ok(sys) = self.build_system_prompt()
973 {
974 self.history
975 .push(ConversationMessage::Chat(ChatMessage::system(sys)));
976 }
977 for msg in messages {
978 if msg.role != "system" {
979 self.history.push(ConversationMessage::Chat(msg.clone()));
980 }
981 }
982 }
983
984 pub fn seed_conversation_history(&mut self, messages: Vec<ConversationMessage>) {
989 if self.history.is_empty()
990 && let Ok(sys) = self.build_system_prompt()
991 {
992 self.history
993 .push(ConversationMessage::Chat(ChatMessage::system(sys)));
994 }
995 for msg in messages {
996 if matches!(&msg, ConversationMessage::Chat(m) if m.role == "system") {
998 continue;
999 }
1000 self.history.push(msg);
1001 }
1002 self.trim_history();
1007 }
1008
1009 pub async fn from_config(config: &Config, agent_alias: &str) -> Result<Self> {
1010 Self::from_config_with_session_cwd(config, agent_alias, None).await
1011 }
1012
1013 pub async fn from_config_with_session_cwd(
1024 config: &Config,
1025 agent_alias: &str,
1026 session_cwd: Option<&Path>,
1027 ) -> Result<Self> {
1028 Self::from_config_with_session_cwd_and_mcp(config, agent_alias, session_cwd, true).await
1029 }
1030
1031 pub async fn from_config_with_session_cwd_and_mcp(
1037 config: &Config,
1038 agent_alias: &str,
1039 session_cwd: Option<&Path>,
1040 initialize_mcp: bool,
1041 ) -> Result<Self> {
1042 Self::from_config_with_session_cwd_and_mcp_approval_mode(
1043 config,
1044 agent_alias,
1045 session_cwd,
1046 initialize_mcp,
1047 false,
1048 false,
1049 None,
1050 )
1051 .await
1052 }
1053
1054 pub async fn from_config_with_session_cwd_and_mcp_backchannel(
1061 config: &Config,
1062 agent_alias: &str,
1063 session_cwd: Option<&Path>,
1064 initialize_mcp: bool,
1065 exclude_memory: bool,
1066 ) -> Result<Self> {
1067 Self::from_config_with_session_cwd_and_mcp_approval_mode(
1068 config,
1069 agent_alias,
1070 session_cwd,
1071 initialize_mcp,
1072 true,
1073 exclude_memory,
1074 None,
1075 )
1076 .await
1077 }
1078
1079 pub async fn from_config_with_tui_env(
1084 config: &Config,
1085 agent_alias: &str,
1086 session_cwd: Option<&Path>,
1087 initialize_mcp: bool,
1088 exclude_memory: bool,
1089 tui_env: Option<std::collections::HashMap<String, String>>,
1090 ) -> Result<Self> {
1091 Self::from_config_with_session_cwd_and_mcp_approval_mode(
1092 config,
1093 agent_alias,
1094 session_cwd,
1095 initialize_mcp,
1096 true,
1097 exclude_memory,
1098 tui_env,
1099 )
1100 .await
1101 }
1102
1103 async fn from_config_with_session_cwd_and_mcp_approval_mode(
1104 config: &Config,
1105 agent_alias: &str,
1106 session_cwd: Option<&Path>,
1107 initialize_mcp: bool,
1108 approval_backchannel: bool,
1109 exclude_memory: bool,
1110 tui_env: Option<std::collections::HashMap<String, String>>,
1111 ) -> Result<Self> {
1112 let agent_cfg = config
1113 .agent(agent_alias)
1114 .with_context(|| format!("agents.{agent_alias} is not configured"))?;
1115 let risk_profile = config
1116 .risk_profile_for_agent(agent_alias)
1117 .with_context(|| {
1118 format!(
1119 "agents.{agent_alias}.risk_profile does not name a configured risk_profiles entry"
1120 )
1121 })?;
1122
1123 let observer: Arc<dyn Observer> =
1124 Arc::from(observability::create_observer(&config.observability));
1125 let runtime: Arc<dyn platform::RuntimeAdapter> =
1126 Arc::from(platform::create_runtime(&config.runtime)?);
1127 let agent_workspace = config.agent_workspace_dir(agent_alias);
1132 if let Err(e) = tokio::fs::create_dir_all(&agent_workspace).await {
1136 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "workspace": agent_workspace.display().to_string(), "e": e.to_string()})), "Failed to create per-agent workspace dir (continuing): ");
1137 }
1138 if let Err(e) = zeroclaw_config::schema::ensure_bootstrap_files(&agent_workspace).await {
1144 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "workspace": agent_workspace.display().to_string(), "e": e.to_string()})), "Failed to ensure per-agent bootstrap files (continuing with whatever exists): ");
1145 }
1146 let security = Arc::new({
1147 let mut policy = SecurityPolicy::for_agent(config, agent_alias).with_context(|| {
1152 format!("agents.{agent_alias}: failed to build security policy")
1153 })?;
1154 if let Some(cwd) = session_cwd {
1160 policy.workspace_dir = cwd.to_path_buf();
1161 policy.allowed_roots.push(agent_workspace.clone());
1162 }
1163 policy
1164 });
1165
1166 let (provider_name, provider_alias, agent_model_provider) =
1167 match config.resolved_model_provider_for_agent(agent_alias) {
1168 Some(resolved) => (resolved.0, resolved.1, Some(resolved.2)),
1169 None => {
1170 let agent_ref = agent_cfg.model_provider.as_str();
1171 if !agent_ref.is_empty() {
1172 anyhow::bail!(
1173 "agents.{agent_alias}.model_provider = \"{agent_ref}\" does not \
1174 resolve to a configured [providers.models.<type>.<alias>] entry"
1175 );
1176 }
1177 anyhow::bail!(
1180 "agents.{agent_alias}.model_provider is empty — set it to a \
1181 configured \"<type>.<alias>\" (e.g. \"anthropic.{agent_alias}\")"
1182 );
1183 }
1184 };
1185 let memory: Arc<dyn Memory> = zeroclaw_memory::create_memory_for_agent(
1186 config,
1187 agent_alias,
1188 agent_model_provider.and_then(|e| e.api_key.as_deref()),
1189 )
1190 .await?;
1191
1192 let composio_key = if config.composio.enabled {
1193 config.composio.api_key.as_deref()
1194 } else {
1195 None
1196 };
1197 let composio_entity_id = if config.composio.enabled {
1198 Some(config.composio.entity_id.as_str())
1199 } else {
1200 None
1201 };
1202
1203 let all_tools_result = tools::all_tools_with_runtime(
1204 Arc::new(config.clone()),
1205 &security,
1206 risk_profile,
1207 agent_alias,
1208 runtime,
1209 memory.clone(),
1210 composio_key,
1211 composio_entity_id,
1212 &config.browser,
1213 &config.http_request,
1214 &config.web_fetch,
1215 &security.workspace_dir,
1216 &config.agents,
1217 agent_model_provider.and_then(|e| e.api_key.as_deref()),
1218 config,
1219 None,
1220 false,
1221 tui_env,
1222 );
1223 let mut tools = all_tools_result.tools;
1224 let delegate_handle = all_tools_result.delegate_handle;
1225 let ask_user_handle = all_tools_result.ask_user_handle;
1226 let reaction_handle = all_tools_result.reaction_handle;
1227 let poll_handle = all_tools_result.poll_handle;
1228 let escalate_handle = all_tools_result.escalate_handle;
1229
1230 let before_policy_filter = tools.len();
1244 crate::agent::loop_::apply_policy_tool_filter(&mut tools, Some(security.as_ref()), None);
1245 if tools.len() != before_policy_filter {
1246 ::zeroclaw_log::record!(
1247 INFO,
1248 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1249 .with_attrs(::serde_json::json!({
1250 "before": before_policy_filter,
1251 "retained": tools.len(),
1252 "policy_allowed": security.allowed_tools.as_ref().map(|v| v.len()),
1253 "policy_excluded": security.excluded_tools.as_ref().map(|v| v.len()),
1254 })),
1255 "Applied SecurityPolicy built-in tool filter (from_config path)"
1256 );
1257 }
1258
1259 let mut activated_tools: Option<Arc<std::sync::Mutex<tools::ActivatedToolSet>>> = None;
1264 let mut mcp_elevation_arcs: Vec<Arc<dyn tools::Tool>> = Vec::new();
1266 if initialize_mcp && config.mcp.enabled && !config.mcp.servers.is_empty() {
1267 ::zeroclaw_log::record!(
1268 INFO,
1269 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1270 &format!(
1271 "Initializing MCP client — {} server(s) configured",
1272 config.mcp.servers.len()
1273 )
1274 );
1275 match tools::McpRegistry::connect_all(&config.mcp.servers).await {
1276 Ok(registry) => {
1277 let registry = std::sync::Arc::new(registry);
1278 mcp_elevation_arcs = tools::collect_mcp_elevation_arcs(®istry).await;
1279 let mcp_policy =
1280 crate::agent::loop_::mcp_tool_access_policy(security.as_ref(), None);
1281 if config.mcp.deferred_loading {
1282 let deferred_set = tools::DeferredMcpToolSet::from_registry(
1283 std::sync::Arc::clone(®istry),
1284 )
1285 .await;
1286 ::zeroclaw_log::record!(
1287 INFO,
1288 ::zeroclaw_log::Event::new(
1289 module_path!(),
1290 ::zeroclaw_log::Action::Note
1291 ),
1292 &format!(
1293 "MCP deferred: {} tool stub(s) from {} server(s)",
1294 deferred_set.len(),
1295 registry.server_count()
1296 )
1297 );
1298 let allowed_stub_count = crate::agent::loop_::mcp_allowed_tool_count(
1299 deferred_set
1300 .stubs
1301 .iter()
1302 .map(|stub| stub.prefixed_name.as_str()),
1303 mcp_policy.as_ref(),
1304 );
1305 if allowed_stub_count > 0 {
1306 let activated =
1307 Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
1308 activated_tools = Some(Arc::clone(&activated));
1309 let mut tool_search =
1310 tools::ToolSearchTool::new(deferred_set, activated);
1311 if let Some(policy) = mcp_policy {
1312 tool_search = tool_search.with_access_policy(policy);
1313 }
1314 tools.push(Box::new(tool_search));
1315 }
1316 } else {
1317 let names = registry.tool_names();
1318 let mut registered = 0usize;
1319 let mut skipped = 0usize;
1320 for name in names {
1321 if !crate::agent::loop_::eager_mcp_tool_allowed(
1322 &name,
1323 mcp_policy.as_ref(),
1324 ) {
1325 skipped += 1;
1326 continue;
1327 }
1328 if let Some(def) = registry.get_tool_def(&name).await {
1329 let wrapper: std::sync::Arc<dyn tools::Tool> =
1330 std::sync::Arc::new(tools::McpToolWrapper::new(
1331 name,
1332 def,
1333 std::sync::Arc::clone(®istry),
1334 ));
1335 if crate::agent::loop_::register_eager_mcp_tool_if_allowed(
1336 wrapper,
1337 &mut tools,
1338 delegate_handle.as_ref(),
1339 mcp_policy.as_ref(),
1340 ) {
1341 registered += 1;
1342 }
1343 }
1344 }
1345 ::zeroclaw_log::record!(
1346 INFO,
1347 ::zeroclaw_log::Event::new(
1348 module_path!(),
1349 ::zeroclaw_log::Action::Note
1350 ),
1351 &format!(
1352 "MCP: {} tool(s) registered from {} server(s), {} skipped by policy",
1353 registered,
1354 registry.server_count(),
1355 skipped
1356 )
1357 );
1358 }
1359 }
1360 Err(e) => {
1361 ::zeroclaw_log::record!(
1362 ERROR,
1363 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1364 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1365 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1366 "MCP registry failed to initialize"
1367 );
1368 }
1369 }
1370 }
1371
1372 let model_name = match agent_model_provider
1373 .and_then(|e| e.model.as_deref())
1374 .map(str::trim)
1375 .filter(|m| !m.is_empty())
1376 {
1377 Some(m) => m.to_string(),
1378 None => anyhow::bail!(
1379 "agents.{agent_alias}.model_provider resolves to a model_provider entry \
1380 with no `model` set. Configure [providers.models.{provider_name}.<alias>] \
1381 model = \"...\".",
1382 ),
1383 };
1384
1385 let provider_ref = format!("{provider_name}.{provider_alias}");
1386 let provider_runtime_options = zeroclaw_providers::provider_runtime_options_for_alias(
1387 config,
1388 provider_name,
1389 provider_alias,
1390 );
1391
1392 let model_provider: Box<dyn ModelProvider> =
1393 zeroclaw_providers::create_routed_model_provider_with_options(
1394 config,
1395 &provider_ref,
1396 agent_model_provider.and_then(|e| e.api_key.as_deref()),
1397 agent_model_provider.and_then(|e| e.uri.as_deref()),
1398 &config.reliability,
1399 &config.model_routes,
1400 &model_name,
1401 &provider_runtime_options,
1402 )?;
1403
1404 let dispatcher_choice = agent_cfg.resolved.tool_dispatcher.as_str();
1405 let tool_dispatcher: Box<dyn ToolDispatcher> = match dispatcher_choice {
1406 "native" => Box::new(NativeToolDispatcher),
1407 "xml" => Box::new(XmlToolDispatcher),
1408 _ if model_provider.supports_native_tools() => Box::new(NativeToolDispatcher),
1409 _ => Box::new(XmlToolDispatcher),
1410 };
1411
1412 let route_model_by_hint: HashMap<String, String> = config
1413 .model_routes
1414 .iter()
1415 .map(|route| (route.hint.clone(), route.model.clone()))
1416 .collect();
1417 let available_hints: Vec<String> = route_model_by_hint.keys().cloned().collect();
1418
1419 let response_cache = if config.memory.response_cache_enabled {
1420 zeroclaw_memory::response_cache::ResponseCache::with_hot_cache(
1421 &config.data_dir,
1422 config.memory.response_cache_ttl_minutes,
1423 config.memory.response_cache_max_entries,
1424 config.memory.response_cache_hot_entries,
1425 )
1426 .ok()
1427 .map(Arc::new)
1428 } else {
1429 None
1430 };
1431
1432 let excluded = &risk_profile.excluded_tools;
1436 if !excluded.is_empty() {
1437 tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
1438 }
1439
1440 let skills = crate::skills::load_skills_for_agent_from_config(config, agent_alias);
1446 let skill_resolution_registry: Vec<Arc<dyn tools::Tool>> = all_tools_result
1449 .unfiltered_tool_arcs
1450 .iter()
1451 .cloned()
1452 .chain(mcp_elevation_arcs.iter().cloned())
1453 .collect();
1454 tools::register_skill_tools_with_context(
1455 &mut tools,
1456 &skills,
1457 security.clone(),
1458 &skill_resolution_registry,
1459 );
1460
1461 let approval_manager = if approval_backchannel {
1462 ApprovalManager::for_non_interactive_backchannel(risk_profile)
1463 } else {
1464 ApprovalManager::for_non_interactive(risk_profile)
1465 };
1466
1467 let mut agent = Agent::builder()
1468 .model_provider(model_provider)
1469 .tools(tools)
1470 .memory(memory.clone())
1471 .observer(observer)
1472 .response_cache(response_cache)
1473 .tool_dispatcher(tool_dispatcher)
1474 .memory_strategy(Arc::new(
1475 crate::agent::memory_strategy::DefaultMemoryStrategy::with_config_and_limit(
1476 memory.clone(),
1477 config.memory.clone(),
1478 security.workspace_dir.clone(),
1479 config.effective_memory_recall_limit(agent_alias),
1480 ),
1481 ))
1482 .prompt_builder(SystemPromptBuilder::with_defaults())
1483 .config(
1484 config
1485 .resolved_agent_config(agent_alias)
1486 .unwrap_or_else(|| agent_cfg.clone()),
1487 )
1488 .multimodal_config(config.multimodal.clone())
1489 .agent_alias(agent_alias.to_string())
1490 .model_name(model_name)
1491 .model_provider_name(provider_name.to_string())
1492 .temperature(agent_model_provider.and_then(|e| e.temperature))
1493 .workspace_dir(security.workspace_dir.clone())
1494 .agent_workspace_dir(agent_workspace.clone())
1495 .classification_config(config.query_classification.clone())
1496 .available_hints(available_hints)
1497 .route_model_by_hint(route_model_by_hint)
1498 .identity_config(agent_cfg.identity.clone())
1499 .skills(skills)
1500 .skills_prompt_mode(config.skills.prompt_injection_mode)
1501 .auto_save(config.memory.auto_save)
1502 .exclude_memory(exclude_memory)
1503 .security_summary(Some(security.prompt_summary()))
1504 .autonomy_level(risk_profile.level)
1505 .activated_tools(activated_tools)
1506 .hook_runner(if config.hooks.enabled {
1507 let mut runner = crate::hooks::HookRunner::new();
1508 if config.hooks.builtin.command_logger {
1509 runner.register(Box::new(crate::hooks::builtin::CommandLoggerHook::new()));
1510 }
1511 if config.hooks.builtin.webhook_audit.enabled {
1512 runner.register(Box::new(crate::hooks::builtin::WebhookAuditHook::new(
1513 config.hooks.builtin.webhook_audit.clone(),
1514 )));
1515 }
1516 Some(Arc::new(runner))
1517 } else {
1518 None
1519 })
1520 .approval_manager(Some(Arc::new(approval_manager)))
1521 .build()?;
1522
1523 agent.channel_handles = AgentChannelHandles {
1526 ask_user: ask_user_handle,
1527 reaction: reaction_handle,
1528 poll: poll_handle,
1529 escalate: escalate_handle,
1530 };
1531
1532 Ok(agent)
1533 }
1534
1535 fn trim_history(&mut self) {
1536 let max = self.config.resolved.max_history_messages;
1537 if self.history.len() <= max {
1538 return;
1539 }
1540
1541 let mut system_messages = Vec::new();
1542 let mut other_messages = Vec::new();
1543
1544 for msg in self.history.drain(..) {
1545 match &msg {
1546 ConversationMessage::Chat(chat) if chat.role == "system" => {
1547 system_messages.push(msg);
1548 }
1549 _ => other_messages.push(msg),
1550 }
1551 }
1552
1553 if other_messages.len() > max {
1554 let initial_drop_count = other_messages.len() - max;
1555 let mut drop_count = initial_drop_count;
1556
1557 ::zeroclaw_log::record!(
1558 DEBUG,
1559 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1560 .with_category(::zeroclaw_log::EventCategory::Agent)
1561 .with_attrs(::serde_json::json!({
1562 "total_messages": other_messages.len(),
1563 "max_history": max,
1564 "initial_drop_count": initial_drop_count,
1565 })),
1566 "trim_history: dropping oldest messages"
1567 );
1568
1569 let before_orphan_tr = drop_count;
1577 while drop_count < other_messages.len()
1578 && matches!(
1579 &other_messages[drop_count],
1580 ConversationMessage::ToolResults(_)
1581 )
1582 {
1583 drop_count += 1;
1584 }
1585 if drop_count > before_orphan_tr {
1586 ::zeroclaw_log::record!(
1587 DEBUG,
1588 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1589 .with_category(::zeroclaw_log::EventCategory::Agent)
1590 .with_attrs(::serde_json::json!({
1591 "extra_dropped": drop_count - before_orphan_tr,
1592 })),
1593 "trim_history: dropped orphan ToolResults at head"
1594 );
1595 }
1596
1597 let before_orphan_ac = drop_count;
1604 while drop_count < other_messages.len()
1605 && matches!(
1606 &other_messages[drop_count],
1607 ConversationMessage::AssistantToolCalls { .. }
1608 )
1609 {
1610 drop_count += 1;
1612 if drop_count < other_messages.len()
1613 && matches!(
1614 &other_messages[drop_count],
1615 ConversationMessage::ToolResults(_)
1616 )
1617 {
1618 drop_count += 1;
1619 }
1620 }
1621 if drop_count > before_orphan_ac {
1622 ::zeroclaw_log::record!(
1623 DEBUG,
1624 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1625 .with_category(::zeroclaw_log::EventCategory::Agent)
1626 .with_attrs(::serde_json::json!({
1627 "extra_dropped": drop_count - before_orphan_ac,
1628 })),
1629 "trim_history: dropped orphan AssistantToolCalls at head"
1630 );
1631 }
1632
1633 if drop_count >= other_messages.len() {
1643 ::zeroclaw_log::record!(
1644 WARN,
1645 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1646 .with_category(::zeroclaw_log::EventCategory::Agent)
1647 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1648 .with_attrs(::serde_json::json!({
1649 "history_len": other_messages.len(),
1650 "max_history_messages": max,
1651 })),
1652 "trim_history: orphan-cascade would empty all non-system messages; skipping trim to preserve conversation"
1653 );
1654 self.history = system_messages;
1655 self.history.extend(other_messages);
1656 return;
1657 }
1658
1659 ::zeroclaw_log::record!(
1660 DEBUG,
1661 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
1662 .with_category(::zeroclaw_log::EventCategory::Agent)
1663 .with_outcome(::zeroclaw_log::EventOutcome::Success)
1664 .with_attrs(::serde_json::json!({
1665 "total_dropped": drop_count,
1666 "remaining": other_messages.len() - drop_count,
1667 })),
1668 "trim_history: complete"
1669 );
1670
1671 other_messages.drain(0..drop_count);
1672 }
1673
1674 self.history = system_messages;
1675 self.history.extend(other_messages);
1676 }
1677
1678 fn build_system_prompt(&self) -> Result<String> {
1679 let expose_text_tool_protocol = !self.config.resolved.strict_tool_parsing
1680 || self.tool_dispatcher.should_send_tool_specs();
1681 let no_tools: Vec<Box<dyn Tool>> = Vec::new();
1682 let prompt_tools = if expose_text_tool_protocol {
1683 &self.tools
1684 } else {
1685 &no_tools
1686 };
1687 let instructions = self.tool_dispatcher.prompt_instructions(prompt_tools);
1688 let ctx = PromptContext {
1689 workspace_dir: &self.workspace_dir,
1690 agent_workspace_dir: &self.agent_workspace_dir,
1691 model_name: &self.model_name,
1692 tools: prompt_tools,
1693 skills: &self.skills,
1694 skills_prompt_mode: self.skills_prompt_mode,
1695 identity_config: Some(&self.identity_config),
1696 dispatcher_instructions: &instructions,
1697 sends_native_tool_specs: self.tool_dispatcher.should_send_tool_specs()
1698 && !prompt_tools.is_empty(),
1699 security_summary: self.security_summary.clone(),
1700 autonomy_level: self.autonomy_level,
1701 };
1702 self.prompt_builder.build(&ctx)
1703 }
1704
1705 async fn prepare_provider_messages(
1706 &mut self,
1707 messages: &[ChatMessage],
1708 ) -> Result<Vec<ChatMessage>> {
1709 let prepared = zeroclaw_providers::multimodal::prepare_messages_for_provider_cached(
1710 messages,
1711 &self.multimodal_config,
1712 &mut self.image_cache,
1713 )
1714 .await?;
1715 Ok(prepared.messages)
1716 }
1717
1718 async fn execute_tool_call(&self, call: &ParsedToolCall, turn_id: &str) -> ToolExecutionResult {
1719 let start = Instant::now();
1720
1721 let mut tool_name = call.name.clone();
1725 let mut tool_args = call.arguments.clone();
1726 if let Some(ref hooks) = self.hook_runner {
1727 match hooks
1728 .run_before_tool_call(tool_name.clone(), tool_args.clone())
1729 .await
1730 {
1731 crate::hooks::HookResult::Continue((n, a)) => {
1732 tool_name = n;
1733 tool_args = a;
1734 }
1735 crate::hooks::HookResult::Cancel(reason) => {
1736 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"tool": call.name, "reason": reason.to_string()})), "tool call cancelled by hook");
1737 return ToolExecutionResult {
1738 name: call.name.clone(),
1739 output: format!("Cancelled by hook: {reason}"),
1740 success: false,
1741 tool_call_id: call.tool_call_id.clone(),
1742 };
1743 }
1744 }
1745 }
1746
1747 super::set_runtime_approved_arg(&tool_name, &mut tool_args, false);
1748
1749 let mut approval_requirement = self
1754 .approval_manager
1755 .as_deref()
1756 .map(|mgr| mgr.approval_requirement(&tool_name))
1757 .unwrap_or(ApprovalRequirement::NotRequired);
1758 if let Some(mgr) = self.approval_manager.as_deref()
1759 && approval_requirement == ApprovalRequirement::Prompt
1760 {
1761 let request = ApprovalRequest {
1762 tool_name: tool_name.clone(),
1763 arguments: tool_args.clone(),
1764 };
1765
1766 let (decision, decision_channel) = if mgr.is_non_interactive() {
1767 let ch_request = zeroclaw_api::channel::ChannelApprovalRequest {
1772 tool_name: request.tool_name.clone(),
1773 arguments_summary: crate::approval::summarize_args(&request.arguments),
1774 raw_arguments: Some(request.arguments.clone()),
1775 };
1776 let mut channel_decision: Option<zeroclaw_api::channel::ChannelApprovalResponse> =
1777 None;
1778 let mut decision_channel_name = String::new();
1779 let channels: Vec<(String, Arc<dyn zeroclaw_api::channel::Channel>)> = self
1782 .channel_handles
1783 .ask_user
1784 .as_ref()
1785 .map(|h| {
1786 h.read()
1787 .iter()
1788 .map(|(k, v)| (k.clone(), Arc::clone(v)))
1789 .collect()
1790 })
1791 .unwrap_or_default();
1792 for (ch_name, ch) in &channels {
1793 match ch.request_approval("", &ch_request).await {
1794 Ok(Some(r)) => {
1795 decision_channel_name = ch_name.clone();
1796 channel_decision = Some(r);
1797 break;
1798 }
1799 Ok(None) => continue,
1800 Err(e) => {
1801 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"tool": tool_name, "channel": ch_name, "error": format!("{}", e)})), "channel approval request failed");
1802 }
1803 }
1804 }
1805 let approval = match channel_decision {
1806 Some(zeroclaw_api::channel::ChannelApprovalResponse::Approve) => {
1807 ApprovalResponse::Yes
1808 }
1809 Some(zeroclaw_api::channel::ChannelApprovalResponse::AlwaysApprove) => {
1810 ApprovalResponse::Always
1811 }
1812 Some(zeroclaw_api::channel::ChannelApprovalResponse::Deny) => {
1813 ApprovalResponse::No
1814 }
1815 Some(zeroclaw_api::channel::ChannelApprovalResponse::DenyWithEdit {
1816 replacement,
1817 }) => ApprovalResponse::ReplaceWith(replacement),
1818 None => {
1819 ::zeroclaw_log::record!(
1820 WARN,
1821 ::zeroclaw_log::Event::new(
1822 module_path!(),
1823 ::zeroclaw_log::Action::Note
1824 )
1825 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1826 .with_attrs(::serde_json::json!({"tool": tool_name})),
1827 "no approval channel handled this request — denying. \
1828 Configure a back-channel (ACP or WS) that implements \
1829 request_approval to enable interactive approval."
1830 );
1831 ApprovalResponse::No
1832 }
1833 };
1834 (approval, decision_channel_name)
1835 } else {
1836 (mgr.prompt_cli(&request), String::new())
1837 };
1838
1839 mgr.record_decision(&tool_name, &tool_args, &decision, &decision_channel);
1840
1841 if decision == ApprovalResponse::No {
1842 return ToolExecutionResult {
1843 name: tool_name,
1844 output: "Denied by user.".to_string(),
1845 success: false,
1846 tool_call_id: call.tool_call_id.clone(),
1847 };
1848 }
1849
1850 if let ApprovalResponse::ReplaceWith(replacement) = &decision {
1851 return ToolExecutionResult {
1852 name: tool_name,
1853 output: crate::approval::sanitize_tool_replacement(replacement),
1854 success: true,
1855 tool_call_id: call.tool_call_id.clone(),
1856 };
1857 }
1858
1859 if matches!(decision, ApprovalResponse::Yes | ApprovalResponse::Always) {
1860 approval_requirement = ApprovalRequirement::Approved;
1861 }
1862 }
1863 super::set_runtime_approved_arg(
1864 &tool_name,
1865 &mut tool_args,
1866 approval_requirement == ApprovalRequirement::Approved,
1867 );
1868
1869 let args_json = tool_args.to_string();
1873 let tool_call_id = call.tool_call_id.clone();
1874
1875 ::zeroclaw_log::record!(
1877 DEBUG,
1878 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Invoke)
1879 .with_category(::zeroclaw_log::EventCategory::Tool)
1880 .with_attrs(::serde_json::json!({
1881 "tool": tool_name,
1882 "tool_call_id": tool_call_id,
1883 "input": args_json,
1884 })),
1885 format!("tool call: {tool_name}")
1886 );
1887
1888 let (result, success) =
1890 if let Some(tool) = self.tools.iter().find(|t| t.name() == tool_name) {
1891 match tool.execute(tool_args.clone()).await {
1892 Ok(r) => {
1893 let (outcome_text, ok) = if r.success {
1894 (r.output, true)
1895 } else {
1896 (format!("Error: {}", r.error.unwrap_or(r.output)), false)
1897 };
1898 self.observer.record_event(&ObserverEvent::ToolCall {
1899 tool: tool_name.clone(),
1900 tool_call_id: tool_call_id.clone(),
1901 duration: start.elapsed(),
1902 success: ok,
1903 arguments: Some(args_json.clone()),
1904 result: Some(super::loop_::scrub_credentials(&outcome_text)),
1905 channel: None,
1906 agent_alias: self.observer_agent_alias(),
1907 turn_id: Some(turn_id.to_string()),
1908 });
1909 (outcome_text, ok)
1910 }
1911 Err(e) => {
1912 let err_text = format!("Error executing {}: {e}", tool_name);
1913 self.observer.record_event(&ObserverEvent::ToolCall {
1914 tool: tool_name.clone(),
1915 tool_call_id: tool_call_id.clone(),
1916 duration: start.elapsed(),
1917 success: false,
1918 arguments: Some(args_json.clone()),
1919 result: Some(super::loop_::scrub_credentials(&err_text)),
1920 channel: None,
1921 agent_alias: self.observer_agent_alias(),
1922 turn_id: Some(turn_id.to_string()),
1923 });
1924 (err_text, false)
1925 }
1926 }
1927 } else if let Some(activated_arc) = self.activated_tools.as_ref() {
1928 let activated_opt = activated_arc.lock().unwrap().get_resolved(&tool_name);
1929 if let Some(tool) = activated_opt {
1930 match tool.execute(tool_args.clone()).await {
1931 Ok(r) => {
1932 let (outcome_text, ok) = if r.success {
1933 (r.output, true)
1934 } else {
1935 (format!("Error: {}", r.error.unwrap_or(r.output)), false)
1936 };
1937 self.observer.record_event(&ObserverEvent::ToolCall {
1938 tool: tool_name.clone(),
1939 tool_call_id: tool_call_id.clone(),
1940 duration: start.elapsed(),
1941 success: ok,
1942 arguments: Some(args_json.clone()),
1943 result: Some(super::loop_::scrub_credentials(&outcome_text)),
1944 channel: None,
1945 agent_alias: self.observer_agent_alias(),
1946 turn_id: Some(turn_id.to_string()),
1947 });
1948 (outcome_text, ok)
1949 }
1950 Err(e) => {
1951 let err_text = format!("Error executing {}: {e}", tool_name);
1952 self.observer.record_event(&ObserverEvent::ToolCall {
1953 tool: tool_name.clone(),
1954 tool_call_id: tool_call_id.clone(),
1955 duration: start.elapsed(),
1956 success: false,
1957 arguments: Some(args_json.clone()),
1958 result: Some(super::loop_::scrub_credentials(&err_text)),
1959 channel: None,
1960 agent_alias: self.observer_agent_alias(),
1961 turn_id: Some(turn_id.to_string()),
1962 });
1963 (err_text, false)
1964 }
1965 }
1966 } else {
1967 (format!("Unknown tool: {}", tool_name), false)
1968 }
1969 } else {
1970 (format!("Unknown tool: {}", tool_name), false)
1971 };
1972
1973 let duration = start.elapsed();
1974
1975 if success {
1977 ::zeroclaw_log::record!(
1978 DEBUG,
1979 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
1980 .with_category(::zeroclaw_log::EventCategory::Tool)
1981 .with_outcome(::zeroclaw_log::EventOutcome::Success)
1982 .with_duration(duration.as_millis() as u64)
1983 .with_attrs(::serde_json::json!({
1984 "tool": tool_name,
1985 "tool_call_id": tool_call_id,
1986 "input": args_json,
1987 "output": result,
1988 })),
1989 format!("tool result: {tool_name}")
1990 );
1991 } else {
1992 ::zeroclaw_log::record!(
1993 WARN,
1994 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1995 .with_category(::zeroclaw_log::EventCategory::Tool)
1996 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1997 .with_duration(duration.as_millis() as u64)
1998 .with_attrs(::serde_json::json!({
1999 "tool": tool_name,
2000 "tool_call_id": tool_call_id,
2001 "input": args_json,
2002 "output": result,
2003 })),
2004 format!("tool failed: {tool_name}")
2005 );
2006 }
2007
2008 if let Some(ref hooks) = self.hook_runner {
2010 let tool_result_obj = crate::tools::ToolResult {
2011 success,
2012 output: result.clone(),
2013 error: None,
2014 };
2015 hooks
2016 .fire_after_tool_call(&tool_name, &tool_result_obj, duration)
2017 .await;
2018 }
2019
2020 ToolExecutionResult {
2021 name: tool_name,
2022 output: result,
2023 success,
2024 tool_call_id: call.tool_call_id.clone(),
2025 }
2026 }
2027
2028 async fn execute_tools(
2029 &self,
2030 calls: &[ParsedToolCall],
2031 turn_id: &str,
2032 ) -> Vec<ToolExecutionResult> {
2033 let approval_required = self.approval_manager.as_deref().is_some_and(|mgr| {
2034 calls
2035 .iter()
2036 .any(|call| mgr.needs_approval(call.name.as_str()))
2037 });
2038 if !self.config.resolved.parallel_tools || approval_required {
2039 let mut results = Vec::with_capacity(calls.len());
2040 for call in calls {
2041 results.push(self.execute_tool_call(call, turn_id).await);
2042 }
2043 return results;
2044 }
2045
2046 let futs: Vec<_> = calls
2047 .iter()
2048 .map(|call| self.execute_tool_call(call, turn_id))
2049 .collect();
2050 futures_util::future::join_all(futs).await
2051 }
2052
2053 fn classify_model(&self, user_message: &str) -> String {
2054 if let Some(decision) =
2055 super::classifier::classify_with_decision(&self.classification_config, user_message)
2056 && self.available_hints.contains(&decision.hint)
2057 {
2058 let resolved_model = self
2059 .route_model_by_hint
2060 .get(&decision.hint)
2061 .map(String::as_str)
2062 .unwrap_or("unknown");
2063 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"hint": decision.hint.as_str(), "model": resolved_model, "rule_priority": decision.priority, "message_length": user_message.len()})), "Classified message route");
2064 return format!("hint:{}", decision.hint);
2065 }
2066
2067 if let Some(ref ac) = self.config.resolved.auto_classify {
2069 let tier = super::eval::estimate_complexity(user_message);
2070 if let Some(hint) = ac.hint_for(tier)
2071 && self.available_hints.contains(&hint.to_string())
2072 {
2073 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"hint": hint, "complexity": format!("{:?}", tier), "message_length": user_message.len()})), "Auto-classified by complexity");
2074 return format!("hint:{hint}");
2075 }
2076 }
2077
2078 self.model_name.clone()
2079 }
2080
2081 pub async fn turn(&mut self, user_message: &str) -> Result<String> {
2082 if user_message.trim().is_empty() {
2092 ::zeroclaw_log::record!(
2093 WARN,
2094 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2095 .with_category(::zeroclaw_log::EventCategory::Agent)
2096 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2097 .with_attrs(::serde_json::json!({
2098 "reason": "empty_user_message",
2099 "entry_point": "Agent::turn",
2100 "raw_len": user_message.len(),
2101 })),
2102 "Refusing blank user turn (would emit timestamp-only message and risk prompt-template bleed-through)"
2103 );
2104 return Err(anyhow::Error::msg(
2105 "empty user message: refusing to dispatch a blank turn",
2106 ));
2107 }
2108
2109 if self.history.is_empty() {
2110 let system_prompt = self.build_system_prompt()?;
2111 self.history
2112 .push(ConversationMessage::Chat(ChatMessage::system(
2113 system_prompt,
2114 )));
2115 }
2116
2117 let context = self
2118 .memory_strategy
2119 .load_context(user_message, self.memory_session_id.as_deref())
2120 .await
2121 .unwrap_or_default();
2122
2123 if self.auto_save {
2124 let _ = self
2125 .memory
2126 .store(
2127 "user_msg",
2128 user_message,
2129 MemoryCategory::Conversation,
2130 self.memory_session_id.as_deref(),
2131 )
2132 .await;
2133 }
2134
2135 let now = chrono::Local::now();
2136 let (year, month, day) = (now.year(), now.month(), now.day());
2137 let (hour, minute, second) = (now.hour(), now.minute(), now.second());
2138 let tz = now.format("%Z");
2139 let date_str =
2140 format!("{year:04}-{month:02}-{day:02} {hour:02}:{minute:02}:{second:02} {tz}");
2141
2142 let enriched = if context.is_empty() {
2143 format!("[CURRENT DATE & TIME: {date_str}]\n\n{user_message}")
2144 } else {
2145 format!("[CURRENT DATE & TIME: {date_str}]\n\n{context}\n\n{user_message}")
2146 };
2147
2148 self.history
2149 .push(ConversationMessage::Chat(ChatMessage::user(enriched)));
2150
2151 let effective_model = self.classify_model(user_message);
2152
2153 let turn_id = Self::new_turn_id();
2154 let turn_started_at = Instant::now();
2155
2156 self.observer.record_event(&ObserverEvent::AgentStart {
2157 model_provider: self.model_provider_name.clone(),
2158 model: effective_model.clone(),
2159 channel: None,
2160 agent_alias: self.observer_agent_alias(),
2161 turn_id: Some(turn_id.clone()),
2162 });
2163
2164 let mut guard = TurnGuard {
2165 observer: Arc::clone(&self.observer),
2166 model_provider: self.model_provider_name.clone(),
2167 model: effective_model.clone(),
2168 turn_id: Some(turn_id.clone()),
2169 turn_started_at,
2170 agent_alias: self.observer_agent_alias(),
2171 total_input_tokens: 0,
2172 total_output_tokens: 0,
2173 saw_usage: false,
2174 done: false,
2175 };
2176
2177 for _ in 0..self.config.resolved.max_tool_iterations {
2178 let messages = self.tool_dispatcher.to_provider_messages(&self.history);
2179 let prepared_messages = self.prepare_provider_messages(&messages).await?;
2180
2181 let cache_key =
2185 self.response_cache_key_for_messages(&prepared_messages, &effective_model);
2186
2187 if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
2188 if let Ok(Some(cached)) = cache.get(key) {
2189 self.observer.record_event(&ObserverEvent::CacheHit {
2190 cache_type: "response".into(),
2191 tokens_saved: 0,
2192 });
2193 self.history
2194 .push(ConversationMessage::Chat(ChatMessage::assistant(
2195 cached.clone(),
2196 )));
2197 self.trim_history();
2198 return Ok(cached);
2199 }
2200 self.observer.record_event(&ObserverEvent::CacheMiss {
2201 cache_type: "response".into(),
2202 });
2203 }
2204
2205 {
2207 let msg_count = prepared_messages.len();
2208 let content_chars: usize = prepared_messages.iter().map(|m| m.content.len()).sum();
2209 ::zeroclaw_log::record!(
2210 DEBUG,
2211 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note,)
2212 .with_attrs(::serde_json::json!({
2213 "msg_count": msg_count,
2214 "content_chars": content_chars,
2215 "approx_tokens": content_chars / 4,
2216 "model": effective_model,
2217 })),
2218 "agent: outbound prompt size (non-streaming)"
2219 );
2220 }
2221
2222 let llm_started_at = Instant::now();
2223 self.observer.record_event(&ObserverEvent::LlmRequest {
2224 model_provider: self.model_provider_name.clone(),
2225 model: effective_model.clone(),
2226 messages_count: messages.len(),
2227 channel: None,
2228 agent_alias: self.observer_agent_alias(),
2229 turn_id: Some(turn_id.clone()),
2230 });
2231
2232 let response = match self
2233 .model_provider
2234 .chat(
2235 ChatRequest {
2236 messages: &prepared_messages,
2237 tools: if self.should_send_tool_specs() {
2238 Some(&self.tool_specs)
2239 } else {
2240 None
2241 },
2242 thinking: None,
2243 },
2244 &effective_model,
2245 self.temperature,
2246 )
2247 .await
2248 {
2249 Ok(resp) => {
2250 let (resp_input_tokens, resp_output_tokens) = resp
2251 .usage
2252 .as_ref()
2253 .map(|u| (u.input_tokens, u.output_tokens))
2254 .unwrap_or((None, None));
2255 if let Some(input) = resp_input_tokens {
2256 guard.total_input_tokens = guard.total_input_tokens.saturating_add(input);
2257 guard.saw_usage = true;
2258 }
2259 if let Some(output) = resp_output_tokens {
2260 guard.total_output_tokens =
2261 guard.total_output_tokens.saturating_add(output);
2262 guard.saw_usage = true;
2263 }
2264 self.observer.record_event(&ObserverEvent::LlmResponse {
2265 model_provider: self.model_provider_name.clone(),
2266 model: effective_model.clone(),
2267 duration: llm_started_at.elapsed(),
2268 success: true,
2269 error_message: None,
2270 input_tokens: resp_input_tokens,
2271 output_tokens: resp_output_tokens,
2272 channel: None,
2273 agent_alias: self.observer_agent_alias(),
2274 turn_id: Some(turn_id.clone()),
2275 });
2276 resp
2277 }
2278 Err(err) => {
2279 let safe_error = zeroclaw_providers::sanitize_api_error(&err.to_string());
2280 self.observer.record_event(&ObserverEvent::LlmResponse {
2281 model_provider: self.model_provider_name.clone(),
2282 model: effective_model.clone(),
2283 duration: llm_started_at.elapsed(),
2284 success: false,
2285 error_message: Some(safe_error),
2286 input_tokens: None,
2287 output_tokens: None,
2288 channel: None,
2289 agent_alias: self.observer_agent_alias(),
2290 turn_id: Some(turn_id.clone()),
2291 });
2292 return Err(err);
2293 }
2294 };
2295
2296 let (text, calls) = self.parse_response_for_effective_tools(&response);
2297 if calls.is_empty() {
2298 let final_text = if text.is_empty() && !self.tool_specs.is_empty() {
2299 response.text.unwrap_or_default()
2300 } else {
2301 text
2302 };
2303
2304 if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
2306 let token_count = response
2307 .usage
2308 .as_ref()
2309 .and_then(|u| u.output_tokens)
2310 .unwrap_or(0);
2311 #[allow(clippy::cast_possible_truncation)]
2312 let _ = cache.put(key, &effective_model, &final_text, token_count as u32);
2313 }
2314
2315 self.history
2316 .push(ConversationMessage::Chat(ChatMessage::assistant(
2317 final_text.clone(),
2318 )));
2319 self.trim_history();
2320
2321 return Ok(final_text);
2322 }
2323
2324 if !text.is_empty() {
2325 print!("{text}");
2326 let _ = std::io::stdout().flush();
2327 }
2328
2329 self.history.push(ConversationMessage::AssistantToolCalls {
2330 text: response.text.clone(),
2331 tool_calls: response.tool_calls.clone(),
2332 reasoning_content: response.reasoning_content.clone(),
2333 });
2334
2335 let results = self.execute_tools(&calls, &turn_id).await;
2336 let formatted = self.tool_dispatcher.format_results(&results);
2337 self.history.push(formatted);
2338 self.trim_history();
2339 }
2340
2341 anyhow::bail!(
2342 "Agent exceeded maximum tool iterations ({})",
2343 self.config.resolved.max_tool_iterations
2344 )
2345 }
2346
2347 pub async fn turn_streamed(
2358 &mut self,
2359 user_message: &str,
2360 event_tx: tokio::sync::mpsc::Sender<TurnEvent>,
2361 cancel_token: Option<tokio_util::sync::CancellationToken>,
2362 ) -> Result<(String, Vec<ConversationMessage>)> {
2363 if user_message.trim().is_empty() {
2367 ::zeroclaw_log::record!(
2368 WARN,
2369 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2370 .with_category(::zeroclaw_log::EventCategory::Agent)
2371 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2372 .with_attrs(::serde_json::json!({
2373 "reason": "empty_user_message",
2374 "entry_point": "Agent::turn_streamed",
2375 "raw_len": user_message.len(),
2376 })),
2377 "Refusing blank user turn (would emit timestamp-only message and risk prompt-template bleed-through)"
2378 );
2379 return Err(anyhow::Error::msg(
2380 "empty user message: refusing to dispatch a blank turn",
2381 ));
2382 }
2383
2384 self.turn_streamed_with_steering_state(user_message, event_tx, cancel_token, None)
2385 .await
2386 .map(|outcome| (outcome.response, outcome.new_messages))
2387 .map_err(|err| err.error)
2388 }
2389
2390 pub async fn turn_streamed_with_steering_state(
2391 &mut self,
2392 user_message: &str,
2393 event_tx: tokio::sync::mpsc::Sender<TurnEvent>,
2394 cancel_token: Option<tokio_util::sync::CancellationToken>,
2395 mut steering_rx: Option<&mut tokio::sync::mpsc::Receiver<String>>,
2396 ) -> std::result::Result<StreamedTurnSuccess, StreamedTurnError> {
2397 if user_message.trim().is_empty() {
2401 ::zeroclaw_log::record!(
2402 WARN,
2403 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2404 .with_category(::zeroclaw_log::EventCategory::Agent)
2405 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2406 .with_attrs(::serde_json::json!({
2407 "reason": "empty_user_message",
2408 "entry_point": "Agent::turn_streamed_with_steering_state",
2409 "raw_len": user_message.len(),
2410 })),
2411 "Refusing blank user turn (would emit timestamp-only message and risk prompt-template bleed-through)"
2412 );
2413 return Err(StreamedTurnError {
2414 error: anyhow::Error::msg("empty user message: refusing to dispatch a blank turn"),
2415 committed_response: String::new(),
2416 new_messages: Vec::new(),
2417 });
2418 }
2419
2420 if self.history.is_empty() {
2422 let system_prompt = self
2423 .build_system_prompt()
2424 .map_err(|error| StreamedTurnError {
2425 error,
2426 committed_response: String::new(),
2427 new_messages: Vec::new(),
2428 })?;
2429 self.history
2430 .push(ConversationMessage::Chat(ChatMessage::system(
2431 system_prompt,
2432 )));
2433 }
2434
2435 let mut new_msgs: Vec<ConversationMessage> = Vec::new();
2436 self.append_streamed_user_message_to_history(user_message, &mut new_msgs)
2437 .await;
2438
2439 let effective_model = self.classify_model(user_message);
2440 let turn_started_at = Instant::now();
2441 let turn_id = Self::new_turn_id();
2442 let mut committed_response = String::new();
2443
2444 self.observer.record_event(&ObserverEvent::AgentStart {
2445 model_provider: self.model_provider_name.clone(),
2446 model: effective_model.clone(),
2447 channel: None,
2448 agent_alias: self.observer_agent_alias(),
2449 turn_id: Some(turn_id.clone()),
2450 });
2451
2452 let mut guard = TurnGuard {
2453 observer: Arc::clone(&self.observer),
2454 model_provider: self.model_provider_name.clone(),
2455 model: effective_model.clone(),
2456 turn_id: Some(turn_id.clone()),
2457 turn_started_at,
2458 agent_alias: self.observer_agent_alias(),
2459 total_input_tokens: 0,
2460 total_output_tokens: 0,
2461 saw_usage: false,
2462 done: false,
2463 };
2464
2465 for _ in 0..self.config.resolved.max_tool_iterations {
2467 if cancel_token
2469 .as_ref()
2470 .is_some_and(tokio_util::sync::CancellationToken::is_cancelled)
2471 {
2472 self.append_streamed_assistant_message_to_history(
2473 "[interrupted by user]".to_string(),
2474 &mut new_msgs,
2475 &mut committed_response,
2476 );
2477 return Err(StreamedTurnError {
2478 error: crate::agent::loop_::ToolLoopCancelled.into(),
2479 committed_response,
2480 new_messages: new_msgs,
2481 });
2482 }
2483
2484 for steering_message in Self::drain_steering_messages(&mut steering_rx) {
2485 self.append_streamed_user_message_to_history(&steering_message, &mut new_msgs)
2486 .await;
2487 }
2488
2489 let messages = self.tool_dispatcher.to_provider_messages(&self.history);
2490 let prepared_messages = match self.prepare_provider_messages(&messages).await {
2491 Ok(messages) => messages,
2492 Err(error) => {
2493 return Err(StreamedTurnError {
2494 error,
2495 committed_response,
2496 new_messages: new_msgs,
2497 });
2498 }
2499 };
2500
2501 let cache_key =
2503 self.response_cache_key_for_messages(&prepared_messages, &effective_model);
2504
2505 if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
2506 if let Ok(Some(cached)) = cache.get(key) {
2507 self.observer.record_event(&ObserverEvent::CacheHit {
2508 cache_type: "response".into(),
2509 tokens_saved: 0,
2510 });
2511 let cached_msg =
2512 ConversationMessage::Chat(ChatMessage::assistant(cached.clone()));
2513 new_msgs.push(cached_msg.clone());
2514 self.history.push(cached_msg);
2515 self.trim_history();
2516 self.observer.record_event(&ObserverEvent::TurnComplete);
2517 committed_response.push_str(&cached);
2518 return Ok(StreamedTurnSuccess {
2519 response: committed_response,
2520 new_messages: new_msgs,
2521 });
2522 }
2523 self.observer.record_event(&ObserverEvent::CacheMiss {
2524 cache_type: "response".into(),
2525 });
2526 }
2527
2528 {
2535 let msg_count = prepared_messages.len();
2536 let content_chars: usize = prepared_messages.iter().map(|m| m.content.len()).sum();
2537 ::zeroclaw_log::record!(
2538 DEBUG,
2539 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note,)
2540 .with_attrs(::serde_json::json!({
2541 "msg_count": msg_count,
2542 "content_chars": content_chars,
2543 "approx_tokens": content_chars / 4,
2544 "model": effective_model,
2545 })),
2546 "agent: outbound prompt size (streaming)"
2547 );
2548 }
2549
2550 use futures_util::StreamExt;
2554
2555 let llm_started_at = Instant::now();
2556 self.observer.record_event(&ObserverEvent::LlmRequest {
2557 model_provider: self.model_provider_name.clone(),
2558 model: effective_model.clone(),
2559 messages_count: messages.len(),
2560 channel: None,
2561 agent_alias: self.observer_agent_alias(),
2562 turn_id: Some(turn_id.clone()),
2563 });
2564
2565 let stream_opts = zeroclaw_providers::traits::StreamOptions::new(
2566 self.model_provider.supports_streaming(),
2567 );
2568 let mut stream = self.model_provider.stream_chat(
2569 zeroclaw_providers::ChatRequest {
2570 messages: &prepared_messages,
2571 tools: if self.should_send_tool_specs() {
2572 Some(&self.tool_specs)
2573 } else {
2574 None
2575 },
2576 thinking: None,
2577 },
2578 &effective_model,
2579 self.temperature,
2580 stream_opts,
2581 );
2582
2583 let mut streamed_text = String::new();
2584 let mut streamed_reasoning = String::new();
2585 let mut streamed_tool_calls: Vec<zeroclaw_providers::traits::ToolCall> = Vec::new();
2586 let mut streamed_usage: Option<zeroclaw_providers::traits::TokenUsage> = None;
2587 let mut got_stream = false;
2588 let mut visible_streamed_output = false;
2589 let mut stream_error: Option<String> = None;
2590 let mut pre_executed_call_ids: HashMap<String, VecDeque<String>> = HashMap::new();
2591 let mut was_cancelled = false;
2592
2593 loop {
2598 let next_item = stream.next();
2599 let item = if let Some(ref token) = cancel_token {
2600 tokio::select! {
2601 biased;
2602 () = token.cancelled() => {
2603 was_cancelled = true;
2604 ::zeroclaw_log::record!(
2605 INFO,
2606 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Cancel)
2607 .with_category(::zeroclaw_log::EventCategory::Agent)
2608 .with_outcome(::zeroclaw_log::EventOutcome::Success)
2609 .with_attrs(::serde_json::json!({
2610 "streamed_text_len": streamed_text.len(),
2611 "got_stream": got_stream,
2612 })),
2613 "turn: cancel token fired mid-stream — breaking consume loop, dropping stream to abort parser"
2614 );
2615 break;
2616 }
2617 item = next_item => item,
2618 }
2619 } else {
2620 next_item.await
2621 };
2622
2623 let Some(item) = item else { break };
2624 match item {
2625 Ok(event) => match event {
2626 zeroclaw_providers::traits::StreamEvent::TextDelta(chunk) => {
2627 if let Some(reasoning) = chunk.reasoning
2628 && !reasoning.is_empty()
2629 {
2630 streamed_reasoning.push_str(&reasoning);
2634 Self::send_turn_event(
2635 &event_tx,
2636 cancel_token.as_ref(),
2637 TurnEvent::Thinking { delta: reasoning },
2638 )
2639 .await;
2640 visible_streamed_output = true;
2641 }
2642 if !chunk.delta.is_empty() {
2643 got_stream = true;
2644 streamed_text.push_str(&chunk.delta);
2645 Self::send_turn_event(
2646 &event_tx,
2647 cancel_token.as_ref(),
2648 TurnEvent::Chunk { delta: chunk.delta },
2649 )
2650 .await;
2651 visible_streamed_output = true;
2652 }
2653 }
2654 zeroclaw_providers::traits::StreamEvent::ToolCall(tc) => {
2655 got_stream = true;
2656 streamed_tool_calls.push(tc);
2659 }
2660 zeroclaw_providers::traits::StreamEvent::PreExecutedToolCall {
2661 name,
2662 args,
2663 } => {
2664 let call_id = uuid::Uuid::new_v4().to_string();
2665 pre_executed_call_ids
2666 .entry(name.clone())
2667 .or_default()
2668 .push_back(call_id.clone());
2669 Self::send_turn_event(
2670 &event_tx,
2671 cancel_token.as_ref(),
2672 TurnEvent::ToolCall {
2673 id: call_id,
2674 name,
2675 args: serde_json::from_str(&args).unwrap_or_default(),
2676 },
2677 )
2678 .await;
2679 visible_streamed_output = true;
2680 }
2682 zeroclaw_providers::traits::StreamEvent::PreExecutedToolResult {
2683 name,
2684 output,
2685 } => {
2686 let result_id = pre_executed_call_ids
2687 .get_mut(&name)
2688 .and_then(|ids| ids.pop_front())
2689 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
2690 Self::send_turn_event(
2691 &event_tx,
2692 cancel_token.as_ref(),
2693 TurnEvent::ToolResult {
2694 id: result_id,
2695 name,
2696 output,
2697 },
2698 )
2699 .await;
2700 visible_streamed_output = true;
2701 }
2702 zeroclaw_providers::traits::StreamEvent::Usage(usage) => {
2703 streamed_usage = Some(usage);
2704 }
2705 zeroclaw_providers::traits::StreamEvent::Final => break,
2706 },
2707 Err(error) => {
2708 stream_error = Some(error.to_string());
2709 break;
2710 }
2711 }
2712 }
2713 drop(stream);
2715
2716 if was_cancelled {
2720 let partial =
2721 Self::marked_partial_response(&streamed_text, "[interrupted by user]");
2722 self.append_streamed_assistant_message_to_history(
2723 partial,
2724 &mut new_msgs,
2725 &mut committed_response,
2726 );
2727 self.observer.record_event(&ObserverEvent::LlmResponse {
2728 model_provider: self.model_provider_name.clone(),
2729 model: effective_model.clone(),
2730 duration: llm_started_at.elapsed(),
2731 success: false,
2732 error_message: Some("request cancelled by user".into()),
2733 input_tokens: None,
2734 output_tokens: None,
2735 channel: None,
2736 agent_alias: self.observer_agent_alias(),
2737 turn_id: Some(turn_id.clone()),
2738 });
2739 return Err(StreamedTurnError {
2740 error: crate::agent::loop_::ToolLoopCancelled.into(),
2741 committed_response,
2742 new_messages: new_msgs,
2743 });
2744 }
2745
2746 if stream_error.is_some() && visible_streamed_output {
2747 if !streamed_text.is_empty() {
2748 let partial =
2749 Self::marked_partial_response(&streamed_text, "[stream interrupted]");
2750 self.append_streamed_assistant_message_to_history(
2751 partial,
2752 &mut new_msgs,
2753 &mut committed_response,
2754 );
2755 }
2756 let safe_error = zeroclaw_providers::sanitize_api_error(
2757 stream_error.as_deref().unwrap_or_default(),
2758 );
2759 self.observer.record_event(&ObserverEvent::LlmResponse {
2760 model_provider: self.model_provider_name.clone(),
2761 model: effective_model.clone(),
2762 duration: llm_started_at.elapsed(),
2763 success: false,
2764 error_message: Some(safe_error),
2765 input_tokens: None,
2766 output_tokens: None,
2767 channel: None,
2768 agent_alias: self.observer_agent_alias(),
2769 turn_id: Some(turn_id.clone()),
2770 });
2771 return Err(StreamedTurnError {
2772 error: anyhow::Error::msg(stream_error.unwrap_or_default()),
2773 committed_response,
2774 new_messages: new_msgs,
2775 });
2776 }
2777
2778 let response = if got_stream && stream_error.is_none() {
2784 zeroclaw_providers::ChatResponse {
2790 text: Some(streamed_text),
2791 tool_calls: streamed_tool_calls,
2792 usage: streamed_usage.clone(),
2793 reasoning_content: if streamed_reasoning.is_empty() {
2794 None
2795 } else {
2796 Some(streamed_reasoning)
2797 },
2798 }
2799 } else {
2800 if let Some(error) = stream_error.as_ref() {
2801 ::zeroclaw_log::record!(
2802 WARN,
2803 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2804 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2805 .with_attrs(::serde_json::json!({
2806 "model": effective_model.as_str(),
2807 "error": zeroclaw_providers::sanitize_api_error(error),
2808 })),
2809 "turn_streamed provider stream failed; falling back to non-streaming chat"
2810 );
2811 }
2812 let chat_fut = self.model_provider.chat(
2814 ChatRequest {
2815 messages: &prepared_messages,
2816 tools: if self.should_send_tool_specs() {
2817 Some(&self.tool_specs)
2818 } else {
2819 None
2820 },
2821 thinking: None,
2822 },
2823 &effective_model,
2824 self.temperature,
2825 );
2826 let chat_result = if let Some(ref token) = cancel_token {
2827 tokio::select! {
2828 biased;
2829 () = token.cancelled() => {
2830 let partial = if streamed_text.is_empty() {
2831 "[interrupted by user]".to_string()
2832 } else {
2833 Self::marked_partial_response(
2834 &streamed_text,
2835 "[interrupted by user]",
2836 )
2837 };
2838 self.append_streamed_assistant_message_to_history(
2839 partial,
2840 &mut new_msgs,
2841 &mut committed_response,
2842 );
2843 self.observer.record_event(&ObserverEvent::LlmResponse {
2844 model_provider: self.model_provider_name.clone(),
2845 model: effective_model.clone(),
2846 duration: llm_started_at.elapsed(),
2847 success: false,
2848 error_message: Some("request cancelled by user".into()),
2849 input_tokens: None,
2850 output_tokens: None,
2851 channel: None,
2852 agent_alias: self.observer_agent_alias(),
2853 turn_id: Some(turn_id.clone()),
2854 });
2855 return Err(StreamedTurnError {
2856 error: crate::agent::loop_::ToolLoopCancelled.into(),
2857 committed_response,
2858 new_messages: new_msgs,
2859 });
2860 }
2861 result = chat_fut => result,
2862 }
2863 } else {
2864 chat_fut.await
2865 };
2866 match chat_result {
2867 Ok(resp) => resp,
2868 Err(error) => {
2869 let safe_error = zeroclaw_providers::sanitize_api_error(&error.to_string());
2870 self.observer.record_event(&ObserverEvent::LlmResponse {
2871 model_provider: self.model_provider_name.clone(),
2872 model: effective_model.clone(),
2873 duration: llm_started_at.elapsed(),
2874 success: false,
2875 error_message: Some(safe_error),
2876 input_tokens: None,
2877 output_tokens: None,
2878 channel: None,
2879 agent_alias: self.observer_agent_alias(),
2880 turn_id: Some(turn_id.clone()),
2881 });
2882 if got_stream && !streamed_text.is_empty() {
2883 let partial = Self::marked_partial_response(
2884 &streamed_text,
2885 "[stream interrupted]",
2886 );
2887 self.append_streamed_assistant_message_to_history(
2888 partial,
2889 &mut new_msgs,
2890 &mut committed_response,
2891 );
2892 }
2893 return Err(StreamedTurnError {
2894 error,
2895 committed_response,
2896 new_messages: new_msgs,
2897 });
2898 }
2899 }
2900 };
2901
2902 let (resp_input_tokens, resp_output_tokens) = response
2903 .usage
2904 .as_ref()
2905 .map(|u| (u.input_tokens, u.output_tokens))
2906 .unwrap_or((None, None));
2907 if let Some(input) = resp_input_tokens {
2908 guard.total_input_tokens = guard.total_input_tokens.saturating_add(input);
2909 guard.saw_usage = true;
2910 }
2911 if let Some(output) = resp_output_tokens {
2912 guard.total_output_tokens = guard.total_output_tokens.saturating_add(output);
2913 guard.saw_usage = true;
2914 }
2915 self.observer.record_event(&ObserverEvent::LlmResponse {
2916 model_provider: self.model_provider_name.clone(),
2917 model: effective_model.clone(),
2918 duration: llm_started_at.elapsed(),
2919 success: true,
2920 error_message: None,
2921 input_tokens: resp_input_tokens,
2922 output_tokens: resp_output_tokens,
2923 channel: None,
2924 agent_alias: self.observer_agent_alias(),
2925 turn_id: Some(turn_id.clone()),
2926 });
2927
2928 if let Some(ref usage) = response.usage {
2933 Self::send_turn_event(
2934 &event_tx,
2935 cancel_token.as_ref(),
2936 TurnEvent::Usage {
2937 input_tokens: usage.input_tokens,
2938 cached_input_tokens: usage.cached_input_tokens,
2939 output_tokens: usage.output_tokens,
2940 cost_usd: None,
2941 },
2942 )
2943 .await;
2944 }
2945
2946 let (text, mut calls) = self.parse_response_for_effective_tools(&response);
2947 if calls.is_empty() {
2948 let final_text = if text.is_empty() && !self.tool_specs.is_empty() {
2949 response.text.unwrap_or_default()
2950 } else {
2951 text
2952 };
2953
2954 let steering_messages = Self::drain_steering_messages(&mut steering_rx);
2955 if !steering_messages.is_empty() {
2956 if !final_text.is_empty() {
2957 let assistant_msg =
2958 ConversationMessage::Chat(ChatMessage::assistant(final_text.clone()));
2959 new_msgs.push(assistant_msg.clone());
2960 self.history.push(assistant_msg);
2961 committed_response.push_str(&final_text);
2962 self.trim_history();
2963 }
2964
2965 for steering_message in steering_messages {
2966 self.append_streamed_user_message_to_history(
2967 &steering_message,
2968 &mut new_msgs,
2969 )
2970 .await;
2971 }
2972 continue;
2973 }
2974
2975 if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
2977 let token_count = response
2978 .usage
2979 .as_ref()
2980 .and_then(|u| u.output_tokens)
2981 .unwrap_or(0);
2982 #[allow(clippy::cast_possible_truncation)]
2983 let _ = cache.put(key, &effective_model, &final_text, token_count as u32);
2984 }
2985
2986 if !got_stream && !final_text.is_empty() {
2988 Self::send_turn_event(
2989 &event_tx,
2990 cancel_token.as_ref(),
2991 TurnEvent::Chunk {
2992 delta: final_text.clone(),
2993 },
2994 )
2995 .await;
2996 }
2997
2998 new_msgs.push(ConversationMessage::Chat(ChatMessage::assistant(
2999 final_text.clone(),
3000 )));
3001 self.history
3002 .push(ConversationMessage::Chat(ChatMessage::assistant(
3003 final_text.clone(),
3004 )));
3005 committed_response.push_str(&final_text);
3006 self.trim_history();
3007 self.observer.record_event(&ObserverEvent::TurnComplete);
3008 return Ok(StreamedTurnSuccess {
3009 response: committed_response,
3010 new_messages: new_msgs,
3011 });
3012 }
3013
3014 for call in &mut calls {
3016 if call.tool_call_id.is_none() {
3017 call.tool_call_id = Some(uuid::Uuid::new_v4().to_string());
3018 }
3019 }
3020
3021 let tool_call_msg = ConversationMessage::AssistantToolCalls {
3023 text: response.text.clone(),
3024 tool_calls: response.tool_calls.clone(),
3025 reasoning_content: response.reasoning_content.clone(),
3026 };
3027 new_msgs.push(tool_call_msg.clone());
3028 let serial_dispatch =
3041 !self.config.resolved.parallel_tools || self.approval_manager.is_some();
3042
3043 let results = if serial_dispatch {
3044 let mut serial_results: Vec<ToolExecutionResult> = Vec::with_capacity(calls.len());
3045 for (idx, call) in calls.iter().enumerate() {
3046 if let Some(ref token) = cancel_token
3047 && token.is_cancelled()
3048 {
3049 self.history.push(tool_call_msg.clone());
3050 self.synthesize_cancelled_tool_results(
3051 vec![],
3052 &response.tool_calls,
3053 &mut new_msgs,
3054 );
3055 self.append_streamed_assistant_message_to_history(
3056 "[interrupted by user]".to_string(),
3057 &mut new_msgs,
3058 &mut committed_response,
3059 );
3060 return Err(StreamedTurnError {
3061 error: crate::agent::loop_::ToolLoopCancelled.into(),
3062 committed_response,
3063 new_messages: new_msgs,
3064 });
3065 }
3066
3067 let call_id = call.tool_call_id.as_ref().unwrap().clone();
3068 Self::send_turn_event(
3069 &event_tx,
3070 cancel_token.as_ref(),
3071 TurnEvent::ToolCall {
3072 id: call_id,
3073 name: call.name.clone(),
3074 args: call.arguments.clone(),
3075 },
3076 )
3077 .await;
3078
3079 let single = std::slice::from_ref(call);
3080 let result = if let Some(ref token) = cancel_token {
3081 tokio::select! {
3082 biased;
3083 () = token.cancelled() => {
3084 let completed: Vec<ToolResultMessage> = serial_results
3085 .iter()
3086 .map(|r| ToolResultMessage {
3087 tool_call_id: r.tool_call_id.clone().unwrap_or_default(),
3088 content: r.output.clone(),
3089 })
3090 .collect();
3091 self.history.push(tool_call_msg.clone());
3092 self.synthesize_cancelled_tool_results(
3093 completed,
3094 &response.tool_calls[idx..],
3095 &mut new_msgs,
3096 );
3097 self.append_streamed_assistant_message_to_history(
3098 "[interrupted by user]".to_string(),
3099 &mut new_msgs,
3100 &mut committed_response,
3101 );
3102 return Err(StreamedTurnError {
3103 error: crate::agent::loop_::ToolLoopCancelled.into(),
3104 committed_response,
3105 new_messages: new_msgs,
3106 });
3107 }
3108 mut r = self.execute_tools(single, &turn_id) => r.pop().expect("one call yields one result"),
3109 }
3110 } else {
3111 self.execute_tools(single, &turn_id)
3112 .await
3113 .pop()
3114 .expect("one call yields one result")
3115 };
3116
3117 let result_id = result.tool_call_id.as_ref().unwrap().clone();
3118 Self::send_turn_event(
3119 &event_tx,
3120 cancel_token.as_ref(),
3121 TurnEvent::ToolResult {
3122 id: result_id,
3123 name: result.name.clone(),
3124 output: result.output.clone(),
3125 },
3126 )
3127 .await;
3128
3129 serial_results.push(result);
3130 }
3131 serial_results
3132 } else {
3133 for call in &calls {
3134 let call_id = call.tool_call_id.as_ref().unwrap().clone();
3135 Self::send_turn_event(
3136 &event_tx,
3137 cancel_token.as_ref(),
3138 TurnEvent::ToolCall {
3139 id: call_id,
3140 name: call.name.clone(),
3141 args: call.arguments.clone(),
3142 },
3143 )
3144 .await;
3145 }
3146
3147 let results = if let Some(ref token) = cancel_token {
3148 tokio::select! {
3149 biased;
3150 () = token.cancelled() => {
3151 self.history.push(tool_call_msg.clone());
3152 self.synthesize_cancelled_tool_results(
3153 vec![],
3154 &response.tool_calls,
3155 &mut new_msgs,
3156 );
3157 self.append_streamed_assistant_message_to_history(
3158 "[interrupted by user]".to_string(),
3159 &mut new_msgs,
3160 &mut committed_response,
3161 );
3162 return Err(StreamedTurnError {
3163 error: crate::agent::loop_::ToolLoopCancelled.into(),
3164 committed_response,
3165 new_messages: new_msgs,
3166 });
3167 }
3168 results = self.execute_tools(&calls, &turn_id) => results,
3169 }
3170 } else {
3171 self.execute_tools(&calls, &turn_id).await
3172 };
3173
3174 for result in &results {
3175 let result_id = result.tool_call_id.as_ref().unwrap().clone();
3176 Self::send_turn_event(
3177 &event_tx,
3178 cancel_token.as_ref(),
3179 TurnEvent::ToolResult {
3180 id: result_id,
3181 name: result.name.clone(),
3182 output: result.output.clone(),
3183 },
3184 )
3185 .await;
3186 }
3187
3188 results
3189 };
3190
3191 let formatted = self.tool_dispatcher.format_results(&results);
3192 new_msgs.push(formatted.clone());
3193 self.history.push(tool_call_msg);
3194 self.history.push(formatted);
3195 self.trim_history();
3196 }
3197
3198 Err(StreamedTurnError {
3199 error: anyhow::Error::msg(format!(
3200 "Agent exceeded maximum tool iterations ({})",
3201 self.config.resolved.max_tool_iterations
3202 )),
3203 committed_response,
3204 new_messages: new_msgs,
3205 })
3206 }
3207
3208 pub async fn run_single(&mut self, message: &str) -> Result<String> {
3209 self.turn(message).await
3210 }
3211
3212 pub async fn run_interactive(&mut self) -> Result<()> {
3213 println!("🦀 ZeroClaw Interactive Mode");
3214 println!("Type /quit to exit.\n");
3215
3216 let (tx, mut rx) = tokio::sync::mpsc::channel(32);
3217 let cli = crate::agent::loop_::CLI_CHANNEL_FN
3218 .get()
3219 .expect("CLI channel factory not registered — call register_cli_channel_fn at startup")(
3220 );
3221
3222 let listen_handle = zeroclaw_spawn::spawn!(async move {
3223 let _ = zeroclaw_api::channel::Channel::listen(&*cli, tx).await;
3224 });
3225
3226 while let Some(msg) = rx.recv().await {
3227 let response = match self.turn(&msg.content).await {
3228 Ok(resp) => resp,
3229 Err(e) => {
3230 eprintln!("\nError: {e}\n");
3231 continue;
3232 }
3233 };
3234 println!("\n{response}\n");
3235 }
3236
3237 listen_handle.abort();
3238 Ok(())
3239 }
3240}
3241
3242pub async fn run(
3243 config: Config,
3244 agent_alias: &str,
3245 message: Option<String>,
3246 provider_override: Option<String>,
3247 model_override: Option<String>,
3248 temperature: Option<f64>,
3249) -> Result<()> {
3250 let start = Instant::now();
3251
3252 let mut effective_config = config;
3253 if let Some(ref p) = provider_override {
3254 let (type_key, alias_key) = p.split_once('.').unwrap_or((p.as_str(), agent_alias));
3257 effective_config
3258 .providers
3259 .models
3260 .ensure(type_key, alias_key);
3261 if let Some(agent_cfg) = effective_config.agents.get_mut(agent_alias) {
3262 agent_cfg.model_provider = format!("{type_key}.{alias_key}").into();
3263 }
3264 }
3265 if let Some(agent_cfg) = effective_config.agents.get(agent_alias)
3267 && let Some((fam, ali)) = agent_cfg.model_provider.split_once('.')
3268 && let Some(entry) = effective_config.providers.models.ensure(fam, ali)
3269 {
3270 if let Some(m) = model_override {
3271 entry.model = Some(m);
3272 }
3273 entry.temperature = temperature;
3274 }
3275
3276 let mut agent = Agent::from_config(&effective_config, agent_alias).await?;
3277
3278 let (provider_name, model_name) =
3279 match effective_config.resolved_model_provider_for_agent(agent_alias) {
3280 Some((ty, _alias, entry)) => {
3281 let model = entry
3282 .model
3283 .as_deref()
3284 .map(str::trim)
3285 .filter(|m| !m.is_empty())
3286 .map(ToString::to_string)
3287 .or_else(|| effective_config.resolve_default_model())
3288 .unwrap_or_else(|| "<unresolved>".to_string());
3289 (ty.to_string(), model)
3290 }
3291 None => (
3292 provider_override.unwrap_or_else(|| "unknown".to_string()),
3293 effective_config
3294 .resolve_default_model()
3295 .unwrap_or_else(|| "<unresolved>".to_string()),
3296 ),
3297 };
3298
3299 agent.observer.record_event(&ObserverEvent::AgentStart {
3300 model_provider: provider_name.clone(),
3301 model: model_name.clone(),
3302 channel: None,
3303 agent_alias: None,
3304 turn_id: None,
3305 });
3306
3307 let _run_guard = TurnGuard {
3308 observer: Arc::clone(&agent.observer),
3309 model_provider: provider_name,
3310 model: model_name,
3311 turn_id: None,
3312 turn_started_at: start,
3313 agent_alias: None,
3314 total_input_tokens: 0,
3315 total_output_tokens: 0,
3316 saw_usage: false,
3317 done: false,
3318 };
3319
3320 if let Some(msg) = message {
3321 let response = agent.run_single(&msg).await?;
3322 println!("{response}");
3323 } else {
3324 agent.run_interactive().await?;
3325 }
3326
3327 Ok(())
3328}
3329
3330#[cfg(test)]
3331mod tests {
3332 use super::*;
3333 use async_trait::async_trait;
3334 use parking_lot::Mutex;
3335 use std::collections::HashMap;
3336 use std::sync::atomic::{AtomicUsize, Ordering};
3337 use zeroclaw_api::observability_traits::ObserverMetric;
3338
3339 #[test]
3340 fn build_session_model_provider_rejects_undotted_ref() {
3341 let config = Config::default();
3342 let err = match build_session_model_provider(&config, "anthropic", Some("m")) {
3343 Ok(_) => panic!("undotted ref must error"),
3344 Err(e) => e,
3345 };
3346 assert!(err.to_string().contains("<type>.<alias>"), "got: {err}");
3347 }
3348
3349 #[test]
3350 fn build_session_model_provider_requires_a_model() {
3351 let config = Config::default();
3353 let err = match build_session_model_provider(&config, "anthropic.default", None) {
3354 Ok(_) => panic!("missing model must error"),
3355 Err(e) => e,
3356 };
3357 assert!(
3358 err.to_string().contains("no `model` configured"),
3359 "got: {err}"
3360 );
3361 }
3362
3363 zeroclaw_api::mock_tool_attribution!(
3364 CountingTool,
3365 NamedMockTool,
3366 MockTool,
3367 SlowTool,
3368 CapturingApprovalArgTool,
3369 );
3370
3371 struct MockModelProvider {
3372 responses: Mutex<Vec<zeroclaw_providers::ChatResponse>>,
3373 }
3374
3375 #[async_trait]
3376 impl ModelProvider for MockModelProvider {
3377 async fn chat_with_system(
3378 &self,
3379 _system_prompt: Option<&str>,
3380 _message: &str,
3381 _model: &str,
3382 _temperature: Option<f64>,
3383 ) -> Result<String> {
3384 Ok("ok".into())
3385 }
3386
3387 async fn chat(
3388 &self,
3389 _request: ChatRequest<'_>,
3390 _model: &str,
3391 _temperature: Option<f64>,
3392 ) -> Result<zeroclaw_providers::ChatResponse> {
3393 let mut guard = self.responses.lock();
3394 if guard.is_empty() {
3395 return Ok(zeroclaw_providers::ChatResponse {
3396 text: Some("done".into()),
3397 tool_calls: vec![],
3398 usage: None,
3399 reasoning_content: None,
3400 });
3401 }
3402 Ok(guard.remove(0))
3403 }
3404 }
3405 impl ::zeroclaw_api::attribution::Attributable for MockModelProvider {
3406 fn role(&self) -> ::zeroclaw_api::attribution::Role {
3407 ::zeroclaw_api::attribution::Role::Provider(
3408 ::zeroclaw_api::attribution::ProviderKind::Model(
3409 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3410 ),
3411 )
3412 }
3413 fn alias(&self) -> &str {
3414 "MockModelProvider"
3415 }
3416 }
3417
3418 struct ModelCaptureModelProvider {
3419 responses: Mutex<Vec<zeroclaw_providers::ChatResponse>>,
3420 seen_models: Arc<Mutex<Vec<String>>>,
3421 }
3422
3423 #[async_trait]
3424 impl ModelProvider for ModelCaptureModelProvider {
3425 async fn chat_with_system(
3426 &self,
3427 _system_prompt: Option<&str>,
3428 _message: &str,
3429 _model: &str,
3430 _temperature: Option<f64>,
3431 ) -> Result<String> {
3432 Ok("ok".into())
3433 }
3434
3435 async fn chat(
3436 &self,
3437 _request: ChatRequest<'_>,
3438 model: &str,
3439 _temperature: Option<f64>,
3440 ) -> Result<zeroclaw_providers::ChatResponse> {
3441 self.seen_models.lock().push(model.to_string());
3442 let mut guard = self.responses.lock();
3443 if guard.is_empty() {
3444 return Ok(zeroclaw_providers::ChatResponse {
3445 text: Some("done".into()),
3446 tool_calls: vec![],
3447 usage: None,
3448 reasoning_content: None,
3449 });
3450 }
3451 Ok(guard.remove(0))
3452 }
3453 }
3454 impl ::zeroclaw_api::attribution::Attributable for ModelCaptureModelProvider {
3455 fn role(&self) -> ::zeroclaw_api::attribution::Role {
3456 ::zeroclaw_api::attribution::Role::Provider(
3457 ::zeroclaw_api::attribution::ProviderKind::Model(
3458 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3459 ),
3460 )
3461 }
3462 fn alias(&self) -> &str {
3463 "ModelCaptureModelProvider"
3464 }
3465 }
3466
3467 struct TranscriptCaptureModelProvider {
3468 responses: Mutex<Vec<zeroclaw_providers::ChatResponse>>,
3469 seen_messages: Arc<Mutex<Vec<Vec<ChatMessage>>>>,
3470 }
3471
3472 #[async_trait]
3473 impl ModelProvider for TranscriptCaptureModelProvider {
3474 async fn chat_with_system(
3475 &self,
3476 _system_prompt: Option<&str>,
3477 _message: &str,
3478 _model: &str,
3479 _temperature: Option<f64>,
3480 ) -> Result<String> {
3481 Ok("ok".into())
3482 }
3483
3484 async fn chat(
3485 &self,
3486 request: ChatRequest<'_>,
3487 _model: &str,
3488 _temperature: Option<f64>,
3489 ) -> Result<zeroclaw_providers::ChatResponse> {
3490 self.seen_messages.lock().push(request.messages.to_vec());
3491 let mut responses = self.responses.lock();
3492 if responses.is_empty() {
3493 return Ok(zeroclaw_providers::ChatResponse {
3494 text: Some("done".into()),
3495 tool_calls: vec![],
3496 usage: None,
3497 reasoning_content: None,
3498 });
3499 }
3500 Ok(responses.remove(0))
3501 }
3502 }
3503
3504 impl ::zeroclaw_api::attribution::Attributable for TranscriptCaptureModelProvider {
3505 fn role(&self) -> ::zeroclaw_api::attribution::Role {
3506 ::zeroclaw_api::attribution::Role::Provider(
3507 ::zeroclaw_api::attribution::ProviderKind::Model(
3508 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3509 ),
3510 )
3511 }
3512 fn alias(&self) -> &str {
3513 "TranscriptCaptureModelProvider"
3514 }
3515 }
3516
3517 struct StreamingSteeringModelProvider {
3518 seen_messages: Arc<Mutex<Vec<Vec<ChatMessage>>>>,
3519 call_count: AtomicUsize,
3520 fail_on_call: Option<usize>,
3521 fail_chat_on_call: Option<usize>,
3522 fail_after_delta_on_call: Option<usize>,
3523 delay_chat_on_call: Option<usize>,
3524 }
3525
3526 #[async_trait]
3527 impl ModelProvider for StreamingSteeringModelProvider {
3528 async fn chat_with_system(
3529 &self,
3530 _system_prompt: Option<&str>,
3531 _message: &str,
3532 _model: &str,
3533 _temperature: Option<f64>,
3534 ) -> Result<String> {
3535 Ok("ok".into())
3536 }
3537
3538 async fn chat(
3539 &self,
3540 request: ChatRequest<'_>,
3541 _model: &str,
3542 _temperature: Option<f64>,
3543 ) -> Result<zeroclaw_providers::ChatResponse> {
3544 let call = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
3545 self.seen_messages.lock().push(request.messages.to_vec());
3546 if self.delay_chat_on_call == Some(call) {
3547 tokio::time::sleep(std::time::Duration::from_secs(60)).await;
3548 }
3549 if self.fail_on_call == Some(call) {
3550 anyhow::bail!("synthetic provider failure on call {call}");
3551 }
3552 if self.fail_chat_on_call == Some(call) {
3553 anyhow::bail!("synthetic chat failure on call {call}");
3554 }
3555 if self.fail_after_delta_on_call == Some(call) {
3556 anyhow::bail!("synthetic provider failure after delta on call {call}");
3557 }
3558 Ok(zeroclaw_providers::ChatResponse {
3559 text: Some(if call == 1 { "draft" } else { "final" }.into()),
3560 tool_calls: vec![],
3561 usage: None,
3562 reasoning_content: None,
3563 })
3564 }
3565
3566 fn supports_streaming(&self) -> bool {
3567 true
3568 }
3569
3570 fn stream_chat(
3571 &self,
3572 request: ChatRequest<'_>,
3573 _model: &str,
3574 _temperature: Option<f64>,
3575 _options: zeroclaw_providers::traits::StreamOptions,
3576 ) -> futures_util::stream::BoxStream<
3577 'static,
3578 zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
3579 > {
3580 use futures_util::StreamExt as _;
3581
3582 let call = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
3583 self.seen_messages.lock().push(request.messages.to_vec());
3584 let should_fail = self.fail_on_call == Some(call);
3585 let should_fail_after_delta = self.fail_after_delta_on_call == Some(call);
3586 let delta = if call == 1 { "draft" } else { "final" }.to_string();
3587 futures_util::stream::unfold(0, move |step| {
3588 let delta = delta.clone();
3589 async move {
3590 match step {
3591 0 if should_fail => Some((
3592 Err(zeroclaw_providers::traits::StreamError::ModelProvider(
3593 "synthetic provider failure".into(),
3594 )),
3595 1,
3596 )),
3597 0 => Some((
3598 Ok(zeroclaw_providers::traits::StreamEvent::TextDelta(
3599 zeroclaw_providers::traits::StreamChunk {
3600 delta,
3601 is_final: false,
3602 reasoning: None,
3603 token_count: 0,
3604 },
3605 )),
3606 1,
3607 )),
3608 1 if should_fail_after_delta => Some((
3609 Err(zeroclaw_providers::traits::StreamError::ModelProvider(
3610 "synthetic provider failure after delta".into(),
3611 )),
3612 2,
3613 )),
3614 1 => {
3615 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
3616 Some((Ok(zeroclaw_providers::traits::StreamEvent::Final), 2))
3617 }
3618 _ => None,
3619 }
3620 }
3621 })
3622 .boxed()
3623 }
3624 }
3625
3626 impl ::zeroclaw_api::attribution::Attributable for StreamingSteeringModelProvider {
3627 fn role(&self) -> ::zeroclaw_api::attribution::Role {
3628 ::zeroclaw_api::attribution::Role::Provider(
3629 ::zeroclaw_api::attribution::ProviderKind::Model(
3630 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3631 ),
3632 )
3633 }
3634 fn alias(&self) -> &str {
3635 "StreamingSteeringModelProvider"
3636 }
3637 }
3638
3639 #[derive(Default)]
3640 struct CapturingObserver {
3641 events: parking_lot::Mutex<Vec<ObserverEvent>>,
3642 }
3643
3644 impl Observer for CapturingObserver {
3645 fn record_event(&self, event: &ObserverEvent) {
3646 self.events.lock().push(event.clone());
3647 }
3648 fn record_metric(&self, _metric: &ObserverMetric) {}
3649 fn name(&self) -> &str {
3650 "capturing"
3651 }
3652 fn as_any(&self) -> &dyn std::any::Any {
3653 self
3654 }
3655 fn flush(&self) {}
3656 }
3657
3658 struct MultimodalCaptureProvider {
3659 seen_user_messages: Arc<Mutex<Vec<String>>>,
3660 streamed: bool,
3661 }
3662
3663 #[async_trait]
3664 impl ModelProvider for MultimodalCaptureProvider {
3665 async fn chat_with_system(
3666 &self,
3667 _system_prompt: Option<&str>,
3668 _message: &str,
3669 _model: &str,
3670 _temperature: Option<f64>,
3671 ) -> Result<String> {
3672 Ok("ok".into())
3673 }
3674
3675 async fn chat(
3676 &self,
3677 request: ChatRequest<'_>,
3678 _model: &str,
3679 _temperature: Option<f64>,
3680 ) -> Result<zeroclaw_providers::ChatResponse> {
3681 if let Some(message) = request.messages.iter().rfind(|msg| msg.role == "user") {
3682 self.seen_user_messages.lock().push(message.content.clone());
3683 }
3684 Ok(zeroclaw_providers::ChatResponse {
3685 text: Some("done".into()),
3686 tool_calls: vec![],
3687 usage: None,
3688 reasoning_content: None,
3689 })
3690 }
3691
3692 fn stream_chat(
3693 &self,
3694 request: ChatRequest<'_>,
3695 _model: &str,
3696 _temperature: Option<f64>,
3697 _options: zeroclaw_providers::traits::StreamOptions,
3698 ) -> futures_util::stream::BoxStream<
3699 'static,
3700 zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
3701 > {
3702 use futures_util::stream::{self, StreamExt};
3703
3704 if let Some(message) = request.messages.iter().rfind(|msg| msg.role == "user") {
3705 self.seen_user_messages.lock().push(message.content.clone());
3706 }
3707
3708 if self.streamed {
3709 let chunk = zeroclaw_providers::traits::StreamEvent::TextDelta(
3710 zeroclaw_providers::traits::StreamChunk {
3711 delta: "stream-done".into(),
3712 is_final: false,
3713 reasoning: None,
3714 token_count: 0,
3715 },
3716 );
3717 stream::iter(vec![
3718 Ok(chunk),
3719 Ok(zeroclaw_providers::traits::StreamEvent::Final),
3720 ])
3721 .boxed()
3722 } else {
3723 stream::iter(vec![Ok(zeroclaw_providers::traits::StreamEvent::Final)]).boxed()
3724 }
3725 }
3726
3727 fn supports_vision(&self) -> bool {
3728 true
3729 }
3730 }
3731 impl ::zeroclaw_api::attribution::Attributable for MultimodalCaptureProvider {
3732 fn role(&self) -> ::zeroclaw_api::attribution::Role {
3733 ::zeroclaw_api::attribution::Role::Provider(
3734 ::zeroclaw_api::attribution::ProviderKind::Model(
3735 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3736 ),
3737 )
3738 }
3739 fn alias(&self) -> &str {
3740 "MultimodalCaptureProvider"
3741 }
3742 }
3743
3744 struct MockTool;
3745
3746 #[async_trait]
3747 impl Tool for MockTool {
3748 fn name(&self) -> &str {
3749 "echo"
3750 }
3751
3752 fn description(&self) -> &str {
3753 "echo"
3754 }
3755
3756 fn parameters_schema(&self) -> serde_json::Value {
3757 serde_json::json!({"type": "object"})
3758 }
3759
3760 async fn execute(&self, _args: serde_json::Value) -> Result<crate::tools::ToolResult> {
3761 Ok(crate::tools::ToolResult {
3762 success: true,
3763 output: "tool-out".into(),
3764 error: None,
3765 })
3766 }
3767 }
3768
3769 struct FailingModelProvider;
3770
3771 #[async_trait]
3772 impl ModelProvider for FailingModelProvider {
3773 async fn chat_with_system(
3774 &self,
3775 _system_prompt: Option<&str>,
3776 _message: &str,
3777 _model: &str,
3778 _temperature: Option<f64>,
3779 ) -> Result<String> {
3780 Err(anyhow::Error::msg("provider unavailable"))
3781 }
3782
3783 async fn chat(
3784 &self,
3785 _request: ChatRequest<'_>,
3786 _model: &str,
3787 _temperature: Option<f64>,
3788 ) -> Result<zeroclaw_providers::ChatResponse> {
3789 Err(anyhow::Error::msg("provider unavailable"))
3790 }
3791 }
3792
3793 impl ::zeroclaw_api::attribution::Attributable for FailingModelProvider {
3794 fn role(&self) -> ::zeroclaw_api::attribution::Role {
3795 ::zeroclaw_api::attribution::Role::Provider(
3796 ::zeroclaw_api::attribution::ProviderKind::Model(
3797 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3798 ),
3799 )
3800 }
3801 fn alias(&self) -> &str {
3802 "FailingModelProvider"
3803 }
3804 }
3805
3806 struct SlowTool;
3807
3808 #[async_trait]
3809 impl Tool for SlowTool {
3810 fn name(&self) -> &str {
3811 "echo"
3812 }
3813
3814 fn description(&self) -> &str {
3815 "echo"
3816 }
3817
3818 fn parameters_schema(&self) -> serde_json::Value {
3819 serde_json::json!({"type": "object"})
3820 }
3821
3822 async fn execute(&self, _args: serde_json::Value) -> Result<crate::tools::ToolResult> {
3823 tokio::time::sleep(std::time::Duration::from_secs(60)).await;
3824 Ok(crate::tools::ToolResult {
3825 success: true,
3826 output: "tool-out".into(),
3827 error: None,
3828 })
3829 }
3830 }
3831
3832 struct CountingTool {
3833 calls: Arc<AtomicUsize>,
3834 }
3835
3836 #[async_trait]
3837 impl Tool for CountingTool {
3838 fn name(&self) -> &str {
3839 "echo"
3840 }
3841
3842 fn description(&self) -> &str {
3843 "echo"
3844 }
3845
3846 fn parameters_schema(&self) -> serde_json::Value {
3847 serde_json::json!({"type": "object"})
3848 }
3849
3850 async fn execute(&self, _args: serde_json::Value) -> Result<crate::tools::ToolResult> {
3851 self.calls.fetch_add(1, Ordering::SeqCst);
3852 Ok(crate::tools::ToolResult {
3853 success: true,
3854 output: "tool-out".into(),
3855 error: None,
3856 })
3857 }
3858 }
3859
3860 struct CapturingApprovalArgTool {
3861 name: &'static str,
3862 output: &'static str,
3863 calls: Arc<AtomicUsize>,
3864 last_args: Arc<std::sync::Mutex<Option<serde_json::Value>>>,
3865 }
3866
3867 #[async_trait]
3868 impl Tool for CapturingApprovalArgTool {
3869 fn name(&self) -> &str {
3870 self.name
3871 }
3872
3873 fn description(&self) -> &str {
3874 self.name
3875 }
3876
3877 fn parameters_schema(&self) -> serde_json::Value {
3878 serde_json::json!({"type": "object"})
3879 }
3880
3881 async fn execute(&self, args: serde_json::Value) -> Result<crate::tools::ToolResult> {
3882 self.calls.fetch_add(1, Ordering::SeqCst);
3883 *self.last_args.lock().unwrap() = Some(args);
3884 Ok(crate::tools::ToolResult {
3885 success: true,
3886 output: self.output.into(),
3887 error: None,
3888 })
3889 }
3890 }
3891
3892 struct ApprovalChannel {
3893 response: zeroclaw_api::channel::ChannelApprovalResponse,
3894 requests: Arc<AtomicUsize>,
3895 }
3896
3897 impl ::zeroclaw_api::attribution::Attributable for ApprovalChannel {
3898 fn role(&self) -> ::zeroclaw_api::attribution::Role {
3899 ::zeroclaw_api::attribution::Role::Channel(
3900 ::zeroclaw_api::attribution::ChannelKind::AcpChannel,
3901 )
3902 }
3903 fn alias(&self) -> &str {
3904 "test"
3905 }
3906 }
3907
3908 #[async_trait]
3909 impl zeroclaw_api::channel::Channel for ApprovalChannel {
3910 fn name(&self) -> &str {
3911 "acp"
3912 }
3913
3914 async fn send(&self, _message: &zeroclaw_api::channel::SendMessage) -> anyhow::Result<()> {
3915 Ok(())
3916 }
3917
3918 async fn listen(
3919 &self,
3920 _tx: tokio::sync::mpsc::Sender<zeroclaw_api::channel::ChannelMessage>,
3921 ) -> anyhow::Result<()> {
3922 Ok(())
3923 }
3924
3925 async fn request_approval(
3926 &self,
3927 _recipient: &str,
3928 _request: &zeroclaw_api::channel::ChannelApprovalRequest,
3929 ) -> anyhow::Result<Option<zeroclaw_api::channel::ChannelApprovalResponse>> {
3930 self.requests.fetch_add(1, Ordering::SeqCst);
3931 Ok(Some(self.response.clone()))
3932 }
3933 }
3934
3935 #[tokio::test]
3936 async fn turn_without_tools_returns_text() {
3937 let model_provider = Box::new(MockModelProvider {
3938 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
3939 text: Some("hello".into()),
3940 tool_calls: vec![],
3941 usage: None,
3942 reasoning_content: None,
3943 }]),
3944 });
3945
3946 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3947 backend: "none".into(),
3948 ..zeroclaw_config::schema::MemoryConfig::default()
3949 };
3950 let mem: Arc<dyn Memory> = Arc::from(
3951 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3952 .expect("memory creation should succeed with valid config"),
3953 );
3954
3955 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3956 let mut agent = Agent::builder()
3957 .model_provider(model_provider)
3958 .tools(vec![Box::new(MockTool)])
3959 .memory(mem)
3960 .observer(observer)
3961 .tool_dispatcher(Box::new(XmlToolDispatcher))
3962 .workspace_dir(std::path::PathBuf::from("/tmp"))
3963 .build()
3964 .expect("agent builder should succeed with valid config");
3965
3966 let response = agent.turn("hi").await.unwrap();
3967 assert_eq!(response, "hello");
3968 }
3969
3970 #[tokio::test]
3971 async fn direct_agent_strict_tool_parsing_ignores_xml_dispatcher_calls() {
3972 let provider = Box::new(MockModelProvider {
3973 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
3974 text: Some(
3975 r#"<tool_call>{"name":"echo","arguments":{"value":"ignored"}}</tool_call>"#
3976 .into(),
3977 ),
3978 tool_calls: vec![],
3979 usage: None,
3980 reasoning_content: None,
3981 }]),
3982 });
3983
3984 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3985 backend: "none".into(),
3986 ..zeroclaw_config::schema::MemoryConfig::default()
3987 };
3988 let mem: Arc<dyn Memory> = Arc::from(
3989 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3990 .expect("memory creation should succeed with valid config"),
3991 );
3992 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3993 let calls = Arc::new(AtomicUsize::new(0));
3994 let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
3995 resolved: zeroclaw_config::schema::ResolvedRuntime {
3996 strict_tool_parsing: true,
3997 ..Default::default()
3998 },
3999 ..zeroclaw_config::schema::AliasedAgentConfig::default()
4000 };
4001 let mut agent = Agent::builder()
4002 .model_provider(provider)
4003 .tools(vec![Box::new(CountingTool {
4004 calls: Arc::clone(&calls),
4005 })])
4006 .memory(mem)
4007 .observer(observer)
4008 .tool_dispatcher(Box::new(XmlToolDispatcher))
4009 .config(agent_config)
4010 .workspace_dir(std::path::PathBuf::from("/tmp"))
4011 .build()
4012 .expect("agent builder should succeed with valid config");
4013
4014 let system_prompt = agent
4015 .build_system_prompt()
4016 .expect("system prompt should render");
4017 assert!(
4018 !system_prompt.contains("## Tools"),
4019 "strict parsing should not advertise text tool instructions"
4020 );
4021 assert!(
4022 !system_prompt.contains("<tool_call"),
4023 "strict parsing should not advertise XML tool calls"
4024 );
4025
4026 let response = agent.turn("hi").await.unwrap();
4027
4028 assert_eq!(calls.load(Ordering::SeqCst), 0);
4029 assert!(response.contains("<tool_call>"));
4030 }
4031
4032 #[test]
4033 fn native_agent_prompt_omits_duplicate_tools_section() {
4034 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4035 backend: "none".into(),
4036 ..zeroclaw_config::schema::MemoryConfig::default()
4037 };
4038 let workspace = tempfile::TempDir::new().expect("temp dir");
4039 let mem: Arc<dyn Memory> = Arc::from(
4040 zeroclaw_memory::create_memory(&memory_cfg, workspace.path(), None)
4041 .expect("memory creation should succeed with valid config"),
4042 );
4043 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4044
4045 let native_agent = Agent::builder()
4046 .model_provider(Box::new(MockModelProvider {
4047 responses: Mutex::new(vec![]),
4048 }))
4049 .tools(vec![Box::new(MockTool)])
4050 .memory(Arc::clone(&mem))
4051 .observer(Arc::clone(&observer))
4052 .tool_dispatcher(Box::new(NativeToolDispatcher))
4053 .workspace_dir(workspace.path().to_path_buf())
4054 .build()
4055 .expect("agent builder should succeed with valid config");
4056 let native_prompt = native_agent.build_system_prompt().unwrap();
4057 assert!(!native_prompt.contains("## Tools"));
4058 assert!(!native_prompt.contains("echo"));
4059
4060 let xml_agent = Agent::builder()
4061 .model_provider(Box::new(MockModelProvider {
4062 responses: Mutex::new(vec![]),
4063 }))
4064 .tools(vec![Box::new(MockTool)])
4065 .memory(mem)
4066 .observer(observer)
4067 .tool_dispatcher(Box::new(XmlToolDispatcher))
4068 .workspace_dir(workspace.path().to_path_buf())
4069 .build()
4070 .expect("agent builder should succeed with valid config");
4071 let xml_prompt = xml_agent.build_system_prompt().unwrap();
4072 assert!(xml_prompt.contains("## Tools"));
4073 assert!(xml_prompt.contains("echo"));
4074 assert!(xml_prompt.contains("## Tool Use Protocol"));
4075 }
4076
4077 #[tokio::test]
4078 async fn direct_agent_tool_execution_requests_acp_approval() {
4079 let model_provider = Box::new(MockModelProvider {
4080 responses: Mutex::new(vec![]),
4081 });
4082 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4083 backend: "none".into(),
4084 ..zeroclaw_config::schema::MemoryConfig::default()
4085 };
4086 let mem: Arc<dyn Memory> = Arc::from(
4087 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4088 .expect("memory creation should succeed with valid config"),
4089 );
4090 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4091 let tool_calls = Arc::new(AtomicUsize::new(0));
4092 let approval_requests = Arc::new(AtomicUsize::new(0));
4093 let approval_cfg = zeroclaw_config::schema::RiskProfileConfig {
4094 always_ask: vec!["echo".into()],
4095 ..zeroclaw_config::schema::RiskProfileConfig::default()
4096 };
4097 let mut agent = Agent::builder()
4098 .model_provider(model_provider)
4099 .tools(vec![Box::new(CountingTool {
4100 calls: Arc::clone(&tool_calls),
4101 })])
4102 .memory(mem)
4103 .observer(observer)
4104 .tool_dispatcher(Box::new(NativeToolDispatcher))
4105 .workspace_dir(std::path::PathBuf::from("/tmp"))
4106 .approval_manager(Some(Arc::new(ApprovalManager::for_non_interactive(
4107 &approval_cfg,
4108 ))))
4109 .build()
4110 .expect("agent builder should succeed with valid config");
4111
4112 let handle: tools::PerToolChannelHandle =
4113 Arc::new(parking_lot::RwLock::new(HashMap::new()));
4114 agent.channel_handles.ask_user = Some(Arc::clone(&handle));
4115 let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
4116 response: zeroclaw_api::channel::ChannelApprovalResponse::Approve,
4117 requests: Arc::clone(&approval_requests),
4118 });
4119 agent.channel_handles().register_channel("acp", channel);
4120
4121 let result = agent
4122 .execute_tool_call(
4123 &ParsedToolCall {
4124 name: "echo".into(),
4125 arguments: serde_json::json!({"message": "hi"}),
4126 tool_call_id: Some("tc1".into()),
4127 },
4128 "test-turn",
4129 )
4130 .await;
4131
4132 assert!(result.success);
4133 assert_eq!(result.output, "tool-out");
4134 assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
4135 assert_eq!(tool_calls.load(Ordering::SeqCst), 1);
4136 }
4137
4138 #[tokio::test]
4139 async fn direct_agent_tool_execution_denies_when_acp_rejects() {
4140 let model_provider = Box::new(MockModelProvider {
4141 responses: Mutex::new(vec![]),
4142 });
4143 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4144 backend: "none".into(),
4145 ..zeroclaw_config::schema::MemoryConfig::default()
4146 };
4147 let mem: Arc<dyn Memory> = Arc::from(
4148 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4149 .expect("memory creation should succeed with valid config"),
4150 );
4151 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4152 let tool_calls = Arc::new(AtomicUsize::new(0));
4153 let approval_requests = Arc::new(AtomicUsize::new(0));
4154 let approval_cfg = zeroclaw_config::schema::RiskProfileConfig {
4155 always_ask: vec!["echo".into()],
4156 ..zeroclaw_config::schema::RiskProfileConfig::default()
4157 };
4158 let mut agent = Agent::builder()
4159 .model_provider(model_provider)
4160 .tools(vec![Box::new(CountingTool {
4161 calls: Arc::clone(&tool_calls),
4162 })])
4163 .memory(mem)
4164 .observer(observer)
4165 .tool_dispatcher(Box::new(NativeToolDispatcher))
4166 .workspace_dir(std::path::PathBuf::from("/tmp"))
4167 .approval_manager(Some(Arc::new(ApprovalManager::for_non_interactive(
4168 &approval_cfg,
4169 ))))
4170 .build()
4171 .expect("agent builder should succeed with valid config");
4172
4173 let handle: tools::PerToolChannelHandle =
4174 Arc::new(parking_lot::RwLock::new(HashMap::new()));
4175 agent.channel_handles.ask_user = Some(Arc::clone(&handle));
4176 let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
4177 response: zeroclaw_api::channel::ChannelApprovalResponse::Deny,
4178 requests: Arc::clone(&approval_requests),
4179 });
4180 agent.channel_handles().register_channel("acp", channel);
4181
4182 let result = agent
4183 .execute_tool_call(
4184 &ParsedToolCall {
4185 name: "echo".into(),
4186 arguments: serde_json::json!({"message": "hi"}),
4187 tool_call_id: Some("tc1".into()),
4188 },
4189 "test-turn",
4190 )
4191 .await;
4192
4193 assert!(!result.success);
4194 assert_eq!(result.output, "Denied by user.");
4195 assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
4196 assert_eq!(tool_calls.load(Ordering::SeqCst), 0);
4197 }
4198
4199 #[tokio::test]
4200 async fn direct_agent_shell_does_not_trust_model_supplied_approved_arg() {
4201 let provider = Box::new(MockModelProvider {
4202 responses: Mutex::new(vec![]),
4203 });
4204 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4205 backend: "none".into(),
4206 ..zeroclaw_config::schema::MemoryConfig::default()
4207 };
4208 let mem: Arc<dyn Memory> = Arc::from(
4209 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4210 .expect("memory creation should succeed with valid config"),
4211 );
4212 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4213 let tool_calls = Arc::new(AtomicUsize::new(0));
4214 let approval_requests = Arc::new(AtomicUsize::new(0));
4215 let captured_args = Arc::new(std::sync::Mutex::new(None));
4216 let approval_cfg = zeroclaw_config::schema::RiskProfileConfig::default();
4217 let mut agent = Agent::builder()
4218 .model_provider(provider)
4219 .tools(vec![Box::new(CapturingApprovalArgTool {
4220 name: "shell",
4221 output: "shell-out",
4222 calls: Arc::clone(&tool_calls),
4223 last_args: Arc::clone(&captured_args),
4224 })])
4225 .memory(mem)
4226 .observer(observer)
4227 .tool_dispatcher(Box::new(NativeToolDispatcher))
4228 .workspace_dir(std::path::PathBuf::from("/tmp"))
4229 .approval_manager(Some(Arc::new(
4230 ApprovalManager::for_non_interactive_backchannel(&approval_cfg),
4231 )))
4232 .build()
4233 .expect("agent builder should succeed with valid config");
4234
4235 let handle: tools::PerToolChannelHandle =
4236 Arc::new(parking_lot::RwLock::new(HashMap::new()));
4237 agent.channel_handles.ask_user = Some(Arc::clone(&handle));
4238 let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
4239 response: zeroclaw_api::channel::ChannelApprovalResponse::Deny,
4240 requests: Arc::clone(&approval_requests),
4241 });
4242 agent.channel_handles().register_channel("acp", channel);
4243
4244 let result = agent
4245 .execute_tool_call(
4246 &ParsedToolCall {
4247 name: "shell".into(),
4248 arguments: serde_json::json!({
4249 "command": "touch should-not-run",
4250 "approved": true
4251 }),
4252 tool_call_id: Some("tc1".into()),
4253 },
4254 "test-turn",
4255 )
4256 .await;
4257
4258 assert!(!result.success);
4259 assert_eq!(result.output, "Denied by user.");
4260 assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
4261 assert_eq!(tool_calls.load(Ordering::SeqCst), 0);
4262 assert!(captured_args.lock().unwrap().is_none());
4263 }
4264
4265 #[tokio::test]
4266 async fn direct_agent_shell_marks_args_approved_after_backchannel_approval() {
4267 let provider = Box::new(MockModelProvider {
4268 responses: Mutex::new(vec![]),
4269 });
4270 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4271 backend: "none".into(),
4272 ..zeroclaw_config::schema::MemoryConfig::default()
4273 };
4274 let mem: Arc<dyn Memory> = Arc::from(
4275 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4276 .expect("memory creation should succeed with valid config"),
4277 );
4278 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4279 let tool_calls = Arc::new(AtomicUsize::new(0));
4280 let approval_requests = Arc::new(AtomicUsize::new(0));
4281 let captured_args = Arc::new(std::sync::Mutex::new(None));
4282 let approval_cfg = zeroclaw_config::schema::RiskProfileConfig::default();
4283 let mut agent = Agent::builder()
4284 .model_provider(provider)
4285 .tools(vec![Box::new(CapturingApprovalArgTool {
4286 name: "shell",
4287 output: "shell-out",
4288 calls: Arc::clone(&tool_calls),
4289 last_args: Arc::clone(&captured_args),
4290 })])
4291 .memory(mem)
4292 .observer(observer)
4293 .tool_dispatcher(Box::new(NativeToolDispatcher))
4294 .workspace_dir(std::path::PathBuf::from("/tmp"))
4295 .approval_manager(Some(Arc::new(
4296 ApprovalManager::for_non_interactive_backchannel(&approval_cfg),
4297 )))
4298 .build()
4299 .expect("agent builder should succeed with valid config");
4300
4301 let handle: tools::PerToolChannelHandle =
4302 Arc::new(parking_lot::RwLock::new(HashMap::new()));
4303 agent.channel_handles.ask_user = Some(Arc::clone(&handle));
4304 let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
4305 response: zeroclaw_api::channel::ChannelApprovalResponse::Approve,
4306 requests: Arc::clone(&approval_requests),
4307 });
4308 agent.channel_handles().register_channel("acp", channel);
4309
4310 let result = agent
4311 .execute_tool_call(
4312 &ParsedToolCall {
4313 name: "shell".into(),
4314 arguments: serde_json::json!({
4315 "command": "touch should-run-after-human-approval",
4316 "approved": false
4317 }),
4318 tool_call_id: Some("tc1".into()),
4319 },
4320 "test-turn",
4321 )
4322 .await;
4323
4324 assert!(result.success);
4325 assert_eq!(result.output, "shell-out");
4326 assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
4327 assert_eq!(tool_calls.load(Ordering::SeqCst), 1);
4328 let args = captured_args
4329 .lock()
4330 .unwrap()
4331 .clone()
4332 .expect("shell tool should capture executed args");
4333 assert_eq!(args["approved"], true);
4334 }
4335
4336 #[tokio::test]
4337 async fn direct_agent_shell_keeps_runtime_approval_from_always_allowlist() {
4338 let provider = Box::new(MockModelProvider {
4339 responses: Mutex::new(vec![]),
4340 });
4341 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4342 backend: "none".into(),
4343 ..zeroclaw_config::schema::MemoryConfig::default()
4344 };
4345 let mem: Arc<dyn Memory> = Arc::from(
4346 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4347 .expect("memory creation should succeed with valid config"),
4348 );
4349 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4350 let tool_calls = Arc::new(AtomicUsize::new(0));
4351 let approval_requests = Arc::new(AtomicUsize::new(0));
4352 let captured_args = Arc::new(std::sync::Mutex::new(None));
4353 let approval_cfg = zeroclaw_config::schema::RiskProfileConfig::default();
4354 let mut agent = Agent::builder()
4355 .model_provider(provider)
4356 .tools(vec![Box::new(CapturingApprovalArgTool {
4357 name: "shell",
4358 output: "shell-out",
4359 calls: Arc::clone(&tool_calls),
4360 last_args: Arc::clone(&captured_args),
4361 })])
4362 .memory(mem)
4363 .observer(observer)
4364 .tool_dispatcher(Box::new(NativeToolDispatcher))
4365 .workspace_dir(std::path::PathBuf::from("/tmp"))
4366 .approval_manager(Some(Arc::new(
4367 ApprovalManager::for_non_interactive_backchannel(&approval_cfg),
4368 )))
4369 .build()
4370 .expect("agent builder should succeed with valid config");
4371
4372 let handle: tools::PerToolChannelHandle =
4373 Arc::new(parking_lot::RwLock::new(HashMap::new()));
4374 agent.channel_handles.ask_user = Some(Arc::clone(&handle));
4375 let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
4376 response: zeroclaw_api::channel::ChannelApprovalResponse::AlwaysApprove,
4377 requests: Arc::clone(&approval_requests),
4378 });
4379 agent.channel_handles().register_channel("acp", channel);
4380
4381 let first_result = agent
4382 .execute_tool_call(
4383 &ParsedToolCall {
4384 name: "shell".into(),
4385 arguments: serde_json::json!({
4386 "command": "touch should-run-after-always-approval",
4387 "approved": false
4388 }),
4389 tool_call_id: Some("tc1".into()),
4390 },
4391 "test-turn",
4392 )
4393 .await;
4394 let second_result = agent
4395 .execute_tool_call(
4396 &ParsedToolCall {
4397 name: "shell".into(),
4398 arguments: serde_json::json!({
4399 "command": "touch should-run-from-allowlist",
4400 "approved": false
4401 }),
4402 tool_call_id: Some("tc2".into()),
4403 },
4404 "test-turn",
4405 )
4406 .await;
4407
4408 assert!(first_result.success);
4409 assert!(second_result.success);
4410 assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
4411 assert_eq!(tool_calls.load(Ordering::SeqCst), 2);
4412 let args = captured_args
4413 .lock()
4414 .unwrap()
4415 .clone()
4416 .expect("shell tool should capture executed args");
4417 assert_eq!(args["approved"], true);
4418 }
4419
4420 #[tokio::test]
4421 async fn direct_agent_cron_add_does_not_trust_model_supplied_approved_arg() {
4422 let provider = Box::new(MockModelProvider {
4423 responses: Mutex::new(vec![]),
4424 });
4425 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4426 backend: "none".into(),
4427 ..zeroclaw_config::schema::MemoryConfig::default()
4428 };
4429 let mem: Arc<dyn Memory> = Arc::from(
4430 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4431 .expect("memory creation should succeed with valid config"),
4432 );
4433 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4434 let tool_calls = Arc::new(AtomicUsize::new(0));
4435 let captured_args = Arc::new(std::sync::Mutex::new(None));
4436 let agent = Agent::builder()
4437 .model_provider(provider)
4438 .tools(vec![Box::new(CapturingApprovalArgTool {
4439 name: "cron_add",
4440 output: "cron-out",
4441 calls: Arc::clone(&tool_calls),
4442 last_args: Arc::clone(&captured_args),
4443 })])
4444 .memory(mem)
4445 .observer(observer)
4446 .tool_dispatcher(Box::new(NativeToolDispatcher))
4447 .workspace_dir(std::path::PathBuf::from("/tmp"))
4448 .build()
4449 .expect("agent builder should succeed with valid config");
4450
4451 let result = agent
4452 .execute_tool_call(
4453 &ParsedToolCall {
4454 name: "cron_add".into(),
4455 arguments: serde_json::json!({
4456 "command": "echo should-not-be-model-approved",
4457 "approved": true
4458 }),
4459 tool_call_id: Some("tc1".into()),
4460 },
4461 "test-turn",
4462 )
4463 .await;
4464
4465 assert!(result.success);
4466 assert_eq!(result.output, "cron-out");
4467 assert_eq!(tool_calls.load(Ordering::SeqCst), 1);
4468 let args = captured_args
4469 .lock()
4470 .unwrap()
4471 .clone()
4472 .expect("cron_add tool should capture executed args");
4473 assert_eq!(args["approved"], false);
4474 }
4475
4476 #[tokio::test]
4477 async fn turn_with_native_dispatcher_handles_tool_results_variant() {
4478 let model_provider = Box::new(MockModelProvider {
4479 responses: Mutex::new(vec![
4480 zeroclaw_providers::ChatResponse {
4481 text: Some(String::new()),
4482 tool_calls: vec![zeroclaw_providers::ToolCall {
4483 id: "tc1".into(),
4484 name: "echo".into(),
4485 arguments: "{}".into(),
4486 extra_content: None,
4487 }],
4488 usage: None,
4489 reasoning_content: None,
4490 },
4491 zeroclaw_providers::ChatResponse {
4492 text: Some("done".into()),
4493 tool_calls: vec![],
4494 usage: None,
4495 reasoning_content: None,
4496 },
4497 ]),
4498 });
4499
4500 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4501 backend: "none".into(),
4502 ..zeroclaw_config::schema::MemoryConfig::default()
4503 };
4504 let mem: Arc<dyn Memory> = Arc::from(
4505 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4506 .expect("memory creation should succeed with valid config"),
4507 );
4508
4509 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4510 let mut agent = Agent::builder()
4511 .model_provider(model_provider)
4512 .tools(vec![Box::new(MockTool)])
4513 .memory(mem)
4514 .observer(observer)
4515 .tool_dispatcher(Box::new(NativeToolDispatcher))
4516 .workspace_dir(std::path::PathBuf::from("/tmp"))
4517 .build()
4518 .expect("agent builder should succeed with valid config");
4519
4520 let response = agent.turn("hi").await.unwrap();
4521 assert_eq!(response, "done");
4522 assert!(
4523 agent
4524 .history()
4525 .iter()
4526 .any(|msg| matches!(msg, ConversationMessage::ToolResults(_)))
4527 );
4528 }
4529
4530 #[tokio::test]
4531 async fn turn_routes_with_hint_when_query_classification_matches() {
4532 let seen_models = Arc::new(Mutex::new(Vec::new()));
4533 let model_provider = Box::new(ModelCaptureModelProvider {
4534 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
4535 text: Some("classified".into()),
4536 tool_calls: vec![],
4537 usage: None,
4538 reasoning_content: None,
4539 }]),
4540 seen_models: seen_models.clone(),
4541 });
4542
4543 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4544 backend: "none".into(),
4545 ..zeroclaw_config::schema::MemoryConfig::default()
4546 };
4547 let mem: Arc<dyn Memory> = Arc::from(
4548 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4549 .expect("memory creation should succeed with valid config"),
4550 );
4551
4552 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4553 let mut route_model_by_hint = HashMap::new();
4554 route_model_by_hint.insert("fast".to_string(), "anthropic/claude-haiku-4-5".to_string());
4555 let mut agent = Agent::builder()
4556 .model_provider(model_provider)
4557 .tools(vec![Box::new(MockTool)])
4558 .memory(mem)
4559 .observer(observer)
4560 .tool_dispatcher(Box::new(NativeToolDispatcher))
4561 .workspace_dir(std::path::PathBuf::from("/tmp"))
4562 .classification_config(zeroclaw_config::schema::QueryClassificationConfig {
4563 enabled: true,
4564 rules: vec![zeroclaw_config::schema::ClassificationRule {
4565 hint: "fast".to_string(),
4566 keywords: vec!["quick".to_string()],
4567 patterns: vec![],
4568 min_length: None,
4569 max_length: None,
4570 priority: 10,
4571 }],
4572 })
4573 .available_hints(vec!["fast".to_string()])
4574 .route_model_by_hint(route_model_by_hint)
4575 .build()
4576 .expect("agent builder should succeed with valid config");
4577
4578 let response = agent.turn("quick summary please").await.unwrap();
4579 assert_eq!(response, "classified");
4580 let seen = seen_models.lock();
4581 assert_eq!(seen.as_slice(), &["hint:fast".to_string()]);
4582 }
4583
4584 #[tokio::test]
4585 async fn from_config_passes_extra_headers_to_custom_provider() {
4586 use axum::{Json, Router, http::HeaderMap, routing::post};
4587 use tempfile::TempDir;
4588 use tokio::net::TcpListener;
4589
4590 let captured_headers: Arc<std::sync::Mutex<Option<HashMap<String, String>>>> =
4591 Arc::new(std::sync::Mutex::new(None));
4592 let captured_headers_clone = captured_headers.clone();
4593
4594 let app = Router::new().route(
4595 "/chat/completions",
4596 post(
4597 move |headers: HeaderMap, Json(_body): Json<serde_json::Value>| {
4598 let captured_headers = captured_headers_clone.clone();
4599 async move {
4600 let collected = headers
4601 .iter()
4602 .filter_map(|(name, value)| {
4603 value
4604 .to_str()
4605 .ok()
4606 .map(|value| (name.as_str().to_string(), value.to_string()))
4607 })
4608 .collect();
4609 *captured_headers.lock().unwrap() = Some(collected);
4610 Json(serde_json::json!({
4611 "choices": [{
4612 "message": {
4613 "content": "hello from mock"
4614 }
4615 }]
4616 }))
4617 }
4618 },
4619 ),
4620 );
4621
4622 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
4623 let mock_addr = listener.local_addr().unwrap();
4624 let server_handle = zeroclaw_spawn::spawn!(async move {
4625 axum::serve(listener, app).await.unwrap();
4626 });
4627
4628 let tmp = TempDir::new().expect("temp dir");
4629 let workspace_dir = tmp.path().join("workspace");
4630 std::fs::create_dir_all(&workspace_dir).unwrap();
4631
4632 let mut config = zeroclaw_config::schema::Config {
4633 data_dir: workspace_dir,
4634 config_path: tmp.path().join("config.toml"),
4635 ..Default::default()
4636 };
4637 {
4638 let entry = config
4648 .providers
4649 .models
4650 .ensure("custom", "default")
4651 .expect("custom model_provider type slot");
4652 entry.api_key = Some("test-key".to_string());
4653 entry.model = Some("test-model".to_string());
4654 entry.uri = Some(format!("http://{mock_addr}"));
4655 entry.extra_headers.insert(
4656 "User-Agent".to_string(),
4657 "zeroclaw-web-test/1.0".to_string(),
4658 );
4659 entry
4660 .extra_headers
4661 .insert("X-Title".to_string(), "zeroclaw-web".to_string());
4662 }
4663 config.memory.backend = "none".to_string();
4664 config.memory.auto_save = false;
4665
4666 config.risk_profiles.insert(
4670 "test-profile".to_string(),
4671 zeroclaw_config::schema::RiskProfileConfig::default(),
4672 );
4673 let agent_cfg = zeroclaw_config::schema::AliasedAgentConfig {
4674 model_provider: "custom.default".into(),
4675 risk_profile: "test-profile".to_string(),
4676 ..zeroclaw_config::schema::AliasedAgentConfig::default()
4677 };
4678 config.agents.insert("test-agent".to_string(), agent_cfg);
4679
4680 let mut agent = Agent::from_config(&config, "test-agent")
4681 .await
4682 .expect("agent from config");
4683 let response = agent.turn("hello").await.expect("agent turn");
4684
4685 assert_eq!(response, "hello from mock");
4686
4687 let headers = captured_headers
4688 .lock()
4689 .unwrap()
4690 .clone()
4691 .expect("captured headers");
4692 assert_eq!(
4693 headers.get("user-agent").map(String::as_str),
4694 Some("zeroclaw-web-test/1.0")
4695 );
4696 assert_eq!(
4697 headers.get("x-title").map(String::as_str),
4698 Some("zeroclaw-web")
4699 );
4700
4701 server_handle.abort();
4702 }
4703
4704 #[tokio::test]
4705 async fn from_config_accepts_openai_alias_with_requires_openai_auth() {
4706 use tempfile::TempDir;
4707 use zeroclaw_config::schema::{
4708 AliasedAgentConfig, Config, ModelProviderConfig, OpenAIModelProviderConfig,
4709 RiskProfileConfig, WireApi,
4710 };
4711
4712 let tmp = TempDir::new().expect("temp dir");
4713 let workspace_dir = tmp.path().join("workspace");
4714 std::fs::create_dir_all(&workspace_dir).expect("workspace dir");
4715
4716 let mut config = Config {
4717 data_dir: workspace_dir,
4718 config_path: tmp.path().join("config.toml"),
4719 ..Default::default()
4720 };
4721 config.memory.backend = "none".to_string();
4722 config.memory.auto_save = false;
4723 config
4724 .risk_profiles
4725 .insert("test-profile".to_string(), RiskProfileConfig::default());
4726 config.providers.models.openai.insert(
4727 "codex".to_string(),
4728 OpenAIModelProviderConfig {
4729 base: ModelProviderConfig {
4730 model: Some("gpt-5.4".to_string()),
4731 requires_openai_auth: true,
4732 wire_api: Some(WireApi::Responses),
4733 ..ModelProviderConfig::default()
4734 },
4735 },
4736 );
4737 config.agents.insert(
4738 "test-agent".to_string(),
4739 AliasedAgentConfig {
4740 model_provider: "openai.codex".into(),
4741 risk_profile: "test-profile".to_string(),
4742 ..AliasedAgentConfig::default()
4743 },
4744 );
4745
4746 let result = Agent::from_config(&config, "test-agent").await;
4747
4748 assert!(
4749 result.is_ok(),
4750 "openai alias with requires_openai_auth should construct via Codex OAuth path: {}",
4751 result.err().unwrap()
4752 );
4753 }
4754
4755 #[test]
4756 fn builder_allowed_tools_none_keeps_all_tools() {
4757 let model_provider = Box::new(MockModelProvider {
4758 responses: Mutex::new(vec![]),
4759 });
4760
4761 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4762 backend: "none".into(),
4763 ..zeroclaw_config::schema::MemoryConfig::default()
4764 };
4765 let mem: Arc<dyn Memory> = Arc::from(
4766 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4767 .expect("memory creation should succeed with valid config"),
4768 );
4769
4770 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4771 let agent = Agent::builder()
4772 .model_provider(model_provider)
4773 .tools(vec![Box::new(MockTool)])
4774 .memory(mem)
4775 .observer(observer)
4776 .tool_dispatcher(Box::new(NativeToolDispatcher))
4777 .workspace_dir(std::path::PathBuf::from("/tmp"))
4778 .allowed_tools(None)
4779 .build()
4780 .expect("agent builder should succeed with valid config");
4781
4782 assert_eq!(agent.tool_specs.len(), 1);
4783 assert_eq!(agent.tool_specs[0].name, "echo");
4784 }
4785
4786 #[test]
4787 fn builder_allowed_tools_some_filters_tools() {
4788 let model_provider = Box::new(MockModelProvider {
4789 responses: Mutex::new(vec![]),
4790 });
4791
4792 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4793 backend: "none".into(),
4794 ..zeroclaw_config::schema::MemoryConfig::default()
4795 };
4796 let mem: Arc<dyn Memory> = Arc::from(
4797 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4798 .expect("memory creation should succeed with valid config"),
4799 );
4800
4801 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4802 let agent = Agent::builder()
4803 .model_provider(model_provider)
4804 .tools(vec![Box::new(MockTool)])
4805 .memory(mem)
4806 .observer(observer)
4807 .tool_dispatcher(Box::new(NativeToolDispatcher))
4808 .workspace_dir(std::path::PathBuf::from("/tmp"))
4809 .allowed_tools(Some(vec!["nonexistent".to_string()]))
4810 .build()
4811 .expect("agent builder should succeed with valid config");
4812
4813 assert!(
4814 agent.tool_specs.is_empty(),
4815 "No tools should match a non-existent allowlist entry"
4816 );
4817 }
4818
4819 #[test]
4823 fn session_cwd_keeps_workspace_in_allowed_roots() {
4824 let workspace = std::env::temp_dir().join("zeroclaw_test_session_cwd_workspace");
4825 let session = std::env::temp_dir().join("zeroclaw_test_session_cwd_session");
4826 let _ = std::fs::create_dir_all(&workspace);
4827 let _ = std::fs::create_dir_all(&session);
4828
4829 let skill_file = workspace.join("SKILL.md");
4830 let _ = std::fs::write(&skill_file, "body");
4831 let skill_resolved = std::fs::canonicalize(&skill_file).unwrap_or(skill_file);
4833
4834 let risk_profile = zeroclaw_config::schema::RiskProfileConfig::default();
4835
4836 let mut policy = SecurityPolicy::from_risk_profile(&risk_profile, &session);
4838 policy.allowed_roots.push(workspace.clone());
4839 assert!(
4840 policy.is_resolved_path_allowed(&skill_resolved),
4841 "workspace skills must remain readable when session_cwd differs"
4842 );
4843
4844 let policy_no_push = SecurityPolicy::from_risk_profile(&risk_profile, &session);
4847 assert!(
4848 !policy_no_push.is_resolved_path_allowed(&skill_resolved),
4849 "without allowed_roots.push, workspace files must be outside the sandbox"
4850 );
4851 }
4852
4853 #[test]
4854 fn seed_history_prepends_system_and_skips_system_from_seed() {
4855 let model_provider = Box::new(MockModelProvider {
4856 responses: Mutex::new(vec![]),
4857 });
4858
4859 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4860 backend: "none".into(),
4861 ..zeroclaw_config::schema::MemoryConfig::default()
4862 };
4863 let mem: Arc<dyn Memory> = Arc::from(
4864 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4865 .expect("memory creation should succeed with valid config"),
4866 );
4867
4868 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4869 let mut agent = Agent::builder()
4870 .model_provider(model_provider)
4871 .tools(vec![Box::new(MockTool)])
4872 .memory(mem)
4873 .observer(observer)
4874 .tool_dispatcher(Box::new(NativeToolDispatcher))
4875 .workspace_dir(std::path::PathBuf::from("/tmp"))
4876 .build()
4877 .expect("agent builder should succeed with valid config");
4878
4879 let seed = vec![
4880 ChatMessage::system("old system prompt"),
4881 ChatMessage::user("hello"),
4882 ChatMessage::assistant("hi there"),
4883 ];
4884 agent.seed_history(&seed);
4885
4886 let history = agent.history();
4887 assert!(matches!(&history[0], ConversationMessage::Chat(m) if m.role == "system"));
4889 assert!(
4891 matches!(&history[1], ConversationMessage::Chat(m) if m.role == "user" && m.content == "hello")
4892 );
4893 assert!(
4894 matches!(&history[2], ConversationMessage::Chat(m) if m.role == "assistant" && m.content == "hi there")
4895 );
4896 assert_eq!(history.len(), 3);
4897 }
4898
4899 #[test]
4900 fn seed_conversation_history_preserves_tool_call_variants() {
4901 use zeroclaw_api::model_provider::{
4902 ChatMessage, ConversationMessage, ToolCall, ToolResultMessage,
4903 };
4904
4905 let provider = Box::new(MockModelProvider {
4906 responses: Mutex::new(vec![]),
4907 });
4908
4909 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4910 backend: "none".into(),
4911 ..zeroclaw_config::schema::MemoryConfig::default()
4912 };
4913 let mem: Arc<dyn Memory> = Arc::from(
4914 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4915 .expect("memory creation should succeed with valid config"),
4916 );
4917
4918 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4919 let mut agent = Agent::builder()
4920 .model_provider(provider)
4921 .tools(vec![Box::new(MockTool)])
4922 .memory(mem)
4923 .observer(observer)
4924 .tool_dispatcher(Box::new(NativeToolDispatcher))
4925 .workspace_dir(std::path::PathBuf::from("/tmp"))
4926 .build()
4927 .expect("agent builder should succeed with valid config");
4928
4929 let messages = vec![
4930 ConversationMessage::Chat(ChatMessage::user("run it")),
4931 ConversationMessage::AssistantToolCalls {
4932 text: None,
4933 tool_calls: vec![ToolCall {
4934 id: "tc-1".into(),
4935 name: "shell".into(),
4936 arguments: r#"{"command":"ls"}"#.into(),
4937 extra_content: None,
4938 }],
4939 reasoning_content: None,
4940 },
4941 ConversationMessage::ToolResults(vec![ToolResultMessage {
4942 tool_call_id: "tc-1".into(),
4943 content: "ok".into(),
4944 }]),
4945 ConversationMessage::Chat(ChatMessage::assistant("done")),
4946 ];
4947
4948 agent.seed_conversation_history(messages);
4949
4950 let non_system: Vec<_> = agent
4952 .history()
4953 .iter()
4954 .filter(|m| !matches!(m, ConversationMessage::Chat(c) if c.role == "system"))
4955 .collect();
4956
4957 assert_eq!(non_system.len(), 4);
4958 assert!(
4959 matches!(non_system[1], ConversationMessage::AssistantToolCalls { tool_calls, .. } if tool_calls[0].id == "tc-1")
4960 );
4961 assert!(
4962 matches!(non_system[2], ConversationMessage::ToolResults(r) if r[0].tool_call_id == "tc-1")
4963 );
4964 }
4965
4966 struct StreamToolCaptureModelProvider {
4969 tools_received: Arc<Mutex<Vec<bool>>>,
4970 call_count: Arc<Mutex<usize>>,
4971 }
4972
4973 #[async_trait]
4974 impl ModelProvider for StreamToolCaptureModelProvider {
4975 async fn chat_with_system(
4976 &self,
4977 _system_prompt: Option<&str>,
4978 _message: &str,
4979 _model: &str,
4980 _temperature: Option<f64>,
4981 ) -> Result<String> {
4982 Ok("ok".into())
4983 }
4984
4985 async fn chat(
4986 &self,
4987 request: ChatRequest<'_>,
4988 _model: &str,
4989 _temperature: Option<f64>,
4990 ) -> Result<zeroclaw_providers::ChatResponse> {
4991 self.tools_received.lock().push(request.tools.is_some());
4992 let mut count = self.call_count.lock();
4993 *count += 1;
4994 if *count == 1 {
4995 Ok(zeroclaw_providers::ChatResponse {
4996 text: Some(String::new()),
4997 tool_calls: vec![zeroclaw_providers::ToolCall {
4998 id: "00000000-0000-0000-0000-000000000001".into(),
4999 name: "echo".into(),
5000 arguments: "{}".into(),
5001 extra_content: None,
5002 }],
5003 usage: None,
5004 reasoning_content: None,
5005 })
5006 } else {
5007 Ok(zeroclaw_providers::ChatResponse {
5008 text: Some("stream-done".into()),
5009 tool_calls: vec![],
5010 usage: None,
5011 reasoning_content: None,
5012 })
5013 }
5014 }
5015
5016 fn supports_native_tools(&self) -> bool {
5017 true
5018 }
5019
5020 fn stream_chat(
5021 &self,
5022 request: ChatRequest<'_>,
5023 _model: &str,
5024 _temperature: Option<f64>,
5025 _options: zeroclaw_providers::traits::StreamOptions,
5026 ) -> futures_util::stream::BoxStream<
5027 'static,
5028 zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
5029 > {
5030 use futures_util::stream::{self, StreamExt};
5031 self.tools_received.lock().push(request.tools.is_some());
5032 let mut count = self.call_count.lock();
5033 *count += 1;
5034 if *count == 1 {
5035 let tc = zeroclaw_providers::traits::StreamEvent::ToolCall(
5036 zeroclaw_providers::ToolCall {
5037 id: "00000000-0000-0000-0000-000000000001".into(),
5038 name: "echo".into(),
5039 arguments: "{}".into(),
5040 extra_content: None,
5041 },
5042 );
5043 stream::iter(vec![
5044 Ok(tc),
5045 Ok(zeroclaw_providers::traits::StreamEvent::Final),
5046 ])
5047 .boxed()
5048 } else {
5049 let chunk = zeroclaw_providers::traits::StreamEvent::TextDelta(
5050 zeroclaw_providers::traits::StreamChunk {
5051 delta: "stream-done".into(),
5052 is_final: false,
5053 reasoning: None,
5054 token_count: 0,
5055 },
5056 );
5057 stream::iter(vec![
5058 Ok(chunk),
5059 Ok(zeroclaw_providers::traits::StreamEvent::Final),
5060 ])
5061 .boxed()
5062 }
5063 }
5064 }
5065 impl ::zeroclaw_api::attribution::Attributable for StreamToolCaptureModelProvider {
5066 fn role(&self) -> ::zeroclaw_api::attribution::Role {
5067 ::zeroclaw_api::attribution::Role::Provider(
5068 ::zeroclaw_api::attribution::ProviderKind::Model(
5069 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
5070 ),
5071 )
5072 }
5073 fn alias(&self) -> &str {
5074 "StreamToolCaptureModelProvider"
5075 }
5076 }
5077
5078 #[tokio::test]
5079 async fn turn_streamed_passes_tool_specs_to_provider() {
5080 let tools_received = Arc::new(Mutex::new(Vec::new()));
5081 let model_provider = Box::new(StreamToolCaptureModelProvider {
5082 tools_received: tools_received.clone(),
5083 call_count: Arc::new(Mutex::new(0)),
5084 });
5085
5086 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5087 backend: "none".into(),
5088 ..zeroclaw_config::schema::MemoryConfig::default()
5089 };
5090 let mem: Arc<dyn Memory> = Arc::from(
5091 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5092 .expect("memory creation should succeed with valid config"),
5093 );
5094
5095 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5096 let mut agent = Agent::builder()
5097 .model_provider(model_provider)
5098 .tools(vec![Box::new(MockTool)])
5099 .memory(mem)
5100 .observer(observer)
5101 .tool_dispatcher(Box::new(NativeToolDispatcher))
5102 .workspace_dir(std::path::PathBuf::from("/tmp"))
5103 .build()
5104 .expect("agent builder should succeed with valid config");
5105
5106 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
5107 let (response, _) = agent
5108 .turn_streamed("use the echo tool", event_tx, None)
5109 .await
5110 .unwrap();
5111 assert_eq!(response, "stream-done");
5112
5113 let received = tools_received.lock();
5115 assert!(
5116 received.len() >= 2,
5117 "Expected at least 2 stream_chat calls, got {}",
5118 received.len()
5119 );
5120 assert!(
5121 received[0],
5122 "First stream_chat call should have received tool specs"
5123 );
5124 assert!(
5125 received[1],
5126 "Second stream_chat call should have received tool specs"
5127 );
5128
5129 let mut events = Vec::new();
5131 while let Ok(ev) = event_rx.try_recv() {
5132 events.push(ev);
5133 }
5134 let has_tool_call = events
5135 .iter()
5136 .any(|e| matches!(e, TurnEvent::ToolCall { name, .. } if name == "echo"));
5137 let has_tool_result = events
5138 .iter()
5139 .any(|e| matches!(e, TurnEvent::ToolResult { name, .. } if name == "echo"));
5140 assert!(
5141 has_tool_call,
5142 "Should have emitted a ToolCall event for 'echo'"
5143 );
5144 assert!(
5145 has_tool_result,
5146 "Should have emitted a ToolResult event for 'echo'"
5147 );
5148
5149 let call_id = events
5151 .iter()
5152 .find_map(|e| {
5153 if let TurnEvent::ToolCall { id, .. } = e {
5154 Some(id.clone())
5155 } else {
5156 None
5157 }
5158 })
5159 .expect("ToolCall should have an ID");
5160
5161 let result_id = events
5162 .iter()
5163 .find_map(|e| {
5164 if let TurnEvent::ToolResult { id, .. } = e {
5165 Some(id.clone())
5166 } else {
5167 None
5168 }
5169 })
5170 .expect("ToolResult should have an ID");
5171
5172 assert_eq!(
5173 call_id, result_id,
5174 "ToolCall and ToolResult should share the same ID for correlation"
5175 );
5176
5177 assert!(
5179 uuid::Uuid::parse_str(&call_id).is_ok(),
5180 "Generated ID should be a valid UUID: got '{}'",
5181 call_id
5182 );
5183 }
5184
5185 struct TwoToolCallStreamModelProvider {
5188 call_count: Arc<Mutex<usize>>,
5189 }
5190
5191 #[async_trait]
5192 impl ModelProvider for TwoToolCallStreamModelProvider {
5193 async fn chat_with_system(
5194 &self,
5195 _system_prompt: Option<&str>,
5196 _message: &str,
5197 _model: &str,
5198 _temperature: Option<f64>,
5199 ) -> Result<String> {
5200 Ok("ok".into())
5201 }
5202
5203 async fn chat(
5204 &self,
5205 _request: ChatRequest<'_>,
5206 _model: &str,
5207 _temperature: Option<f64>,
5208 ) -> Result<zeroclaw_providers::ChatResponse> {
5209 Ok(zeroclaw_providers::ChatResponse {
5210 text: Some("done".into()),
5211 tool_calls: vec![],
5212 usage: None,
5213 reasoning_content: None,
5214 })
5215 }
5216
5217 fn supports_native_tools(&self) -> bool {
5218 true
5219 }
5220
5221 fn stream_chat(
5222 &self,
5223 _request: ChatRequest<'_>,
5224 _model: &str,
5225 _temperature: Option<f64>,
5226 _options: zeroclaw_providers::traits::StreamOptions,
5227 ) -> futures_util::stream::BoxStream<
5228 'static,
5229 zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
5230 > {
5231 use futures_util::stream::{self, StreamExt};
5232 let mut count = self.call_count.lock();
5233 *count += 1;
5234 if *count == 1 {
5235 stream::iter(vec![
5236 Ok(zeroclaw_providers::traits::StreamEvent::ToolCall(
5237 zeroclaw_providers::ToolCall {
5238 id: "00000000-0000-0000-0000-000000000001".into(),
5239 name: "echo".into(),
5240 arguments: "{}".into(),
5241 extra_content: None,
5242 },
5243 )),
5244 Ok(zeroclaw_providers::traits::StreamEvent::ToolCall(
5245 zeroclaw_providers::ToolCall {
5246 id: "00000000-0000-0000-0000-000000000002".into(),
5247 name: "echo".into(),
5248 arguments: "{}".into(),
5249 extra_content: None,
5250 },
5251 )),
5252 Ok(zeroclaw_providers::traits::StreamEvent::Final),
5253 ])
5254 .boxed()
5255 } else {
5256 stream::iter(vec![
5257 Ok(zeroclaw_providers::traits::StreamEvent::TextDelta(
5258 zeroclaw_providers::traits::StreamChunk {
5259 delta: "stream-done".into(),
5260 is_final: false,
5261 reasoning: None,
5262 token_count: 0,
5263 },
5264 )),
5265 Ok(zeroclaw_providers::traits::StreamEvent::Final),
5266 ])
5267 .boxed()
5268 }
5269 }
5270 }
5271 impl ::zeroclaw_api::attribution::Attributable for TwoToolCallStreamModelProvider {
5272 fn role(&self) -> ::zeroclaw_api::attribution::Role {
5273 ::zeroclaw_api::attribution::Role::Provider(
5274 ::zeroclaw_api::attribution::ProviderKind::Model(
5275 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
5276 ),
5277 )
5278 }
5279 fn alias(&self) -> &str {
5280 "TwoToolCallStreamModelProvider"
5281 }
5282 }
5283
5284 #[tokio::test]
5291 async fn turn_streamed_dispatches_multiple_tools_serially_when_parallel_disabled() {
5292 let model_provider = Box::new(TwoToolCallStreamModelProvider {
5293 call_count: Arc::new(Mutex::new(0)),
5294 });
5295
5296 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5297 backend: "none".into(),
5298 ..zeroclaw_config::schema::MemoryConfig::default()
5299 };
5300 let mem: Arc<dyn Memory> = Arc::from(
5301 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5302 .expect("memory creation should succeed with valid config"),
5303 );
5304
5305 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5306 let mut agent = Agent::builder()
5307 .model_provider(model_provider)
5308 .tools(vec![Box::new(MockTool)])
5309 .memory(mem)
5310 .observer(observer)
5311 .tool_dispatcher(Box::new(NativeToolDispatcher))
5312 .workspace_dir(std::path::PathBuf::from("/tmp"))
5313 .build()
5314 .expect("agent builder should succeed with valid config");
5315
5316 assert!(
5319 !agent.config.resolved.parallel_tools,
5320 "test precondition: parallel_tools must be disabled"
5321 );
5322
5323 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
5324 let (response, _) = agent
5325 .turn_streamed("use echo twice", event_tx, None)
5326 .await
5327 .unwrap();
5328 assert_eq!(response, "stream-done");
5329
5330 let mut seq: Vec<(&'static str, String)> = Vec::new();
5332 while let Ok(ev) = event_rx.try_recv() {
5333 match ev {
5334 TurnEvent::ToolCall { id, .. } => seq.push(("call", id)),
5335 TurnEvent::ToolResult { id, .. } => seq.push(("result", id)),
5336 _ => {}
5337 }
5338 }
5339
5340 let id1 = "00000000-0000-0000-0000-000000000001";
5341 let id2 = "00000000-0000-0000-0000-000000000002";
5342 assert_eq!(
5343 seq,
5344 vec![
5345 ("call", id1.to_string()),
5346 ("result", id1.to_string()),
5347 ("call", id2.to_string()),
5348 ("result", id2.to_string()),
5349 ],
5350 "serial dispatch must interleave call->result per tool, not batch all \
5351 starts then all results; got {seq:?}"
5352 );
5353 }
5354
5355 struct PreExecutedToolModelProvider;
5356
5357 #[async_trait]
5358 impl ModelProvider for PreExecutedToolModelProvider {
5359 async fn chat_with_system(
5360 &self,
5361 _system_prompt: Option<&str>,
5362 _message: &str,
5363 _model: &str,
5364 _temperature: Option<f64>,
5365 ) -> Result<String> {
5366 Ok(String::new())
5367 }
5368
5369 async fn chat(
5370 &self,
5371 _request: ChatRequest<'_>,
5372 _model: &str,
5373 _temperature: Option<f64>,
5374 ) -> Result<zeroclaw_providers::ChatResponse> {
5375 Ok(zeroclaw_providers::ChatResponse {
5376 text: Some(String::new()),
5377 tool_calls: vec![],
5378 usage: None,
5379 reasoning_content: None,
5380 })
5381 }
5382
5383 fn stream_chat(
5384 &self,
5385 _request: ChatRequest<'_>,
5386 _model: &str,
5387 _temperature: Option<f64>,
5388 _options: zeroclaw_providers::traits::StreamOptions,
5389 ) -> futures_util::stream::BoxStream<
5390 'static,
5391 zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
5392 > {
5393 use futures_util::stream::{self, StreamExt};
5394
5395 stream::iter(vec![
5396 Ok(
5397 zeroclaw_providers::traits::StreamEvent::PreExecutedToolCall {
5398 name: "file_read".into(),
5399 args: "{\"path\":\"a.txt\"}".into(),
5400 },
5401 ),
5402 Ok(
5403 zeroclaw_providers::traits::StreamEvent::PreExecutedToolCall {
5404 name: "shell".into(),
5405 args: "{\"command\":\"pwd\"}".into(),
5406 },
5407 ),
5408 Ok(
5409 zeroclaw_providers::traits::StreamEvent::PreExecutedToolResult {
5410 name: "file_read".into(),
5411 output: "a".into(),
5412 },
5413 ),
5414 Ok(
5415 zeroclaw_providers::traits::StreamEvent::PreExecutedToolResult {
5416 name: "shell".into(),
5417 output: "b".into(),
5418 },
5419 ),
5420 Ok(zeroclaw_providers::traits::StreamEvent::Final),
5421 ])
5422 .boxed()
5423 }
5424 }
5425 impl ::zeroclaw_api::attribution::Attributable for PreExecutedToolModelProvider {
5426 fn role(&self) -> ::zeroclaw_api::attribution::Role {
5427 ::zeroclaw_api::attribution::Role::Provider(
5428 ::zeroclaw_api::attribution::ProviderKind::Model(
5429 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
5430 ),
5431 )
5432 }
5433 fn alias(&self) -> &str {
5434 "PreExecutedToolModelProvider"
5435 }
5436 }
5437
5438 #[tokio::test]
5439 async fn pre_executed_tool_results_keep_ids_when_calls_overlap() {
5440 let model_provider = Box::new(PreExecutedToolModelProvider);
5441
5442 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5443 backend: "none".into(),
5444 ..zeroclaw_config::schema::MemoryConfig::default()
5445 };
5446 let mem: Arc<dyn Memory> = Arc::from(
5447 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5448 .expect("memory creation should succeed with valid config"),
5449 );
5450
5451 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5452 let mut agent = Agent::builder()
5453 .model_provider(model_provider)
5454 .tools(vec![Box::new(MockTool)])
5455 .memory(mem)
5456 .observer(observer)
5457 .tool_dispatcher(Box::new(NativeToolDispatcher))
5458 .workspace_dir(std::path::PathBuf::from("/tmp"))
5459 .build()
5460 .expect("agent builder should succeed with valid config");
5461
5462 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
5463 let _ = agent
5464 .turn_streamed("use pre-executed tools", event_tx, None)
5465 .await
5466 .unwrap();
5467
5468 let mut call_ids = HashMap::new();
5469 let mut result_ids = HashMap::new();
5470 while let Ok(event) = event_rx.try_recv() {
5471 match event {
5472 TurnEvent::ToolCall { id, name, .. } => {
5473 call_ids.insert(name, id);
5474 }
5475 TurnEvent::ToolResult { id, name, .. } => {
5476 result_ids.insert(name, id);
5477 }
5478 _ => {}
5479 }
5480 }
5481
5482 assert_eq!(call_ids.len(), 2, "expected two pre-executed tool calls");
5483 assert_eq!(
5484 result_ids.len(),
5485 2,
5486 "expected two pre-executed tool results"
5487 );
5488 assert_eq!(call_ids.get("file_read"), result_ids.get("file_read"));
5489 assert_eq!(call_ids.get("shell"), result_ids.get("shell"));
5490 }
5491
5492 #[tokio::test]
5493 async fn turn_normalizes_user_image_markers_before_provider_call() {
5494 let seen_user_messages = Arc::new(Mutex::new(Vec::new()));
5495 let provider = Box::new(MultimodalCaptureProvider {
5496 seen_user_messages: seen_user_messages.clone(),
5497 streamed: false,
5498 });
5499
5500 let temp = tempfile::tempdir().expect("tempdir");
5501 let image_path = temp.path().join("agent-turn.png");
5502 std::fs::write(
5503 &image_path,
5504 [0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n'],
5505 )
5506 .expect("write fixture");
5507
5508 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5509 backend: "none".into(),
5510 ..zeroclaw_config::schema::MemoryConfig::default()
5511 };
5512 let mem: Arc<dyn Memory> = Arc::from(
5513 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5514 .expect("memory creation should succeed with valid config"),
5515 );
5516
5517 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5518 let mut agent = Agent::builder()
5519 .model_provider(provider)
5520 .tools(vec![Box::new(MockTool)])
5521 .memory(mem)
5522 .observer(observer)
5523 .tool_dispatcher(Box::new(NativeToolDispatcher))
5524 .workspace_dir(std::path::PathBuf::from("/tmp"))
5525 .multimodal_config(zeroclaw_config::schema::MultimodalConfig::default())
5526 .build()
5527 .expect("agent builder should succeed with valid config");
5528
5529 agent
5530 .turn(&format!(
5531 "inspect [IMAGE:{}]",
5532 image_path.display().to_string()
5533 ))
5534 .await
5535 .expect("turn should succeed");
5536
5537 let seen = seen_user_messages.lock();
5538 let last = seen.last().expect("provider should receive a user message");
5539 assert!(
5540 last.contains("data:image/png;base64,"),
5541 "expected normalized data URI in provider request, got: {last}"
5542 );
5543 }
5544
5545 #[tokio::test]
5546 async fn turn_streamed_normalizes_user_image_markers_before_provider_call() {
5547 let seen_user_messages = Arc::new(Mutex::new(Vec::new()));
5548 let provider = Box::new(MultimodalCaptureProvider {
5549 seen_user_messages: seen_user_messages.clone(),
5550 streamed: true,
5551 });
5552
5553 let temp = tempfile::tempdir().expect("tempdir");
5554 let image_path = temp.path().join("agent-stream.png");
5555 std::fs::write(
5556 &image_path,
5557 [0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n'],
5558 )
5559 .expect("write fixture");
5560
5561 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5562 backend: "none".into(),
5563 ..zeroclaw_config::schema::MemoryConfig::default()
5564 };
5565 let mem: Arc<dyn Memory> = Arc::from(
5566 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5567 .expect("memory creation should succeed with valid config"),
5568 );
5569
5570 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5571 let mut agent = Agent::builder()
5572 .model_provider(provider)
5573 .tools(vec![Box::new(MockTool)])
5574 .memory(mem)
5575 .observer(observer)
5576 .tool_dispatcher(Box::new(NativeToolDispatcher))
5577 .workspace_dir(std::path::PathBuf::from("/tmp"))
5578 .multimodal_config(zeroclaw_config::schema::MultimodalConfig::default())
5579 .build()
5580 .expect("agent builder should succeed with valid config");
5581
5582 let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(8);
5583 agent
5584 .turn_streamed(
5585 &format!("inspect [IMAGE:{}]", image_path.display().to_string()),
5586 event_tx,
5587 None,
5588 )
5589 .await
5590 .expect("turn_streamed should succeed");
5591
5592 let seen = seen_user_messages.lock();
5593 let last = seen.last().expect("provider should receive a user message");
5594 assert!(
5595 last.contains("data:image/png;base64,"),
5596 "expected normalized data URI in provider request, got: {last}"
5597 );
5598 }
5599
5600 #[test]
5616 fn trim_history_does_not_leave_orphan_tool_results() {
5617 use zeroclaw_providers::{ToolCall, ToolResultMessage};
5618
5619 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5620 backend: "none".into(),
5621 ..zeroclaw_config::schema::MemoryConfig::default()
5622 };
5623 let mem: Arc<dyn Memory> = Arc::from(
5624 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5625 .expect("memory creation should succeed with valid config"),
5626 );
5627
5628 let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
5632 resolved: zeroclaw_config::schema::ResolvedRuntime {
5633 max_history_messages: 4,
5634 ..Default::default()
5635 },
5636 ..zeroclaw_config::schema::AliasedAgentConfig::default()
5637 };
5638
5639 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5640 let mut agent = Agent::builder()
5641 .model_provider(Box::new(MockModelProvider {
5642 responses: Mutex::new(vec![]),
5643 }))
5644 .tools(vec![Box::new(MockTool)])
5645 .memory(mem)
5646 .observer(observer)
5647 .tool_dispatcher(Box::new(NativeToolDispatcher))
5648 .workspace_dir(std::path::PathBuf::from("/tmp"))
5649 .config(agent_config)
5650 .build()
5651 .expect("agent builder should succeed with valid config");
5652
5653 for i in 1..=3 {
5655 agent.history.push(ConversationMessage::AssistantToolCalls {
5656 text: Some(format!("Calling tool {i}")),
5657 tool_calls: vec![ToolCall {
5658 id: format!("tc{i}"),
5659 name: format!("tool{i}"),
5660 arguments: "{}".into(),
5661 extra_content: None,
5662 }],
5663 reasoning_content: None,
5664 });
5665 if i < 3 {
5669 agent
5670 .history
5671 .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5672 tool_call_id: format!("tc{i}"),
5673 content: format!("result{i}"),
5674 }]));
5675 }
5676 }
5677
5678 assert_eq!(agent.history.len(), 5);
5679 agent.trim_history();
5680
5681 if let Some(first) = agent.history.first() {
5685 assert!(
5686 !matches!(first, ConversationMessage::ToolResults(_)),
5687 "trim_history left an orphan ToolResults at the head of the \
5688 history; this would cause Anthropic to reject the next \
5689 request with 'unexpected tool_use_id found in tool_result \
5690 blocks'"
5691 );
5692 }
5693
5694 for window in agent.history.windows(2) {
5697 if matches!(&window[1], ConversationMessage::ToolResults(_)) {
5698 assert!(
5699 matches!(&window[0], ConversationMessage::AssistantToolCalls { .. }),
5700 "ToolResults entry is not preceded by an AssistantToolCalls \
5701 entry — pair was split during trim"
5702 );
5703 }
5704 }
5705 }
5706
5707 #[test]
5708 fn trim_history_does_not_leave_orphan_assistant_tool_calls() {
5709 use zeroclaw_providers::{ToolCall, ToolResultMessage};
5710
5711 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5712 backend: "none".into(),
5713 ..zeroclaw_config::schema::MemoryConfig::default()
5714 };
5715 let mem: Arc<dyn Memory> = Arc::from(
5716 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5717 .expect("memory creation should succeed with valid config"),
5718 );
5719
5720 let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
5737 resolved: zeroclaw_config::schema::ResolvedRuntime {
5738 max_history_messages: 3,
5739 ..Default::default()
5740 },
5741 ..zeroclaw_config::schema::AliasedAgentConfig::default()
5742 };
5743
5744 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5745 let mut agent = Agent::builder()
5746 .model_provider(Box::new(MockModelProvider {
5747 responses: Mutex::new(vec![]),
5748 }))
5749 .tools(vec![Box::new(MockTool)])
5750 .memory(mem)
5751 .observer(observer)
5752 .tool_dispatcher(Box::new(NativeToolDispatcher))
5753 .workspace_dir(std::path::PathBuf::from("/tmp"))
5754 .config(agent_config)
5755 .build()
5756 .expect("agent builder should succeed with valid config");
5757
5758 agent.history.push(ConversationMessage::Chat(ChatMessage {
5760 role: "user".into(),
5761 content: "hello".into(),
5762 }));
5763 agent.history.push(ConversationMessage::AssistantToolCalls {
5765 text: Some("Calling tool 1".into()),
5766 tool_calls: vec![ToolCall {
5767 id: "tc1".into(),
5768 name: "tool1".into(),
5769 arguments: "{}".into(),
5770 extra_content: None,
5771 }],
5772 reasoning_content: None,
5773 });
5774 agent
5775 .history
5776 .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5777 tool_call_id: "tc1".into(),
5778 content: "result1".into(),
5779 }]));
5780 agent.history.push(ConversationMessage::AssistantToolCalls {
5782 text: Some("Calling tool 2".into()),
5783 tool_calls: vec![ToolCall {
5784 id: "tc2".into(),
5785 name: "tool2".into(),
5786 arguments: "{}".into(),
5787 extra_content: None,
5788 }],
5789 reasoning_content: None,
5790 });
5791 agent
5792 .history
5793 .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5794 tool_call_id: "tc2".into(),
5795 content: "result2".into(),
5796 }]));
5797 agent.history.push(ConversationMessage::AssistantToolCalls {
5799 text: Some("Calling tool 3".into()),
5800 tool_calls: vec![ToolCall {
5801 id: "tc3".into(),
5802 name: "tool3".into(),
5803 arguments: "{}".into(),
5804 extra_content: None,
5805 }],
5806 reasoning_content: None,
5807 });
5808 agent
5809 .history
5810 .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5811 tool_call_id: "tc3".into(),
5812 content: "result3".into(),
5813 }]));
5814
5815 assert_eq!(agent.history.len(), 7);
5816 agent.trim_history();
5817
5818 if let Some(first) = agent.history.first() {
5820 assert!(
5821 !matches!(first, ConversationMessage::AssistantToolCalls { .. }),
5822 "trim_history left an orphan AssistantToolCalls at the head of \
5823 the history; the model would see tool calls with no results"
5824 );
5825 }
5826
5827 for window in agent.history.windows(2) {
5830 if matches!(&window[1], ConversationMessage::ToolResults(_)) {
5831 assert!(
5832 matches!(&window[0], ConversationMessage::AssistantToolCalls { .. }),
5833 "ToolResults entry is not preceded by an AssistantToolCalls \
5834 entry — pair was split during trim"
5835 );
5836 }
5837 }
5838
5839 for window in agent.history.windows(2) {
5842 if matches!(&window[0], ConversationMessage::AssistantToolCalls { .. }) {
5843 assert!(
5844 matches!(&window[1], ConversationMessage::ToolResults(_)),
5845 "AssistantToolCalls entry is not followed by a ToolResults \
5846 entry — orphan tool call would confuse the model"
5847 );
5848 }
5849 }
5850 }
5851
5852 #[test]
5876 fn trim_history_does_not_empty_all_messages_on_full_cascade() {
5877 use zeroclaw_providers::{ToolCall, ToolResultMessage};
5878
5879 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5880 backend: "none".into(),
5881 ..zeroclaw_config::schema::MemoryConfig::default()
5882 };
5883 let mem: Arc<dyn Memory> = Arc::from(
5884 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5885 .expect("memory creation should succeed with valid config"),
5886 );
5887
5888 let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
5889 resolved: zeroclaw_config::schema::ResolvedRuntime {
5890 max_history_messages: 4,
5891 ..Default::default()
5892 },
5893 ..zeroclaw_config::schema::AliasedAgentConfig::default()
5894 };
5895
5896 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5897 let mut agent = Agent::builder()
5898 .model_provider(Box::new(MockModelProvider {
5899 responses: Mutex::new(vec![]),
5900 }))
5901 .tools(vec![Box::new(MockTool)])
5902 .memory(mem)
5903 .observer(observer)
5904 .tool_dispatcher(Box::new(NativeToolDispatcher))
5905 .workspace_dir(std::path::PathBuf::from("/tmp"))
5906 .config(agent_config)
5907 .build()
5908 .expect("agent builder should succeed with valid config");
5909
5910 agent.history.push(ConversationMessage::Chat(ChatMessage {
5912 role: "user".into(),
5913 content: "kick off a long tool loop".into(),
5914 }));
5915 agent.history.push(ConversationMessage::AssistantToolCalls {
5917 text: Some("Calling tool 1".into()),
5918 tool_calls: vec![ToolCall {
5919 id: "tc1".into(),
5920 name: "tool1".into(),
5921 arguments: "{}".into(),
5922 extra_content: None,
5923 }],
5924 reasoning_content: None,
5925 });
5926 agent
5927 .history
5928 .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5929 tool_call_id: "tc1".into(),
5930 content: "result1".into(),
5931 }]));
5932 agent.history.push(ConversationMessage::AssistantToolCalls {
5934 text: Some("Calling tool 2".into()),
5935 tool_calls: vec![ToolCall {
5936 id: "tc2".into(),
5937 name: "tool2".into(),
5938 arguments: "{}".into(),
5939 extra_content: None,
5940 }],
5941 reasoning_content: None,
5942 });
5943 agent
5944 .history
5945 .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5946 tool_call_id: "tc2".into(),
5947 content: "result2".into(),
5948 }]));
5949
5950 assert_eq!(agent.history.len(), 5);
5951 let before = agent.history.clone();
5952
5953 agent.trim_history();
5954
5955 assert!(
5959 !agent.history.is_empty(),
5960 "trim_history drained every non-system message; the next \
5961 provider call would fail with 'messages: at least one message \
5962 is required'"
5963 );
5964
5965 assert_eq!(
5971 agent.history.len(),
5972 before.len(),
5973 "trim_history dropped messages despite the orphan cascade \
5974 reaching other_messages.len(); the guard's contract is to \
5975 preserve the conversation untouched in this case"
5976 );
5977
5978 assert!(
5982 agent.history.len() > agent.config.resolved.max_history_messages,
5983 "expected history to remain over max_history_messages after the \
5984 guard fires (that is the documented trade-off); got len={} max={}",
5985 agent.history.len(),
5986 agent.config.resolved.max_history_messages,
5987 );
5988 }
5989
5990 #[test]
6001 fn trim_history_full_cascade_with_system_message_preserves_full_history() {
6002 use zeroclaw_providers::{ToolCall, ToolResultMessage};
6003
6004 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6005 backend: "none".into(),
6006 ..zeroclaw_config::schema::MemoryConfig::default()
6007 };
6008 let mem: Arc<dyn Memory> = Arc::from(
6009 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6010 .expect("memory creation should succeed with valid config"),
6011 );
6012
6013 let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
6016 resolved: zeroclaw_config::schema::ResolvedRuntime {
6017 max_history_messages: 4,
6018 ..Default::default()
6019 },
6020 ..zeroclaw_config::schema::AliasedAgentConfig::default()
6021 };
6022
6023 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6024 let mut agent = Agent::builder()
6025 .model_provider(Box::new(MockModelProvider {
6026 responses: Mutex::new(vec![]),
6027 }))
6028 .tools(vec![Box::new(MockTool)])
6029 .memory(mem)
6030 .observer(observer)
6031 .tool_dispatcher(Box::new(NativeToolDispatcher))
6032 .workspace_dir(std::path::PathBuf::from("/tmp"))
6033 .config(agent_config)
6034 .build()
6035 .expect("agent builder should succeed with valid config");
6036
6037 agent.history.push(ConversationMessage::Chat(ChatMessage {
6039 role: "system".into(),
6040 content: "you are a helpful agent".into(),
6041 }));
6042 agent.history.push(ConversationMessage::Chat(ChatMessage {
6044 role: "user".into(),
6045 content: "kick off a long tool loop".into(),
6046 }));
6047 agent.history.push(ConversationMessage::AssistantToolCalls {
6049 text: Some("Calling tool 1".into()),
6050 tool_calls: vec![ToolCall {
6051 id: "tc1".into(),
6052 name: "tool1".into(),
6053 arguments: "{}".into(),
6054 extra_content: None,
6055 }],
6056 reasoning_content: None,
6057 });
6058 agent
6059 .history
6060 .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
6061 tool_call_id: "tc1".into(),
6062 content: "result1".into(),
6063 }]));
6064 agent.history.push(ConversationMessage::AssistantToolCalls {
6066 text: Some("Calling tool 2".into()),
6067 tool_calls: vec![ToolCall {
6068 id: "tc2".into(),
6069 name: "tool2".into(),
6070 arguments: "{}".into(),
6071 extra_content: None,
6072 }],
6073 reasoning_content: None,
6074 });
6075 agent
6076 .history
6077 .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
6078 tool_call_id: "tc2".into(),
6079 content: "result2".into(),
6080 }]));
6081
6082 assert_eq!(agent.history.len(), 6);
6083 let before_len = agent.history.len();
6084
6085 agent.trim_history();
6086
6087 match agent.history.first() {
6090 Some(ConversationMessage::Chat(chat)) => assert_eq!(
6091 chat.role, "system",
6092 "expected system message at head after restore; got role={:?}",
6093 chat.role
6094 ),
6095 other => panic!(
6096 "expected Chat(system) at head of restored history, got {:?}",
6097 other
6098 ),
6099 }
6100
6101 assert_eq!(
6105 agent.history.len(),
6106 before_len,
6107 "trim_history dropped messages from the non-system half despite \
6108 the orphan cascade reaching other_messages.len(); guard must \
6109 preserve every entry when it fires"
6110 );
6111
6112 let non_system_remaining = agent
6116 .history
6117 .iter()
6118 .filter(|m| !matches!(m, ConversationMessage::Chat(c) if c.role == "system"))
6119 .count();
6120 assert!(
6121 non_system_remaining > 0,
6122 "trim_history left only the system message; convert_messages \
6123 would produce messages: [] and the provider call would 400"
6124 );
6125 }
6126
6127 #[test]
6128 fn cancel_synthesizes_paired_tool_results_for_orphaned_calls() {
6129 use zeroclaw_providers::ToolCall;
6130
6131 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6132 backend: "none".into(),
6133 ..zeroclaw_config::schema::MemoryConfig::default()
6134 };
6135 let mem: Arc<dyn Memory> = Arc::from(
6136 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6137 .expect("memory creation should succeed with valid config"),
6138 );
6139 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6140 let mut agent = Agent::builder()
6141 .model_provider(Box::new(MockModelProvider {
6142 responses: Mutex::new(vec![]),
6143 }))
6144 .tools(vec![Box::new(MockTool)])
6145 .memory(mem)
6146 .observer(observer)
6147 .tool_dispatcher(Box::new(NativeToolDispatcher))
6148 .workspace_dir(std::path::PathBuf::from("/tmp"))
6149 .config(zeroclaw_config::schema::AliasedAgentConfig::default())
6150 .build()
6151 .expect("agent builder should succeed with valid config");
6152
6153 let tool_calls = vec![
6158 ToolCall {
6159 id: "tc-cancel-1".into(),
6160 name: "shell".into(),
6161 arguments: "{}".into(),
6162 extra_content: None,
6163 },
6164 ToolCall {
6165 id: "tc-cancel-2".into(),
6166 name: "shell".into(),
6167 arguments: "{}".into(),
6168 extra_content: None,
6169 },
6170 ];
6171 agent.history.push(ConversationMessage::AssistantToolCalls {
6172 text: None,
6173 tool_calls: tool_calls.clone(),
6174 reasoning_content: None,
6175 });
6176
6177 let mut new_msgs = Vec::new();
6178 agent.synthesize_cancelled_tool_results(vec![], &tool_calls, &mut new_msgs);
6179
6180 let last = agent.history.last().expect("history not empty");
6183 match last {
6184 ConversationMessage::ToolResults(results) => {
6185 assert_eq!(results.len(), 2);
6186 assert_eq!(results[0].tool_call_id, "tc-cancel-1");
6187 assert_eq!(results[1].tool_call_id, "tc-cancel-2");
6188 }
6189 other => panic!("expected ToolResults, got {other:?}"),
6190 }
6191 assert!(matches!(
6192 new_msgs.last(),
6193 Some(ConversationMessage::ToolResults(r)) if r.len() == 2
6194 ));
6195
6196 for window in agent.history.windows(2) {
6199 if matches!(&window[0], ConversationMessage::AssistantToolCalls { .. }) {
6200 assert!(
6201 matches!(&window[1], ConversationMessage::ToolResults(_)),
6202 "orphaned AssistantToolCalls after cancel synthesis"
6203 );
6204 }
6205 }
6206 }
6207
6208 #[tokio::test]
6216 async fn narration_with_tool_calls_produces_no_consecutive_assistant_entries() {
6217 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6218 backend: "none".into(),
6219 ..zeroclaw_config::schema::MemoryConfig::default()
6220 };
6221 let mem: Arc<dyn Memory> = Arc::from(
6222 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6223 .expect("memory creation should succeed with valid config"),
6224 );
6225
6226 let model_provider = Box::new(MockModelProvider {
6227 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
6228 text: Some("I will echo the message.".into()),
6229 tool_calls: vec![zeroclaw_providers::ToolCall {
6230 id: "tc1".into(),
6231 name: "echo".into(),
6232 arguments: "{}".into(),
6233 extra_content: None,
6234 }],
6235 usage: None,
6236 reasoning_content: None,
6237 }]),
6238 });
6239
6240 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6241 let mut agent = Agent::builder()
6242 .model_provider(model_provider)
6243 .tools(vec![Box::new(MockTool)])
6244 .memory(mem)
6245 .observer(observer)
6246 .tool_dispatcher(Box::new(NativeToolDispatcher))
6247 .workspace_dir(std::path::PathBuf::from("/tmp"))
6248 .build()
6249 .expect("agent builder should succeed with valid config");
6250
6251 agent.turn("hi").await.unwrap();
6252
6253 let history = agent.history();
6254 for window in history.windows(2) {
6255 let prev_is_assistant_chat = matches!(
6256 &window[0],
6257 ConversationMessage::Chat(m) if m.role == "assistant"
6258 );
6259 let next_is_tool_calls =
6260 matches!(&window[1], ConversationMessage::AssistantToolCalls { .. });
6261 assert!(
6262 !(prev_is_assistant_chat && next_is_tool_calls),
6263 "history contains Chat(assistant) immediately before AssistantToolCalls — \
6264 duplicate narration push was not removed"
6265 );
6266 }
6267 }
6268
6269 struct NarrationStreamModelProvider {
6273 call_count: Arc<Mutex<usize>>,
6274 }
6275
6276 #[async_trait]
6277 impl ModelProvider for NarrationStreamModelProvider {
6278 async fn chat_with_system(
6279 &self,
6280 _system_prompt: Option<&str>,
6281 _message: &str,
6282 _model: &str,
6283 _temperature: Option<f64>,
6284 ) -> Result<String> {
6285 Ok("ok".into())
6286 }
6287
6288 async fn chat(
6289 &self,
6290 _request: ChatRequest<'_>,
6291 _model: &str,
6292 _temperature: Option<f64>,
6293 ) -> Result<zeroclaw_providers::ChatResponse> {
6294 Ok(zeroclaw_providers::ChatResponse {
6295 text: Some("done".into()),
6296 tool_calls: vec![],
6297 usage: None,
6298 reasoning_content: None,
6299 })
6300 }
6301
6302 fn supports_native_tools(&self) -> bool {
6303 true
6304 }
6305
6306 fn stream_chat(
6307 &self,
6308 _request: ChatRequest<'_>,
6309 _model: &str,
6310 _temperature: Option<f64>,
6311 _options: zeroclaw_providers::traits::StreamOptions,
6312 ) -> futures_util::stream::BoxStream<
6313 'static,
6314 zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
6315 > {
6316 use futures_util::stream::{self, StreamExt};
6317 let mut count = self.call_count.lock();
6318 *count += 1;
6319 if *count == 1 {
6320 stream::iter(vec![
6321 Ok(zeroclaw_providers::traits::StreamEvent::TextDelta(
6322 zeroclaw_providers::traits::StreamChunk {
6323 delta: "I will echo the message.".into(),
6324 is_final: false,
6325 reasoning: None,
6326 token_count: 0,
6327 },
6328 )),
6329 Ok(zeroclaw_providers::traits::StreamEvent::ToolCall(
6330 zeroclaw_providers::ToolCall {
6331 id: "tc1".into(),
6332 name: "echo".into(),
6333 arguments: "{}".into(),
6334 extra_content: None,
6335 },
6336 )),
6337 Ok(zeroclaw_providers::traits::StreamEvent::Final),
6338 ])
6339 .boxed()
6340 } else {
6341 stream::iter(vec![
6342 Ok(zeroclaw_providers::traits::StreamEvent::TextDelta(
6343 zeroclaw_providers::traits::StreamChunk {
6344 delta: "done".into(),
6345 is_final: false,
6346 reasoning: None,
6347 token_count: 0,
6348 },
6349 )),
6350 Ok(zeroclaw_providers::traits::StreamEvent::Final),
6351 ])
6352 .boxed()
6353 }
6354 }
6355 }
6356 impl ::zeroclaw_api::attribution::Attributable for NarrationStreamModelProvider {
6357 fn role(&self) -> ::zeroclaw_api::attribution::Role {
6358 ::zeroclaw_api::attribution::Role::Provider(
6359 ::zeroclaw_api::attribution::ProviderKind::Model(
6360 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
6361 ),
6362 )
6363 }
6364 fn alias(&self) -> &str {
6365 "NarrationStreamModelProvider"
6366 }
6367 }
6368
6369 #[tokio::test]
6370 async fn streaming_narration_with_tool_calls_produces_no_consecutive_assistant_entries() {
6371 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6372 backend: "none".into(),
6373 ..zeroclaw_config::schema::MemoryConfig::default()
6374 };
6375 let mem: Arc<dyn Memory> = Arc::from(
6376 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6377 .expect("memory creation should succeed with valid config"),
6378 );
6379
6380 let model_provider = Box::new(NarrationStreamModelProvider {
6381 call_count: Arc::new(Mutex::new(0)),
6382 });
6383
6384 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6385 let mut agent = Agent::builder()
6386 .model_provider(model_provider)
6387 .tools(vec![Box::new(MockTool)])
6388 .memory(mem)
6389 .observer(observer)
6390 .tool_dispatcher(Box::new(NativeToolDispatcher))
6391 .workspace_dir(std::path::PathBuf::from("/tmp"))
6392 .build()
6393 .expect("agent builder should succeed with valid config");
6394
6395 let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6396 agent.turn_streamed("hi", event_tx, None).await.unwrap();
6397
6398 let history = agent.history();
6399 for window in history.windows(2) {
6400 let prev_is_assistant_chat = matches!(
6401 &window[0],
6402 ConversationMessage::Chat(m) if m.role == "assistant"
6403 );
6404 let next_is_tool_calls =
6405 matches!(&window[1], ConversationMessage::AssistantToolCalls { .. });
6406 assert!(
6407 !(prev_is_assistant_chat && next_is_tool_calls),
6408 "streaming path: history contains Chat(assistant) immediately before \
6409 AssistantToolCalls — duplicate narration push was not removed"
6410 );
6411 }
6412 }
6413
6414 #[tokio::test]
6415 async fn response_cache_key_uses_full_provider_visible_transcript() {
6416 let tmp = tempfile::tempdir().expect("temp response cache dir");
6417 let cache = Arc::new(
6418 zeroclaw_memory::response_cache::ResponseCache::new(tmp.path(), 60, 100)
6419 .expect("response cache should initialize"),
6420 );
6421
6422 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6423 backend: "none".into(),
6424 ..zeroclaw_config::schema::MemoryConfig::default()
6425 };
6426 let mem_a: Arc<dyn Memory> = Arc::from(
6427 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6428 .expect("memory creation should succeed with valid config"),
6429 );
6430 let mem_b: Arc<dyn Memory> = Arc::from(
6431 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6432 .expect("memory creation should succeed with valid config"),
6433 );
6434
6435 let seen_a = Arc::new(Mutex::new(Vec::new()));
6436 let seen_b = Arc::new(Mutex::new(Vec::new()));
6437 let provider_a = Box::new(TranscriptCaptureModelProvider {
6438 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
6439 text: Some("from prior transcript".into()),
6440 tool_calls: vec![],
6441 usage: None,
6442 reasoning_content: None,
6443 }]),
6444 seen_messages: seen_a.clone(),
6445 });
6446 let provider_b = Box::new(TranscriptCaptureModelProvider {
6447 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
6448 text: Some("from fresh transcript".into()),
6449 tool_calls: vec![],
6450 usage: None,
6451 reasoning_content: None,
6452 }]),
6453 seen_messages: seen_b.clone(),
6454 });
6455
6456 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6457 let mut agent_a = Agent::builder()
6458 .model_provider(provider_a)
6459 .tools(vec![Box::new(MockTool)])
6460 .memory(mem_a)
6461 .observer(observer.clone())
6462 .response_cache(Some(cache.clone()))
6463 .tool_dispatcher(Box::new(NativeToolDispatcher))
6464 .workspace_dir(std::path::PathBuf::from("/tmp"))
6465 .model_name("test-model".into())
6466 .temperature(Some(0.0))
6467 .build()
6468 .expect("agent builder should succeed with valid config");
6469 agent_a.seed_history(&[
6470 ChatMessage::user("earlier turn"),
6471 ChatMessage::assistant("earlier answer"),
6472 ]);
6473
6474 let mut agent_b = Agent::builder()
6475 .model_provider(provider_b)
6476 .tools(vec![Box::new(MockTool)])
6477 .memory(mem_b)
6478 .observer(observer)
6479 .response_cache(Some(cache))
6480 .tool_dispatcher(Box::new(NativeToolDispatcher))
6481 .workspace_dir(std::path::PathBuf::from("/tmp"))
6482 .model_name("test-model".into())
6483 .temperature(Some(0.0))
6484 .build()
6485 .expect("agent builder should succeed with valid config");
6486
6487 assert_eq!(
6488 agent_a.turn("same final prompt").await.unwrap(),
6489 "from prior transcript"
6490 );
6491 assert_eq!(
6492 agent_b.turn("same final prompt").await.unwrap(),
6493 "from fresh transcript"
6494 );
6495 assert_eq!(seen_a.lock().len(), 1);
6496 assert_eq!(
6497 seen_b.lock().len(),
6498 1,
6499 "fresh transcript must not reuse a cache entry written for a different prior transcript"
6500 );
6501 }
6502
6503 #[tokio::test]
6504 async fn turn_streamed_with_steering_commits_streamed_output_before_continuing() {
6505 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6506 backend: "none".into(),
6507 ..zeroclaw_config::schema::MemoryConfig::default()
6508 };
6509 let mem: Arc<dyn Memory> = Arc::from(
6510 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6511 .expect("memory creation should succeed with valid config"),
6512 );
6513
6514 let seen_messages = Arc::new(Mutex::new(Vec::new()));
6515 let model_provider = Box::new(StreamingSteeringModelProvider {
6516 seen_messages: seen_messages.clone(),
6517 call_count: AtomicUsize::new(0),
6518 fail_on_call: None,
6519 fail_chat_on_call: None,
6520 fail_after_delta_on_call: None,
6521 delay_chat_on_call: None,
6522 });
6523 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6524 let mut agent = Agent::builder()
6525 .model_provider(model_provider)
6526 .tools(vec![Box::new(MockTool)])
6527 .memory(mem)
6528 .observer(observer)
6529 .tool_dispatcher(Box::new(NativeToolDispatcher))
6530 .workspace_dir(std::path::PathBuf::from("/tmp"))
6531 .build()
6532 .expect("agent builder should succeed with valid config");
6533
6534 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6535 let (steering_tx, mut steering_rx) = tokio::sync::mpsc::channel::<String>(4);
6536 let handle = zeroclaw_spawn::spawn!(async move {
6537 agent
6538 .turn_streamed_with_steering_state("first", event_tx, None, Some(&mut steering_rx))
6539 .await
6540 });
6541
6542 loop {
6543 match event_rx.recv().await.expect("turn event should arrive") {
6544 TurnEvent::Chunk { delta } if delta == "draft" => {
6545 steering_tx
6546 .send("second".into())
6547 .await
6548 .expect("steering message should enqueue");
6549 break;
6550 }
6551 _ => {}
6552 }
6553 }
6554
6555 let outcome = handle
6556 .await
6557 .expect("turn task should finish")
6558 .expect("steered turn should succeed");
6559 assert_eq!(outcome.response, "draftfinal");
6560
6561 let new_chat_messages: Vec<_> = outcome
6562 .new_messages
6563 .iter()
6564 .filter_map(|msg| match msg {
6565 ConversationMessage::Chat(message) => {
6566 Some((message.role.as_str(), message.content.as_str()))
6567 }
6568 _ => None,
6569 })
6570 .collect();
6571 assert!(
6572 new_chat_messages
6573 .iter()
6574 .any(|(role, content)| { *role == "assistant" && *content == "draft" }),
6575 "already streamed output must be committed before the steering continuation"
6576 );
6577 assert!(
6578 new_chat_messages
6579 .iter()
6580 .any(|(role, content)| { *role == "user" && content.contains("second") }),
6581 "accepted steering must be retained as its own user turn"
6582 );
6583
6584 let seen = seen_messages.lock();
6585 assert_eq!(seen.len(), 2);
6586 let second_call = &seen[1];
6587 assert!(
6588 second_call
6589 .iter()
6590 .any(|msg| msg.role == "assistant" && msg.content == "draft"),
6591 "second provider call must see the committed streamed assistant text"
6592 );
6593 assert!(
6594 second_call
6595 .iter()
6596 .filter(|msg| msg.role == "user")
6597 .any(|msg| msg.content.contains("second")),
6598 "second provider call must include the accepted steering user message"
6599 );
6600 }
6601
6602 #[tokio::test]
6603 async fn turn_streamed_with_steering_error_returns_committed_partial_output() {
6604 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6605 backend: "none".into(),
6606 ..zeroclaw_config::schema::MemoryConfig::default()
6607 };
6608 let mem: Arc<dyn Memory> = Arc::from(
6609 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6610 .expect("memory creation should succeed with valid config"),
6611 );
6612
6613 let model_provider = Box::new(StreamingSteeringModelProvider {
6614 seen_messages: Arc::new(Mutex::new(Vec::new())),
6615 call_count: AtomicUsize::new(0),
6616 fail_on_call: Some(2),
6617 fail_chat_on_call: Some(3),
6618 fail_after_delta_on_call: None,
6619 delay_chat_on_call: None,
6620 });
6621 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6622 let mut agent = Agent::builder()
6623 .model_provider(model_provider)
6624 .tools(vec![Box::new(MockTool)])
6625 .memory(mem)
6626 .observer(observer)
6627 .tool_dispatcher(Box::new(NativeToolDispatcher))
6628 .workspace_dir(std::path::PathBuf::from("/tmp"))
6629 .build()
6630 .expect("agent builder should succeed with valid config");
6631
6632 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6633 let (steering_tx, mut steering_rx) = tokio::sync::mpsc::channel::<String>(4);
6634 let handle = zeroclaw_spawn::spawn!(async move {
6635 agent
6636 .turn_streamed_with_steering_state("first", event_tx, None, Some(&mut steering_rx))
6637 .await
6638 });
6639
6640 loop {
6641 match event_rx.recv().await.expect("turn event should arrive") {
6642 TurnEvent::Chunk { delta } if delta == "draft" => {
6643 steering_tx
6644 .send("second".into())
6645 .await
6646 .expect("steering message should enqueue");
6647 break;
6648 }
6649 _ => {}
6650 }
6651 }
6652
6653 let err = handle
6654 .await
6655 .expect("turn task should finish")
6656 .expect_err("second provider call should fail");
6657 assert_eq!(err.committed_response, "draft");
6658 assert!(
6659 err.new_messages.iter().any(|msg| {
6660 matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content == "draft")
6661 }),
6662 "committed partial assistant output should be returned for persistence after continuation failure"
6663 );
6664 assert!(
6665 err.new_messages.iter().any(|msg| {
6666 matches!(msg, ConversationMessage::Chat(message) if message.role == "user" && message.content.contains("second"))
6667 }),
6668 "accepted steering user message should still be returned after continuation failure"
6669 );
6670 }
6671
6672 #[tokio::test]
6673 async fn turn_streamed_error_before_visible_output_falls_back_to_chat() {
6674 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6675 backend: "none".into(),
6676 ..zeroclaw_config::schema::MemoryConfig::default()
6677 };
6678 let mem: Arc<dyn Memory> = Arc::from(
6679 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6680 .expect("memory creation should succeed with valid config"),
6681 );
6682
6683 let seen_messages = Arc::new(Mutex::new(Vec::new()));
6684 let model_provider = Box::new(StreamingSteeringModelProvider {
6685 seen_messages: seen_messages.clone(),
6686 call_count: AtomicUsize::new(0),
6687 fail_on_call: Some(1),
6688 fail_chat_on_call: None,
6689 fail_after_delta_on_call: None,
6690 delay_chat_on_call: None,
6691 });
6692 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6693 let mut agent = Agent::builder()
6694 .model_provider(model_provider)
6695 .tools(vec![Box::new(MockTool)])
6696 .memory(mem)
6697 .observer(observer)
6698 .tool_dispatcher(Box::new(NativeToolDispatcher))
6699 .workspace_dir(std::path::PathBuf::from("/tmp"))
6700 .build()
6701 .expect("agent builder should succeed with valid config");
6702
6703 let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6704 let handle = zeroclaw_spawn::spawn!(async move {
6705 agent
6706 .turn_streamed_with_steering_state("first", event_tx, None, None)
6707 .await
6708 });
6709
6710 let outcome = handle
6711 .await
6712 .expect("turn task should finish")
6713 .expect("pre-output stream failure should fall back to non-streaming chat");
6714 assert_eq!(outcome.response, "final");
6715 assert!(
6716 outcome.new_messages.iter().any(|msg| {
6717 matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content == "final")
6718 }),
6719 "new messages should carry the fallback assistant answer"
6720 );
6721 assert!(
6722 !outcome.new_messages.iter().any(|msg| {
6723 matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content.contains("[stream interrupted]"))
6724 }),
6725 "successful fallback should not persist interrupted stream text"
6726 );
6727
6728 let seen = seen_messages.lock();
6729 assert_eq!(seen.len(), 2);
6730 assert!(
6731 !seen[1]
6732 .iter()
6733 .any(|msg| { msg.role == "assistant" && msg.content.contains("draft") }),
6734 "fallback chat must not receive the abandoned stream attempt as prior assistant text"
6735 );
6736 }
6737
6738 #[tokio::test]
6739 async fn turn_streamed_error_after_delta_preserves_visible_partial() {
6740 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6741 backend: "none".into(),
6742 ..zeroclaw_config::schema::MemoryConfig::default()
6743 };
6744 let mem: Arc<dyn Memory> = Arc::from(
6745 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6746 .expect("memory creation should succeed with valid config"),
6747 );
6748
6749 let model_provider = Box::new(StreamingSteeringModelProvider {
6750 seen_messages: Arc::new(Mutex::new(Vec::new())),
6751 call_count: AtomicUsize::new(0),
6752 fail_on_call: None,
6753 fail_chat_on_call: None,
6754 fail_after_delta_on_call: Some(1),
6755 delay_chat_on_call: None,
6756 });
6757 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6758 let mut agent = Agent::builder()
6759 .model_provider(model_provider)
6760 .tools(vec![Box::new(MockTool)])
6761 .memory(mem)
6762 .observer(observer)
6763 .tool_dispatcher(Box::new(NativeToolDispatcher))
6764 .workspace_dir(std::path::PathBuf::from("/tmp"))
6765 .build()
6766 .expect("agent builder should succeed with valid config");
6767
6768 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6769 let handle = zeroclaw_spawn::spawn!(async move {
6770 agent
6771 .turn_streamed_with_steering_state("first", event_tx, None, None)
6772 .await
6773 });
6774
6775 assert!(
6776 matches!(
6777 event_rx.recv().await,
6778 Some(TurnEvent::Chunk { delta }) if delta == "draft"
6779 ),
6780 "the client should see the streamed text before the provider error"
6781 );
6782
6783 let err = handle
6784 .await
6785 .expect("turn task should finish")
6786 .expect_err("post-output stream failure should return an error with partial output");
6787 assert!(
6788 err.error
6789 .to_string()
6790 .contains("synthetic provider failure after delta"),
6791 "unexpected error: {}",
6792 err.error
6793 );
6794 assert!(
6795 err.committed_response.contains("[stream interrupted]"),
6796 "persisted partial text should mark that the visible stream was interrupted"
6797 );
6798 assert!(
6799 err.new_messages.iter().any(|msg| {
6800 matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content.contains("draft"))
6801 }),
6802 "new messages should carry the visible assistant partial for gateway persistence"
6803 );
6804 }
6805
6806 #[tokio::test]
6807 async fn turn_streamed_error_before_visible_output_fallback_can_be_cancelled() {
6808 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6809 backend: "none".into(),
6810 ..zeroclaw_config::schema::MemoryConfig::default()
6811 };
6812 let mem: Arc<dyn Memory> = Arc::from(
6813 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6814 .expect("memory creation should succeed with valid config"),
6815 );
6816
6817 let model_provider = Box::new(StreamingSteeringModelProvider {
6818 seen_messages: Arc::new(Mutex::new(Vec::new())),
6819 call_count: AtomicUsize::new(0),
6820 fail_on_call: Some(1),
6821 fail_chat_on_call: None,
6822 fail_after_delta_on_call: None,
6823 delay_chat_on_call: Some(2),
6824 });
6825 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6826 let mut agent = Agent::builder()
6827 .model_provider(model_provider)
6828 .tools(vec![Box::new(MockTool)])
6829 .memory(mem)
6830 .observer(observer)
6831 .tool_dispatcher(Box::new(NativeToolDispatcher))
6832 .workspace_dir(std::path::PathBuf::from("/tmp"))
6833 .build()
6834 .expect("agent builder should succeed with valid config");
6835
6836 let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6837 let cancel_token = tokio_util::sync::CancellationToken::new();
6838 let cancel_for_task = cancel_token.clone();
6839 let handle = zeroclaw_spawn::spawn!(async move {
6840 agent
6841 .turn_streamed_with_steering_state("first", event_tx, Some(cancel_for_task), None)
6842 .await
6843 });
6844
6845 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
6846 cancel_token.cancel();
6847
6848 let err = handle
6849 .await
6850 .expect("turn task should finish")
6851 .expect_err("cancelled fallback should return cancellation");
6852 assert!(
6853 crate::agent::loop_::is_tool_loop_cancelled(&err.error),
6854 "unexpected error: {}",
6855 err.error
6856 );
6857 assert_eq!(err.committed_response, "[interrupted by user]");
6858 assert!(
6859 err.new_messages.iter().any(|msg| {
6860 matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content == "[interrupted by user]")
6861 }),
6862 "pre-output fallback cancellation should include an interruption marker"
6863 );
6864 }
6865
6866 #[tokio::test]
6867 async fn turn_streamed_cancel_before_output_returns_interruption_message() {
6868 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6869 backend: "none".into(),
6870 ..zeroclaw_config::schema::MemoryConfig::default()
6871 };
6872 let mem: Arc<dyn Memory> = Arc::from(
6873 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6874 .expect("memory creation should succeed with valid config"),
6875 );
6876
6877 let model_provider = Box::new(StreamingSteeringModelProvider {
6878 seen_messages: Arc::new(Mutex::new(Vec::new())),
6879 call_count: AtomicUsize::new(0),
6880 fail_on_call: None,
6881 fail_chat_on_call: None,
6882 fail_after_delta_on_call: None,
6883 delay_chat_on_call: None,
6884 });
6885 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6886 let mut agent = Agent::builder()
6887 .model_provider(model_provider)
6888 .tools(vec![Box::new(MockTool)])
6889 .memory(mem)
6890 .observer(observer)
6891 .tool_dispatcher(Box::new(NativeToolDispatcher))
6892 .workspace_dir(std::path::PathBuf::from("/tmp"))
6893 .build()
6894 .expect("agent builder should succeed with valid config");
6895
6896 let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6897 let cancel_token = tokio_util::sync::CancellationToken::new();
6898 cancel_token.cancel();
6899
6900 let err = agent
6901 .turn_streamed_with_steering_state("first", event_tx, Some(cancel_token), None)
6902 .await
6903 .expect_err("pre-cancelled turn should return cancellation");
6904
6905 assert!(
6906 crate::agent::loop_::is_tool_loop_cancelled(&err.error),
6907 "unexpected error: {}",
6908 err.error
6909 );
6910 assert_eq!(err.committed_response, "[interrupted by user]");
6911 assert!(
6912 err.new_messages.iter().any(|msg| {
6913 matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content == "[interrupted by user]")
6914 }),
6915 "cancelled turn should include an assistant interruption marker for persistence"
6916 );
6917 }
6918
6919 #[tokio::test]
6920 async fn turn_streamed_stream_error_after_delta_emits_llm_response_failure() {
6921 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6922 backend: "none".into(),
6923 ..zeroclaw_config::schema::MemoryConfig::default()
6924 };
6925 let mem: Arc<dyn Memory> = Arc::from(
6926 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6927 .expect("memory creation should succeed with valid config"),
6928 );
6929
6930 let model_provider = Box::new(StreamingSteeringModelProvider {
6931 seen_messages: Arc::new(Mutex::new(Vec::new())),
6932 call_count: AtomicUsize::new(0),
6933 fail_on_call: None,
6934 fail_chat_on_call: None,
6935 fail_after_delta_on_call: Some(1),
6936 delay_chat_on_call: None,
6937 });
6938 let capturing = Arc::new(CapturingObserver::default());
6939 let observer: Arc<dyn Observer> = capturing.clone();
6940 let mut agent = Agent::builder()
6941 .model_provider(model_provider)
6942 .tools(vec![Box::new(MockTool)])
6943 .memory(mem)
6944 .observer(observer)
6945 .tool_dispatcher(Box::new(NativeToolDispatcher))
6946 .workspace_dir(std::path::PathBuf::from("/tmp"))
6947 .build()
6948 .expect("agent builder should succeed with valid config");
6949
6950 let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6951 let err = agent
6952 .turn_streamed_with_steering_state("test", event_tx, None, None)
6953 .await
6954 .expect_err("provider stream failure should be returned");
6955
6956 assert!(
6957 err.committed_response.contains("draft")
6958 && err.committed_response.contains("[stream interrupted]"),
6959 "unexpected committed_response: {}",
6960 err.committed_response
6961 );
6962
6963 let events = capturing.events.lock();
6964 let request = events
6965 .iter()
6966 .find(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
6967 .expect("LlmRequest should have been recorded");
6968 let response = events
6969 .iter()
6970 .find(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
6971 .expect("LlmResponse should have been recorded");
6972
6973 assert_eq!(
6974 events
6975 .iter()
6976 .filter(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
6977 .count(),
6978 1,
6979 "exactly one LlmRequest expected"
6980 );
6981 assert_eq!(
6982 events
6983 .iter()
6984 .filter(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
6985 .count(),
6986 1,
6987 "exactly one LlmResponse expected"
6988 );
6989
6990 let (
6991 ObserverEvent::LlmRequest {
6992 model_provider: req_provider,
6993 model: req_model,
6994 ..
6995 },
6996 ObserverEvent::LlmResponse {
6997 model_provider: resp_provider,
6998 model: resp_model,
6999 success,
7000 error_message,
7001 ..
7002 },
7003 ) = (request, response)
7004 else {
7005 panic!("matched event variants should be LlmRequest and LlmResponse");
7006 };
7007
7008 assert!(!success, "LlmResponse on stream error must be a failure");
7009 assert!(
7010 error_message.as_deref().is_some_and(|m| !m.is_empty()),
7011 "failure LlmResponse must carry a non-empty error_message"
7012 );
7013 assert_eq!(req_provider, resp_provider, "provider should match");
7014 assert_eq!(req_model, resp_model, "model should match");
7015 }
7016
7017 #[tokio::test]
7018 async fn turn_streamed_cancel_during_stream_emits_llm_response_failure() {
7019 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7020 backend: "none".into(),
7021 ..zeroclaw_config::schema::MemoryConfig::default()
7022 };
7023 let mem: Arc<dyn Memory> = Arc::from(
7024 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7025 .expect("memory creation should succeed with valid config"),
7026 );
7027
7028 let model_provider = Box::new(StreamingSteeringModelProvider {
7029 seen_messages: Arc::new(Mutex::new(Vec::new())),
7030 call_count: AtomicUsize::new(0),
7031 fail_on_call: None,
7032 fail_chat_on_call: None,
7033 fail_after_delta_on_call: None,
7034 delay_chat_on_call: None,
7035 });
7036 let capturing = Arc::new(CapturingObserver::default());
7037 let observer: Arc<dyn Observer> = capturing.clone();
7038 let mut agent = Agent::builder()
7039 .model_provider(model_provider)
7040 .tools(vec![Box::new(MockTool)])
7041 .memory(mem)
7042 .observer(observer)
7043 .tool_dispatcher(Box::new(NativeToolDispatcher))
7044 .workspace_dir(std::path::PathBuf::from("/tmp"))
7045 .build()
7046 .expect("agent builder should succeed with valid config");
7047
7048 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
7049 let cancel_token = tokio_util::sync::CancellationToken::new();
7050 let cancel_for_task = cancel_token.clone();
7051
7052 let canceller = zeroclaw_spawn::spawn!(async move {
7053 while let Some(event) = event_rx.recv().await {
7054 if matches!(event, TurnEvent::Chunk { ref delta } if delta == "draft") {
7055 cancel_for_task.cancel();
7056 break;
7057 }
7058 }
7059 while event_rx.recv().await.is_some() {}
7060 });
7061
7062 let err = agent
7063 .turn_streamed_with_steering_state("test", event_tx, Some(cancel_token), None)
7064 .await
7065 .expect_err("cancelled turn should return cancellation");
7066
7067 canceller.await.expect("canceller task should finish");
7068
7069 assert!(
7070 crate::agent::loop_::is_tool_loop_cancelled(&err.error),
7071 "cancelled turn should carry the cancellation error: {}",
7072 err.error
7073 );
7074
7075 let events = capturing.events.lock();
7076 assert_eq!(
7077 events
7078 .iter()
7079 .filter(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
7080 .count(),
7081 1,
7082 "exactly one LlmRequest expected"
7083 );
7084 assert_eq!(
7085 events
7086 .iter()
7087 .filter(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
7088 .count(),
7089 1,
7090 "exactly one LlmResponse expected"
7091 );
7092
7093 let request = events
7094 .iter()
7095 .find(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
7096 .expect("LlmRequest should have been recorded");
7097 let response = events
7098 .iter()
7099 .find(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
7100 .expect("LlmResponse should have been recorded");
7101
7102 let (
7103 ObserverEvent::LlmRequest {
7104 model_provider: req_provider,
7105 model: req_model,
7106 ..
7107 },
7108 ObserverEvent::LlmResponse {
7109 model_provider: resp_provider,
7110 model: resp_model,
7111 success,
7112 error_message,
7113 ..
7114 },
7115 ) = (request, response)
7116 else {
7117 panic!("matched event variants should be LlmRequest and LlmResponse");
7118 };
7119
7120 assert!(!success, "cancellation LlmResponse must be a failure");
7121 assert_eq!(
7122 error_message.as_deref(),
7123 Some("request cancelled by user"),
7124 "cancellation LlmResponse must carry the fixed cancel message"
7125 );
7126 assert_eq!(req_provider, resp_provider, "provider should match");
7127 assert_eq!(req_model, resp_model, "model should match");
7128 }
7129
7130 struct NamedMockTool {
7135 tool_name: String,
7136 }
7137
7138 impl NamedMockTool {
7139 fn new(name: &str) -> Self {
7140 Self {
7141 tool_name: name.to_string(),
7142 }
7143 }
7144 }
7145
7146 #[async_trait]
7147 impl Tool for NamedMockTool {
7148 fn name(&self) -> &str {
7149 &self.tool_name
7150 }
7151
7152 fn description(&self) -> &str {
7153 "mock"
7154 }
7155
7156 fn parameters_schema(&self) -> serde_json::Value {
7157 serde_json::json!({"type": "object"})
7158 }
7159
7160 async fn execute(&self, _args: serde_json::Value) -> Result<crate::tools::ToolResult> {
7161 Ok(crate::tools::ToolResult {
7162 success: true,
7163 output: "ok".into(),
7164 error: None,
7165 })
7166 }
7167 }
7168
7169 fn make_skill(name: &str, tool_names: &[&str]) -> crate::skills::Skill {
7170 crate::skills::Skill {
7171 name: name.to_string(),
7172 description: format!("{name} skill"),
7173 version: "0.1.0".to_string(),
7174 author: None,
7175 tags: vec![],
7176 tools: tool_names
7177 .iter()
7178 .map(|t| crate::skills::SkillTool {
7179 name: t.to_string(),
7180 description: format!("{t} tool"),
7181 kind: "shell".to_string(),
7182 command: format!("echo {t}"),
7183 args: std::collections::HashMap::new(),
7184 target: None,
7185 locked_args: std::collections::HashMap::new(),
7186 })
7187 .collect(),
7188 prompts: vec![],
7189 location: None,
7190 }
7191 }
7192
7193 #[test]
7194 fn register_skill_tools_adds_skill_tools_to_registry() {
7195 let security = Arc::new(crate::security::SecurityPolicy::default());
7196 let mut tools: Vec<Box<dyn Tool>> = vec![Box::new(NamedMockTool::new("builtin_a"))];
7197
7198 let skills = vec![make_skill("deploy", &["run", "status"])];
7199 tools::register_skill_tools(&mut tools, &skills, security);
7200
7201 let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
7202 assert_eq!(names, &["builtin_a", "deploy__run", "deploy__status"]);
7203 }
7204
7205 #[test]
7206 fn register_skill_tools_skips_shadowed_builtins() {
7207 let security = Arc::new(crate::security::SecurityPolicy::default());
7208 let mut tools: Vec<Box<dyn Tool>> = vec![Box::new(NamedMockTool::new("my_skill__run"))];
7210
7211 let skills = vec![make_skill("my_skill", &["run"])];
7212 tools::register_skill_tools(&mut tools, &skills, security);
7213
7214 assert_eq!(tools.len(), 1);
7216 assert_eq!(tools[0].name(), "my_skill__run");
7217 }
7218
7219 #[test]
7220 fn from_config_policy_filter_blocks_raw_target_but_keeps_scoped_wrapper() {
7221 use crate::skills::{Skill, SkillTool};
7230
7231 let shell: Arc<dyn Tool> = Arc::new(NamedMockTool::new("shell"));
7232 let file_read: Arc<dyn Tool> = Arc::new(NamedMockTool::new("file_read"));
7233 let resolution: Vec<Arc<dyn Tool>> = vec![Arc::clone(&shell), Arc::clone(&file_read)];
7236
7237 let mut tools: Vec<Box<dyn Tool>> = vec![
7238 Box::new(crate::tools::ArcToolRef(Arc::clone(&shell))),
7239 Box::new(crate::tools::ArcToolRef(Arc::clone(&file_read))),
7240 ];
7241
7242 let policy = crate::security::SecurityPolicy {
7246 allowed_tools: Some(vec!["file_read".to_string()]),
7247 workspace_dir: std::env::temp_dir(),
7248 ..crate::security::SecurityPolicy::default()
7249 };
7250 crate::agent::loop_::apply_policy_tool_filter(&mut tools, Some(&policy), None);
7251 assert!(
7252 !tools.iter().any(|t| t.name() == "shell"),
7253 "raw shell must be removed by the allowlist on the from_config path"
7254 );
7255 assert!(
7256 tools.iter().any(|t| t.name() == "file_read"),
7257 "allowlisted file_read must survive the filter"
7258 );
7259
7260 let skill = Skill {
7261 name: "ops".to_string(),
7262 description: "d".to_string(),
7263 version: "1".to_string(),
7264 author: None,
7265 tags: vec![],
7266 tools: vec![SkillTool {
7267 name: "use_shell".to_string(),
7268 description: "scoped shell".to_string(),
7269 kind: "builtin".to_string(),
7270 command: String::new(),
7271 args: std::collections::HashMap::new(),
7272 target: Some("shell".to_string()),
7273 locked_args: std::collections::HashMap::new(),
7274 }],
7275 prompts: vec![],
7276 location: None,
7277 };
7278 tools::register_skill_tools_with_context(
7279 &mut tools,
7280 &[skill],
7281 Arc::new(crate::security::SecurityPolicy::default()),
7282 &resolution,
7283 );
7284
7285 assert!(
7286 !tools.iter().any(|t| t.name() == "shell"),
7287 "raw shell must STILL be unavailable after skill registration"
7288 );
7289 assert!(
7290 tools.iter().any(|t| t.name() == "ops__use_shell"),
7291 "the scoped elevation wrapper must remain the only callable path to shell"
7292 );
7293 }
7294
7295 #[test]
7296 fn excluded_tools_filters_matching_tools() {
7297 let mut tools: Vec<Box<dyn Tool>> = vec![
7298 Box::new(NamedMockTool::new("shell")),
7299 Box::new(NamedMockTool::new("file_write")),
7300 Box::new(NamedMockTool::new("web_search")),
7301 ];
7302
7303 let excluded = ["shell".to_string(), "file_write".to_string()];
7304 tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
7305
7306 let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
7307 assert_eq!(names, &["web_search"]);
7308 }
7309
7310 #[test]
7311 fn excluded_tools_preserves_non_excluded() {
7312 let mut tools: Vec<Box<dyn Tool>> = vec![
7313 Box::new(NamedMockTool::new("shell")),
7314 Box::new(NamedMockTool::new("file_read")),
7315 Box::new(NamedMockTool::new("web_fetch")),
7316 ];
7317
7318 let excluded = ["shell".to_string()];
7320 tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
7321
7322 let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
7323 assert_eq!(names, &["file_read", "web_fetch"]);
7324 }
7325
7326 #[test]
7327 fn empty_excluded_tools_preserves_all() {
7328 let mut tools: Vec<Box<dyn Tool>> = vec![
7329 Box::new(NamedMockTool::new("shell")),
7330 Box::new(NamedMockTool::new("file_read")),
7331 ];
7332
7333 let excluded: Vec<String> = vec![];
7334 if !excluded.is_empty() {
7335 tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
7336 }
7337
7338 assert_eq!(tools.len(), 2);
7339 }
7340
7341 #[tokio::test]
7347 async fn turn_streamed_returns_new_messages_at_history_limit() {
7348 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7349 backend: "none".into(),
7350 ..zeroclaw_config::schema::MemoryConfig::default()
7351 };
7352 let mem: Arc<dyn Memory> = Arc::from(
7353 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7354 .expect("memory creation should succeed with valid config"),
7355 );
7356
7357 let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
7360 resolved: zeroclaw_config::schema::ResolvedRuntime {
7361 max_history_messages: 4,
7362 ..Default::default()
7363 },
7364 ..zeroclaw_config::schema::AliasedAgentConfig::default()
7365 };
7366
7367 let provider = Box::new(NarrationStreamModelProvider {
7369 call_count: Arc::new(Mutex::new(0)),
7370 });
7371
7372 let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
7373 let mut agent = Agent::builder()
7374 .model_provider(provider)
7375 .tools(vec![Box::new(MockTool)])
7376 .memory(mem)
7377 .observer(observer)
7378 .tool_dispatcher(Box::new(NativeToolDispatcher))
7379 .workspace_dir(std::path::PathBuf::from("/tmp"))
7380 .config(agent_config)
7381 .build()
7382 .expect("agent builder should succeed with valid config");
7383
7384 agent
7389 .history
7390 .push(ConversationMessage::Chat(ChatMessage::system("sys")));
7391 for i in 0..2 {
7392 agent
7393 .history
7394 .push(ConversationMessage::Chat(ChatMessage::user(format!(
7395 "old {i}"
7396 ))));
7397 agent
7398 .history
7399 .push(ConversationMessage::Chat(ChatMessage::assistant(format!(
7400 "old reply {i}"
7401 ))));
7402 }
7403 let (event_tx, _rx) = tokio::sync::mpsc::channel::<TurnEvent>(8);
7408 let (_, new_msgs) = agent
7409 .turn_streamed("new question", event_tx, None)
7410 .await
7411 .expect("turn_streamed should succeed");
7412
7413 let has_user = new_msgs
7415 .iter()
7416 .any(|m| matches!(m, ConversationMessage::Chat(c) if c.role == "user"));
7417 assert!(
7418 has_user,
7419 "new_msgs must include the user message even after trim; got: {new_msgs:?}"
7420 );
7421
7422 let has_assistant = new_msgs
7424 .iter()
7425 .any(|m| matches!(m, ConversationMessage::Chat(c) if c.role == "assistant"));
7426 assert!(
7427 has_assistant,
7428 "new_msgs must include the assistant reply even after trim; got: {new_msgs:?}"
7429 );
7430 }
7431
7432 #[test]
7433 fn excluded_tools_then_skill_registration_end_to_end() {
7434 let security = Arc::new(crate::security::SecurityPolicy::default());
7435 let mut tools: Vec<Box<dyn Tool>> = vec![
7436 Box::new(NamedMockTool::new("shell")),
7437 Box::new(NamedMockTool::new("file_read")),
7438 Box::new(NamedMockTool::new("web_fetch")),
7439 ];
7440
7441 let excluded = ["shell".to_string()];
7443 tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
7444
7445 let skills = vec![make_skill("ops", &["deploy", "rollback"])];
7447 tools::register_skill_tools(&mut tools, &skills, security);
7448
7449 let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
7450 assert_eq!(
7451 names,
7452 &["file_read", "web_fetch", "ops__deploy", "ops__rollback"]
7453 );
7454 }
7455
7456 fn observer_event_turn_id(event: &ObserverEvent) -> Option<&str> {
7457 match event {
7458 ObserverEvent::AgentStart { turn_id, .. }
7459 | ObserverEvent::LlmRequest { turn_id, .. }
7460 | ObserverEvent::LlmResponse { turn_id, .. }
7461 | ObserverEvent::AgentEnd { turn_id, .. }
7462 | ObserverEvent::ToolCall { turn_id, .. } => turn_id.as_deref(),
7463 _ => None,
7464 }
7465 }
7466
7467 fn assert_single_agent_lifecycle(events: &[ObserverEvent]) -> (usize, usize) {
7468 let starts: Vec<_> = events
7469 .iter()
7470 .enumerate()
7471 .filter(|(_, event)| matches!(event, ObserverEvent::AgentStart { .. }))
7472 .collect();
7473 let ends: Vec<_> = events
7474 .iter()
7475 .enumerate()
7476 .filter(|(_, event)| matches!(event, ObserverEvent::AgentEnd { .. }))
7477 .collect();
7478
7479 assert_eq!(starts.len(), 1, "expected exactly one AgentStart");
7480 assert_eq!(ends.len(), 1, "expected exactly one AgentEnd");
7481 assert!(starts[0].0 < ends[0].0, "AgentEnd must follow AgentStart");
7482 assert_eq!(
7483 observer_event_turn_id(starts[0].1),
7484 observer_event_turn_id(ends[0].1),
7485 "AgentEnd turn_id must match AgentStart turn_id"
7486 );
7487
7488 (starts[0].0, ends[0].0)
7489 }
7490
7491 fn agent_end_tokens(
7492 event: &ObserverEvent,
7493 ) -> Option<zeroclaw_api::observability_traits::TurnTokenUsage> {
7494 match event {
7495 ObserverEvent::AgentEnd { tokens_used, .. } => tokens_used.clone(),
7496 _ => None,
7497 }
7498 }
7499
7500 #[tokio::test]
7501 async fn turn_cache_hit_emits_agent_end_with_none_tokens() {
7502 let tmp = tempfile::tempdir().expect("temp response cache dir");
7503 let cache = Arc::new(
7504 zeroclaw_memory::response_cache::ResponseCache::new(tmp.path(), 60, 100)
7505 .expect("response cache should initialize"),
7506 );
7507 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7508 backend: "none".into(),
7509 ..zeroclaw_config::schema::MemoryConfig::default()
7510 };
7511 let mem_a: Arc<dyn Memory> = Arc::from(
7512 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7513 .expect("memory creation should succeed with valid config"),
7514 );
7515 let mem_b: Arc<dyn Memory> = Arc::from(
7516 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7517 .expect("memory creation should succeed with valid config"),
7518 );
7519
7520 let mut agent_a = Agent::builder()
7521 .model_provider(Box::new(MockModelProvider {
7522 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
7523 text: Some("cached answer".into()),
7524 tool_calls: vec![],
7525 usage: Some(zeroclaw_providers::traits::TokenUsage {
7526 input_tokens: Some(10),
7527 cached_input_tokens: None,
7528 output_tokens: Some(5),
7529 }),
7530 reasoning_content: None,
7531 }]),
7532 }))
7533 .tools(vec![Box::new(MockTool)])
7534 .memory(mem_a)
7535 .observer(Arc::from(crate::observability::NoopObserver {}) as Arc<dyn Observer>)
7536 .response_cache(Some(cache.clone()))
7537 .tool_dispatcher(Box::new(NativeToolDispatcher))
7538 .workspace_dir(std::path::PathBuf::from("/tmp"))
7539 .model_name("test-model".into())
7540 .temperature(Some(0.0))
7541 .build()
7542 .expect("agent builder should succeed with valid config");
7543
7544 assert_eq!(agent_a.turn("seed").await.unwrap(), "cached answer");
7545
7546 let capturing = Arc::new(CapturingObserver::default());
7547 let observer: Arc<dyn Observer> = capturing.clone();
7548 let mut agent_b = Agent::builder()
7549 .model_provider(Box::new(MockModelProvider {
7550 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
7551 text: Some("uncached answer".into()),
7552 tool_calls: vec![],
7553 usage: None,
7554 reasoning_content: None,
7555 }]),
7556 }))
7557 .tools(vec![Box::new(MockTool)])
7558 .memory(mem_b)
7559 .observer(observer)
7560 .response_cache(Some(cache))
7561 .tool_dispatcher(Box::new(NativeToolDispatcher))
7562 .workspace_dir(std::path::PathBuf::from("/tmp"))
7563 .model_name("test-model".into())
7564 .temperature(Some(0.0))
7565 .build()
7566 .expect("agent builder should succeed with valid config");
7567
7568 assert_eq!(agent_b.turn("seed").await.unwrap(), "cached answer");
7569
7570 let events = capturing.events.lock();
7571 let (_, end_idx) = assert_single_agent_lifecycle(&events);
7572 assert!(agent_end_tokens(&events[end_idx]).is_none());
7573 assert!(
7574 !events
7575 .iter()
7576 .any(|event| matches!(event, ObserverEvent::LlmRequest { .. })),
7577 "cache hit should not call the LLM"
7578 );
7579 }
7580
7581 #[tokio::test]
7582 async fn turn_streamed_cancel_during_tool_execution_emits_agent_end_with_tokens() {
7583 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7584 backend: "none".into(),
7585 ..zeroclaw_config::schema::MemoryConfig::default()
7586 };
7587 let mem: Arc<dyn Memory> = Arc::from(
7588 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7589 .expect("memory creation should succeed with valid config"),
7590 );
7591 let capturing = Arc::new(CapturingObserver::default());
7592 let observer: Arc<dyn Observer> = capturing.clone();
7593 let mut agent = Agent::builder()
7594 .model_provider(Box::new(MockModelProvider {
7595 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
7596 text: Some("I will echo.".into()),
7597 tool_calls: vec![zeroclaw_providers::ToolCall {
7598 id: "tc1".into(),
7599 name: "echo".into(),
7600 arguments: "{}".into(),
7601 extra_content: None,
7602 }],
7603 usage: Some(zeroclaw_providers::traits::TokenUsage {
7604 input_tokens: Some(10),
7605 cached_input_tokens: None,
7606 output_tokens: Some(5),
7607 }),
7608 reasoning_content: None,
7609 }]),
7610 }))
7611 .tools(vec![Box::new(SlowTool)])
7612 .memory(mem)
7613 .observer(observer)
7614 .tool_dispatcher(Box::new(NativeToolDispatcher))
7615 .workspace_dir(std::path::PathBuf::from("/tmp"))
7616 .build()
7617 .expect("agent builder should succeed with valid config");
7618
7619 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
7620 let cancel_token = tokio_util::sync::CancellationToken::new();
7621 let cancel_for_task = cancel_token.clone();
7622 let handle = zeroclaw_spawn::spawn!(async move {
7623 agent
7624 .turn_streamed_with_steering_state(
7625 "use echo",
7626 event_tx,
7627 Some(cancel_for_task),
7628 None,
7629 )
7630 .await
7631 });
7632
7633 while let Some(event) = event_rx.recv().await {
7634 if matches!(event, TurnEvent::Usage { .. }) {
7635 cancel_token.cancel();
7636 break;
7637 }
7638 }
7639
7640 handle
7641 .await
7642 .expect("turn task should finish")
7643 .expect_err("turn should be cancelled before tool execution completes");
7644
7645 let events = capturing.events.lock();
7646 let (_, end_idx) = assert_single_agent_lifecycle(&events);
7647 let tokens = agent_end_tokens(&events[end_idx]).expect("AgentEnd should include tokens");
7648 assert_eq!(tokens.input_tokens, 10);
7649 assert_eq!(tokens.output_tokens, 5);
7650 let llm_response_idx = events
7651 .iter()
7652 .position(|event| matches!(event, ObserverEvent::LlmResponse { success: true, .. }))
7653 .expect("successful LlmResponse should be recorded");
7654 assert!(
7655 llm_response_idx < end_idx,
7656 "AgentEnd must follow LlmResponse"
7657 );
7658 }
7659
7660 #[tokio::test]
7661 async fn turn_llm_error_emits_agent_end() {
7662 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7663 backend: "none".into(),
7664 ..zeroclaw_config::schema::MemoryConfig::default()
7665 };
7666 let mem: Arc<dyn Memory> = Arc::from(
7667 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7668 .expect("memory creation should succeed with valid config"),
7669 );
7670 let capturing = Arc::new(CapturingObserver::default());
7671 let observer: Arc<dyn Observer> = capturing.clone();
7672 let mut agent = Agent::builder()
7673 .model_provider(Box::new(FailingModelProvider))
7674 .tools(vec![Box::new(MockTool)])
7675 .memory(mem)
7676 .observer(observer)
7677 .tool_dispatcher(Box::new(NativeToolDispatcher))
7678 .workspace_dir(std::path::PathBuf::from("/tmp"))
7679 .model_name("test-model".into())
7680 .temperature(Some(0.0))
7681 .build()
7682 .expect("agent builder should succeed with valid config");
7683
7684 let result = agent.turn("hello").await;
7685 assert!(
7686 result.is_err(),
7687 "turn should fail when provider is unavailable"
7688 );
7689
7690 let events = capturing.events.lock();
7691 let (_, end_idx) = assert_single_agent_lifecycle(&events);
7692 assert!(
7693 agent_end_tokens(&events[end_idx]).is_none(),
7694 "AgentEnd should have tokens_used: None on LLM error"
7695 );
7696 }
7697
7698 #[tokio::test]
7699 async fn turn_events_share_consistent_turn_id() {
7700 let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7701 backend: "none".into(),
7702 ..zeroclaw_config::schema::MemoryConfig::default()
7703 };
7704 let mem: Arc<dyn Memory> = Arc::from(
7705 zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7706 .expect("memory creation should succeed with valid config"),
7707 );
7708
7709 let model_provider = Box::new(MockModelProvider {
7710 responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
7711 text: Some("done".into()),
7712 tool_calls: vec![],
7713 usage: None,
7714 reasoning_content: None,
7715 }]),
7716 });
7717 let capturing = Arc::new(CapturingObserver::default());
7718 let observer: Arc<dyn Observer> = capturing.clone();
7719 let mut agent = Agent::builder()
7720 .model_provider(model_provider)
7721 .tools(vec![Box::new(MockTool)])
7722 .memory(mem)
7723 .observer(observer)
7724 .tool_dispatcher(Box::new(NativeToolDispatcher))
7725 .workspace_dir(std::path::PathBuf::from("/tmp"))
7726 .build()
7727 .expect("agent builder should succeed with valid config");
7728
7729 let _ = agent.turn("test").await.expect("turn should succeed");
7730
7731 let events = capturing.events.lock();
7732 let turn_ids: Vec<&str> = events.iter().filter_map(observer_event_turn_id).collect();
7733 assert!(!turn_ids.is_empty(), "turn events should carry turn_id");
7734 let first = turn_ids[0];
7735 assert!(
7736 turn_ids.iter().all(|turn_id| *turn_id == first),
7737 "all turn_ids should be consistent"
7738 );
7739 }
7740}