Skip to main content

zeroclaw_runtime/agent/
agent.rs

1use crate::agent::dispatcher::{
2    NativeToolDispatcher, ParsedToolCall, ToolDispatcher, ToolExecutionResult, XmlToolDispatcher,
3};
4use crate::agent::eval::AutoClassifyExt;
5use crate::agent::prompt::{PromptContext, SystemPromptBuilder};
6use crate::approval::{ApprovalManager, ApprovalRequest, ApprovalRequirement, ApprovalResponse};
7use crate::observability::{self, Observer, ObserverEvent};
8use crate::platform;
9use crate::security::SecurityPolicy;
10use crate::tools::{self, Tool, ToolSpec};
11use anyhow::{Context, Result};
12use chrono::{Datelike, Timelike};
13use std::collections::{HashMap, VecDeque};
14use std::io::Write as IoWrite;
15use std::path::Path;
16use std::sync::Arc;
17use std::time::Instant;
18use zeroclaw_config::schema::Config;
19use zeroclaw_memory::{self, Memory, MemoryCategory};
20use zeroclaw_providers::{
21    self, ChatMessage, ChatRequest, ConversationMessage, ModelProvider, ToolResultMessage,
22};
23use zeroclaw_tool_call_parser::strip_think_tags;
24
25// Re-export TurnEvent from zeroclaw-types for backwards compatibility.
26pub use zeroclaw_api::agent::TurnEvent;
27
28/// Build a fresh `ModelProvider` box for a dotted `<type>.<alias>` reference,
29/// resolving the model from the override (when supplied) or the configured
30/// entry. Mirrors the model_provider-construction path in [`Agent::from_config`]
31/// so a live session switch produces the same wiring a fresh agent would.
32/// Returns the built box plus the resolved `(model_provider_name, model_name)`
33/// for attribution.
34pub fn build_session_model_provider(
35    config: &Config,
36    model_provider_ref: &str,
37    model_override: Option<&str>,
38) -> Result<(Box<dyn ModelProvider>, String, String)> {
39    let (model_provider_name, model_provider_alias) = model_provider_ref
40        .split_once('.')
41        .map(|(t, a)| (t.to_string(), a.to_string()))
42        .ok_or_else(|| {
43            anyhow::Error::msg(format!(
44                "model_provider reference `{model_provider_ref}` must be `<type>.<alias>`"
45            ))
46        })?;
47
48    let entry = config
49        .providers
50        .models
51        .find(&model_provider_name, &model_provider_alias);
52    let model_name = model_override
53        .map(str::trim)
54        .filter(|m| !m.is_empty())
55        .map(str::to_string)
56        .or_else(|| {
57            entry
58                .and_then(|e| e.model.as_deref())
59                .map(str::trim)
60                .filter(|m| !m.is_empty())
61                .map(str::to_string)
62        })
63        .ok_or_else(|| {
64            anyhow::Error::msg(format!(
65                "model_provider `{model_provider_ref}` has no `model` configured and no model \
66                 override was supplied"
67            ))
68        })?;
69
70    let model_provider_runtime_options = zeroclaw_providers::provider_runtime_options_for_alias(
71        config,
72        &model_provider_name,
73        &model_provider_alias,
74    );
75
76    let model_provider = zeroclaw_providers::create_routed_model_provider_with_options(
77        config,
78        model_provider_ref,
79        entry.and_then(|e| e.api_key.as_deref()),
80        entry.and_then(|e| e.uri.as_deref()),
81        &config.reliability,
82        &config.model_routes,
83        &model_name,
84        &model_provider_runtime_options,
85    )?;
86
87    Ok((model_provider, model_provider_name, model_name))
88}
89
90struct TurnGuard {
91    observer: Arc<dyn Observer>,
92    model_provider: String,
93    model: String,
94    turn_id: Option<String>,
95    turn_started_at: Instant,
96    agent_alias: Option<String>,
97    total_input_tokens: u64,
98    total_output_tokens: u64,
99    saw_usage: bool,
100    done: bool,
101}
102
103impl TurnGuard {
104    fn fire(&mut self) {
105        if self.done {
106            return;
107        }
108        self.done = true;
109        self.observer.record_event(&ObserverEvent::AgentEnd {
110            model_provider: self.model_provider.clone(),
111            model: self.model.clone(),
112            duration: self.turn_started_at.elapsed(),
113            tokens_used: self.saw_usage.then_some(
114                zeroclaw_api::observability_traits::TurnTokenUsage {
115                    input_tokens: self.total_input_tokens,
116                    output_tokens: self.total_output_tokens,
117                },
118            ),
119            cost_usd: None,
120            channel: None,
121            agent_alias: self.agent_alias.clone(),
122            turn_id: self.turn_id.clone(),
123        });
124    }
125}
126
127impl Drop for TurnGuard {
128    fn drop(&mut self) {
129        self.fire();
130    }
131}
132
133pub struct Agent {
134    model_provider: Box<dyn ModelProvider>,
135    tools: Vec<Box<dyn Tool>>,
136    tool_specs: Vec<ToolSpec>,
137    memory: Arc<dyn Memory>,
138    observer: Arc<dyn Observer>,
139    prompt_builder: SystemPromptBuilder,
140    tool_dispatcher: Box<dyn ToolDispatcher>,
141    memory_strategy: Arc<dyn zeroclaw_api::memory_traits::MemoryStrategy>,
142    config: zeroclaw_config::schema::AliasedAgentConfig,
143    multimodal_config: zeroclaw_config::schema::MultimodalConfig,
144    model_name: String,
145    model_provider_name: String,
146    temperature: Option<f64>,
147    workspace_dir: std::path::PathBuf,
148    /// Per-agent persona workspace (`<install>/agents/<alias>/workspace/`).
149    /// Holds IDENTITY.md / SOUL.md / USER.md / AGENTS.md. Distinct from
150    /// `workspace_dir`, which is the security sandbox root and can be the
151    /// session cwd for IDE-driven sessions (ACP, gateway WS).
152    agent_workspace_dir: std::path::PathBuf,
153    identity_config: zeroclaw_config::schema::IdentityConfig,
154    skills: Vec<crate::skills::Skill>,
155    skills_prompt_mode: zeroclaw_config::schema::SkillsPromptInjectionMode,
156    auto_save: bool,
157    memory_session_id: Option<String>,
158    history: Vec<ConversationMessage>,
159    classification_config: zeroclaw_config::schema::QueryClassificationConfig,
160    available_hints: Vec<String>,
161    route_model_by_hint: HashMap<String, String>,
162    #[allow(dead_code)] // WIP: stored for future runtime tool filtering
163    allowed_tools: Option<Vec<String>>,
164    response_cache: Option<Arc<zeroclaw_memory::response_cache::ResponseCache>>,
165    /// Pre-rendered security policy summary injected into the system prompt
166    /// so the LLM knows the concrete constraints before making tool calls.
167    security_summary: Option<String>,
168    /// Autonomy level from config; controls safety prompt instructions.
169    autonomy_level: crate::security::AutonomyLevel,
170    /// Activated MCP tools for deferred loading mode.
171    /// When MCP deferred loading is enabled, tools are activated via `tool_search`
172    /// and stored here for lookup during tool execution.
173    activated_tools: Option<Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
174    /// Hook runner for tool-call auditing and lifecycle side effects.
175    /// See issue #5462.
176    hook_runner: Option<Arc<crate::hooks::HookRunner>>,
177    /// Approval manager for direct Agent execution paths such as ACP.
178    approval_manager: Option<Arc<ApprovalManager>>,
179    /// Agent alias, retained for opening attribution spans at external turn
180    /// call sites (ACP, gateway WS) where the alias is otherwise unavailable.
181    agent_alias: String,
182    /// Late-bound channel maps for the four channel-driven tools
183    /// (`ask_user`, `reaction`, `escalate_to_human`, `poll`). Held so that
184    /// per-session callers (e.g. the ACP server) can register a back-channel
185    /// after agent construction. Production paths populate via
186    /// `start_channels`; this is the alternate path for environments that
187    /// build an Agent directly without `start_channels`.
188    channel_handles: AgentChannelHandles,
189    /// When `true`, the agent was constructed without persistent memory.
190    /// Memory backend is `NoneMemory`, auto-save is off, and memory tools
191    /// are stripped from the tool set. Used by ACP sessions.
192    #[allow(dead_code)]
193    exclude_memory: bool,
194    /// Per-session cache for resolved local image data URIs.
195    /// Avoids re-reading the same image file on every turn/tool-iteration
196    /// when the multimodal pipeline re-walks the full conversation history.
197    image_cache: zeroclaw_providers::multimodal::LocalImageCache,
198}
199
200impl Drop for Agent {
201    fn drop(&mut self) {
202        ::zeroclaw_log::record!(
203            INFO,
204            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
205                .with_category(::zeroclaw_log::EventCategory::Agent)
206                .with_attrs(::serde_json::json!({
207                    "model_provider": self.model_provider_name,
208                    "model": self.model_name,
209                    "history_messages_freed": self.history.len(),
210                })),
211            "Agent dropped; conversation history and per-session state freed"
212        );
213    }
214}
215
216#[derive(Debug)]
217pub struct StreamedTurnSuccess {
218    pub response: String,
219    pub new_messages: Vec<ConversationMessage>,
220}
221
222#[derive(Debug)]
223pub struct StreamedTurnError {
224    pub error: anyhow::Error,
225    pub committed_response: String,
226    pub new_messages: Vec<ConversationMessage>,
227}
228
229/// Bundle of late-bound channel-map handles owned by an Agent. Cloning is
230/// cheap (Arc clones); the underlying maps are shared with the live tools.
231#[derive(Clone, Default)]
232pub struct AgentChannelHandles {
233    pub ask_user: Option<tools::PerToolChannelHandle>,
234    pub reaction: tools::PerToolChannelHandle,
235    pub poll: Option<tools::PerToolChannelHandle>,
236    pub escalate: Option<tools::PerToolChannelHandle>,
237}
238
239impl AgentChannelHandles {
240    /// Return references to all populated per-tool channel handles.
241    fn populated_handles(&self) -> Vec<Option<&tools::PerToolChannelHandle>> {
242        vec![
243            self.ask_user.as_ref(),
244            Some(&self.reaction),
245            self.poll.as_ref(),
246            self.escalate.as_ref(),
247        ]
248    }
249
250    /// Register a channel into every populated handle so all channel-driven
251    /// tools can resolve it by name.
252    pub fn register_channel(
253        &self,
254        name: impl Into<String>,
255        channel: Arc<dyn zeroclaw_api::channel::Channel>,
256    ) {
257        let name = name.into();
258        for handle in self.populated_handles().into_iter().flatten() {
259            handle.write().insert(name.clone(), Arc::clone(&channel));
260        }
261    }
262
263    /// Remove a channel from every populated handle (used on session/stop).
264    pub fn unregister_channel(&self, name: &str) {
265        for handle in self.populated_handles().into_iter().flatten() {
266            handle.write().remove(name);
267        }
268    }
269
270    /// Look up a registered channel by name from any populated channel map.
271    pub fn get_channel(&self, name: &str) -> Option<Arc<dyn zeroclaw_api::channel::Channel>> {
272        for handle in self.populated_handles().into_iter().flatten() {
273            if let Some(channel) = handle.read().get(name) {
274                return Some(Arc::clone(channel));
275            }
276        }
277        None
278    }
279}
280
281pub struct AgentBuilder {
282    model_provider: Option<Box<dyn ModelProvider>>,
283    tools: Option<Vec<Box<dyn Tool>>>,
284    memory: Option<Arc<dyn Memory>>,
285    observer: Option<Arc<dyn Observer>>,
286    prompt_builder: Option<SystemPromptBuilder>,
287    tool_dispatcher: Option<Box<dyn ToolDispatcher>>,
288    memory_strategy: Option<Arc<dyn zeroclaw_api::memory_traits::MemoryStrategy>>,
289    config: Option<zeroclaw_config::schema::AliasedAgentConfig>,
290    multimodal_config: Option<zeroclaw_config::schema::MultimodalConfig>,
291    model_name: Option<String>,
292    model_provider_name: Option<String>,
293    temperature: Option<f64>,
294    workspace_dir: Option<std::path::PathBuf>,
295    agent_workspace_dir: Option<std::path::PathBuf>,
296    identity_config: Option<zeroclaw_config::schema::IdentityConfig>,
297    skills: Option<Vec<crate::skills::Skill>>,
298    skills_prompt_mode: Option<zeroclaw_config::schema::SkillsPromptInjectionMode>,
299    auto_save: Option<bool>,
300    memory_session_id: Option<String>,
301    classification_config: Option<zeroclaw_config::schema::QueryClassificationConfig>,
302    available_hints: Option<Vec<String>>,
303    route_model_by_hint: Option<HashMap<String, String>>,
304    allowed_tools: Option<Vec<String>>,
305    response_cache: Option<Arc<zeroclaw_memory::response_cache::ResponseCache>>,
306    security_summary: Option<String>,
307    autonomy_level: Option<crate::security::AutonomyLevel>,
308    activated_tools: Option<Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
309    hook_runner: Option<Arc<crate::hooks::HookRunner>>,
310    approval_manager: Option<Arc<ApprovalManager>>,
311    agent_alias: Option<String>,
312    exclude_memory: bool,
313}
314
315impl Default for AgentBuilder {
316    fn default() -> Self {
317        Self::new()
318    }
319}
320
321impl AgentBuilder {
322    pub fn new() -> Self {
323        Self {
324            model_provider: None,
325            tools: None,
326            memory: None,
327            observer: None,
328            prompt_builder: None,
329            tool_dispatcher: None,
330            memory_strategy: None,
331            config: None,
332            multimodal_config: None,
333            model_name: None,
334            model_provider_name: None,
335            temperature: None,
336            workspace_dir: None,
337            agent_workspace_dir: None,
338            identity_config: None,
339            skills: None,
340            skills_prompt_mode: None,
341            auto_save: None,
342            memory_session_id: None,
343            classification_config: None,
344            available_hints: None,
345            route_model_by_hint: None,
346            allowed_tools: None,
347            response_cache: None,
348            security_summary: None,
349            autonomy_level: None,
350            activated_tools: None,
351            hook_runner: None,
352            approval_manager: None,
353            agent_alias: None,
354            exclude_memory: false,
355        }
356    }
357
358    pub fn model_provider(mut self, model_provider: Box<dyn ModelProvider>) -> Self {
359        self.model_provider = Some(model_provider);
360        self
361    }
362
363    pub fn tools(mut self, tools: Vec<Box<dyn Tool>>) -> Self {
364        self.tools = Some(tools);
365        self
366    }
367
368    pub fn memory(mut self, memory: Arc<dyn Memory>) -> Self {
369        self.memory = Some(memory);
370        self
371    }
372
373    pub fn observer(mut self, observer: Arc<dyn Observer>) -> Self {
374        self.observer = Some(observer);
375        self
376    }
377
378    pub fn prompt_builder(mut self, prompt_builder: SystemPromptBuilder) -> Self {
379        self.prompt_builder = Some(prompt_builder);
380        self
381    }
382
383    pub fn tool_dispatcher(mut self, tool_dispatcher: Box<dyn ToolDispatcher>) -> Self {
384        self.tool_dispatcher = Some(tool_dispatcher);
385        self
386    }
387
388    pub fn memory_strategy(
389        mut self,
390        memory_strategy: Arc<dyn zeroclaw_api::memory_traits::MemoryStrategy>,
391    ) -> Self {
392        self.memory_strategy = Some(memory_strategy);
393        self
394    }
395
396    pub fn config(mut self, config: zeroclaw_config::schema::AliasedAgentConfig) -> Self {
397        self.config = Some(config);
398        self
399    }
400
401    pub fn multimodal_config(
402        mut self,
403        multimodal_config: zeroclaw_config::schema::MultimodalConfig,
404    ) -> Self {
405        self.multimodal_config = Some(multimodal_config);
406        self
407    }
408
409    pub fn model_name(mut self, model_name: String) -> Self {
410        self.model_name = Some(model_name);
411        self
412    }
413
414    pub fn model_provider_name(mut self, name: String) -> Self {
415        self.model_provider_name = Some(name);
416        self
417    }
418
419    pub fn temperature(mut self, temperature: Option<f64>) -> Self {
420        self.temperature = temperature;
421        self
422    }
423
424    pub fn workspace_dir(mut self, workspace_dir: std::path::PathBuf) -> Self {
425        self.workspace_dir = Some(workspace_dir);
426        self
427    }
428
429    pub fn agent_workspace_dir(mut self, agent_workspace_dir: std::path::PathBuf) -> Self {
430        self.agent_workspace_dir = Some(agent_workspace_dir);
431        self
432    }
433
434    pub fn identity_config(
435        mut self,
436        identity_config: zeroclaw_config::schema::IdentityConfig,
437    ) -> Self {
438        self.identity_config = Some(identity_config);
439        self
440    }
441
442    pub fn skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
443        self.skills = Some(skills);
444        self
445    }
446
447    pub fn skills_prompt_mode(
448        mut self,
449        skills_prompt_mode: zeroclaw_config::schema::SkillsPromptInjectionMode,
450    ) -> Self {
451        self.skills_prompt_mode = Some(skills_prompt_mode);
452        self
453    }
454
455    pub fn auto_save(mut self, auto_save: bool) -> Self {
456        self.auto_save = Some(auto_save);
457        self
458    }
459
460    pub fn memory_session_id(mut self, memory_session_id: Option<String>) -> Self {
461        self.memory_session_id = memory_session_id;
462        self
463    }
464
465    pub fn classification_config(
466        mut self,
467        classification_config: zeroclaw_config::schema::QueryClassificationConfig,
468    ) -> Self {
469        self.classification_config = Some(classification_config);
470        self
471    }
472
473    pub fn available_hints(mut self, available_hints: Vec<String>) -> Self {
474        self.available_hints = Some(available_hints);
475        self
476    }
477
478    pub fn route_model_by_hint(mut self, route_model_by_hint: HashMap<String, String>) -> Self {
479        self.route_model_by_hint = Some(route_model_by_hint);
480        self
481    }
482
483    pub fn allowed_tools(mut self, allowed_tools: Option<Vec<String>>) -> Self {
484        self.allowed_tools = allowed_tools;
485        self
486    }
487
488    pub fn response_cache(
489        mut self,
490        cache: Option<Arc<zeroclaw_memory::response_cache::ResponseCache>>,
491    ) -> Self {
492        self.response_cache = cache;
493        self
494    }
495
496    pub fn security_summary(mut self, summary: Option<String>) -> Self {
497        self.security_summary = summary;
498        self
499    }
500
501    pub fn autonomy_level(mut self, level: crate::security::AutonomyLevel) -> Self {
502        self.autonomy_level = Some(level);
503        self
504    }
505
506    pub fn activated_tools(
507        mut self,
508        activated: Option<Arc<std::sync::Mutex<tools::ActivatedToolSet>>>,
509    ) -> Self {
510        self.activated_tools = activated;
511        self
512    }
513
514    pub fn hook_runner(mut self, runner: Option<Arc<crate::hooks::HookRunner>>) -> Self {
515        self.hook_runner = runner;
516        self
517    }
518
519    pub fn approval_manager(mut self, manager: Option<Arc<ApprovalManager>>) -> Self {
520        self.approval_manager = manager;
521        self
522    }
523
524    /// Set the agent alias used for turn-span attribution.
525    pub fn agent_alias(mut self, alias: String) -> Self {
526        self.agent_alias = Some(alias);
527        self
528    }
529
530    /// Exclude persistent memory from this agent. When set, the memory
531    /// backend is replaced with `NoneMemory`, auto-save is forced off, and
532    /// all `memory_*` tools are stripped from the tool set. Used by ACP
533    /// sessions, which rely on session history for context rather than the
534    /// agent's long-term memory.
535    pub fn exclude_memory(mut self, exclude: bool) -> Self {
536        self.exclude_memory = exclude;
537        self
538    }
539
540    pub fn build(self) -> Result<Agent> {
541        let mut tools = self.tools.ok_or_else(|| {
542            ::zeroclaw_log::record!(
543                ERROR,
544                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
545                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
546                    .with_attrs(::serde_json::json!({"missing_field": "tools"})),
547                "AgentBuilder::build missing required field"
548            );
549            anyhow::Error::msg("tools are required")
550        })?;
551        let allowed = self.allowed_tools.clone();
552        if let Some(ref allow_list) = allowed {
553            tools.retain(|t| allow_list.iter().any(|name| name == t.name()));
554        }
555
556        // ACP sessions exclude persistent memory: strip memory tools,
557        // replace the backend with NoneMemory, and force auto_save off.
558        let exclude_memory = self.exclude_memory;
559        if exclude_memory {
560            tools.retain(|t| !zeroclaw_tools::MEMORY_TOOL_NAMES.contains(&t.name()));
561        }
562
563        let tool_specs = tools.iter().map(|tool| tool.spec()).collect();
564        let workspace_dir = self
565            .workspace_dir
566            .clone()
567            .unwrap_or_else(|| std::path::PathBuf::from("."));
568
569        let memory: Arc<dyn Memory> = if exclude_memory {
570            Arc::new(zeroclaw_memory::NoneMemory::new("none"))
571        } else {
572            self.memory.ok_or_else(|| {
573                ::zeroclaw_log::record!(
574                    ERROR,
575                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
576                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
577                        .with_attrs(::serde_json::json!({"missing_field": "memory"})),
578                    "AgentBuilder::build missing required field"
579                );
580                anyhow::Error::msg("memory is required")
581            })?
582        };
583        // No-memory sessions must not retain a caller-provided strategy that
584        // still closes over persistent memory.
585        let memory_strategy = if exclude_memory {
586            None
587        } else {
588            self.memory_strategy
589        };
590
591        Ok(Agent {
592            model_provider: self.model_provider.ok_or_else(|| {
593                ::zeroclaw_log::record!(
594                    ERROR,
595                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
596                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
597                        .with_attrs(::serde_json::json!({"missing_field": "model_provider"})),
598                    "AgentBuilder::build missing required field"
599                );
600                anyhow::Error::msg("model_provider is required")
601            })?,
602            tools,
603            tool_specs,
604            memory: memory.clone(),
605            observer: self.observer.ok_or_else(|| {
606                ::zeroclaw_log::record!(
607                    ERROR,
608                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
609                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
610                        .with_attrs(::serde_json::json!({"missing_field": "observer"})),
611                    "AgentBuilder::build missing required field"
612                );
613                anyhow::Error::msg("observer is required")
614            })?,
615            prompt_builder: self
616                .prompt_builder
617                .unwrap_or_else(SystemPromptBuilder::with_defaults),
618            tool_dispatcher: self.tool_dispatcher.ok_or_else(|| {
619                ::zeroclaw_log::record!(
620                    ERROR,
621                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
622                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
623                        .with_attrs(::serde_json::json!({"missing_field": "tool_dispatcher"})),
624                    "AgentBuilder::build missing required field"
625                );
626                anyhow::Error::msg("tool_dispatcher is required")
627            })?,
628            memory_strategy: memory_strategy.unwrap_or_else(|| {
629                Arc::new(
630                    crate::agent::memory_strategy::DefaultMemoryStrategy::with_config(
631                        memory.clone(),
632                        zeroclaw_config::schema::MemoryConfig::default(),
633                        workspace_dir.clone(),
634                    ),
635                )
636            }),
637            config: self.config.unwrap_or_default(),
638            multimodal_config: self.multimodal_config.unwrap_or_default(),
639            // No silent vendor-default model. Callers that construct `Agent` via the
640            // builder must set `model_name` explicitly (via `.model_name(...)` or via
641            // `Agent::from_config`, which resolves from `[model_providers]`). The sentinel
642            // keeps the field non-empty so accidental dispatch surfaces a clear 4xx
643            // rather than misrouting to a real vendor model.
644            model_name: self.model_name.unwrap_or_else(|| "<unconfigured>".into()),
645            model_provider_name: self
646                .model_provider_name
647                .unwrap_or_else(|| "<unconfigured>".into()),
648            temperature: self.temperature,
649            // Default for test callers that don't call workspace_dir().
650            workspace_dir: self
651                .workspace_dir
652                .clone()
653                .unwrap_or_else(|| std::path::PathBuf::from(".")),
654            agent_workspace_dir: self.agent_workspace_dir.unwrap_or_else(|| {
655                self.workspace_dir
656                    .clone()
657                    .unwrap_or_else(|| std::path::PathBuf::from("."))
658            }),
659            identity_config: self.identity_config.unwrap_or_default(),
660            skills: self.skills.unwrap_or_default(),
661            skills_prompt_mode: self.skills_prompt_mode.unwrap_or_default(),
662            auto_save: if exclude_memory {
663                false
664            } else {
665                self.auto_save.unwrap_or(false)
666            },
667            memory_session_id: self.memory_session_id,
668            history: Vec::new(),
669            classification_config: self.classification_config.unwrap_or_default(),
670            available_hints: self.available_hints.unwrap_or_default(),
671            route_model_by_hint: self.route_model_by_hint.unwrap_or_default(),
672            allowed_tools: allowed,
673            response_cache: self.response_cache,
674            security_summary: self.security_summary,
675            autonomy_level: self
676                .autonomy_level
677                .unwrap_or(crate::security::AutonomyLevel::Supervised),
678            activated_tools: self.activated_tools,
679            hook_runner: self.hook_runner,
680            approval_manager: self.approval_manager,
681            agent_alias: self.agent_alias.unwrap_or_default(),
682            channel_handles: AgentChannelHandles::default(),
683            exclude_memory,
684            image_cache: zeroclaw_providers::multimodal::LocalImageCache::new(),
685        })
686    }
687}
688
689impl Agent {
690    pub fn builder() -> AgentBuilder {
691        AgentBuilder::new()
692    }
693
694    fn new_turn_id() -> String {
695        uuid::Uuid::new_v4().to_string()
696    }
697
698    fn observer_agent_alias(&self) -> Option<String> {
699        if self.agent_alias.is_empty() {
700            None
701        } else {
702            Some(self.agent_alias.clone())
703        }
704    }
705
706    pub fn history(&self) -> &[ConversationMessage] {
707        &self.history
708    }
709
710    /// Late-bound channel-map handles for the five channel-driven tools.
711    /// Populated by `from_config_with_session_cwd`; empty when an Agent is
712    /// constructed via the builder directly. Callers (e.g. the ACP server)
713    /// use `channel_handles().register_channel(...)` to wire a back-channel
714    /// into all five tool maps in one shot.
715    pub fn channel_handles(&self) -> &AgentChannelHandles {
716        &self.channel_handles
717    }
718
719    /// Populate late-bound channel-map handles with configured channels.
720    ///
721    /// Seeds `ask_user`, `reaction`, `poll`, and `escalate`
722    /// handles from the provided map. Called by CLI and orchestrator paths
723    /// after agent construction but before the agent loop starts.
724    ///
725    /// Returns the list of registered channel names for logging.
726    pub fn populate_channels(
727        &self,
728        channel_map: &std::collections::HashMap<String, Arc<dyn zeroclaw_api::channel::Channel>>,
729    ) -> Vec<String> {
730        let mut names = Vec::new();
731        for (name, ch) in channel_map {
732            self.channel_handles.register_channel(name, Arc::clone(ch));
733            names.push(name.clone());
734        }
735        names
736    }
737
738    /// Attribution fields for opening a turn span at external call sites
739    /// (ACP, gateway WS) so every record inside a streamed turn carries the
740    /// same `agent_alias`/`model_provider`/`model` the RPC dispatch path sets.
741    /// Returns `(agent_alias, model_provider, model)`.
742    pub fn attribution_fields(&self) -> (String, String, String) {
743        (
744            self.agent_alias.clone(),
745            self.model_provider_name.clone(),
746            self.model_name.clone(),
747        )
748    }
749
750    pub fn clear_history(&mut self) {
751        self.history.clear();
752    }
753
754    fn encode_response_cache_transcript(messages: &[ChatMessage]) -> String {
755        let mut transcript = String::new();
756        for message in messages.iter().filter(|message| message.role != "system") {
757            transcript.push_str("role=");
758            transcript.push_str(&message.role.len().to_string());
759            transcript.push(':');
760            transcript.push_str(&message.role);
761            transcript.push_str(";content=");
762            transcript.push_str(&message.content.len().to_string());
763            transcript.push(':');
764            transcript.push_str(&message.content);
765            transcript.push('\n');
766        }
767        transcript
768    }
769
770    fn response_cache_key_for_messages(
771        &self,
772        messages: &[ChatMessage],
773        effective_model: &str,
774    ) -> Option<String> {
775        if self.temperature != Some(0.0) || self.response_cache.is_none() {
776            return None;
777        }
778
779        let system = messages
780            .iter()
781            .find(|message| message.role == "system")
782            .map(|message| message.content.as_str());
783        let transcript = Self::encode_response_cache_transcript(messages);
784
785        Some(zeroclaw_memory::response_cache::ResponseCache::cache_key(
786            effective_model,
787            system,
788            &transcript,
789        ))
790    }
791
792    fn drain_steering_messages(
793        steering_rx: &mut Option<&mut tokio::sync::mpsc::Receiver<String>>,
794    ) -> Vec<String> {
795        let Some(rx) = steering_rx.as_deref_mut() else {
796            return Vec::new();
797        };
798
799        let mut messages = Vec::new();
800        loop {
801            match rx.try_recv() {
802                Ok(message) => messages.push(message),
803                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
804                Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
805            }
806        }
807        messages
808    }
809
810    async fn append_streamed_user_message_to_history(
811        &mut self,
812        user_message: &str,
813        new_msgs: &mut Vec<ConversationMessage>,
814    ) {
815        let context = self
816            .memory_strategy
817            .load_context(user_message, self.memory_session_id.as_deref())
818            .await
819            .unwrap_or_default();
820
821        if self.auto_save {
822            let _ = self
823                .memory
824                .store(
825                    "user_msg",
826                    user_message,
827                    MemoryCategory::Conversation,
828                    self.memory_session_id.as_deref(),
829                )
830                .await;
831        }
832
833        let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S %Z");
834        let enriched = if context.is_empty() {
835            format!("[{now}] {user_message}")
836        } else {
837            format!("{context}[{now}] {user_message}")
838        };
839
840        let user_msg = ConversationMessage::Chat(ChatMessage::user(enriched));
841        new_msgs.push(user_msg.clone());
842        self.history.push(user_msg);
843    }
844
845    fn marked_partial_response(partial: &str, marker: &str) -> String {
846        if partial.is_empty() {
847            marker.to_string()
848        } else {
849            format!("{partial}\n\n{marker}")
850        }
851    }
852
853    /// Forward a turn event to the consumer without ever blocking the turn on a
854    /// stalled receiver. The streamed `event_tx` is bounded; if the consumer
855    /// stops draining (e.g. it is itself blocked on a contended write) a bare
856    /// `send().await` would park here forever, and because these sends sit
857    /// outside the turn's `select!` arms a fired cancel token could never
858    /// interrupt them — the turn would wedge on "working" with no cancel path.
859    /// Racing the send against the cancel token guarantees the producer yields
860    /// the moment cancellation fires. Returns `false` when cancellation won the
861    /// race (the event was not delivered); the send error itself is intentionally
862    /// ignored — a gone consumer is not a turn failure.
863    async fn send_turn_event(
864        event_tx: &tokio::sync::mpsc::Sender<TurnEvent>,
865        cancel_token: Option<&tokio_util::sync::CancellationToken>,
866        event: TurnEvent,
867    ) -> bool {
868        match cancel_token {
869            Some(token) => {
870                tokio::select! {
871                    biased;
872                    () = token.cancelled() => false,
873                    _ = event_tx.send(event) => true,
874                }
875            }
876            None => {
877                let _ = event_tx.send(event).await;
878                true
879            }
880        }
881    }
882
883    fn append_streamed_assistant_message_to_history(
884        &mut self,
885        content: String,
886        new_msgs: &mut Vec<ConversationMessage>,
887        committed_response: &mut String,
888    ) {
889        let assistant_msg = ConversationMessage::Chat(ChatMessage::assistant(content.clone()));
890        new_msgs.push(assistant_msg.clone());
891        self.history.push(assistant_msg);
892        committed_response.push_str(&content);
893    }
894
895    fn synthesize_cancelled_tool_results(
896        &mut self,
897        completed: Vec<ToolResultMessage>,
898        remaining: &[zeroclaw_providers::ToolCall],
899        new_msgs: &mut Vec<ConversationMessage>,
900    ) {
901        let mut results = completed;
902        results.extend(remaining.iter().map(|call| ToolResultMessage {
903            tool_call_id: call.id.clone(),
904            content: "[interrupted by user before this tool produced a result]".to_string(),
905        }));
906        if results.is_empty() {
907            return;
908        }
909        let msg = ConversationMessage::ToolResults(results);
910        new_msgs.push(msg.clone());
911        self.history.push(msg);
912    }
913
914    fn should_send_tool_specs(&self) -> bool {
915        self.tool_dispatcher.should_send_tool_specs() && !self.tool_specs.is_empty()
916    }
917
918    fn parse_response_for_effective_tools(
919        &self,
920        response: &zeroclaw_providers::ChatResponse,
921    ) -> (String, Vec<ParsedToolCall>) {
922        if self.tool_specs.is_empty() {
923            return (strip_think_tags(response.text_or_empty()), Vec::new());
924        }
925
926        if self.config.resolved.strict_tool_parsing && response.tool_calls.is_empty() {
927            return (strip_think_tags(response.text_or_empty()), Vec::new());
928        }
929
930        self.tool_dispatcher.parse_response(response)
931    }
932
933    pub fn set_memory_session_id(&mut self, session_id: Option<String>) {
934        self.memory_session_id = session_id;
935    }
936
937    pub fn set_temperature(&mut self, temperature: Option<f64>) {
938        self.temperature = temperature;
939    }
940
941    #[cfg(test)]
942    pub fn temperature_for_test(&self) -> Option<f64> {
943        self.temperature
944    }
945
946    pub fn set_model_name(&mut self, model_name: String) {
947        self.model_name = model_name;
948    }
949
950    pub fn set_model_provider(&mut self, model_provider: Box<dyn ModelProvider>) {
951        self.model_provider = model_provider;
952    }
953
954    pub fn set_model_provider_name(&mut self, model_provider_name: String) {
955        self.model_provider_name = model_provider_name;
956    }
957
958    /// Return the names of all registered tools.  Test-only — avoids
959    /// exposing `Box<dyn Tool>` across the crate boundary.
960    #[cfg(test)]
961    pub fn tool_names(&self) -> Vec<&str> {
962        self.tools.iter().map(|t| t.name()).collect()
963    }
964
965    /// Hydrate the agent with prior chat messages (e.g. from a session backend).
966    ///
967    /// Ensures a system prompt is prepended if history is empty, then appends all
968    /// non-system messages from the seed. System messages in the seed are skipped
969    /// to avoid duplicating the system prompt.
970    pub fn seed_history(&mut self, messages: &[ChatMessage]) {
971        if self.history.is_empty()
972            && let Ok(sys) = self.build_system_prompt()
973        {
974            self.history
975                .push(ConversationMessage::Chat(ChatMessage::system(sys)));
976        }
977        for msg in messages {
978            if msg.role != "system" {
979                self.history.push(ConversationMessage::Chat(msg.clone()));
980            }
981        }
982    }
983
984    /// Hydrate the agent with a full `ConversationMessage` history (e.g. restored
985    /// from an ACP session store). Preserves all variants including `AssistantToolCalls`
986    /// and `ToolResults` — use this for ACP restore; use `seed_history` for flat
987    /// channel session hydration.
988    pub fn seed_conversation_history(&mut self, messages: Vec<ConversationMessage>) {
989        if self.history.is_empty()
990            && let Ok(sys) = self.build_system_prompt()
991        {
992            self.history
993                .push(ConversationMessage::Chat(ChatMessage::system(sys)));
994        }
995        for msg in messages {
996            // Skip system messages from the seed — the system prompt is already prepended above.
997            if matches!(&msg, ConversationMessage::Chat(m) if m.role == "system") {
998                continue;
999            }
1000            self.history.push(msg);
1001        }
1002        // Trim immediately so pre_len snapshots (taken before the first turn)
1003        // are always within the configured limit; otherwise a long restored
1004        // history would cause history[pre_len..] to panic after trim_history
1005        // shrinks the vec below pre_len during the turn.
1006        self.trim_history();
1007    }
1008
1009    pub async fn from_config(config: &Config, agent_alias: &str) -> Result<Self> {
1010        Self::from_config_with_session_cwd(config, agent_alias, None).await
1011    }
1012
1013    /// Build an Agent with an optional per-session working directory override.
1014    ///
1015    /// `session_cwd`, when supplied, becomes [`SecurityPolicy::workspace_dir`]
1016    /// for this agent — i.e. the boundary used by file_read/write/edit and the
1017    /// cwd used by the shell tool. Memory storage, identity files, scheduled
1018    /// task DBs, and other on-disk state continue to live under
1019    /// `config.data_dir`.
1020    ///
1021    /// This is what ACP sessions use to pin tool path resolution to the
1022    /// IDE-provided `cwd` without relocating the agent's data directory.
1023    pub async fn from_config_with_session_cwd(
1024        config: &Config,
1025        agent_alias: &str,
1026        session_cwd: Option<&Path>,
1027    ) -> Result<Self> {
1028        Self::from_config_with_session_cwd_and_mcp(config, agent_alias, session_cwd, true).await
1029    }
1030
1031    /// Build an Agent while optionally skipping eager MCP initialization.
1032    ///
1033    /// ACP clients expect `session/new` to return promptly. User-configured
1034    /// MCP servers are external processes/services and can block startup while
1035    /// they time out, so ACP uses this with `initialize_mcp = false`.
1036    pub async fn from_config_with_session_cwd_and_mcp(
1037        config: &Config,
1038        agent_alias: &str,
1039        session_cwd: Option<&Path>,
1040        initialize_mcp: bool,
1041    ) -> Result<Self> {
1042        Self::from_config_with_session_cwd_and_mcp_approval_mode(
1043            config,
1044            agent_alias,
1045            session_cwd,
1046            initialize_mcp,
1047            false,
1048            false,
1049            None,
1050        )
1051        .await
1052    }
1053
1054    /// Build an Agent for direct ACP/WS sessions that have a client approval
1055    /// back-channel. This keeps shell approval on the runtime-controlled path.
1056    ///
1057    /// When `exclude_memory` is `true`, the agent is constructed without
1058    /// persistent memory: `NoneMemory` backend, auto-save off, and all
1059    /// `memory_*` tools stripped. ACP sessions pass `true`.
1060    pub async fn from_config_with_session_cwd_and_mcp_backchannel(
1061        config: &Config,
1062        agent_alias: &str,
1063        session_cwd: Option<&Path>,
1064        initialize_mcp: bool,
1065        exclude_memory: bool,
1066    ) -> Result<Self> {
1067        Self::from_config_with_session_cwd_and_mcp_approval_mode(
1068            config,
1069            agent_alias,
1070            session_cwd,
1071            initialize_mcp,
1072            true,
1073            exclude_memory,
1074            None,
1075        )
1076        .await
1077    }
1078
1079    /// Like [`Self::from_config_with_session_cwd_and_mcp_backchannel`] but also
1080    /// injects the TUI's captured shell environment so that tools like
1081    /// `ShellTool` inherit the user's real `PATH`, `SSH_AUTH_SOCK`, etc.
1082    /// rather than the daemon's stripped-down process environment.
1083    pub async fn from_config_with_tui_env(
1084        config: &Config,
1085        agent_alias: &str,
1086        session_cwd: Option<&Path>,
1087        initialize_mcp: bool,
1088        exclude_memory: bool,
1089        tui_env: Option<std::collections::HashMap<String, String>>,
1090    ) -> Result<Self> {
1091        Self::from_config_with_session_cwd_and_mcp_approval_mode(
1092            config,
1093            agent_alias,
1094            session_cwd,
1095            initialize_mcp,
1096            true,
1097            exclude_memory,
1098            tui_env,
1099        )
1100        .await
1101    }
1102
1103    async fn from_config_with_session_cwd_and_mcp_approval_mode(
1104        config: &Config,
1105        agent_alias: &str,
1106        session_cwd: Option<&Path>,
1107        initialize_mcp: bool,
1108        approval_backchannel: bool,
1109        exclude_memory: bool,
1110        tui_env: Option<std::collections::HashMap<String, String>>,
1111    ) -> Result<Self> {
1112        let agent_cfg = config
1113            .agent(agent_alias)
1114            .with_context(|| format!("agents.{agent_alias} is not configured"))?;
1115        let risk_profile = config
1116            .risk_profile_for_agent(agent_alias)
1117            .with_context(|| {
1118                format!(
1119                    "agents.{agent_alias}.risk_profile does not name a configured risk_profiles entry"
1120                )
1121            })?;
1122
1123        let observer: Arc<dyn Observer> =
1124            Arc::from(observability::create_observer(&config.observability));
1125        let runtime: Arc<dyn platform::RuntimeAdapter> =
1126            Arc::from(platform::create_runtime(&config.runtime)?);
1127        // Per-agent workspace becomes the SecurityPolicy boundary
1128        // (file_read/write/edit + shell tool jail to the agent's own
1129        // dir). The session-cwd override still wins so ACP sessions
1130        // can pin tool path resolution to an IDE-provided cwd.
1131        let agent_workspace = config.agent_workspace_dir(agent_alias);
1132        // Create the per-agent workspace dir on demand so bootstrap
1133        // file writes (and downstream markdown-memory backends) don't
1134        // hit ENOENT on a fresh install.
1135        if let Err(e) = tokio::fs::create_dir_all(&agent_workspace).await {
1136            ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "workspace": agent_workspace.display().to_string(), "e": e.to_string()})), "Failed to create per-agent workspace dir (continuing): ");
1137        }
1138        // Seed the agent's bootstrap files (AGENTS.md / SOUL.md /
1139        // IDENTITY.md / USER.md / TOOLS.md / BOOTSTRAP.md) on first
1140        // run. Idempotent — never overwrites existing files; only
1141        // fills in the gaps so a freshly-created agent has a basic
1142        // identity to load.
1143        if let Err(e) = zeroclaw_config::schema::ensure_bootstrap_files(&agent_workspace).await {
1144            ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "workspace": agent_workspace.display().to_string(), "e": e.to_string()})), "Failed to ensure per-agent bootstrap files (continuing with whatever exists): ");
1145        }
1146        let security = Arc::new({
1147            // Use for_agent so the runtime profile (max_actions_per_hour,
1148            // shell_timeout_secs, etc.) is applied — from_risk_profile passes
1149            // None for the runtime profile and silently falls back to the
1150            // schema default of 20 actions/hour regardless of config.
1151            let mut policy = SecurityPolicy::for_agent(config, agent_alias).with_context(|| {
1152                format!("agents.{agent_alias}: failed to build security policy")
1153            })?;
1154            // When a per-session cwd overrides the sandbox root, ensure
1155            // the per-agent workspace (where skills, identity, and config
1156            // data live) remains readable. Without this, file_read and
1157            // search tools are locked out of the agent's workspace the
1158            // moment the session cwd differs.
1159            if let Some(cwd) = session_cwd {
1160                policy.workspace_dir = cwd.to_path_buf();
1161                policy.allowed_roots.push(agent_workspace.clone());
1162            }
1163            policy
1164        });
1165
1166        let (provider_name, provider_alias, agent_model_provider) =
1167            match config.resolved_model_provider_for_agent(agent_alias) {
1168                Some(resolved) => (resolved.0, resolved.1, Some(resolved.2)),
1169                None => {
1170                    let agent_ref = agent_cfg.model_provider.as_str();
1171                    if !agent_ref.is_empty() {
1172                        anyhow::bail!(
1173                            "agents.{agent_alias}.model_provider = \"{agent_ref}\" does not \
1174                             resolve to a configured [providers.models.<type>.<alias>] entry"
1175                        );
1176                    }
1177                    // V3 schema requires every agent to set model_provider.
1178                    // Empty is a config error rather than a silent fallback.
1179                    anyhow::bail!(
1180                        "agents.{agent_alias}.model_provider is empty — set it to a \
1181                         configured \"<type>.<alias>\" (e.g. \"anthropic.{agent_alias}\")"
1182                    );
1183                }
1184            };
1185        let memory: Arc<dyn Memory> = zeroclaw_memory::create_memory_for_agent(
1186            config,
1187            agent_alias,
1188            agent_model_provider.and_then(|e| e.api_key.as_deref()),
1189        )
1190        .await?;
1191
1192        let composio_key = if config.composio.enabled {
1193            config.composio.api_key.as_deref()
1194        } else {
1195            None
1196        };
1197        let composio_entity_id = if config.composio.enabled {
1198            Some(config.composio.entity_id.as_str())
1199        } else {
1200            None
1201        };
1202
1203        let all_tools_result = tools::all_tools_with_runtime(
1204            Arc::new(config.clone()),
1205            &security,
1206            risk_profile,
1207            agent_alias,
1208            runtime,
1209            memory.clone(),
1210            composio_key,
1211            composio_entity_id,
1212            &config.browser,
1213            &config.http_request,
1214            &config.web_fetch,
1215            &security.workspace_dir,
1216            &config.agents,
1217            agent_model_provider.and_then(|e| e.api_key.as_deref()),
1218            config,
1219            None,
1220            false,
1221            tui_env,
1222        );
1223        let mut tools = all_tools_result.tools;
1224        let delegate_handle = all_tools_result.delegate_handle;
1225        let ask_user_handle = all_tools_result.ask_user_handle;
1226        let reaction_handle = all_tools_result.reaction_handle;
1227        let poll_handle = all_tools_result.poll_handle;
1228        let escalate_handle = all_tools_result.escalate_handle;
1229
1230        // ── Built-in SecurityPolicy tool gate (parity with agent::run) ──
1231        // Apply the agent's allowlist (`allowed_tools`) AND denylist
1232        // (`excluded_tools`) to the built-in registry *before* MCP tools and
1233        // skill tools are added. `from_config` (ws.rs / daemon) bypasses the
1234        // channel orchestrator and previously enforced only the risk-profile
1235        // denylist (further below) on this path — never the allowlist — so an
1236        // agent allowlisted to e.g. `file_read` still kept raw `shell` /
1237        // `file_write`. Filtering here, before skill registration, is also
1238        // what lets a scoped elevation wrapper survive: the raw target is
1239        // removed while the distinct prefixed `{skill}__{tool}` wrapper is
1240        // appended later. MCP tools are initialized after this built-in
1241        // filter, then MCP registration and deferred discovery apply the same
1242        // SecurityPolicy explicitly so denied MCP tools do not surface.
1243        let before_policy_filter = tools.len();
1244        crate::agent::loop_::apply_policy_tool_filter(&mut tools, Some(security.as_ref()), None);
1245        if tools.len() != before_policy_filter {
1246            ::zeroclaw_log::record!(
1247                INFO,
1248                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1249                    .with_attrs(::serde_json::json!({
1250                        "before": before_policy_filter,
1251                        "retained": tools.len(),
1252                        "policy_allowed": security.allowed_tools.as_ref().map(|v| v.len()),
1253                        "policy_excluded": security.excluded_tools.as_ref().map(|v| v.len()),
1254                    })),
1255                "Applied SecurityPolicy built-in tool filter (from_config path)"
1256            );
1257        }
1258
1259        // ── Wire MCP tools (non-fatal) ─────────────────────────────
1260        // Replicates the same MCP initialization logic used in the CLI
1261        // and webhook paths (loop_.rs) so that the WebSocket/daemon UI
1262        // path also has access to MCP tools.
1263        let mut activated_tools: Option<Arc<std::sync::Mutex<tools::ActivatedToolSet>>> = None;
1264        // Resolution-only MCP wrappers for skill MCP elevation (kind = "mcp").
1265        let mut mcp_elevation_arcs: Vec<Arc<dyn tools::Tool>> = Vec::new();
1266        if initialize_mcp && config.mcp.enabled && !config.mcp.servers.is_empty() {
1267            ::zeroclaw_log::record!(
1268                INFO,
1269                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1270                &format!(
1271                    "Initializing MCP client — {} server(s) configured",
1272                    config.mcp.servers.len()
1273                )
1274            );
1275            match tools::McpRegistry::connect_all(&config.mcp.servers).await {
1276                Ok(registry) => {
1277                    let registry = std::sync::Arc::new(registry);
1278                    mcp_elevation_arcs = tools::collect_mcp_elevation_arcs(&registry).await;
1279                    let mcp_policy =
1280                        crate::agent::loop_::mcp_tool_access_policy(security.as_ref(), None);
1281                    if config.mcp.deferred_loading {
1282                        let deferred_set = tools::DeferredMcpToolSet::from_registry(
1283                            std::sync::Arc::clone(&registry),
1284                        )
1285                        .await;
1286                        ::zeroclaw_log::record!(
1287                            INFO,
1288                            ::zeroclaw_log::Event::new(
1289                                module_path!(),
1290                                ::zeroclaw_log::Action::Note
1291                            ),
1292                            &format!(
1293                                "MCP deferred: {} tool stub(s) from {} server(s)",
1294                                deferred_set.len(),
1295                                registry.server_count()
1296                            )
1297                        );
1298                        let allowed_stub_count = crate::agent::loop_::mcp_allowed_tool_count(
1299                            deferred_set
1300                                .stubs
1301                                .iter()
1302                                .map(|stub| stub.prefixed_name.as_str()),
1303                            mcp_policy.as_ref(),
1304                        );
1305                        if allowed_stub_count > 0 {
1306                            let activated =
1307                                Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
1308                            activated_tools = Some(Arc::clone(&activated));
1309                            let mut tool_search =
1310                                tools::ToolSearchTool::new(deferred_set, activated);
1311                            if let Some(policy) = mcp_policy {
1312                                tool_search = tool_search.with_access_policy(policy);
1313                            }
1314                            tools.push(Box::new(tool_search));
1315                        }
1316                    } else {
1317                        let names = registry.tool_names();
1318                        let mut registered = 0usize;
1319                        let mut skipped = 0usize;
1320                        for name in names {
1321                            if !crate::agent::loop_::eager_mcp_tool_allowed(
1322                                &name,
1323                                mcp_policy.as_ref(),
1324                            ) {
1325                                skipped += 1;
1326                                continue;
1327                            }
1328                            if let Some(def) = registry.get_tool_def(&name).await {
1329                                let wrapper: std::sync::Arc<dyn tools::Tool> =
1330                                    std::sync::Arc::new(tools::McpToolWrapper::new(
1331                                        name,
1332                                        def,
1333                                        std::sync::Arc::clone(&registry),
1334                                    ));
1335                                if crate::agent::loop_::register_eager_mcp_tool_if_allowed(
1336                                    wrapper,
1337                                    &mut tools,
1338                                    delegate_handle.as_ref(),
1339                                    mcp_policy.as_ref(),
1340                                ) {
1341                                    registered += 1;
1342                                }
1343                            }
1344                        }
1345                        ::zeroclaw_log::record!(
1346                            INFO,
1347                            ::zeroclaw_log::Event::new(
1348                                module_path!(),
1349                                ::zeroclaw_log::Action::Note
1350                            ),
1351                            &format!(
1352                                "MCP: {} tool(s) registered from {} server(s), {} skipped by policy",
1353                                registered,
1354                                registry.server_count(),
1355                                skipped
1356                            )
1357                        );
1358                    }
1359                }
1360                Err(e) => {
1361                    ::zeroclaw_log::record!(
1362                        ERROR,
1363                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1364                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1365                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1366                        "MCP registry failed to initialize"
1367                    );
1368                }
1369            }
1370        }
1371
1372        let model_name = match agent_model_provider
1373            .and_then(|e| e.model.as_deref())
1374            .map(str::trim)
1375            .filter(|m| !m.is_empty())
1376        {
1377            Some(m) => m.to_string(),
1378            None => anyhow::bail!(
1379                "agents.{agent_alias}.model_provider resolves to a model_provider entry \
1380                 with no `model` set. Configure [providers.models.{provider_name}.<alias>] \
1381                 model = \"...\".",
1382            ),
1383        };
1384
1385        let provider_ref = format!("{provider_name}.{provider_alias}");
1386        let provider_runtime_options = zeroclaw_providers::provider_runtime_options_for_alias(
1387            config,
1388            provider_name,
1389            provider_alias,
1390        );
1391
1392        let model_provider: Box<dyn ModelProvider> =
1393            zeroclaw_providers::create_routed_model_provider_with_options(
1394                config,
1395                &provider_ref,
1396                agent_model_provider.and_then(|e| e.api_key.as_deref()),
1397                agent_model_provider.and_then(|e| e.uri.as_deref()),
1398                &config.reliability,
1399                &config.model_routes,
1400                &model_name,
1401                &provider_runtime_options,
1402            )?;
1403
1404        let dispatcher_choice = agent_cfg.resolved.tool_dispatcher.as_str();
1405        let tool_dispatcher: Box<dyn ToolDispatcher> = match dispatcher_choice {
1406            "native" => Box::new(NativeToolDispatcher),
1407            "xml" => Box::new(XmlToolDispatcher),
1408            _ if model_provider.supports_native_tools() => Box::new(NativeToolDispatcher),
1409            _ => Box::new(XmlToolDispatcher),
1410        };
1411
1412        let route_model_by_hint: HashMap<String, String> = config
1413            .model_routes
1414            .iter()
1415            .map(|route| (route.hint.clone(), route.model.clone()))
1416            .collect();
1417        let available_hints: Vec<String> = route_model_by_hint.keys().cloned().collect();
1418
1419        let response_cache = if config.memory.response_cache_enabled {
1420            zeroclaw_memory::response_cache::ResponseCache::with_hot_cache(
1421                &config.data_dir,
1422                config.memory.response_cache_ttl_minutes,
1423                config.memory.response_cache_max_entries,
1424                config.memory.response_cache_hot_entries,
1425            )
1426            .ok()
1427            .map(Arc::new)
1428        } else {
1429            None
1430        };
1431
1432        // Filter out tools excluded by this agent's risk profile. The
1433        // channel orchestrator applies this for channel-driven runs, but
1434        // Agent::from_config (used by ws.rs) doesn't go through that path.
1435        let excluded = &risk_profile.excluded_tools;
1436        if !excluded.is_empty() {
1437            tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
1438        }
1439
1440        // Load skills and register them as callable tools so WebSocket/daemon
1441        // sessions can execute them (not just describe them in the prompt).
1442        // Bundle-aware so `[agents.<alias>].skill_bundles` aliases resolve
1443        // through to `[skill_bundles.<alias>].directory` (defaulting to
1444        // `<install>/shared/skills/<alias>/`).
1445        let skills = crate::skills::load_skills_for_agent_from_config(config, agent_alias);
1446        // Resolution registry = built-in arcs + resolution-only MCP wrappers, so
1447        // skill elevation (kind = "builtin" / "mcp") can resolve either target.
1448        let skill_resolution_registry: Vec<Arc<dyn tools::Tool>> = all_tools_result
1449            .unfiltered_tool_arcs
1450            .iter()
1451            .cloned()
1452            .chain(mcp_elevation_arcs.iter().cloned())
1453            .collect();
1454        tools::register_skill_tools_with_context(
1455            &mut tools,
1456            &skills,
1457            security.clone(),
1458            &skill_resolution_registry,
1459        );
1460
1461        let approval_manager = if approval_backchannel {
1462            ApprovalManager::for_non_interactive_backchannel(risk_profile)
1463        } else {
1464            ApprovalManager::for_non_interactive(risk_profile)
1465        };
1466
1467        let mut agent = Agent::builder()
1468            .model_provider(model_provider)
1469            .tools(tools)
1470            .memory(memory.clone())
1471            .observer(observer)
1472            .response_cache(response_cache)
1473            .tool_dispatcher(tool_dispatcher)
1474            .memory_strategy(Arc::new(
1475                crate::agent::memory_strategy::DefaultMemoryStrategy::with_config_and_limit(
1476                    memory.clone(),
1477                    config.memory.clone(),
1478                    security.workspace_dir.clone(),
1479                    config.effective_memory_recall_limit(agent_alias),
1480                ),
1481            ))
1482            .prompt_builder(SystemPromptBuilder::with_defaults())
1483            .config(
1484                config
1485                    .resolved_agent_config(agent_alias)
1486                    .unwrap_or_else(|| agent_cfg.clone()),
1487            )
1488            .multimodal_config(config.multimodal.clone())
1489            .agent_alias(agent_alias.to_string())
1490            .model_name(model_name)
1491            .model_provider_name(provider_name.to_string())
1492            .temperature(agent_model_provider.and_then(|e| e.temperature))
1493            .workspace_dir(security.workspace_dir.clone())
1494            .agent_workspace_dir(agent_workspace.clone())
1495            .classification_config(config.query_classification.clone())
1496            .available_hints(available_hints)
1497            .route_model_by_hint(route_model_by_hint)
1498            .identity_config(agent_cfg.identity.clone())
1499            .skills(skills)
1500            .skills_prompt_mode(config.skills.prompt_injection_mode)
1501            .auto_save(config.memory.auto_save)
1502            .exclude_memory(exclude_memory)
1503            .security_summary(Some(security.prompt_summary()))
1504            .autonomy_level(risk_profile.level)
1505            .activated_tools(activated_tools)
1506            .hook_runner(if config.hooks.enabled {
1507                let mut runner = crate::hooks::HookRunner::new();
1508                if config.hooks.builtin.command_logger {
1509                    runner.register(Box::new(crate::hooks::builtin::CommandLoggerHook::new()));
1510                }
1511                if config.hooks.builtin.webhook_audit.enabled {
1512                    runner.register(Box::new(crate::hooks::builtin::WebhookAuditHook::new(
1513                        config.hooks.builtin.webhook_audit.clone(),
1514                    )));
1515                }
1516                Some(Arc::new(runner))
1517            } else {
1518                None
1519            })
1520            .approval_manager(Some(Arc::new(approval_manager)))
1521            .build()?;
1522
1523        // Wire per-tool channel-map handles into the agent so callers (e.g.
1524        // the ACP server) can register back-channels after construction.
1525        agent.channel_handles = AgentChannelHandles {
1526            ask_user: ask_user_handle,
1527            reaction: reaction_handle,
1528            poll: poll_handle,
1529            escalate: escalate_handle,
1530        };
1531
1532        Ok(agent)
1533    }
1534
1535    fn trim_history(&mut self) {
1536        let max = self.config.resolved.max_history_messages;
1537        if self.history.len() <= max {
1538            return;
1539        }
1540
1541        let mut system_messages = Vec::new();
1542        let mut other_messages = Vec::new();
1543
1544        for msg in self.history.drain(..) {
1545            match &msg {
1546                ConversationMessage::Chat(chat) if chat.role == "system" => {
1547                    system_messages.push(msg);
1548                }
1549                _ => other_messages.push(msg),
1550            }
1551        }
1552
1553        if other_messages.len() > max {
1554            let initial_drop_count = other_messages.len() - max;
1555            let mut drop_count = initial_drop_count;
1556
1557            ::zeroclaw_log::record!(
1558                DEBUG,
1559                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1560                    .with_category(::zeroclaw_log::EventCategory::Agent)
1561                    .with_attrs(::serde_json::json!({
1562                        "total_messages": other_messages.len(),
1563                        "max_history": max,
1564                        "initial_drop_count": initial_drop_count,
1565                    })),
1566                "trim_history: dropping oldest messages"
1567            );
1568
1569            // Avoid creating orphan ToolResults: if the first message remaining
1570            // after the drop is a ToolResults, its paired AssistantToolCalls was
1571            // dropped, so the ToolResults must be dropped too. Otherwise the
1572            // history would start with a tool_result block whose tool_use_id
1573            // has no matching tool_use, causing model_providers (e.g. Anthropic) to
1574            // reject the request with "messages.0.content.0: unexpected
1575            // tool_use_id found in tool_result blocks".
1576            let before_orphan_tr = drop_count;
1577            while drop_count < other_messages.len()
1578                && matches!(
1579                    &other_messages[drop_count],
1580                    ConversationMessage::ToolResults(_)
1581                )
1582            {
1583                drop_count += 1;
1584            }
1585            if drop_count > before_orphan_tr {
1586                ::zeroclaw_log::record!(
1587                    DEBUG,
1588                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1589                        .with_category(::zeroclaw_log::EventCategory::Agent)
1590                        .with_attrs(::serde_json::json!({
1591                            "extra_dropped": drop_count - before_orphan_tr,
1592                        })),
1593                    "trim_history: dropped orphan ToolResults at head"
1594                );
1595            }
1596
1597            // Symmetric guard: avoid orphan AssistantToolCalls at the new head.
1598            // If the first kept message is an AssistantToolCalls, the model sees
1599            // tool calls it made but never received results for (the paired
1600            // ToolResults was already dropped above or fell outside the window).
1601            // This corrupts the conversation and causes unpredictable behaviour
1602            // — the model may retry tools, hallucinate results, or go off-rails.
1603            let before_orphan_ac = drop_count;
1604            while drop_count < other_messages.len()
1605                && matches!(
1606                    &other_messages[drop_count],
1607                    ConversationMessage::AssistantToolCalls { .. }
1608                )
1609            {
1610                // Also drop the ToolResults that follows this AC (if present)
1611                drop_count += 1;
1612                if drop_count < other_messages.len()
1613                    && matches!(
1614                        &other_messages[drop_count],
1615                        ConversationMessage::ToolResults(_)
1616                    )
1617                {
1618                    drop_count += 1;
1619                }
1620            }
1621            if drop_count > before_orphan_ac {
1622                ::zeroclaw_log::record!(
1623                    DEBUG,
1624                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1625                        .with_category(::zeroclaw_log::EventCategory::Agent)
1626                        .with_attrs(::serde_json::json!({
1627                            "extra_dropped": drop_count - before_orphan_ac,
1628                        })),
1629                    "trim_history: dropped orphan AssistantToolCalls at head"
1630                );
1631            }
1632
1633            // Safety: the orphan-removal cascades above can advance
1634            // drop_count all the way to other_messages.len() when the only
1635            // non-tool-call entry is the user message at position[0] and
1636            // initial_drop_count drops it (e.g. max=50, history=[user,
1637            // AC1, TR1, …, AC25, TR25]).  Sending zero messages to the
1638            // provider causes a hard 400 "messages: at least one message
1639            // is required".  When the cascade would wipe everything, skip
1640            // this trim pass so the conversation stays functional even
1641            // though it is temporarily over the message limit.
1642            if drop_count >= other_messages.len() {
1643                ::zeroclaw_log::record!(
1644                    WARN,
1645                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1646                        .with_category(::zeroclaw_log::EventCategory::Agent)
1647                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1648                        .with_attrs(::serde_json::json!({
1649                            "history_len": other_messages.len(),
1650                            "max_history_messages": max,
1651                        })),
1652                    "trim_history: orphan-cascade would empty all non-system messages; skipping trim to preserve conversation"
1653                );
1654                self.history = system_messages;
1655                self.history.extend(other_messages);
1656                return;
1657            }
1658
1659            ::zeroclaw_log::record!(
1660                DEBUG,
1661                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
1662                    .with_category(::zeroclaw_log::EventCategory::Agent)
1663                    .with_outcome(::zeroclaw_log::EventOutcome::Success)
1664                    .with_attrs(::serde_json::json!({
1665                        "total_dropped": drop_count,
1666                        "remaining": other_messages.len() - drop_count,
1667                    })),
1668                "trim_history: complete"
1669            );
1670
1671            other_messages.drain(0..drop_count);
1672        }
1673
1674        self.history = system_messages;
1675        self.history.extend(other_messages);
1676    }
1677
1678    fn build_system_prompt(&self) -> Result<String> {
1679        let expose_text_tool_protocol = !self.config.resolved.strict_tool_parsing
1680            || self.tool_dispatcher.should_send_tool_specs();
1681        let no_tools: Vec<Box<dyn Tool>> = Vec::new();
1682        let prompt_tools = if expose_text_tool_protocol {
1683            &self.tools
1684        } else {
1685            &no_tools
1686        };
1687        let instructions = self.tool_dispatcher.prompt_instructions(prompt_tools);
1688        let ctx = PromptContext {
1689            workspace_dir: &self.workspace_dir,
1690            agent_workspace_dir: &self.agent_workspace_dir,
1691            model_name: &self.model_name,
1692            tools: prompt_tools,
1693            skills: &self.skills,
1694            skills_prompt_mode: self.skills_prompt_mode,
1695            identity_config: Some(&self.identity_config),
1696            dispatcher_instructions: &instructions,
1697            sends_native_tool_specs: self.tool_dispatcher.should_send_tool_specs()
1698                && !prompt_tools.is_empty(),
1699            security_summary: self.security_summary.clone(),
1700            autonomy_level: self.autonomy_level,
1701        };
1702        self.prompt_builder.build(&ctx)
1703    }
1704
1705    async fn prepare_provider_messages(
1706        &mut self,
1707        messages: &[ChatMessage],
1708    ) -> Result<Vec<ChatMessage>> {
1709        let prepared = zeroclaw_providers::multimodal::prepare_messages_for_provider_cached(
1710            messages,
1711            &self.multimodal_config,
1712            &mut self.image_cache,
1713        )
1714        .await?;
1715        Ok(prepared.messages)
1716    }
1717
1718    async fn execute_tool_call(&self, call: &ParsedToolCall, turn_id: &str) -> ToolExecutionResult {
1719        let start = Instant::now();
1720
1721        // ── Hook: before_tool_call (modifying) ──────────────────
1722        // Mirrors the hook pipeline in run_tool_call_loop (loop_.rs) so that
1723        // library-integrated runs honour the same hook chain.
1724        let mut tool_name = call.name.clone();
1725        let mut tool_args = call.arguments.clone();
1726        if let Some(ref hooks) = self.hook_runner {
1727            match hooks
1728                .run_before_tool_call(tool_name.clone(), tool_args.clone())
1729                .await
1730            {
1731                crate::hooks::HookResult::Continue((n, a)) => {
1732                    tool_name = n;
1733                    tool_args = a;
1734                }
1735                crate::hooks::HookResult::Cancel(reason) => {
1736                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"tool": call.name, "reason": reason.to_string()})), "tool call cancelled by hook");
1737                    return ToolExecutionResult {
1738                        name: call.name.clone(),
1739                        output: format!("Cancelled by hook: {reason}"),
1740                        success: false,
1741                        tool_call_id: call.tool_call_id.clone(),
1742                    };
1743                }
1744            }
1745        }
1746
1747        super::set_runtime_approved_arg(&tool_name, &mut tool_args, false);
1748
1749        // ── Approval hook ──────────────────────────────────────
1750        // The ACP/WebSocket Agent path executes tools directly instead of
1751        // going through run_tool_call_loop. Keep its policy behavior aligned
1752        // with the shared loop by honoring auto_approve / always_ask here too.
1753        let mut approval_requirement = self
1754            .approval_manager
1755            .as_deref()
1756            .map(|mgr| mgr.approval_requirement(&tool_name))
1757            .unwrap_or(ApprovalRequirement::NotRequired);
1758        if let Some(mgr) = self.approval_manager.as_deref()
1759            && approval_requirement == ApprovalRequirement::Prompt
1760        {
1761            let request = ApprovalRequest {
1762                tool_name: tool_name.clone(),
1763                arguments: tool_args.clone(),
1764            };
1765
1766            let (decision, decision_channel) = if mgr.is_non_interactive() {
1767                // Iterate every registered channel looking for one that can
1768                // handle the approval request. The first Ok(Some(_)) wins.
1769                // This avoids hard-coding a channel name (e.g. "acp") and
1770                // naturally supports WS sessions or any future back-channel.
1771                let ch_request = zeroclaw_api::channel::ChannelApprovalRequest {
1772                    tool_name: request.tool_name.clone(),
1773                    arguments_summary: crate::approval::summarize_args(&request.arguments),
1774                    raw_arguments: Some(request.arguments.clone()),
1775                };
1776                let mut channel_decision: Option<zeroclaw_api::channel::ChannelApprovalResponse> =
1777                    None;
1778                let mut decision_channel_name = String::new();
1779                // Collect channels while holding the lock briefly, then drop
1780                // the lock before any await points so the guard is not Send.
1781                let channels: Vec<(String, Arc<dyn zeroclaw_api::channel::Channel>)> = self
1782                    .channel_handles
1783                    .ask_user
1784                    .as_ref()
1785                    .map(|h| {
1786                        h.read()
1787                            .iter()
1788                            .map(|(k, v)| (k.clone(), Arc::clone(v)))
1789                            .collect()
1790                    })
1791                    .unwrap_or_default();
1792                for (ch_name, ch) in &channels {
1793                    match ch.request_approval("", &ch_request).await {
1794                        Ok(Some(r)) => {
1795                            decision_channel_name = ch_name.clone();
1796                            channel_decision = Some(r);
1797                            break;
1798                        }
1799                        Ok(None) => continue,
1800                        Err(e) => {
1801                            ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"tool": tool_name, "channel": ch_name, "error": format!("{}", e)})), "channel approval request failed");
1802                        }
1803                    }
1804                }
1805                let approval = match channel_decision {
1806                    Some(zeroclaw_api::channel::ChannelApprovalResponse::Approve) => {
1807                        ApprovalResponse::Yes
1808                    }
1809                    Some(zeroclaw_api::channel::ChannelApprovalResponse::AlwaysApprove) => {
1810                        ApprovalResponse::Always
1811                    }
1812                    Some(zeroclaw_api::channel::ChannelApprovalResponse::Deny) => {
1813                        ApprovalResponse::No
1814                    }
1815                    Some(zeroclaw_api::channel::ChannelApprovalResponse::DenyWithEdit {
1816                        replacement,
1817                    }) => ApprovalResponse::ReplaceWith(replacement),
1818                    None => {
1819                        ::zeroclaw_log::record!(
1820                            WARN,
1821                            ::zeroclaw_log::Event::new(
1822                                module_path!(),
1823                                ::zeroclaw_log::Action::Note
1824                            )
1825                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1826                            .with_attrs(::serde_json::json!({"tool": tool_name})),
1827                            "no approval channel handled this request — denying. \
1828                             Configure a back-channel (ACP or WS) that implements \
1829                             request_approval to enable interactive approval."
1830                        );
1831                        ApprovalResponse::No
1832                    }
1833                };
1834                (approval, decision_channel_name)
1835            } else {
1836                (mgr.prompt_cli(&request), String::new())
1837            };
1838
1839            mgr.record_decision(&tool_name, &tool_args, &decision, &decision_channel);
1840
1841            if decision == ApprovalResponse::No {
1842                return ToolExecutionResult {
1843                    name: tool_name,
1844                    output: "Denied by user.".to_string(),
1845                    success: false,
1846                    tool_call_id: call.tool_call_id.clone(),
1847                };
1848            }
1849
1850            if let ApprovalResponse::ReplaceWith(replacement) = &decision {
1851                return ToolExecutionResult {
1852                    name: tool_name,
1853                    output: crate::approval::sanitize_tool_replacement(replacement),
1854                    success: true,
1855                    tool_call_id: call.tool_call_id.clone(),
1856                };
1857            }
1858
1859            if matches!(decision, ApprovalResponse::Yes | ApprovalResponse::Always) {
1860                approval_requirement = ApprovalRequirement::Approved;
1861            }
1862        }
1863        super::set_runtime_approved_arg(
1864            &tool_name,
1865            &mut tool_args,
1866            approval_requirement == ApprovalRequirement::Approved,
1867        );
1868
1869        // Serialize arguments once (after hooks may have mutated them) and
1870        // use the same string on the observer event so OTel exporters can
1871        // attach the actual JSON payload to the tool span.
1872        let args_json = tool_args.to_string();
1873        let tool_call_id = call.tool_call_id.clone();
1874
1875        // Emit invoke log — visible in the TUI Logs pane.
1876        ::zeroclaw_log::record!(
1877            DEBUG,
1878            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Invoke)
1879                .with_category(::zeroclaw_log::EventCategory::Tool)
1880                .with_attrs(::serde_json::json!({
1881                    "tool": tool_name,
1882                    "tool_call_id": tool_call_id,
1883                    "input": args_json,
1884                })),
1885            format!("tool call: {tool_name}")
1886        );
1887
1888        // First try to find tool in static registry, then in activated MCP tools.
1889        let (result, success) =
1890            if let Some(tool) = self.tools.iter().find(|t| t.name() == tool_name) {
1891                match tool.execute(tool_args.clone()).await {
1892                    Ok(r) => {
1893                        let (outcome_text, ok) = if r.success {
1894                            (r.output, true)
1895                        } else {
1896                            (format!("Error: {}", r.error.unwrap_or(r.output)), false)
1897                        };
1898                        self.observer.record_event(&ObserverEvent::ToolCall {
1899                            tool: tool_name.clone(),
1900                            tool_call_id: tool_call_id.clone(),
1901                            duration: start.elapsed(),
1902                            success: ok,
1903                            arguments: Some(args_json.clone()),
1904                            result: Some(super::loop_::scrub_credentials(&outcome_text)),
1905                            channel: None,
1906                            agent_alias: self.observer_agent_alias(),
1907                            turn_id: Some(turn_id.to_string()),
1908                        });
1909                        (outcome_text, ok)
1910                    }
1911                    Err(e) => {
1912                        let err_text = format!("Error executing {}: {e}", tool_name);
1913                        self.observer.record_event(&ObserverEvent::ToolCall {
1914                            tool: tool_name.clone(),
1915                            tool_call_id: tool_call_id.clone(),
1916                            duration: start.elapsed(),
1917                            success: false,
1918                            arguments: Some(args_json.clone()),
1919                            result: Some(super::loop_::scrub_credentials(&err_text)),
1920                            channel: None,
1921                            agent_alias: self.observer_agent_alias(),
1922                            turn_id: Some(turn_id.to_string()),
1923                        });
1924                        (err_text, false)
1925                    }
1926                }
1927            } else if let Some(activated_arc) = self.activated_tools.as_ref() {
1928                let activated_opt = activated_arc.lock().unwrap().get_resolved(&tool_name);
1929                if let Some(tool) = activated_opt {
1930                    match tool.execute(tool_args.clone()).await {
1931                        Ok(r) => {
1932                            let (outcome_text, ok) = if r.success {
1933                                (r.output, true)
1934                            } else {
1935                                (format!("Error: {}", r.error.unwrap_or(r.output)), false)
1936                            };
1937                            self.observer.record_event(&ObserverEvent::ToolCall {
1938                                tool: tool_name.clone(),
1939                                tool_call_id: tool_call_id.clone(),
1940                                duration: start.elapsed(),
1941                                success: ok,
1942                                arguments: Some(args_json.clone()),
1943                                result: Some(super::loop_::scrub_credentials(&outcome_text)),
1944                                channel: None,
1945                                agent_alias: self.observer_agent_alias(),
1946                                turn_id: Some(turn_id.to_string()),
1947                            });
1948                            (outcome_text, ok)
1949                        }
1950                        Err(e) => {
1951                            let err_text = format!("Error executing {}: {e}", tool_name);
1952                            self.observer.record_event(&ObserverEvent::ToolCall {
1953                                tool: tool_name.clone(),
1954                                tool_call_id: tool_call_id.clone(),
1955                                duration: start.elapsed(),
1956                                success: false,
1957                                arguments: Some(args_json.clone()),
1958                                result: Some(super::loop_::scrub_credentials(&err_text)),
1959                                channel: None,
1960                                agent_alias: self.observer_agent_alias(),
1961                                turn_id: Some(turn_id.to_string()),
1962                            });
1963                            (err_text, false)
1964                        }
1965                    }
1966                } else {
1967                    (format!("Unknown tool: {}", tool_name), false)
1968                }
1969            } else {
1970                (format!("Unknown tool: {}", tool_name), false)
1971            };
1972
1973        let duration = start.elapsed();
1974
1975        // Emit result log — visible in the TUI Logs pane.
1976        if success {
1977            ::zeroclaw_log::record!(
1978                DEBUG,
1979                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
1980                    .with_category(::zeroclaw_log::EventCategory::Tool)
1981                    .with_outcome(::zeroclaw_log::EventOutcome::Success)
1982                    .with_duration(duration.as_millis() as u64)
1983                    .with_attrs(::serde_json::json!({
1984                        "tool": tool_name,
1985                        "tool_call_id": tool_call_id,
1986                        "input": args_json,
1987                        "output": result,
1988                    })),
1989                format!("tool result: {tool_name}")
1990            );
1991        } else {
1992            ::zeroclaw_log::record!(
1993                WARN,
1994                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1995                    .with_category(::zeroclaw_log::EventCategory::Tool)
1996                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1997                    .with_duration(duration.as_millis() as u64)
1998                    .with_attrs(::serde_json::json!({
1999                        "tool": tool_name,
2000                        "tool_call_id": tool_call_id,
2001                        "input": args_json,
2002                        "output": result,
2003                    })),
2004                format!("tool failed: {tool_name}")
2005            );
2006        }
2007
2008        // ── Hook: after_tool_call (void) ─────────────────────────
2009        if let Some(ref hooks) = self.hook_runner {
2010            let tool_result_obj = crate::tools::ToolResult {
2011                success,
2012                output: result.clone(),
2013                error: None,
2014            };
2015            hooks
2016                .fire_after_tool_call(&tool_name, &tool_result_obj, duration)
2017                .await;
2018        }
2019
2020        ToolExecutionResult {
2021            name: tool_name,
2022            output: result,
2023            success,
2024            tool_call_id: call.tool_call_id.clone(),
2025        }
2026    }
2027
2028    async fn execute_tools(
2029        &self,
2030        calls: &[ParsedToolCall],
2031        turn_id: &str,
2032    ) -> Vec<ToolExecutionResult> {
2033        let approval_required = self.approval_manager.as_deref().is_some_and(|mgr| {
2034            calls
2035                .iter()
2036                .any(|call| mgr.needs_approval(call.name.as_str()))
2037        });
2038        if !self.config.resolved.parallel_tools || approval_required {
2039            let mut results = Vec::with_capacity(calls.len());
2040            for call in calls {
2041                results.push(self.execute_tool_call(call, turn_id).await);
2042            }
2043            return results;
2044        }
2045
2046        let futs: Vec<_> = calls
2047            .iter()
2048            .map(|call| self.execute_tool_call(call, turn_id))
2049            .collect();
2050        futures_util::future::join_all(futs).await
2051    }
2052
2053    fn classify_model(&self, user_message: &str) -> String {
2054        if let Some(decision) =
2055            super::classifier::classify_with_decision(&self.classification_config, user_message)
2056            && self.available_hints.contains(&decision.hint)
2057        {
2058            let resolved_model = self
2059                .route_model_by_hint
2060                .get(&decision.hint)
2061                .map(String::as_str)
2062                .unwrap_or("unknown");
2063            ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"hint": decision.hint.as_str(), "model": resolved_model, "rule_priority": decision.priority, "message_length": user_message.len()})), "Classified message route");
2064            return format!("hint:{}", decision.hint);
2065        }
2066
2067        // Fallback: auto-classify by complexity when no rule matched.
2068        if let Some(ref ac) = self.config.resolved.auto_classify {
2069            let tier = super::eval::estimate_complexity(user_message);
2070            if let Some(hint) = ac.hint_for(tier)
2071                && self.available_hints.contains(&hint.to_string())
2072            {
2073                ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"hint": hint, "complexity": format!("{:?}", tier), "message_length": user_message.len()})), "Auto-classified by complexity");
2074                return format!("hint:{hint}");
2075            }
2076        }
2077
2078        self.model_name.clone()
2079    }
2080
2081    pub async fn turn(&mut self, user_message: &str) -> Result<String> {
2082        // Refuse empty/whitespace-only turns. An empty `user_message` would
2083        // append a user-role message containing only the timestamp prefix
2084        // (`[<now>] `) to history, leaving the model to face a system prompt
2085        // immediately followed by a blank user turn. On Claude this surfaces
2086        // as the underlying `<<HUMAN_CONVERSATION_START>>` template sentinel
2087        // bleeding into the visible response ("there's no human turn yet…"),
2088        // because the model has nothing to respond to and narrates the
2089        // structural marker instead. Stopping it here keeps history clean
2090        // and prevents wasted model spend on garbage turns.
2091        if user_message.trim().is_empty() {
2092            ::zeroclaw_log::record!(
2093                WARN,
2094                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2095                    .with_category(::zeroclaw_log::EventCategory::Agent)
2096                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2097                    .with_attrs(::serde_json::json!({
2098                        "reason": "empty_user_message",
2099                        "entry_point": "Agent::turn",
2100                        "raw_len": user_message.len(),
2101                    })),
2102                "Refusing blank user turn (would emit timestamp-only message and risk prompt-template bleed-through)"
2103            );
2104            return Err(anyhow::Error::msg(
2105                "empty user message: refusing to dispatch a blank turn",
2106            ));
2107        }
2108
2109        if self.history.is_empty() {
2110            let system_prompt = self.build_system_prompt()?;
2111            self.history
2112                .push(ConversationMessage::Chat(ChatMessage::system(
2113                    system_prompt,
2114                )));
2115        }
2116
2117        let context = self
2118            .memory_strategy
2119            .load_context(user_message, self.memory_session_id.as_deref())
2120            .await
2121            .unwrap_or_default();
2122
2123        if self.auto_save {
2124            let _ = self
2125                .memory
2126                .store(
2127                    "user_msg",
2128                    user_message,
2129                    MemoryCategory::Conversation,
2130                    self.memory_session_id.as_deref(),
2131                )
2132                .await;
2133        }
2134
2135        let now = chrono::Local::now();
2136        let (year, month, day) = (now.year(), now.month(), now.day());
2137        let (hour, minute, second) = (now.hour(), now.minute(), now.second());
2138        let tz = now.format("%Z");
2139        let date_str =
2140            format!("{year:04}-{month:02}-{day:02} {hour:02}:{minute:02}:{second:02} {tz}");
2141
2142        let enriched = if context.is_empty() {
2143            format!("[CURRENT DATE & TIME: {date_str}]\n\n{user_message}")
2144        } else {
2145            format!("[CURRENT DATE & TIME: {date_str}]\n\n{context}\n\n{user_message}")
2146        };
2147
2148        self.history
2149            .push(ConversationMessage::Chat(ChatMessage::user(enriched)));
2150
2151        let effective_model = self.classify_model(user_message);
2152
2153        let turn_id = Self::new_turn_id();
2154        let turn_started_at = Instant::now();
2155
2156        self.observer.record_event(&ObserverEvent::AgentStart {
2157            model_provider: self.model_provider_name.clone(),
2158            model: effective_model.clone(),
2159            channel: None,
2160            agent_alias: self.observer_agent_alias(),
2161            turn_id: Some(turn_id.clone()),
2162        });
2163
2164        let mut guard = TurnGuard {
2165            observer: Arc::clone(&self.observer),
2166            model_provider: self.model_provider_name.clone(),
2167            model: effective_model.clone(),
2168            turn_id: Some(turn_id.clone()),
2169            turn_started_at,
2170            agent_alias: self.observer_agent_alias(),
2171            total_input_tokens: 0,
2172            total_output_tokens: 0,
2173            saw_usage: false,
2174            done: false,
2175        };
2176
2177        for _ in 0..self.config.resolved.max_tool_iterations {
2178            let messages = self.tool_dispatcher.to_provider_messages(&self.history);
2179            let prepared_messages = self.prepare_provider_messages(&messages).await?;
2180
2181            // Response cache: check before LLM call (only for deterministic, text-only prompts).
2182            // The key must include the whole provider-visible transcript, not just the last user
2183            // message, otherwise distinct conversations can collide when their final prompt matches.
2184            let cache_key =
2185                self.response_cache_key_for_messages(&prepared_messages, &effective_model);
2186
2187            if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
2188                if let Ok(Some(cached)) = cache.get(key) {
2189                    self.observer.record_event(&ObserverEvent::CacheHit {
2190                        cache_type: "response".into(),
2191                        tokens_saved: 0,
2192                    });
2193                    self.history
2194                        .push(ConversationMessage::Chat(ChatMessage::assistant(
2195                            cached.clone(),
2196                        )));
2197                    self.trim_history();
2198                    return Ok(cached);
2199                }
2200                self.observer.record_event(&ObserverEvent::CacheMiss {
2201                    cache_type: "response".into(),
2202                });
2203            }
2204
2205            // Outbound prompt size diagnostic — see streaming site for notes.
2206            {
2207                let msg_count = prepared_messages.len();
2208                let content_chars: usize = prepared_messages.iter().map(|m| m.content.len()).sum();
2209                ::zeroclaw_log::record!(
2210                    DEBUG,
2211                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note,)
2212                        .with_attrs(::serde_json::json!({
2213                            "msg_count": msg_count,
2214                            "content_chars": content_chars,
2215                            "approx_tokens": content_chars / 4,
2216                            "model": effective_model,
2217                        })),
2218                    "agent: outbound prompt size (non-streaming)"
2219                );
2220            }
2221
2222            let llm_started_at = Instant::now();
2223            self.observer.record_event(&ObserverEvent::LlmRequest {
2224                model_provider: self.model_provider_name.clone(),
2225                model: effective_model.clone(),
2226                messages_count: messages.len(),
2227                channel: None,
2228                agent_alias: self.observer_agent_alias(),
2229                turn_id: Some(turn_id.clone()),
2230            });
2231
2232            let response = match self
2233                .model_provider
2234                .chat(
2235                    ChatRequest {
2236                        messages: &prepared_messages,
2237                        tools: if self.should_send_tool_specs() {
2238                            Some(&self.tool_specs)
2239                        } else {
2240                            None
2241                        },
2242                        thinking: None,
2243                    },
2244                    &effective_model,
2245                    self.temperature,
2246                )
2247                .await
2248            {
2249                Ok(resp) => {
2250                    let (resp_input_tokens, resp_output_tokens) = resp
2251                        .usage
2252                        .as_ref()
2253                        .map(|u| (u.input_tokens, u.output_tokens))
2254                        .unwrap_or((None, None));
2255                    if let Some(input) = resp_input_tokens {
2256                        guard.total_input_tokens = guard.total_input_tokens.saturating_add(input);
2257                        guard.saw_usage = true;
2258                    }
2259                    if let Some(output) = resp_output_tokens {
2260                        guard.total_output_tokens =
2261                            guard.total_output_tokens.saturating_add(output);
2262                        guard.saw_usage = true;
2263                    }
2264                    self.observer.record_event(&ObserverEvent::LlmResponse {
2265                        model_provider: self.model_provider_name.clone(),
2266                        model: effective_model.clone(),
2267                        duration: llm_started_at.elapsed(),
2268                        success: true,
2269                        error_message: None,
2270                        input_tokens: resp_input_tokens,
2271                        output_tokens: resp_output_tokens,
2272                        channel: None,
2273                        agent_alias: self.observer_agent_alias(),
2274                        turn_id: Some(turn_id.clone()),
2275                    });
2276                    resp
2277                }
2278                Err(err) => {
2279                    let safe_error = zeroclaw_providers::sanitize_api_error(&err.to_string());
2280                    self.observer.record_event(&ObserverEvent::LlmResponse {
2281                        model_provider: self.model_provider_name.clone(),
2282                        model: effective_model.clone(),
2283                        duration: llm_started_at.elapsed(),
2284                        success: false,
2285                        error_message: Some(safe_error),
2286                        input_tokens: None,
2287                        output_tokens: None,
2288                        channel: None,
2289                        agent_alias: self.observer_agent_alias(),
2290                        turn_id: Some(turn_id.clone()),
2291                    });
2292                    return Err(err);
2293                }
2294            };
2295
2296            let (text, calls) = self.parse_response_for_effective_tools(&response);
2297            if calls.is_empty() {
2298                let final_text = if text.is_empty() && !self.tool_specs.is_empty() {
2299                    response.text.unwrap_or_default()
2300                } else {
2301                    text
2302                };
2303
2304                // Store in response cache (text-only, no tool calls)
2305                if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
2306                    let token_count = response
2307                        .usage
2308                        .as_ref()
2309                        .and_then(|u| u.output_tokens)
2310                        .unwrap_or(0);
2311                    #[allow(clippy::cast_possible_truncation)]
2312                    let _ = cache.put(key, &effective_model, &final_text, token_count as u32);
2313                }
2314
2315                self.history
2316                    .push(ConversationMessage::Chat(ChatMessage::assistant(
2317                        final_text.clone(),
2318                    )));
2319                self.trim_history();
2320
2321                return Ok(final_text);
2322            }
2323
2324            if !text.is_empty() {
2325                print!("{text}");
2326                let _ = std::io::stdout().flush();
2327            }
2328
2329            self.history.push(ConversationMessage::AssistantToolCalls {
2330                text: response.text.clone(),
2331                tool_calls: response.tool_calls.clone(),
2332                reasoning_content: response.reasoning_content.clone(),
2333            });
2334
2335            let results = self.execute_tools(&calls, &turn_id).await;
2336            let formatted = self.tool_dispatcher.format_results(&results);
2337            self.history.push(formatted);
2338            self.trim_history();
2339        }
2340
2341        anyhow::bail!(
2342            "Agent exceeded maximum tool iterations ({})",
2343            self.config.resolved.max_tool_iterations
2344        )
2345    }
2346
2347    /// Execute a single agent turn while streaming intermediate events.
2348    ///
2349    /// Behaves identically to [`turn`](Self::turn) but forwards [`TurnEvent`]s
2350    /// through the provided channel so callers (e.g. the WebSocket gateway)
2351    /// can relay incremental updates to clients.
2352    ///
2353    /// The returned tuple contains the final assistant response string and all
2354    /// new [`ConversationMessage`]s added during this turn (captured before
2355    /// any `trim_history` call so callers can persist them correctly even when
2356    /// the history is already at its configured limit).
2357    pub async fn turn_streamed(
2358        &mut self,
2359        user_message: &str,
2360        event_tx: tokio::sync::mpsc::Sender<TurnEvent>,
2361        cancel_token: Option<tokio_util::sync::CancellationToken>,
2362    ) -> Result<(String, Vec<ConversationMessage>)> {
2363        // See `Agent::turn` for the rationale. Same guard: blank input would
2364        // push a timestamp-only user message into history and the model would
2365        // narrate the trailing prompt-template sentinel instead of replying.
2366        if user_message.trim().is_empty() {
2367            ::zeroclaw_log::record!(
2368                WARN,
2369                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2370                    .with_category(::zeroclaw_log::EventCategory::Agent)
2371                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2372                    .with_attrs(::serde_json::json!({
2373                        "reason": "empty_user_message",
2374                        "entry_point": "Agent::turn_streamed",
2375                        "raw_len": user_message.len(),
2376                    })),
2377                "Refusing blank user turn (would emit timestamp-only message and risk prompt-template bleed-through)"
2378            );
2379            return Err(anyhow::Error::msg(
2380                "empty user message: refusing to dispatch a blank turn",
2381            ));
2382        }
2383
2384        self.turn_streamed_with_steering_state(user_message, event_tx, cancel_token, None)
2385            .await
2386            .map(|outcome| (outcome.response, outcome.new_messages))
2387            .map_err(|err| err.error)
2388    }
2389
2390    pub async fn turn_streamed_with_steering_state(
2391        &mut self,
2392        user_message: &str,
2393        event_tx: tokio::sync::mpsc::Sender<TurnEvent>,
2394        cancel_token: Option<tokio_util::sync::CancellationToken>,
2395        mut steering_rx: Option<&mut tokio::sync::mpsc::Receiver<String>>,
2396    ) -> std::result::Result<StreamedTurnSuccess, StreamedTurnError> {
2397        // See `Agent::turn` for the rationale. Same guard: blank input would
2398        // push a timestamp-only user message into history and the model would
2399        // narrate the trailing prompt-template sentinel instead of replying.
2400        if user_message.trim().is_empty() {
2401            ::zeroclaw_log::record!(
2402                WARN,
2403                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2404                    .with_category(::zeroclaw_log::EventCategory::Agent)
2405                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2406                    .with_attrs(::serde_json::json!({
2407                        "reason": "empty_user_message",
2408                        "entry_point": "Agent::turn_streamed_with_steering_state",
2409                        "raw_len": user_message.len(),
2410                    })),
2411                "Refusing blank user turn (would emit timestamp-only message and risk prompt-template bleed-through)"
2412            );
2413            return Err(StreamedTurnError {
2414                error: anyhow::Error::msg("empty user message: refusing to dispatch a blank turn"),
2415                committed_response: String::new(),
2416                new_messages: Vec::new(),
2417            });
2418        }
2419
2420        // ── Preamble (identical to turn) ───────────────────────────────
2421        if self.history.is_empty() {
2422            let system_prompt = self
2423                .build_system_prompt()
2424                .map_err(|error| StreamedTurnError {
2425                    error,
2426                    committed_response: String::new(),
2427                    new_messages: Vec::new(),
2428                })?;
2429            self.history
2430                .push(ConversationMessage::Chat(ChatMessage::system(
2431                    system_prompt,
2432                )));
2433        }
2434
2435        let mut new_msgs: Vec<ConversationMessage> = Vec::new();
2436        self.append_streamed_user_message_to_history(user_message, &mut new_msgs)
2437            .await;
2438
2439        let effective_model = self.classify_model(user_message);
2440        let turn_started_at = Instant::now();
2441        let turn_id = Self::new_turn_id();
2442        let mut committed_response = String::new();
2443
2444        self.observer.record_event(&ObserverEvent::AgentStart {
2445            model_provider: self.model_provider_name.clone(),
2446            model: effective_model.clone(),
2447            channel: None,
2448            agent_alias: self.observer_agent_alias(),
2449            turn_id: Some(turn_id.clone()),
2450        });
2451
2452        let mut guard = TurnGuard {
2453            observer: Arc::clone(&self.observer),
2454            model_provider: self.model_provider_name.clone(),
2455            model: effective_model.clone(),
2456            turn_id: Some(turn_id.clone()),
2457            turn_started_at,
2458            agent_alias: self.observer_agent_alias(),
2459            total_input_tokens: 0,
2460            total_output_tokens: 0,
2461            saw_usage: false,
2462            done: false,
2463        };
2464
2465        // ── Turn loop ──────────────────────────────────────────────────
2466        for _ in 0..self.config.resolved.max_tool_iterations {
2467            // Early exit if the caller cancelled this turn (e.g. user abort)
2468            if cancel_token
2469                .as_ref()
2470                .is_some_and(tokio_util::sync::CancellationToken::is_cancelled)
2471            {
2472                self.append_streamed_assistant_message_to_history(
2473                    "[interrupted by user]".to_string(),
2474                    &mut new_msgs,
2475                    &mut committed_response,
2476                );
2477                return Err(StreamedTurnError {
2478                    error: crate::agent::loop_::ToolLoopCancelled.into(),
2479                    committed_response,
2480                    new_messages: new_msgs,
2481                });
2482            }
2483
2484            for steering_message in Self::drain_steering_messages(&mut steering_rx) {
2485                self.append_streamed_user_message_to_history(&steering_message, &mut new_msgs)
2486                    .await;
2487            }
2488
2489            let messages = self.tool_dispatcher.to_provider_messages(&self.history);
2490            let prepared_messages = match self.prepare_provider_messages(&messages).await {
2491                Ok(messages) => messages,
2492                Err(error) => {
2493                    return Err(StreamedTurnError {
2494                        error,
2495                        committed_response,
2496                        new_messages: new_msgs,
2497                    });
2498                }
2499            };
2500
2501            // Response cache check (same as turn): include the full provider-visible transcript.
2502            let cache_key =
2503                self.response_cache_key_for_messages(&prepared_messages, &effective_model);
2504
2505            if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
2506                if let Ok(Some(cached)) = cache.get(key) {
2507                    self.observer.record_event(&ObserverEvent::CacheHit {
2508                        cache_type: "response".into(),
2509                        tokens_saved: 0,
2510                    });
2511                    let cached_msg =
2512                        ConversationMessage::Chat(ChatMessage::assistant(cached.clone()));
2513                    new_msgs.push(cached_msg.clone());
2514                    self.history.push(cached_msg);
2515                    self.trim_history();
2516                    self.observer.record_event(&ObserverEvent::TurnComplete);
2517                    committed_response.push_str(&cached);
2518                    return Ok(StreamedTurnSuccess {
2519                        response: committed_response,
2520                        new_messages: new_msgs,
2521                    });
2522                }
2523                self.observer.record_event(&ObserverEvent::CacheMiss {
2524                    cache_type: "response".into(),
2525                });
2526            }
2527
2528            // Log outbound prompt size so it can be diffed against the
2529            // provider's reported usage. `content_chars` is the raw character
2530            // count of message bodies (tool defs/system prompt are added by
2531            // the provider adapter and not counted here, but messages are by
2532            // far the largest contributor in long sessions). Rough rule of
2533            // thumb: 1 token ≈ 4 chars; treat with care.
2534            {
2535                let msg_count = prepared_messages.len();
2536                let content_chars: usize = prepared_messages.iter().map(|m| m.content.len()).sum();
2537                ::zeroclaw_log::record!(
2538                    DEBUG,
2539                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note,)
2540                        .with_attrs(::serde_json::json!({
2541                            "msg_count": msg_count,
2542                            "content_chars": content_chars,
2543                            "approx_tokens": content_chars / 4,
2544                            "model": effective_model,
2545                        })),
2546                    "agent: outbound prompt size (streaming)"
2547                );
2548            }
2549
2550            // ── Streaming LLM call ────────────────────────────────────
2551            // Try streaming first; if the model_provider returns content we
2552            // forward deltas.  Otherwise fall back to non-streaming chat.
2553            use futures_util::StreamExt;
2554
2555            let llm_started_at = Instant::now();
2556            self.observer.record_event(&ObserverEvent::LlmRequest {
2557                model_provider: self.model_provider_name.clone(),
2558                model: effective_model.clone(),
2559                messages_count: messages.len(),
2560                channel: None,
2561                agent_alias: self.observer_agent_alias(),
2562                turn_id: Some(turn_id.clone()),
2563            });
2564
2565            let stream_opts = zeroclaw_providers::traits::StreamOptions::new(
2566                self.model_provider.supports_streaming(),
2567            );
2568            let mut stream = self.model_provider.stream_chat(
2569                zeroclaw_providers::ChatRequest {
2570                    messages: &prepared_messages,
2571                    tools: if self.should_send_tool_specs() {
2572                        Some(&self.tool_specs)
2573                    } else {
2574                        None
2575                    },
2576                    thinking: None,
2577                },
2578                &effective_model,
2579                self.temperature,
2580                stream_opts,
2581            );
2582
2583            let mut streamed_text = String::new();
2584            let mut streamed_reasoning = String::new();
2585            let mut streamed_tool_calls: Vec<zeroclaw_providers::traits::ToolCall> = Vec::new();
2586            let mut streamed_usage: Option<zeroclaw_providers::traits::TokenUsage> = None;
2587            let mut got_stream = false;
2588            let mut visible_streamed_output = false;
2589            let mut stream_error: Option<String> = None;
2590            let mut pre_executed_call_ids: HashMap<String, VecDeque<String>> = HashMap::new();
2591            let mut was_cancelled = false;
2592
2593            // Consume the stream, checking for cancellation between chunks.
2594            // We use a manual loop with `tokio::select!` so that a cancel
2595            // signal interrupts even while waiting for the next SSE event
2596            // from the model_provider.
2597            loop {
2598                let next_item = stream.next();
2599                let item = if let Some(ref token) = cancel_token {
2600                    tokio::select! {
2601                        biased;
2602                        () = token.cancelled() => {
2603                            was_cancelled = true;
2604                            ::zeroclaw_log::record!(
2605                                INFO,
2606                                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Cancel)
2607                                    .with_category(::zeroclaw_log::EventCategory::Agent)
2608                                    .with_outcome(::zeroclaw_log::EventOutcome::Success)
2609                                    .with_attrs(::serde_json::json!({
2610                                        "streamed_text_len": streamed_text.len(),
2611                                        "got_stream": got_stream,
2612                                    })),
2613                                "turn: cancel token fired mid-stream — breaking consume loop, dropping stream to abort parser"
2614                            );
2615                            break;
2616                        }
2617                        item = next_item => item,
2618                    }
2619                } else {
2620                    next_item.await
2621                };
2622
2623                let Some(item) = item else { break };
2624                match item {
2625                    Ok(event) => match event {
2626                        zeroclaw_providers::traits::StreamEvent::TextDelta(chunk) => {
2627                            if let Some(reasoning) = chunk.reasoning
2628                                && !reasoning.is_empty()
2629                            {
2630                                // Accumulate for signed-block round-trip on
2631                                // providers that carry signatures in this
2632                                // field (Anthropic native-thinking fallback).
2633                                streamed_reasoning.push_str(&reasoning);
2634                                Self::send_turn_event(
2635                                    &event_tx,
2636                                    cancel_token.as_ref(),
2637                                    TurnEvent::Thinking { delta: reasoning },
2638                                )
2639                                .await;
2640                                visible_streamed_output = true;
2641                            }
2642                            if !chunk.delta.is_empty() {
2643                                got_stream = true;
2644                                streamed_text.push_str(&chunk.delta);
2645                                Self::send_turn_event(
2646                                    &event_tx,
2647                                    cancel_token.as_ref(),
2648                                    TurnEvent::Chunk { delta: chunk.delta },
2649                                )
2650                                .await;
2651                                visible_streamed_output = true;
2652                            }
2653                        }
2654                        zeroclaw_providers::traits::StreamEvent::ToolCall(tc) => {
2655                            got_stream = true;
2656                            // ToolCall event is sent later (after parse_response) to
2657                            // avoid duplicates; just collect here.
2658                            streamed_tool_calls.push(tc);
2659                        }
2660                        zeroclaw_providers::traits::StreamEvent::PreExecutedToolCall {
2661                            name,
2662                            args,
2663                        } => {
2664                            let call_id = uuid::Uuid::new_v4().to_string();
2665                            pre_executed_call_ids
2666                                .entry(name.clone())
2667                                .or_default()
2668                                .push_back(call_id.clone());
2669                            Self::send_turn_event(
2670                                &event_tx,
2671                                cancel_token.as_ref(),
2672                                TurnEvent::ToolCall {
2673                                    id: call_id,
2674                                    name,
2675                                    args: serde_json::from_str(&args).unwrap_or_default(),
2676                                },
2677                            )
2678                            .await;
2679                            visible_streamed_output = true;
2680                            // NOT pushed to streamed_tool_calls — already executed by proxy
2681                        }
2682                        zeroclaw_providers::traits::StreamEvent::PreExecutedToolResult {
2683                            name,
2684                            output,
2685                        } => {
2686                            let result_id = pre_executed_call_ids
2687                                .get_mut(&name)
2688                                .and_then(|ids| ids.pop_front())
2689                                .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
2690                            Self::send_turn_event(
2691                                &event_tx,
2692                                cancel_token.as_ref(),
2693                                TurnEvent::ToolResult {
2694                                    id: result_id,
2695                                    name,
2696                                    output,
2697                                },
2698                            )
2699                            .await;
2700                            visible_streamed_output = true;
2701                        }
2702                        zeroclaw_providers::traits::StreamEvent::Usage(usage) => {
2703                            streamed_usage = Some(usage);
2704                        }
2705                        zeroclaw_providers::traits::StreamEvent::Final => break,
2706                    },
2707                    Err(error) => {
2708                        stream_error = Some(error.to_string());
2709                        break;
2710                    }
2711                }
2712            }
2713            // Drop the stream so we release the borrow on model_provider.
2714            drop(stream);
2715
2716            // If cancelled during streaming, return partial content with
2717            // the interruption marker appended. The caller (ws.rs) will
2718            // persist this truncated message and send an abort frame.
2719            if was_cancelled {
2720                let partial =
2721                    Self::marked_partial_response(&streamed_text, "[interrupted by user]");
2722                self.append_streamed_assistant_message_to_history(
2723                    partial,
2724                    &mut new_msgs,
2725                    &mut committed_response,
2726                );
2727                self.observer.record_event(&ObserverEvent::LlmResponse {
2728                    model_provider: self.model_provider_name.clone(),
2729                    model: effective_model.clone(),
2730                    duration: llm_started_at.elapsed(),
2731                    success: false,
2732                    error_message: Some("request cancelled by user".into()),
2733                    input_tokens: None,
2734                    output_tokens: None,
2735                    channel: None,
2736                    agent_alias: self.observer_agent_alias(),
2737                    turn_id: Some(turn_id.clone()),
2738                });
2739                return Err(StreamedTurnError {
2740                    error: crate::agent::loop_::ToolLoopCancelled.into(),
2741                    committed_response,
2742                    new_messages: new_msgs,
2743                });
2744            }
2745
2746            if stream_error.is_some() && visible_streamed_output {
2747                if !streamed_text.is_empty() {
2748                    let partial =
2749                        Self::marked_partial_response(&streamed_text, "[stream interrupted]");
2750                    self.append_streamed_assistant_message_to_history(
2751                        partial,
2752                        &mut new_msgs,
2753                        &mut committed_response,
2754                    );
2755                }
2756                let safe_error = zeroclaw_providers::sanitize_api_error(
2757                    stream_error.as_deref().unwrap_or_default(),
2758                );
2759                self.observer.record_event(&ObserverEvent::LlmResponse {
2760                    model_provider: self.model_provider_name.clone(),
2761                    model: effective_model.clone(),
2762                    duration: llm_started_at.elapsed(),
2763                    success: false,
2764                    error_message: Some(safe_error),
2765                    input_tokens: None,
2766                    output_tokens: None,
2767                    channel: None,
2768                    agent_alias: self.observer_agent_alias(),
2769                    turn_id: Some(turn_id.clone()),
2770                });
2771                return Err(StreamedTurnError {
2772                    error: anyhow::Error::msg(stream_error.unwrap_or_default()),
2773                    committed_response,
2774                    new_messages: new_msgs,
2775                });
2776            }
2777
2778            // If streaming completed cleanly and produced text, use it as the
2779            // response and check for tool calls via the dispatcher. If the
2780            // stream errored before emitting visible output, fall back to
2781            // non-streaming chat so provider error text does not become the
2782            // final answer.
2783            let response = if got_stream && stream_error.is_none() {
2784                // Build a synthetic ChatResponse from streamed text.
2785                // `streamed_reasoning` carries signed thinking blocks from
2786                // providers that emit them via `StreamChunk.reasoning`
2787                // (Anthropic's native-thinking non-streaming fallback), so
2788                // the signature round-trip survives into conversation history.
2789                zeroclaw_providers::ChatResponse {
2790                    text: Some(streamed_text),
2791                    tool_calls: streamed_tool_calls,
2792                    usage: streamed_usage.clone(),
2793                    reasoning_content: if streamed_reasoning.is_empty() {
2794                        None
2795                    } else {
2796                        Some(streamed_reasoning)
2797                    },
2798                }
2799            } else {
2800                if let Some(error) = stream_error.as_ref() {
2801                    ::zeroclaw_log::record!(
2802                        WARN,
2803                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2804                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2805                            .with_attrs(::serde_json::json!({
2806                                "model": effective_model.as_str(),
2807                                "error": zeroclaw_providers::sanitize_api_error(error),
2808                            })),
2809                        "turn_streamed provider stream failed; falling back to non-streaming chat"
2810                    );
2811                }
2812                // Fall back to non-streaming chat, with cancellation guard
2813                let chat_fut = self.model_provider.chat(
2814                    ChatRequest {
2815                        messages: &prepared_messages,
2816                        tools: if self.should_send_tool_specs() {
2817                            Some(&self.tool_specs)
2818                        } else {
2819                            None
2820                        },
2821                        thinking: None,
2822                    },
2823                    &effective_model,
2824                    self.temperature,
2825                );
2826                let chat_result = if let Some(ref token) = cancel_token {
2827                    tokio::select! {
2828                        biased;
2829                        () = token.cancelled() => {
2830                            let partial = if streamed_text.is_empty() {
2831                                "[interrupted by user]".to_string()
2832                            } else {
2833                                Self::marked_partial_response(
2834                                    &streamed_text,
2835                                    "[interrupted by user]",
2836                                )
2837                            };
2838                            self.append_streamed_assistant_message_to_history(
2839                                partial,
2840                                &mut new_msgs,
2841                                &mut committed_response,
2842                            );
2843                            self.observer.record_event(&ObserverEvent::LlmResponse {
2844                                model_provider: self.model_provider_name.clone(),
2845                                model: effective_model.clone(),
2846                                duration: llm_started_at.elapsed(),
2847                                success: false,
2848                                error_message: Some("request cancelled by user".into()),
2849                                input_tokens: None,
2850                                output_tokens: None,
2851                                channel: None,
2852                                agent_alias: self.observer_agent_alias(),
2853                                turn_id: Some(turn_id.clone()),
2854                            });
2855                                return Err(StreamedTurnError {
2856                                error: crate::agent::loop_::ToolLoopCancelled.into(),
2857                                committed_response,
2858                                new_messages: new_msgs,
2859                            });
2860                        }
2861                        result = chat_fut => result,
2862                    }
2863                } else {
2864                    chat_fut.await
2865                };
2866                match chat_result {
2867                    Ok(resp) => resp,
2868                    Err(error) => {
2869                        let safe_error = zeroclaw_providers::sanitize_api_error(&error.to_string());
2870                        self.observer.record_event(&ObserverEvent::LlmResponse {
2871                            model_provider: self.model_provider_name.clone(),
2872                            model: effective_model.clone(),
2873                            duration: llm_started_at.elapsed(),
2874                            success: false,
2875                            error_message: Some(safe_error),
2876                            input_tokens: None,
2877                            output_tokens: None,
2878                            channel: None,
2879                            agent_alias: self.observer_agent_alias(),
2880                            turn_id: Some(turn_id.clone()),
2881                        });
2882                        if got_stream && !streamed_text.is_empty() {
2883                            let partial = Self::marked_partial_response(
2884                                &streamed_text,
2885                                "[stream interrupted]",
2886                            );
2887                            self.append_streamed_assistant_message_to_history(
2888                                partial,
2889                                &mut new_msgs,
2890                                &mut committed_response,
2891                            );
2892                        }
2893                        return Err(StreamedTurnError {
2894                            error,
2895                            committed_response,
2896                            new_messages: new_msgs,
2897                        });
2898                    }
2899                }
2900            };
2901
2902            let (resp_input_tokens, resp_output_tokens) = response
2903                .usage
2904                .as_ref()
2905                .map(|u| (u.input_tokens, u.output_tokens))
2906                .unwrap_or((None, None));
2907            if let Some(input) = resp_input_tokens {
2908                guard.total_input_tokens = guard.total_input_tokens.saturating_add(input);
2909                guard.saw_usage = true;
2910            }
2911            if let Some(output) = resp_output_tokens {
2912                guard.total_output_tokens = guard.total_output_tokens.saturating_add(output);
2913                guard.saw_usage = true;
2914            }
2915            self.observer.record_event(&ObserverEvent::LlmResponse {
2916                model_provider: self.model_provider_name.clone(),
2917                model: effective_model.clone(),
2918                duration: llm_started_at.elapsed(),
2919                success: true,
2920                error_message: None,
2921                input_tokens: resp_input_tokens,
2922                output_tokens: resp_output_tokens,
2923                channel: None,
2924                agent_alias: self.observer_agent_alias(),
2925                turn_id: Some(turn_id.clone()),
2926            });
2927
2928            // Forward per-call token usage so the WS gateway (and any other
2929            // consumer) can include aggregated usage in the final done frame
2930            // and write costs.jsonl. Absent when the provider does not surface
2931            // usage in streaming responses.
2932            if let Some(ref usage) = response.usage {
2933                Self::send_turn_event(
2934                    &event_tx,
2935                    cancel_token.as_ref(),
2936                    TurnEvent::Usage {
2937                        input_tokens: usage.input_tokens,
2938                        cached_input_tokens: usage.cached_input_tokens,
2939                        output_tokens: usage.output_tokens,
2940                        cost_usd: None,
2941                    },
2942                )
2943                .await;
2944            }
2945
2946            let (text, mut calls) = self.parse_response_for_effective_tools(&response);
2947            if calls.is_empty() {
2948                let final_text = if text.is_empty() && !self.tool_specs.is_empty() {
2949                    response.text.unwrap_or_default()
2950                } else {
2951                    text
2952                };
2953
2954                let steering_messages = Self::drain_steering_messages(&mut steering_rx);
2955                if !steering_messages.is_empty() {
2956                    if !final_text.is_empty() {
2957                        let assistant_msg =
2958                            ConversationMessage::Chat(ChatMessage::assistant(final_text.clone()));
2959                        new_msgs.push(assistant_msg.clone());
2960                        self.history.push(assistant_msg);
2961                        committed_response.push_str(&final_text);
2962                        self.trim_history();
2963                    }
2964
2965                    for steering_message in steering_messages {
2966                        self.append_streamed_user_message_to_history(
2967                            &steering_message,
2968                            &mut new_msgs,
2969                        )
2970                        .await;
2971                    }
2972                    continue;
2973                }
2974
2975                // Store in response cache
2976                if let (Some(cache), Some(key)) = (&self.response_cache, &cache_key) {
2977                    let token_count = response
2978                        .usage
2979                        .as_ref()
2980                        .and_then(|u| u.output_tokens)
2981                        .unwrap_or(0);
2982                    #[allow(clippy::cast_possible_truncation)]
2983                    let _ = cache.put(key, &effective_model, &final_text, token_count as u32);
2984                }
2985
2986                // If we didn't stream, send the full response as a single chunk
2987                if !got_stream && !final_text.is_empty() {
2988                    Self::send_turn_event(
2989                        &event_tx,
2990                        cancel_token.as_ref(),
2991                        TurnEvent::Chunk {
2992                            delta: final_text.clone(),
2993                        },
2994                    )
2995                    .await;
2996                }
2997
2998                new_msgs.push(ConversationMessage::Chat(ChatMessage::assistant(
2999                    final_text.clone(),
3000                )));
3001                self.history
3002                    .push(ConversationMessage::Chat(ChatMessage::assistant(
3003                        final_text.clone(),
3004                    )));
3005                committed_response.push_str(&final_text);
3006                self.trim_history();
3007                self.observer.record_event(&ObserverEvent::TurnComplete);
3008                return Ok(StreamedTurnSuccess {
3009                    response: committed_response,
3010                    new_messages: new_msgs,
3011                });
3012            }
3013
3014            // Pre-assign stable IDs to tool calls that don't have one
3015            for call in &mut calls {
3016                if call.tool_call_id.is_none() {
3017                    call.tool_call_id = Some(uuid::Uuid::new_v4().to_string());
3018                }
3019            }
3020
3021            // ── Tool calls ─────────────────────────────────────────────
3022            let tool_call_msg = ConversationMessage::AssistantToolCalls {
3023                text: response.text.clone(),
3024                tool_calls: response.tool_calls.clone(),
3025                reasoning_content: response.reasoning_content.clone(),
3026            };
3027            new_msgs.push(tool_call_msg.clone());
3028            // History push is deferred: tool_call_msg is pushed atomically with
3029            // ToolResults in every exit path below to prevent an orphaned
3030            // AssistantToolCalls entry that would cause a 400 from the Responses API.
3031
3032            // When parallel execution is disabled, the turn must look and behave
3033            // serially end to end: emit a tool-call start event, run that one
3034            // call, emit its result, then move to the next. Emitting every
3035            // start event up front and only then executing makes a multi-call
3036            // turn appear as a simultaneous batch in the front end and floods
3037            // the ACP/RPC channel — large batches have overrun the IPC and
3038            // crashed the TUI. Parallel mode keeps the batched dispatch so
3039            // concurrent execution can overlap.
3040            let serial_dispatch =
3041                !self.config.resolved.parallel_tools || self.approval_manager.is_some();
3042
3043            let results = if serial_dispatch {
3044                let mut serial_results: Vec<ToolExecutionResult> = Vec::with_capacity(calls.len());
3045                for (idx, call) in calls.iter().enumerate() {
3046                    if let Some(ref token) = cancel_token
3047                        && token.is_cancelled()
3048                    {
3049                        self.history.push(tool_call_msg.clone());
3050                        self.synthesize_cancelled_tool_results(
3051                            vec![],
3052                            &response.tool_calls,
3053                            &mut new_msgs,
3054                        );
3055                        self.append_streamed_assistant_message_to_history(
3056                            "[interrupted by user]".to_string(),
3057                            &mut new_msgs,
3058                            &mut committed_response,
3059                        );
3060                        return Err(StreamedTurnError {
3061                            error: crate::agent::loop_::ToolLoopCancelled.into(),
3062                            committed_response,
3063                            new_messages: new_msgs,
3064                        });
3065                    }
3066
3067                    let call_id = call.tool_call_id.as_ref().unwrap().clone();
3068                    Self::send_turn_event(
3069                        &event_tx,
3070                        cancel_token.as_ref(),
3071                        TurnEvent::ToolCall {
3072                            id: call_id,
3073                            name: call.name.clone(),
3074                            args: call.arguments.clone(),
3075                        },
3076                    )
3077                    .await;
3078
3079                    let single = std::slice::from_ref(call);
3080                    let result = if let Some(ref token) = cancel_token {
3081                        tokio::select! {
3082                            biased;
3083                            () = token.cancelled() => {
3084                                let completed: Vec<ToolResultMessage> = serial_results
3085                                    .iter()
3086                                    .map(|r| ToolResultMessage {
3087                                        tool_call_id: r.tool_call_id.clone().unwrap_or_default(),
3088                                        content: r.output.clone(),
3089                                    })
3090                                    .collect();
3091                                self.history.push(tool_call_msg.clone());
3092                                self.synthesize_cancelled_tool_results(
3093                                    completed,
3094                                    &response.tool_calls[idx..],
3095                                    &mut new_msgs,
3096                                );
3097                                self.append_streamed_assistant_message_to_history(
3098                                    "[interrupted by user]".to_string(),
3099                                    &mut new_msgs,
3100                                    &mut committed_response,
3101                                );
3102                                return Err(StreamedTurnError {
3103                                    error: crate::agent::loop_::ToolLoopCancelled.into(),
3104                                    committed_response,
3105                                    new_messages: new_msgs,
3106                                });
3107                            }
3108                            mut r = self.execute_tools(single, &turn_id) => r.pop().expect("one call yields one result"),
3109                        }
3110                    } else {
3111                        self.execute_tools(single, &turn_id)
3112                            .await
3113                            .pop()
3114                            .expect("one call yields one result")
3115                    };
3116
3117                    let result_id = result.tool_call_id.as_ref().unwrap().clone();
3118                    Self::send_turn_event(
3119                        &event_tx,
3120                        cancel_token.as_ref(),
3121                        TurnEvent::ToolResult {
3122                            id: result_id,
3123                            name: result.name.clone(),
3124                            output: result.output.clone(),
3125                        },
3126                    )
3127                    .await;
3128
3129                    serial_results.push(result);
3130                }
3131                serial_results
3132            } else {
3133                for call in &calls {
3134                    let call_id = call.tool_call_id.as_ref().unwrap().clone();
3135                    Self::send_turn_event(
3136                        &event_tx,
3137                        cancel_token.as_ref(),
3138                        TurnEvent::ToolCall {
3139                            id: call_id,
3140                            name: call.name.clone(),
3141                            args: call.arguments.clone(),
3142                        },
3143                    )
3144                    .await;
3145                }
3146
3147                let results = if let Some(ref token) = cancel_token {
3148                    tokio::select! {
3149                        biased;
3150                        () = token.cancelled() => {
3151                            self.history.push(tool_call_msg.clone());
3152                            self.synthesize_cancelled_tool_results(
3153                                vec![],
3154                                &response.tool_calls,
3155                                &mut new_msgs,
3156                            );
3157                            self.append_streamed_assistant_message_to_history(
3158                                "[interrupted by user]".to_string(),
3159                                &mut new_msgs,
3160                                &mut committed_response,
3161                            );
3162                            return Err(StreamedTurnError {
3163                                error: crate::agent::loop_::ToolLoopCancelled.into(),
3164                                committed_response,
3165                                new_messages: new_msgs,
3166                            });
3167                        }
3168                        results = self.execute_tools(&calls, &turn_id) => results,
3169                    }
3170                } else {
3171                    self.execute_tools(&calls, &turn_id).await
3172                };
3173
3174                for result in &results {
3175                    let result_id = result.tool_call_id.as_ref().unwrap().clone();
3176                    Self::send_turn_event(
3177                        &event_tx,
3178                        cancel_token.as_ref(),
3179                        TurnEvent::ToolResult {
3180                            id: result_id,
3181                            name: result.name.clone(),
3182                            output: result.output.clone(),
3183                        },
3184                    )
3185                    .await;
3186                }
3187
3188                results
3189            };
3190
3191            let formatted = self.tool_dispatcher.format_results(&results);
3192            new_msgs.push(formatted.clone());
3193            self.history.push(tool_call_msg);
3194            self.history.push(formatted);
3195            self.trim_history();
3196        }
3197
3198        Err(StreamedTurnError {
3199            error: anyhow::Error::msg(format!(
3200                "Agent exceeded maximum tool iterations ({})",
3201                self.config.resolved.max_tool_iterations
3202            )),
3203            committed_response,
3204            new_messages: new_msgs,
3205        })
3206    }
3207
3208    pub async fn run_single(&mut self, message: &str) -> Result<String> {
3209        self.turn(message).await
3210    }
3211
3212    pub async fn run_interactive(&mut self) -> Result<()> {
3213        println!("🦀 ZeroClaw Interactive Mode");
3214        println!("Type /quit to exit.\n");
3215
3216        let (tx, mut rx) = tokio::sync::mpsc::channel(32);
3217        let cli = crate::agent::loop_::CLI_CHANNEL_FN
3218            .get()
3219            .expect("CLI channel factory not registered — call register_cli_channel_fn at startup")(
3220        );
3221
3222        let listen_handle = zeroclaw_spawn::spawn!(async move {
3223            let _ = zeroclaw_api::channel::Channel::listen(&*cli, tx).await;
3224        });
3225
3226        while let Some(msg) = rx.recv().await {
3227            let response = match self.turn(&msg.content).await {
3228                Ok(resp) => resp,
3229                Err(e) => {
3230                    eprintln!("\nError: {e}\n");
3231                    continue;
3232                }
3233            };
3234            println!("\n{response}\n");
3235        }
3236
3237        listen_handle.abort();
3238        Ok(())
3239    }
3240}
3241
3242pub async fn run(
3243    config: Config,
3244    agent_alias: &str,
3245    message: Option<String>,
3246    provider_override: Option<String>,
3247    model_override: Option<String>,
3248    temperature: Option<f64>,
3249) -> Result<()> {
3250    let start = Instant::now();
3251
3252    let mut effective_config = config;
3253    if let Some(ref p) = provider_override {
3254        // When a model_provider override is specified, ensure that model_provider type exists
3255        // in models and update the agent's model_provider to reference it.
3256        let (type_key, alias_key) = p.split_once('.').unwrap_or((p.as_str(), agent_alias));
3257        effective_config
3258            .providers
3259            .models
3260            .ensure(type_key, alias_key);
3261        if let Some(agent_cfg) = effective_config.agents.get_mut(agent_alias) {
3262            agent_cfg.model_provider = format!("{type_key}.{alias_key}").into();
3263        }
3264    }
3265    // Apply model/temperature overrides to the agent's resolved provider entry.
3266    if let Some(agent_cfg) = effective_config.agents.get(agent_alias)
3267        && let Some((fam, ali)) = agent_cfg.model_provider.split_once('.')
3268        && let Some(entry) = effective_config.providers.models.ensure(fam, ali)
3269    {
3270        if let Some(m) = model_override {
3271            entry.model = Some(m);
3272        }
3273        entry.temperature = temperature;
3274    }
3275
3276    let mut agent = Agent::from_config(&effective_config, agent_alias).await?;
3277
3278    let (provider_name, model_name) =
3279        match effective_config.resolved_model_provider_for_agent(agent_alias) {
3280            Some((ty, _alias, entry)) => {
3281                let model = entry
3282                    .model
3283                    .as_deref()
3284                    .map(str::trim)
3285                    .filter(|m| !m.is_empty())
3286                    .map(ToString::to_string)
3287                    .or_else(|| effective_config.resolve_default_model())
3288                    .unwrap_or_else(|| "<unresolved>".to_string());
3289                (ty.to_string(), model)
3290            }
3291            None => (
3292                provider_override.unwrap_or_else(|| "unknown".to_string()),
3293                effective_config
3294                    .resolve_default_model()
3295                    .unwrap_or_else(|| "<unresolved>".to_string()),
3296            ),
3297        };
3298
3299    agent.observer.record_event(&ObserverEvent::AgentStart {
3300        model_provider: provider_name.clone(),
3301        model: model_name.clone(),
3302        channel: None,
3303        agent_alias: None,
3304        turn_id: None,
3305    });
3306
3307    let _run_guard = TurnGuard {
3308        observer: Arc::clone(&agent.observer),
3309        model_provider: provider_name,
3310        model: model_name,
3311        turn_id: None,
3312        turn_started_at: start,
3313        agent_alias: None,
3314        total_input_tokens: 0,
3315        total_output_tokens: 0,
3316        saw_usage: false,
3317        done: false,
3318    };
3319
3320    if let Some(msg) = message {
3321        let response = agent.run_single(&msg).await?;
3322        println!("{response}");
3323    } else {
3324        agent.run_interactive().await?;
3325    }
3326
3327    Ok(())
3328}
3329
3330#[cfg(test)]
3331mod tests {
3332    use super::*;
3333    use async_trait::async_trait;
3334    use parking_lot::Mutex;
3335    use std::collections::HashMap;
3336    use std::sync::atomic::{AtomicUsize, Ordering};
3337    use zeroclaw_api::observability_traits::ObserverMetric;
3338
3339    #[test]
3340    fn build_session_model_provider_rejects_undotted_ref() {
3341        let config = Config::default();
3342        let err = match build_session_model_provider(&config, "anthropic", Some("m")) {
3343            Ok(_) => panic!("undotted ref must error"),
3344            Err(e) => e,
3345        };
3346        assert!(err.to_string().contains("<type>.<alias>"), "got: {err}");
3347    }
3348
3349    #[test]
3350    fn build_session_model_provider_requires_a_model() {
3351        // No configured entry and no override → cannot resolve a model name.
3352        let config = Config::default();
3353        let err = match build_session_model_provider(&config, "anthropic.default", None) {
3354            Ok(_) => panic!("missing model must error"),
3355            Err(e) => e,
3356        };
3357        assert!(
3358            err.to_string().contains("no `model` configured"),
3359            "got: {err}"
3360        );
3361    }
3362
3363    zeroclaw_api::mock_tool_attribution!(
3364        CountingTool,
3365        NamedMockTool,
3366        MockTool,
3367        SlowTool,
3368        CapturingApprovalArgTool,
3369    );
3370
3371    struct MockModelProvider {
3372        responses: Mutex<Vec<zeroclaw_providers::ChatResponse>>,
3373    }
3374
3375    #[async_trait]
3376    impl ModelProvider for MockModelProvider {
3377        async fn chat_with_system(
3378            &self,
3379            _system_prompt: Option<&str>,
3380            _message: &str,
3381            _model: &str,
3382            _temperature: Option<f64>,
3383        ) -> Result<String> {
3384            Ok("ok".into())
3385        }
3386
3387        async fn chat(
3388            &self,
3389            _request: ChatRequest<'_>,
3390            _model: &str,
3391            _temperature: Option<f64>,
3392        ) -> Result<zeroclaw_providers::ChatResponse> {
3393            let mut guard = self.responses.lock();
3394            if guard.is_empty() {
3395                return Ok(zeroclaw_providers::ChatResponse {
3396                    text: Some("done".into()),
3397                    tool_calls: vec![],
3398                    usage: None,
3399                    reasoning_content: None,
3400                });
3401            }
3402            Ok(guard.remove(0))
3403        }
3404    }
3405    impl ::zeroclaw_api::attribution::Attributable for MockModelProvider {
3406        fn role(&self) -> ::zeroclaw_api::attribution::Role {
3407            ::zeroclaw_api::attribution::Role::Provider(
3408                ::zeroclaw_api::attribution::ProviderKind::Model(
3409                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3410                ),
3411            )
3412        }
3413        fn alias(&self) -> &str {
3414            "MockModelProvider"
3415        }
3416    }
3417
3418    struct ModelCaptureModelProvider {
3419        responses: Mutex<Vec<zeroclaw_providers::ChatResponse>>,
3420        seen_models: Arc<Mutex<Vec<String>>>,
3421    }
3422
3423    #[async_trait]
3424    impl ModelProvider for ModelCaptureModelProvider {
3425        async fn chat_with_system(
3426            &self,
3427            _system_prompt: Option<&str>,
3428            _message: &str,
3429            _model: &str,
3430            _temperature: Option<f64>,
3431        ) -> Result<String> {
3432            Ok("ok".into())
3433        }
3434
3435        async fn chat(
3436            &self,
3437            _request: ChatRequest<'_>,
3438            model: &str,
3439            _temperature: Option<f64>,
3440        ) -> Result<zeroclaw_providers::ChatResponse> {
3441            self.seen_models.lock().push(model.to_string());
3442            let mut guard = self.responses.lock();
3443            if guard.is_empty() {
3444                return Ok(zeroclaw_providers::ChatResponse {
3445                    text: Some("done".into()),
3446                    tool_calls: vec![],
3447                    usage: None,
3448                    reasoning_content: None,
3449                });
3450            }
3451            Ok(guard.remove(0))
3452        }
3453    }
3454    impl ::zeroclaw_api::attribution::Attributable for ModelCaptureModelProvider {
3455        fn role(&self) -> ::zeroclaw_api::attribution::Role {
3456            ::zeroclaw_api::attribution::Role::Provider(
3457                ::zeroclaw_api::attribution::ProviderKind::Model(
3458                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3459                ),
3460            )
3461        }
3462        fn alias(&self) -> &str {
3463            "ModelCaptureModelProvider"
3464        }
3465    }
3466
3467    struct TranscriptCaptureModelProvider {
3468        responses: Mutex<Vec<zeroclaw_providers::ChatResponse>>,
3469        seen_messages: Arc<Mutex<Vec<Vec<ChatMessage>>>>,
3470    }
3471
3472    #[async_trait]
3473    impl ModelProvider for TranscriptCaptureModelProvider {
3474        async fn chat_with_system(
3475            &self,
3476            _system_prompt: Option<&str>,
3477            _message: &str,
3478            _model: &str,
3479            _temperature: Option<f64>,
3480        ) -> Result<String> {
3481            Ok("ok".into())
3482        }
3483
3484        async fn chat(
3485            &self,
3486            request: ChatRequest<'_>,
3487            _model: &str,
3488            _temperature: Option<f64>,
3489        ) -> Result<zeroclaw_providers::ChatResponse> {
3490            self.seen_messages.lock().push(request.messages.to_vec());
3491            let mut responses = self.responses.lock();
3492            if responses.is_empty() {
3493                return Ok(zeroclaw_providers::ChatResponse {
3494                    text: Some("done".into()),
3495                    tool_calls: vec![],
3496                    usage: None,
3497                    reasoning_content: None,
3498                });
3499            }
3500            Ok(responses.remove(0))
3501        }
3502    }
3503
3504    impl ::zeroclaw_api::attribution::Attributable for TranscriptCaptureModelProvider {
3505        fn role(&self) -> ::zeroclaw_api::attribution::Role {
3506            ::zeroclaw_api::attribution::Role::Provider(
3507                ::zeroclaw_api::attribution::ProviderKind::Model(
3508                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3509                ),
3510            )
3511        }
3512        fn alias(&self) -> &str {
3513            "TranscriptCaptureModelProvider"
3514        }
3515    }
3516
3517    struct StreamingSteeringModelProvider {
3518        seen_messages: Arc<Mutex<Vec<Vec<ChatMessage>>>>,
3519        call_count: AtomicUsize,
3520        fail_on_call: Option<usize>,
3521        fail_chat_on_call: Option<usize>,
3522        fail_after_delta_on_call: Option<usize>,
3523        delay_chat_on_call: Option<usize>,
3524    }
3525
3526    #[async_trait]
3527    impl ModelProvider for StreamingSteeringModelProvider {
3528        async fn chat_with_system(
3529            &self,
3530            _system_prompt: Option<&str>,
3531            _message: &str,
3532            _model: &str,
3533            _temperature: Option<f64>,
3534        ) -> Result<String> {
3535            Ok("ok".into())
3536        }
3537
3538        async fn chat(
3539            &self,
3540            request: ChatRequest<'_>,
3541            _model: &str,
3542            _temperature: Option<f64>,
3543        ) -> Result<zeroclaw_providers::ChatResponse> {
3544            let call = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
3545            self.seen_messages.lock().push(request.messages.to_vec());
3546            if self.delay_chat_on_call == Some(call) {
3547                tokio::time::sleep(std::time::Duration::from_secs(60)).await;
3548            }
3549            if self.fail_on_call == Some(call) {
3550                anyhow::bail!("synthetic provider failure on call {call}");
3551            }
3552            if self.fail_chat_on_call == Some(call) {
3553                anyhow::bail!("synthetic chat failure on call {call}");
3554            }
3555            if self.fail_after_delta_on_call == Some(call) {
3556                anyhow::bail!("synthetic provider failure after delta on call {call}");
3557            }
3558            Ok(zeroclaw_providers::ChatResponse {
3559                text: Some(if call == 1 { "draft" } else { "final" }.into()),
3560                tool_calls: vec![],
3561                usage: None,
3562                reasoning_content: None,
3563            })
3564        }
3565
3566        fn supports_streaming(&self) -> bool {
3567            true
3568        }
3569
3570        fn stream_chat(
3571            &self,
3572            request: ChatRequest<'_>,
3573            _model: &str,
3574            _temperature: Option<f64>,
3575            _options: zeroclaw_providers::traits::StreamOptions,
3576        ) -> futures_util::stream::BoxStream<
3577            'static,
3578            zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
3579        > {
3580            use futures_util::StreamExt as _;
3581
3582            let call = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
3583            self.seen_messages.lock().push(request.messages.to_vec());
3584            let should_fail = self.fail_on_call == Some(call);
3585            let should_fail_after_delta = self.fail_after_delta_on_call == Some(call);
3586            let delta = if call == 1 { "draft" } else { "final" }.to_string();
3587            futures_util::stream::unfold(0, move |step| {
3588                let delta = delta.clone();
3589                async move {
3590                    match step {
3591                        0 if should_fail => Some((
3592                            Err(zeroclaw_providers::traits::StreamError::ModelProvider(
3593                                "synthetic provider failure".into(),
3594                            )),
3595                            1,
3596                        )),
3597                        0 => Some((
3598                            Ok(zeroclaw_providers::traits::StreamEvent::TextDelta(
3599                                zeroclaw_providers::traits::StreamChunk {
3600                                    delta,
3601                                    is_final: false,
3602                                    reasoning: None,
3603                                    token_count: 0,
3604                                },
3605                            )),
3606                            1,
3607                        )),
3608                        1 if should_fail_after_delta => Some((
3609                            Err(zeroclaw_providers::traits::StreamError::ModelProvider(
3610                                "synthetic provider failure after delta".into(),
3611                            )),
3612                            2,
3613                        )),
3614                        1 => {
3615                            tokio::time::sleep(std::time::Duration::from_millis(150)).await;
3616                            Some((Ok(zeroclaw_providers::traits::StreamEvent::Final), 2))
3617                        }
3618                        _ => None,
3619                    }
3620                }
3621            })
3622            .boxed()
3623        }
3624    }
3625
3626    impl ::zeroclaw_api::attribution::Attributable for StreamingSteeringModelProvider {
3627        fn role(&self) -> ::zeroclaw_api::attribution::Role {
3628            ::zeroclaw_api::attribution::Role::Provider(
3629                ::zeroclaw_api::attribution::ProviderKind::Model(
3630                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3631                ),
3632            )
3633        }
3634        fn alias(&self) -> &str {
3635            "StreamingSteeringModelProvider"
3636        }
3637    }
3638
3639    #[derive(Default)]
3640    struct CapturingObserver {
3641        events: parking_lot::Mutex<Vec<ObserverEvent>>,
3642    }
3643
3644    impl Observer for CapturingObserver {
3645        fn record_event(&self, event: &ObserverEvent) {
3646            self.events.lock().push(event.clone());
3647        }
3648        fn record_metric(&self, _metric: &ObserverMetric) {}
3649        fn name(&self) -> &str {
3650            "capturing"
3651        }
3652        fn as_any(&self) -> &dyn std::any::Any {
3653            self
3654        }
3655        fn flush(&self) {}
3656    }
3657
3658    struct MultimodalCaptureProvider {
3659        seen_user_messages: Arc<Mutex<Vec<String>>>,
3660        streamed: bool,
3661    }
3662
3663    #[async_trait]
3664    impl ModelProvider for MultimodalCaptureProvider {
3665        async fn chat_with_system(
3666            &self,
3667            _system_prompt: Option<&str>,
3668            _message: &str,
3669            _model: &str,
3670            _temperature: Option<f64>,
3671        ) -> Result<String> {
3672            Ok("ok".into())
3673        }
3674
3675        async fn chat(
3676            &self,
3677            request: ChatRequest<'_>,
3678            _model: &str,
3679            _temperature: Option<f64>,
3680        ) -> Result<zeroclaw_providers::ChatResponse> {
3681            if let Some(message) = request.messages.iter().rfind(|msg| msg.role == "user") {
3682                self.seen_user_messages.lock().push(message.content.clone());
3683            }
3684            Ok(zeroclaw_providers::ChatResponse {
3685                text: Some("done".into()),
3686                tool_calls: vec![],
3687                usage: None,
3688                reasoning_content: None,
3689            })
3690        }
3691
3692        fn stream_chat(
3693            &self,
3694            request: ChatRequest<'_>,
3695            _model: &str,
3696            _temperature: Option<f64>,
3697            _options: zeroclaw_providers::traits::StreamOptions,
3698        ) -> futures_util::stream::BoxStream<
3699            'static,
3700            zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
3701        > {
3702            use futures_util::stream::{self, StreamExt};
3703
3704            if let Some(message) = request.messages.iter().rfind(|msg| msg.role == "user") {
3705                self.seen_user_messages.lock().push(message.content.clone());
3706            }
3707
3708            if self.streamed {
3709                let chunk = zeroclaw_providers::traits::StreamEvent::TextDelta(
3710                    zeroclaw_providers::traits::StreamChunk {
3711                        delta: "stream-done".into(),
3712                        is_final: false,
3713                        reasoning: None,
3714                        token_count: 0,
3715                    },
3716                );
3717                stream::iter(vec![
3718                    Ok(chunk),
3719                    Ok(zeroclaw_providers::traits::StreamEvent::Final),
3720                ])
3721                .boxed()
3722            } else {
3723                stream::iter(vec![Ok(zeroclaw_providers::traits::StreamEvent::Final)]).boxed()
3724            }
3725        }
3726
3727        fn supports_vision(&self) -> bool {
3728            true
3729        }
3730    }
3731    impl ::zeroclaw_api::attribution::Attributable for MultimodalCaptureProvider {
3732        fn role(&self) -> ::zeroclaw_api::attribution::Role {
3733            ::zeroclaw_api::attribution::Role::Provider(
3734                ::zeroclaw_api::attribution::ProviderKind::Model(
3735                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3736                ),
3737            )
3738        }
3739        fn alias(&self) -> &str {
3740            "MultimodalCaptureProvider"
3741        }
3742    }
3743
3744    struct MockTool;
3745
3746    #[async_trait]
3747    impl Tool for MockTool {
3748        fn name(&self) -> &str {
3749            "echo"
3750        }
3751
3752        fn description(&self) -> &str {
3753            "echo"
3754        }
3755
3756        fn parameters_schema(&self) -> serde_json::Value {
3757            serde_json::json!({"type": "object"})
3758        }
3759
3760        async fn execute(&self, _args: serde_json::Value) -> Result<crate::tools::ToolResult> {
3761            Ok(crate::tools::ToolResult {
3762                success: true,
3763                output: "tool-out".into(),
3764                error: None,
3765            })
3766        }
3767    }
3768
3769    struct FailingModelProvider;
3770
3771    #[async_trait]
3772    impl ModelProvider for FailingModelProvider {
3773        async fn chat_with_system(
3774            &self,
3775            _system_prompt: Option<&str>,
3776            _message: &str,
3777            _model: &str,
3778            _temperature: Option<f64>,
3779        ) -> Result<String> {
3780            Err(anyhow::Error::msg("provider unavailable"))
3781        }
3782
3783        async fn chat(
3784            &self,
3785            _request: ChatRequest<'_>,
3786            _model: &str,
3787            _temperature: Option<f64>,
3788        ) -> Result<zeroclaw_providers::ChatResponse> {
3789            Err(anyhow::Error::msg("provider unavailable"))
3790        }
3791    }
3792
3793    impl ::zeroclaw_api::attribution::Attributable for FailingModelProvider {
3794        fn role(&self) -> ::zeroclaw_api::attribution::Role {
3795            ::zeroclaw_api::attribution::Role::Provider(
3796                ::zeroclaw_api::attribution::ProviderKind::Model(
3797                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3798                ),
3799            )
3800        }
3801        fn alias(&self) -> &str {
3802            "FailingModelProvider"
3803        }
3804    }
3805
3806    struct SlowTool;
3807
3808    #[async_trait]
3809    impl Tool for SlowTool {
3810        fn name(&self) -> &str {
3811            "echo"
3812        }
3813
3814        fn description(&self) -> &str {
3815            "echo"
3816        }
3817
3818        fn parameters_schema(&self) -> serde_json::Value {
3819            serde_json::json!({"type": "object"})
3820        }
3821
3822        async fn execute(&self, _args: serde_json::Value) -> Result<crate::tools::ToolResult> {
3823            tokio::time::sleep(std::time::Duration::from_secs(60)).await;
3824            Ok(crate::tools::ToolResult {
3825                success: true,
3826                output: "tool-out".into(),
3827                error: None,
3828            })
3829        }
3830    }
3831
3832    struct CountingTool {
3833        calls: Arc<AtomicUsize>,
3834    }
3835
3836    #[async_trait]
3837    impl Tool for CountingTool {
3838        fn name(&self) -> &str {
3839            "echo"
3840        }
3841
3842        fn description(&self) -> &str {
3843            "echo"
3844        }
3845
3846        fn parameters_schema(&self) -> serde_json::Value {
3847            serde_json::json!({"type": "object"})
3848        }
3849
3850        async fn execute(&self, _args: serde_json::Value) -> Result<crate::tools::ToolResult> {
3851            self.calls.fetch_add(1, Ordering::SeqCst);
3852            Ok(crate::tools::ToolResult {
3853                success: true,
3854                output: "tool-out".into(),
3855                error: None,
3856            })
3857        }
3858    }
3859
3860    struct CapturingApprovalArgTool {
3861        name: &'static str,
3862        output: &'static str,
3863        calls: Arc<AtomicUsize>,
3864        last_args: Arc<std::sync::Mutex<Option<serde_json::Value>>>,
3865    }
3866
3867    #[async_trait]
3868    impl Tool for CapturingApprovalArgTool {
3869        fn name(&self) -> &str {
3870            self.name
3871        }
3872
3873        fn description(&self) -> &str {
3874            self.name
3875        }
3876
3877        fn parameters_schema(&self) -> serde_json::Value {
3878            serde_json::json!({"type": "object"})
3879        }
3880
3881        async fn execute(&self, args: serde_json::Value) -> Result<crate::tools::ToolResult> {
3882            self.calls.fetch_add(1, Ordering::SeqCst);
3883            *self.last_args.lock().unwrap() = Some(args);
3884            Ok(crate::tools::ToolResult {
3885                success: true,
3886                output: self.output.into(),
3887                error: None,
3888            })
3889        }
3890    }
3891
3892    struct ApprovalChannel {
3893        response: zeroclaw_api::channel::ChannelApprovalResponse,
3894        requests: Arc<AtomicUsize>,
3895    }
3896
3897    impl ::zeroclaw_api::attribution::Attributable for ApprovalChannel {
3898        fn role(&self) -> ::zeroclaw_api::attribution::Role {
3899            ::zeroclaw_api::attribution::Role::Channel(
3900                ::zeroclaw_api::attribution::ChannelKind::AcpChannel,
3901            )
3902        }
3903        fn alias(&self) -> &str {
3904            "test"
3905        }
3906    }
3907
3908    #[async_trait]
3909    impl zeroclaw_api::channel::Channel for ApprovalChannel {
3910        fn name(&self) -> &str {
3911            "acp"
3912        }
3913
3914        async fn send(&self, _message: &zeroclaw_api::channel::SendMessage) -> anyhow::Result<()> {
3915            Ok(())
3916        }
3917
3918        async fn listen(
3919            &self,
3920            _tx: tokio::sync::mpsc::Sender<zeroclaw_api::channel::ChannelMessage>,
3921        ) -> anyhow::Result<()> {
3922            Ok(())
3923        }
3924
3925        async fn request_approval(
3926            &self,
3927            _recipient: &str,
3928            _request: &zeroclaw_api::channel::ChannelApprovalRequest,
3929        ) -> anyhow::Result<Option<zeroclaw_api::channel::ChannelApprovalResponse>> {
3930            self.requests.fetch_add(1, Ordering::SeqCst);
3931            Ok(Some(self.response.clone()))
3932        }
3933    }
3934
3935    #[tokio::test]
3936    async fn turn_without_tools_returns_text() {
3937        let model_provider = Box::new(MockModelProvider {
3938            responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
3939                text: Some("hello".into()),
3940                tool_calls: vec![],
3941                usage: None,
3942                reasoning_content: None,
3943            }]),
3944        });
3945
3946        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3947            backend: "none".into(),
3948            ..zeroclaw_config::schema::MemoryConfig::default()
3949        };
3950        let mem: Arc<dyn Memory> = Arc::from(
3951            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3952                .expect("memory creation should succeed with valid config"),
3953        );
3954
3955        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3956        let mut agent = Agent::builder()
3957            .model_provider(model_provider)
3958            .tools(vec![Box::new(MockTool)])
3959            .memory(mem)
3960            .observer(observer)
3961            .tool_dispatcher(Box::new(XmlToolDispatcher))
3962            .workspace_dir(std::path::PathBuf::from("/tmp"))
3963            .build()
3964            .expect("agent builder should succeed with valid config");
3965
3966        let response = agent.turn("hi").await.unwrap();
3967        assert_eq!(response, "hello");
3968    }
3969
3970    #[tokio::test]
3971    async fn direct_agent_strict_tool_parsing_ignores_xml_dispatcher_calls() {
3972        let provider = Box::new(MockModelProvider {
3973            responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
3974                text: Some(
3975                    r#"<tool_call>{"name":"echo","arguments":{"value":"ignored"}}</tool_call>"#
3976                        .into(),
3977                ),
3978                tool_calls: vec![],
3979                usage: None,
3980                reasoning_content: None,
3981            }]),
3982        });
3983
3984        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
3985            backend: "none".into(),
3986            ..zeroclaw_config::schema::MemoryConfig::default()
3987        };
3988        let mem: Arc<dyn Memory> = Arc::from(
3989            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
3990                .expect("memory creation should succeed with valid config"),
3991        );
3992        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
3993        let calls = Arc::new(AtomicUsize::new(0));
3994        let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
3995            resolved: zeroclaw_config::schema::ResolvedRuntime {
3996                strict_tool_parsing: true,
3997                ..Default::default()
3998            },
3999            ..zeroclaw_config::schema::AliasedAgentConfig::default()
4000        };
4001        let mut agent = Agent::builder()
4002            .model_provider(provider)
4003            .tools(vec![Box::new(CountingTool {
4004                calls: Arc::clone(&calls),
4005            })])
4006            .memory(mem)
4007            .observer(observer)
4008            .tool_dispatcher(Box::new(XmlToolDispatcher))
4009            .config(agent_config)
4010            .workspace_dir(std::path::PathBuf::from("/tmp"))
4011            .build()
4012            .expect("agent builder should succeed with valid config");
4013
4014        let system_prompt = agent
4015            .build_system_prompt()
4016            .expect("system prompt should render");
4017        assert!(
4018            !system_prompt.contains("## Tools"),
4019            "strict parsing should not advertise text tool instructions"
4020        );
4021        assert!(
4022            !system_prompt.contains("<tool_call"),
4023            "strict parsing should not advertise XML tool calls"
4024        );
4025
4026        let response = agent.turn("hi").await.unwrap();
4027
4028        assert_eq!(calls.load(Ordering::SeqCst), 0);
4029        assert!(response.contains("<tool_call>"));
4030    }
4031
4032    #[test]
4033    fn native_agent_prompt_omits_duplicate_tools_section() {
4034        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4035            backend: "none".into(),
4036            ..zeroclaw_config::schema::MemoryConfig::default()
4037        };
4038        let workspace = tempfile::TempDir::new().expect("temp dir");
4039        let mem: Arc<dyn Memory> = Arc::from(
4040            zeroclaw_memory::create_memory(&memory_cfg, workspace.path(), None)
4041                .expect("memory creation should succeed with valid config"),
4042        );
4043        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4044
4045        let native_agent = Agent::builder()
4046            .model_provider(Box::new(MockModelProvider {
4047                responses: Mutex::new(vec![]),
4048            }))
4049            .tools(vec![Box::new(MockTool)])
4050            .memory(Arc::clone(&mem))
4051            .observer(Arc::clone(&observer))
4052            .tool_dispatcher(Box::new(NativeToolDispatcher))
4053            .workspace_dir(workspace.path().to_path_buf())
4054            .build()
4055            .expect("agent builder should succeed with valid config");
4056        let native_prompt = native_agent.build_system_prompt().unwrap();
4057        assert!(!native_prompt.contains("## Tools"));
4058        assert!(!native_prompt.contains("echo"));
4059
4060        let xml_agent = Agent::builder()
4061            .model_provider(Box::new(MockModelProvider {
4062                responses: Mutex::new(vec![]),
4063            }))
4064            .tools(vec![Box::new(MockTool)])
4065            .memory(mem)
4066            .observer(observer)
4067            .tool_dispatcher(Box::new(XmlToolDispatcher))
4068            .workspace_dir(workspace.path().to_path_buf())
4069            .build()
4070            .expect("agent builder should succeed with valid config");
4071        let xml_prompt = xml_agent.build_system_prompt().unwrap();
4072        assert!(xml_prompt.contains("## Tools"));
4073        assert!(xml_prompt.contains("echo"));
4074        assert!(xml_prompt.contains("## Tool Use Protocol"));
4075    }
4076
4077    #[tokio::test]
4078    async fn direct_agent_tool_execution_requests_acp_approval() {
4079        let model_provider = Box::new(MockModelProvider {
4080            responses: Mutex::new(vec![]),
4081        });
4082        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4083            backend: "none".into(),
4084            ..zeroclaw_config::schema::MemoryConfig::default()
4085        };
4086        let mem: Arc<dyn Memory> = Arc::from(
4087            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4088                .expect("memory creation should succeed with valid config"),
4089        );
4090        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4091        let tool_calls = Arc::new(AtomicUsize::new(0));
4092        let approval_requests = Arc::new(AtomicUsize::new(0));
4093        let approval_cfg = zeroclaw_config::schema::RiskProfileConfig {
4094            always_ask: vec!["echo".into()],
4095            ..zeroclaw_config::schema::RiskProfileConfig::default()
4096        };
4097        let mut agent = Agent::builder()
4098            .model_provider(model_provider)
4099            .tools(vec![Box::new(CountingTool {
4100                calls: Arc::clone(&tool_calls),
4101            })])
4102            .memory(mem)
4103            .observer(observer)
4104            .tool_dispatcher(Box::new(NativeToolDispatcher))
4105            .workspace_dir(std::path::PathBuf::from("/tmp"))
4106            .approval_manager(Some(Arc::new(ApprovalManager::for_non_interactive(
4107                &approval_cfg,
4108            ))))
4109            .build()
4110            .expect("agent builder should succeed with valid config");
4111
4112        let handle: tools::PerToolChannelHandle =
4113            Arc::new(parking_lot::RwLock::new(HashMap::new()));
4114        agent.channel_handles.ask_user = Some(Arc::clone(&handle));
4115        let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
4116            response: zeroclaw_api::channel::ChannelApprovalResponse::Approve,
4117            requests: Arc::clone(&approval_requests),
4118        });
4119        agent.channel_handles().register_channel("acp", channel);
4120
4121        let result = agent
4122            .execute_tool_call(
4123                &ParsedToolCall {
4124                    name: "echo".into(),
4125                    arguments: serde_json::json!({"message": "hi"}),
4126                    tool_call_id: Some("tc1".into()),
4127                },
4128                "test-turn",
4129            )
4130            .await;
4131
4132        assert!(result.success);
4133        assert_eq!(result.output, "tool-out");
4134        assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
4135        assert_eq!(tool_calls.load(Ordering::SeqCst), 1);
4136    }
4137
4138    #[tokio::test]
4139    async fn direct_agent_tool_execution_denies_when_acp_rejects() {
4140        let model_provider = Box::new(MockModelProvider {
4141            responses: Mutex::new(vec![]),
4142        });
4143        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4144            backend: "none".into(),
4145            ..zeroclaw_config::schema::MemoryConfig::default()
4146        };
4147        let mem: Arc<dyn Memory> = Arc::from(
4148            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4149                .expect("memory creation should succeed with valid config"),
4150        );
4151        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4152        let tool_calls = Arc::new(AtomicUsize::new(0));
4153        let approval_requests = Arc::new(AtomicUsize::new(0));
4154        let approval_cfg = zeroclaw_config::schema::RiskProfileConfig {
4155            always_ask: vec!["echo".into()],
4156            ..zeroclaw_config::schema::RiskProfileConfig::default()
4157        };
4158        let mut agent = Agent::builder()
4159            .model_provider(model_provider)
4160            .tools(vec![Box::new(CountingTool {
4161                calls: Arc::clone(&tool_calls),
4162            })])
4163            .memory(mem)
4164            .observer(observer)
4165            .tool_dispatcher(Box::new(NativeToolDispatcher))
4166            .workspace_dir(std::path::PathBuf::from("/tmp"))
4167            .approval_manager(Some(Arc::new(ApprovalManager::for_non_interactive(
4168                &approval_cfg,
4169            ))))
4170            .build()
4171            .expect("agent builder should succeed with valid config");
4172
4173        let handle: tools::PerToolChannelHandle =
4174            Arc::new(parking_lot::RwLock::new(HashMap::new()));
4175        agent.channel_handles.ask_user = Some(Arc::clone(&handle));
4176        let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
4177            response: zeroclaw_api::channel::ChannelApprovalResponse::Deny,
4178            requests: Arc::clone(&approval_requests),
4179        });
4180        agent.channel_handles().register_channel("acp", channel);
4181
4182        let result = agent
4183            .execute_tool_call(
4184                &ParsedToolCall {
4185                    name: "echo".into(),
4186                    arguments: serde_json::json!({"message": "hi"}),
4187                    tool_call_id: Some("tc1".into()),
4188                },
4189                "test-turn",
4190            )
4191            .await;
4192
4193        assert!(!result.success);
4194        assert_eq!(result.output, "Denied by user.");
4195        assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
4196        assert_eq!(tool_calls.load(Ordering::SeqCst), 0);
4197    }
4198
4199    #[tokio::test]
4200    async fn direct_agent_shell_does_not_trust_model_supplied_approved_arg() {
4201        let provider = Box::new(MockModelProvider {
4202            responses: Mutex::new(vec![]),
4203        });
4204        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4205            backend: "none".into(),
4206            ..zeroclaw_config::schema::MemoryConfig::default()
4207        };
4208        let mem: Arc<dyn Memory> = Arc::from(
4209            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4210                .expect("memory creation should succeed with valid config"),
4211        );
4212        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4213        let tool_calls = Arc::new(AtomicUsize::new(0));
4214        let approval_requests = Arc::new(AtomicUsize::new(0));
4215        let captured_args = Arc::new(std::sync::Mutex::new(None));
4216        let approval_cfg = zeroclaw_config::schema::RiskProfileConfig::default();
4217        let mut agent = Agent::builder()
4218            .model_provider(provider)
4219            .tools(vec![Box::new(CapturingApprovalArgTool {
4220                name: "shell",
4221                output: "shell-out",
4222                calls: Arc::clone(&tool_calls),
4223                last_args: Arc::clone(&captured_args),
4224            })])
4225            .memory(mem)
4226            .observer(observer)
4227            .tool_dispatcher(Box::new(NativeToolDispatcher))
4228            .workspace_dir(std::path::PathBuf::from("/tmp"))
4229            .approval_manager(Some(Arc::new(
4230                ApprovalManager::for_non_interactive_backchannel(&approval_cfg),
4231            )))
4232            .build()
4233            .expect("agent builder should succeed with valid config");
4234
4235        let handle: tools::PerToolChannelHandle =
4236            Arc::new(parking_lot::RwLock::new(HashMap::new()));
4237        agent.channel_handles.ask_user = Some(Arc::clone(&handle));
4238        let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
4239            response: zeroclaw_api::channel::ChannelApprovalResponse::Deny,
4240            requests: Arc::clone(&approval_requests),
4241        });
4242        agent.channel_handles().register_channel("acp", channel);
4243
4244        let result = agent
4245            .execute_tool_call(
4246                &ParsedToolCall {
4247                    name: "shell".into(),
4248                    arguments: serde_json::json!({
4249                        "command": "touch should-not-run",
4250                        "approved": true
4251                    }),
4252                    tool_call_id: Some("tc1".into()),
4253                },
4254                "test-turn",
4255            )
4256            .await;
4257
4258        assert!(!result.success);
4259        assert_eq!(result.output, "Denied by user.");
4260        assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
4261        assert_eq!(tool_calls.load(Ordering::SeqCst), 0);
4262        assert!(captured_args.lock().unwrap().is_none());
4263    }
4264
4265    #[tokio::test]
4266    async fn direct_agent_shell_marks_args_approved_after_backchannel_approval() {
4267        let provider = Box::new(MockModelProvider {
4268            responses: Mutex::new(vec![]),
4269        });
4270        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4271            backend: "none".into(),
4272            ..zeroclaw_config::schema::MemoryConfig::default()
4273        };
4274        let mem: Arc<dyn Memory> = Arc::from(
4275            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4276                .expect("memory creation should succeed with valid config"),
4277        );
4278        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4279        let tool_calls = Arc::new(AtomicUsize::new(0));
4280        let approval_requests = Arc::new(AtomicUsize::new(0));
4281        let captured_args = Arc::new(std::sync::Mutex::new(None));
4282        let approval_cfg = zeroclaw_config::schema::RiskProfileConfig::default();
4283        let mut agent = Agent::builder()
4284            .model_provider(provider)
4285            .tools(vec![Box::new(CapturingApprovalArgTool {
4286                name: "shell",
4287                output: "shell-out",
4288                calls: Arc::clone(&tool_calls),
4289                last_args: Arc::clone(&captured_args),
4290            })])
4291            .memory(mem)
4292            .observer(observer)
4293            .tool_dispatcher(Box::new(NativeToolDispatcher))
4294            .workspace_dir(std::path::PathBuf::from("/tmp"))
4295            .approval_manager(Some(Arc::new(
4296                ApprovalManager::for_non_interactive_backchannel(&approval_cfg),
4297            )))
4298            .build()
4299            .expect("agent builder should succeed with valid config");
4300
4301        let handle: tools::PerToolChannelHandle =
4302            Arc::new(parking_lot::RwLock::new(HashMap::new()));
4303        agent.channel_handles.ask_user = Some(Arc::clone(&handle));
4304        let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
4305            response: zeroclaw_api::channel::ChannelApprovalResponse::Approve,
4306            requests: Arc::clone(&approval_requests),
4307        });
4308        agent.channel_handles().register_channel("acp", channel);
4309
4310        let result = agent
4311            .execute_tool_call(
4312                &ParsedToolCall {
4313                    name: "shell".into(),
4314                    arguments: serde_json::json!({
4315                        "command": "touch should-run-after-human-approval",
4316                        "approved": false
4317                    }),
4318                    tool_call_id: Some("tc1".into()),
4319                },
4320                "test-turn",
4321            )
4322            .await;
4323
4324        assert!(result.success);
4325        assert_eq!(result.output, "shell-out");
4326        assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
4327        assert_eq!(tool_calls.load(Ordering::SeqCst), 1);
4328        let args = captured_args
4329            .lock()
4330            .unwrap()
4331            .clone()
4332            .expect("shell tool should capture executed args");
4333        assert_eq!(args["approved"], true);
4334    }
4335
4336    #[tokio::test]
4337    async fn direct_agent_shell_keeps_runtime_approval_from_always_allowlist() {
4338        let provider = Box::new(MockModelProvider {
4339            responses: Mutex::new(vec![]),
4340        });
4341        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4342            backend: "none".into(),
4343            ..zeroclaw_config::schema::MemoryConfig::default()
4344        };
4345        let mem: Arc<dyn Memory> = Arc::from(
4346            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4347                .expect("memory creation should succeed with valid config"),
4348        );
4349        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4350        let tool_calls = Arc::new(AtomicUsize::new(0));
4351        let approval_requests = Arc::new(AtomicUsize::new(0));
4352        let captured_args = Arc::new(std::sync::Mutex::new(None));
4353        let approval_cfg = zeroclaw_config::schema::RiskProfileConfig::default();
4354        let mut agent = Agent::builder()
4355            .model_provider(provider)
4356            .tools(vec![Box::new(CapturingApprovalArgTool {
4357                name: "shell",
4358                output: "shell-out",
4359                calls: Arc::clone(&tool_calls),
4360                last_args: Arc::clone(&captured_args),
4361            })])
4362            .memory(mem)
4363            .observer(observer)
4364            .tool_dispatcher(Box::new(NativeToolDispatcher))
4365            .workspace_dir(std::path::PathBuf::from("/tmp"))
4366            .approval_manager(Some(Arc::new(
4367                ApprovalManager::for_non_interactive_backchannel(&approval_cfg),
4368            )))
4369            .build()
4370            .expect("agent builder should succeed with valid config");
4371
4372        let handle: tools::PerToolChannelHandle =
4373            Arc::new(parking_lot::RwLock::new(HashMap::new()));
4374        agent.channel_handles.ask_user = Some(Arc::clone(&handle));
4375        let channel: Arc<dyn zeroclaw_api::channel::Channel> = Arc::new(ApprovalChannel {
4376            response: zeroclaw_api::channel::ChannelApprovalResponse::AlwaysApprove,
4377            requests: Arc::clone(&approval_requests),
4378        });
4379        agent.channel_handles().register_channel("acp", channel);
4380
4381        let first_result = agent
4382            .execute_tool_call(
4383                &ParsedToolCall {
4384                    name: "shell".into(),
4385                    arguments: serde_json::json!({
4386                        "command": "touch should-run-after-always-approval",
4387                        "approved": false
4388                    }),
4389                    tool_call_id: Some("tc1".into()),
4390                },
4391                "test-turn",
4392            )
4393            .await;
4394        let second_result = agent
4395            .execute_tool_call(
4396                &ParsedToolCall {
4397                    name: "shell".into(),
4398                    arguments: serde_json::json!({
4399                        "command": "touch should-run-from-allowlist",
4400                        "approved": false
4401                    }),
4402                    tool_call_id: Some("tc2".into()),
4403                },
4404                "test-turn",
4405            )
4406            .await;
4407
4408        assert!(first_result.success);
4409        assert!(second_result.success);
4410        assert_eq!(approval_requests.load(Ordering::SeqCst), 1);
4411        assert_eq!(tool_calls.load(Ordering::SeqCst), 2);
4412        let args = captured_args
4413            .lock()
4414            .unwrap()
4415            .clone()
4416            .expect("shell tool should capture executed args");
4417        assert_eq!(args["approved"], true);
4418    }
4419
4420    #[tokio::test]
4421    async fn direct_agent_cron_add_does_not_trust_model_supplied_approved_arg() {
4422        let provider = Box::new(MockModelProvider {
4423            responses: Mutex::new(vec![]),
4424        });
4425        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4426            backend: "none".into(),
4427            ..zeroclaw_config::schema::MemoryConfig::default()
4428        };
4429        let mem: Arc<dyn Memory> = Arc::from(
4430            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4431                .expect("memory creation should succeed with valid config"),
4432        );
4433        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4434        let tool_calls = Arc::new(AtomicUsize::new(0));
4435        let captured_args = Arc::new(std::sync::Mutex::new(None));
4436        let agent = Agent::builder()
4437            .model_provider(provider)
4438            .tools(vec![Box::new(CapturingApprovalArgTool {
4439                name: "cron_add",
4440                output: "cron-out",
4441                calls: Arc::clone(&tool_calls),
4442                last_args: Arc::clone(&captured_args),
4443            })])
4444            .memory(mem)
4445            .observer(observer)
4446            .tool_dispatcher(Box::new(NativeToolDispatcher))
4447            .workspace_dir(std::path::PathBuf::from("/tmp"))
4448            .build()
4449            .expect("agent builder should succeed with valid config");
4450
4451        let result = agent
4452            .execute_tool_call(
4453                &ParsedToolCall {
4454                    name: "cron_add".into(),
4455                    arguments: serde_json::json!({
4456                        "command": "echo should-not-be-model-approved",
4457                        "approved": true
4458                    }),
4459                    tool_call_id: Some("tc1".into()),
4460                },
4461                "test-turn",
4462            )
4463            .await;
4464
4465        assert!(result.success);
4466        assert_eq!(result.output, "cron-out");
4467        assert_eq!(tool_calls.load(Ordering::SeqCst), 1);
4468        let args = captured_args
4469            .lock()
4470            .unwrap()
4471            .clone()
4472            .expect("cron_add tool should capture executed args");
4473        assert_eq!(args["approved"], false);
4474    }
4475
4476    #[tokio::test]
4477    async fn turn_with_native_dispatcher_handles_tool_results_variant() {
4478        let model_provider = Box::new(MockModelProvider {
4479            responses: Mutex::new(vec![
4480                zeroclaw_providers::ChatResponse {
4481                    text: Some(String::new()),
4482                    tool_calls: vec![zeroclaw_providers::ToolCall {
4483                        id: "tc1".into(),
4484                        name: "echo".into(),
4485                        arguments: "{}".into(),
4486                        extra_content: None,
4487                    }],
4488                    usage: None,
4489                    reasoning_content: None,
4490                },
4491                zeroclaw_providers::ChatResponse {
4492                    text: Some("done".into()),
4493                    tool_calls: vec![],
4494                    usage: None,
4495                    reasoning_content: None,
4496                },
4497            ]),
4498        });
4499
4500        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4501            backend: "none".into(),
4502            ..zeroclaw_config::schema::MemoryConfig::default()
4503        };
4504        let mem: Arc<dyn Memory> = Arc::from(
4505            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4506                .expect("memory creation should succeed with valid config"),
4507        );
4508
4509        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4510        let mut agent = Agent::builder()
4511            .model_provider(model_provider)
4512            .tools(vec![Box::new(MockTool)])
4513            .memory(mem)
4514            .observer(observer)
4515            .tool_dispatcher(Box::new(NativeToolDispatcher))
4516            .workspace_dir(std::path::PathBuf::from("/tmp"))
4517            .build()
4518            .expect("agent builder should succeed with valid config");
4519
4520        let response = agent.turn("hi").await.unwrap();
4521        assert_eq!(response, "done");
4522        assert!(
4523            agent
4524                .history()
4525                .iter()
4526                .any(|msg| matches!(msg, ConversationMessage::ToolResults(_)))
4527        );
4528    }
4529
4530    #[tokio::test]
4531    async fn turn_routes_with_hint_when_query_classification_matches() {
4532        let seen_models = Arc::new(Mutex::new(Vec::new()));
4533        let model_provider = Box::new(ModelCaptureModelProvider {
4534            responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
4535                text: Some("classified".into()),
4536                tool_calls: vec![],
4537                usage: None,
4538                reasoning_content: None,
4539            }]),
4540            seen_models: seen_models.clone(),
4541        });
4542
4543        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4544            backend: "none".into(),
4545            ..zeroclaw_config::schema::MemoryConfig::default()
4546        };
4547        let mem: Arc<dyn Memory> = Arc::from(
4548            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4549                .expect("memory creation should succeed with valid config"),
4550        );
4551
4552        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4553        let mut route_model_by_hint = HashMap::new();
4554        route_model_by_hint.insert("fast".to_string(), "anthropic/claude-haiku-4-5".to_string());
4555        let mut agent = Agent::builder()
4556            .model_provider(model_provider)
4557            .tools(vec![Box::new(MockTool)])
4558            .memory(mem)
4559            .observer(observer)
4560            .tool_dispatcher(Box::new(NativeToolDispatcher))
4561            .workspace_dir(std::path::PathBuf::from("/tmp"))
4562            .classification_config(zeroclaw_config::schema::QueryClassificationConfig {
4563                enabled: true,
4564                rules: vec![zeroclaw_config::schema::ClassificationRule {
4565                    hint: "fast".to_string(),
4566                    keywords: vec!["quick".to_string()],
4567                    patterns: vec![],
4568                    min_length: None,
4569                    max_length: None,
4570                    priority: 10,
4571                }],
4572            })
4573            .available_hints(vec!["fast".to_string()])
4574            .route_model_by_hint(route_model_by_hint)
4575            .build()
4576            .expect("agent builder should succeed with valid config");
4577
4578        let response = agent.turn("quick summary please").await.unwrap();
4579        assert_eq!(response, "classified");
4580        let seen = seen_models.lock();
4581        assert_eq!(seen.as_slice(), &["hint:fast".to_string()]);
4582    }
4583
4584    #[tokio::test]
4585    async fn from_config_passes_extra_headers_to_custom_provider() {
4586        use axum::{Json, Router, http::HeaderMap, routing::post};
4587        use tempfile::TempDir;
4588        use tokio::net::TcpListener;
4589
4590        let captured_headers: Arc<std::sync::Mutex<Option<HashMap<String, String>>>> =
4591            Arc::new(std::sync::Mutex::new(None));
4592        let captured_headers_clone = captured_headers.clone();
4593
4594        let app = Router::new().route(
4595            "/chat/completions",
4596            post(
4597                move |headers: HeaderMap, Json(_body): Json<serde_json::Value>| {
4598                    let captured_headers = captured_headers_clone.clone();
4599                    async move {
4600                        let collected = headers
4601                            .iter()
4602                            .filter_map(|(name, value)| {
4603                                value
4604                                    .to_str()
4605                                    .ok()
4606                                    .map(|value| (name.as_str().to_string(), value.to_string()))
4607                            })
4608                            .collect();
4609                        *captured_headers.lock().unwrap() = Some(collected);
4610                        Json(serde_json::json!({
4611                            "choices": [{
4612                                "message": {
4613                                    "content": "hello from mock"
4614                                }
4615                            }]
4616                        }))
4617                    }
4618                },
4619            ),
4620        );
4621
4622        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
4623        let mock_addr = listener.local_addr().unwrap();
4624        let server_handle = zeroclaw_spawn::spawn!(async move {
4625            axum::serve(listener, app).await.unwrap();
4626        });
4627
4628        let tmp = TempDir::new().expect("temp dir");
4629        let workspace_dir = tmp.path().join("workspace");
4630        std::fs::create_dir_all(&workspace_dir).unwrap();
4631
4632        let mut config = zeroclaw_config::schema::Config {
4633            data_dir: workspace_dir,
4634            config_path: tmp.path().join("config.toml"),
4635            ..Default::default()
4636        };
4637        {
4638            // Use the `custom:<url>` model_provider — it builds an
4639            // OpenAiCompatibleModelProvider routed through the `compat`
4640            // closure, which is the only path that actually wires
4641            // `extra_headers` onto outgoing requests. (The native
4642            // `openai` factory ignores extra_headers; OpenRouter
4643            // hardcodes the upstream URL.)
4644            // Custom-URL model_provider: type is the canonical `custom` slot,
4645            // operator URL goes in the `uri` field (post-Phase 6
4646            // operators no longer put URLs in the outer type key).
4647            let entry = config
4648                .providers
4649                .models
4650                .ensure("custom", "default")
4651                .expect("custom model_provider type slot");
4652            entry.api_key = Some("test-key".to_string());
4653            entry.model = Some("test-model".to_string());
4654            entry.uri = Some(format!("http://{mock_addr}"));
4655            entry.extra_headers.insert(
4656                "User-Agent".to_string(),
4657                "zeroclaw-web-test/1.0".to_string(),
4658            );
4659            entry
4660                .extra_headers
4661                .insert("X-Title".to_string(), "zeroclaw-web".to_string());
4662        }
4663        config.memory.backend = "none".to_string();
4664        config.memory.auto_save = false;
4665
4666        // An explicit agent is required. Wire up a minimal agent that
4667        // points at the synthesized model_provider entry, then construct
4668        // Agent::from_config against it.
4669        config.risk_profiles.insert(
4670            "test-profile".to_string(),
4671            zeroclaw_config::schema::RiskProfileConfig::default(),
4672        );
4673        let agent_cfg = zeroclaw_config::schema::AliasedAgentConfig {
4674            model_provider: "custom.default".into(),
4675            risk_profile: "test-profile".to_string(),
4676            ..zeroclaw_config::schema::AliasedAgentConfig::default()
4677        };
4678        config.agents.insert("test-agent".to_string(), agent_cfg);
4679
4680        let mut agent = Agent::from_config(&config, "test-agent")
4681            .await
4682            .expect("agent from config");
4683        let response = agent.turn("hello").await.expect("agent turn");
4684
4685        assert_eq!(response, "hello from mock");
4686
4687        let headers = captured_headers
4688            .lock()
4689            .unwrap()
4690            .clone()
4691            .expect("captured headers");
4692        assert_eq!(
4693            headers.get("user-agent").map(String::as_str),
4694            Some("zeroclaw-web-test/1.0")
4695        );
4696        assert_eq!(
4697            headers.get("x-title").map(String::as_str),
4698            Some("zeroclaw-web")
4699        );
4700
4701        server_handle.abort();
4702    }
4703
4704    #[tokio::test]
4705    async fn from_config_accepts_openai_alias_with_requires_openai_auth() {
4706        use tempfile::TempDir;
4707        use zeroclaw_config::schema::{
4708            AliasedAgentConfig, Config, ModelProviderConfig, OpenAIModelProviderConfig,
4709            RiskProfileConfig, WireApi,
4710        };
4711
4712        let tmp = TempDir::new().expect("temp dir");
4713        let workspace_dir = tmp.path().join("workspace");
4714        std::fs::create_dir_all(&workspace_dir).expect("workspace dir");
4715
4716        let mut config = Config {
4717            data_dir: workspace_dir,
4718            config_path: tmp.path().join("config.toml"),
4719            ..Default::default()
4720        };
4721        config.memory.backend = "none".to_string();
4722        config.memory.auto_save = false;
4723        config
4724            .risk_profiles
4725            .insert("test-profile".to_string(), RiskProfileConfig::default());
4726        config.providers.models.openai.insert(
4727            "codex".to_string(),
4728            OpenAIModelProviderConfig {
4729                base: ModelProviderConfig {
4730                    model: Some("gpt-5.4".to_string()),
4731                    requires_openai_auth: true,
4732                    wire_api: Some(WireApi::Responses),
4733                    ..ModelProviderConfig::default()
4734                },
4735            },
4736        );
4737        config.agents.insert(
4738            "test-agent".to_string(),
4739            AliasedAgentConfig {
4740                model_provider: "openai.codex".into(),
4741                risk_profile: "test-profile".to_string(),
4742                ..AliasedAgentConfig::default()
4743            },
4744        );
4745
4746        let result = Agent::from_config(&config, "test-agent").await;
4747
4748        assert!(
4749            result.is_ok(),
4750            "openai alias with requires_openai_auth should construct via Codex OAuth path: {}",
4751            result.err().unwrap()
4752        );
4753    }
4754
4755    #[test]
4756    fn builder_allowed_tools_none_keeps_all_tools() {
4757        let model_provider = Box::new(MockModelProvider {
4758            responses: Mutex::new(vec![]),
4759        });
4760
4761        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4762            backend: "none".into(),
4763            ..zeroclaw_config::schema::MemoryConfig::default()
4764        };
4765        let mem: Arc<dyn Memory> = Arc::from(
4766            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4767                .expect("memory creation should succeed with valid config"),
4768        );
4769
4770        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4771        let agent = Agent::builder()
4772            .model_provider(model_provider)
4773            .tools(vec![Box::new(MockTool)])
4774            .memory(mem)
4775            .observer(observer)
4776            .tool_dispatcher(Box::new(NativeToolDispatcher))
4777            .workspace_dir(std::path::PathBuf::from("/tmp"))
4778            .allowed_tools(None)
4779            .build()
4780            .expect("agent builder should succeed with valid config");
4781
4782        assert_eq!(agent.tool_specs.len(), 1);
4783        assert_eq!(agent.tool_specs[0].name, "echo");
4784    }
4785
4786    #[test]
4787    fn builder_allowed_tools_some_filters_tools() {
4788        let model_provider = Box::new(MockModelProvider {
4789            responses: Mutex::new(vec![]),
4790        });
4791
4792        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4793            backend: "none".into(),
4794            ..zeroclaw_config::schema::MemoryConfig::default()
4795        };
4796        let mem: Arc<dyn Memory> = Arc::from(
4797            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4798                .expect("memory creation should succeed with valid config"),
4799        );
4800
4801        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4802        let agent = Agent::builder()
4803            .model_provider(model_provider)
4804            .tools(vec![Box::new(MockTool)])
4805            .memory(mem)
4806            .observer(observer)
4807            .tool_dispatcher(Box::new(NativeToolDispatcher))
4808            .workspace_dir(std::path::PathBuf::from("/tmp"))
4809            .allowed_tools(Some(vec!["nonexistent".to_string()]))
4810            .build()
4811            .expect("agent builder should succeed with valid config");
4812
4813        assert!(
4814            agent.tool_specs.is_empty(),
4815            "No tools should match a non-existent allowlist entry"
4816        );
4817    }
4818
4819    /// When a per-session cwd overrides the sandbox root, workspace files must
4820    /// stay readable. Regression guard for the `allowed_roots.push` fix in
4821    /// `from_config_with_session_cwd` (issue #6516).
4822    #[test]
4823    fn session_cwd_keeps_workspace_in_allowed_roots() {
4824        let workspace = std::env::temp_dir().join("zeroclaw_test_session_cwd_workspace");
4825        let session = std::env::temp_dir().join("zeroclaw_test_session_cwd_session");
4826        let _ = std::fs::create_dir_all(&workspace);
4827        let _ = std::fs::create_dir_all(&session);
4828
4829        let skill_file = workspace.join("SKILL.md");
4830        let _ = std::fs::write(&skill_file, "body");
4831        // is_resolved_path_allowed expects a canonicalized path (symlinks resolved).
4832        let skill_resolved = std::fs::canonicalize(&skill_file).unwrap_or(skill_file);
4833
4834        let risk_profile = zeroclaw_config::schema::RiskProfileConfig::default();
4835
4836        // Policy WITH the fix: workspace pushed into allowed_roots.
4837        let mut policy = SecurityPolicy::from_risk_profile(&risk_profile, &session);
4838        policy.allowed_roots.push(workspace.clone());
4839        assert!(
4840            policy.is_resolved_path_allowed(&skill_resolved),
4841            "workspace skills must remain readable when session_cwd differs"
4842        );
4843
4844        // Without the push the same path must be denied, confirming the push
4845        // is the load-bearing fix rather than an incidental side-effect.
4846        let policy_no_push = SecurityPolicy::from_risk_profile(&risk_profile, &session);
4847        assert!(
4848            !policy_no_push.is_resolved_path_allowed(&skill_resolved),
4849            "without allowed_roots.push, workspace files must be outside the sandbox"
4850        );
4851    }
4852
4853    #[test]
4854    fn seed_history_prepends_system_and_skips_system_from_seed() {
4855        let model_provider = Box::new(MockModelProvider {
4856            responses: Mutex::new(vec![]),
4857        });
4858
4859        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4860            backend: "none".into(),
4861            ..zeroclaw_config::schema::MemoryConfig::default()
4862        };
4863        let mem: Arc<dyn Memory> = Arc::from(
4864            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4865                .expect("memory creation should succeed with valid config"),
4866        );
4867
4868        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4869        let mut agent = Agent::builder()
4870            .model_provider(model_provider)
4871            .tools(vec![Box::new(MockTool)])
4872            .memory(mem)
4873            .observer(observer)
4874            .tool_dispatcher(Box::new(NativeToolDispatcher))
4875            .workspace_dir(std::path::PathBuf::from("/tmp"))
4876            .build()
4877            .expect("agent builder should succeed with valid config");
4878
4879        let seed = vec![
4880            ChatMessage::system("old system prompt"),
4881            ChatMessage::user("hello"),
4882            ChatMessage::assistant("hi there"),
4883        ];
4884        agent.seed_history(&seed);
4885
4886        let history = agent.history();
4887        // First message should be a freshly built system prompt (not the seed one)
4888        assert!(matches!(&history[0], ConversationMessage::Chat(m) if m.role == "system"));
4889        // System message from seed should be skipped, so next is user
4890        assert!(
4891            matches!(&history[1], ConversationMessage::Chat(m) if m.role == "user" && m.content == "hello")
4892        );
4893        assert!(
4894            matches!(&history[2], ConversationMessage::Chat(m) if m.role == "assistant" && m.content == "hi there")
4895        );
4896        assert_eq!(history.len(), 3);
4897    }
4898
4899    #[test]
4900    fn seed_conversation_history_preserves_tool_call_variants() {
4901        use zeroclaw_api::model_provider::{
4902            ChatMessage, ConversationMessage, ToolCall, ToolResultMessage,
4903        };
4904
4905        let provider = Box::new(MockModelProvider {
4906            responses: Mutex::new(vec![]),
4907        });
4908
4909        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
4910            backend: "none".into(),
4911            ..zeroclaw_config::schema::MemoryConfig::default()
4912        };
4913        let mem: Arc<dyn Memory> = Arc::from(
4914            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
4915                .expect("memory creation should succeed with valid config"),
4916        );
4917
4918        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
4919        let mut agent = Agent::builder()
4920            .model_provider(provider)
4921            .tools(vec![Box::new(MockTool)])
4922            .memory(mem)
4923            .observer(observer)
4924            .tool_dispatcher(Box::new(NativeToolDispatcher))
4925            .workspace_dir(std::path::PathBuf::from("/tmp"))
4926            .build()
4927            .expect("agent builder should succeed with valid config");
4928
4929        let messages = vec![
4930            ConversationMessage::Chat(ChatMessage::user("run it")),
4931            ConversationMessage::AssistantToolCalls {
4932                text: None,
4933                tool_calls: vec![ToolCall {
4934                    id: "tc-1".into(),
4935                    name: "shell".into(),
4936                    arguments: r#"{"command":"ls"}"#.into(),
4937                    extra_content: None,
4938                }],
4939                reasoning_content: None,
4940            },
4941            ConversationMessage::ToolResults(vec![ToolResultMessage {
4942                tool_call_id: "tc-1".into(),
4943                content: "ok".into(),
4944            }]),
4945            ConversationMessage::Chat(ChatMessage::assistant("done")),
4946        ];
4947
4948        agent.seed_conversation_history(messages);
4949
4950        // System prompt may have been prepended; find non-system messages
4951        let non_system: Vec<_> = agent
4952            .history()
4953            .iter()
4954            .filter(|m| !matches!(m, ConversationMessage::Chat(c) if c.role == "system"))
4955            .collect();
4956
4957        assert_eq!(non_system.len(), 4);
4958        assert!(
4959            matches!(non_system[1], ConversationMessage::AssistantToolCalls { tool_calls, .. } if tool_calls[0].id == "tc-1")
4960        );
4961        assert!(
4962            matches!(non_system[2], ConversationMessage::ToolResults(r) if r[0].tool_call_id == "tc-1")
4963        );
4964    }
4965
4966    /// Mock provider that captures whether tool specs were passed to `stream_chat`
4967    /// and returns a tool call followed by a text response through the stream.
4968    struct StreamToolCaptureModelProvider {
4969        tools_received: Arc<Mutex<Vec<bool>>>,
4970        call_count: Arc<Mutex<usize>>,
4971    }
4972
4973    #[async_trait]
4974    impl ModelProvider for StreamToolCaptureModelProvider {
4975        async fn chat_with_system(
4976            &self,
4977            _system_prompt: Option<&str>,
4978            _message: &str,
4979            _model: &str,
4980            _temperature: Option<f64>,
4981        ) -> Result<String> {
4982            Ok("ok".into())
4983        }
4984
4985        async fn chat(
4986            &self,
4987            request: ChatRequest<'_>,
4988            _model: &str,
4989            _temperature: Option<f64>,
4990        ) -> Result<zeroclaw_providers::ChatResponse> {
4991            self.tools_received.lock().push(request.tools.is_some());
4992            let mut count = self.call_count.lock();
4993            *count += 1;
4994            if *count == 1 {
4995                Ok(zeroclaw_providers::ChatResponse {
4996                    text: Some(String::new()),
4997                    tool_calls: vec![zeroclaw_providers::ToolCall {
4998                        id: "00000000-0000-0000-0000-000000000001".into(),
4999                        name: "echo".into(),
5000                        arguments: "{}".into(),
5001                        extra_content: None,
5002                    }],
5003                    usage: None,
5004                    reasoning_content: None,
5005                })
5006            } else {
5007                Ok(zeroclaw_providers::ChatResponse {
5008                    text: Some("stream-done".into()),
5009                    tool_calls: vec![],
5010                    usage: None,
5011                    reasoning_content: None,
5012                })
5013            }
5014        }
5015
5016        fn supports_native_tools(&self) -> bool {
5017            true
5018        }
5019
5020        fn stream_chat(
5021            &self,
5022            request: ChatRequest<'_>,
5023            _model: &str,
5024            _temperature: Option<f64>,
5025            _options: zeroclaw_providers::traits::StreamOptions,
5026        ) -> futures_util::stream::BoxStream<
5027            'static,
5028            zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
5029        > {
5030            use futures_util::stream::{self, StreamExt};
5031            self.tools_received.lock().push(request.tools.is_some());
5032            let mut count = self.call_count.lock();
5033            *count += 1;
5034            if *count == 1 {
5035                let tc = zeroclaw_providers::traits::StreamEvent::ToolCall(
5036                    zeroclaw_providers::ToolCall {
5037                        id: "00000000-0000-0000-0000-000000000001".into(),
5038                        name: "echo".into(),
5039                        arguments: "{}".into(),
5040                        extra_content: None,
5041                    },
5042                );
5043                stream::iter(vec![
5044                    Ok(tc),
5045                    Ok(zeroclaw_providers::traits::StreamEvent::Final),
5046                ])
5047                .boxed()
5048            } else {
5049                let chunk = zeroclaw_providers::traits::StreamEvent::TextDelta(
5050                    zeroclaw_providers::traits::StreamChunk {
5051                        delta: "stream-done".into(),
5052                        is_final: false,
5053                        reasoning: None,
5054                        token_count: 0,
5055                    },
5056                );
5057                stream::iter(vec![
5058                    Ok(chunk),
5059                    Ok(zeroclaw_providers::traits::StreamEvent::Final),
5060                ])
5061                .boxed()
5062            }
5063        }
5064    }
5065    impl ::zeroclaw_api::attribution::Attributable for StreamToolCaptureModelProvider {
5066        fn role(&self) -> ::zeroclaw_api::attribution::Role {
5067            ::zeroclaw_api::attribution::Role::Provider(
5068                ::zeroclaw_api::attribution::ProviderKind::Model(
5069                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
5070                ),
5071            )
5072        }
5073        fn alias(&self) -> &str {
5074            "StreamToolCaptureModelProvider"
5075        }
5076    }
5077
5078    #[tokio::test]
5079    async fn turn_streamed_passes_tool_specs_to_provider() {
5080        let tools_received = Arc::new(Mutex::new(Vec::new()));
5081        let model_provider = Box::new(StreamToolCaptureModelProvider {
5082            tools_received: tools_received.clone(),
5083            call_count: Arc::new(Mutex::new(0)),
5084        });
5085
5086        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5087            backend: "none".into(),
5088            ..zeroclaw_config::schema::MemoryConfig::default()
5089        };
5090        let mem: Arc<dyn Memory> = Arc::from(
5091            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5092                .expect("memory creation should succeed with valid config"),
5093        );
5094
5095        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5096        let mut agent = Agent::builder()
5097            .model_provider(model_provider)
5098            .tools(vec![Box::new(MockTool)])
5099            .memory(mem)
5100            .observer(observer)
5101            .tool_dispatcher(Box::new(NativeToolDispatcher))
5102            .workspace_dir(std::path::PathBuf::from("/tmp"))
5103            .build()
5104            .expect("agent builder should succeed with valid config");
5105
5106        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
5107        let (response, _) = agent
5108            .turn_streamed("use the echo tool", event_tx, None)
5109            .await
5110            .unwrap();
5111        assert_eq!(response, "stream-done");
5112
5113        // Verify tools were passed in both stream_chat calls
5114        let received = tools_received.lock();
5115        assert!(
5116            received.len() >= 2,
5117            "Expected at least 2 stream_chat calls, got {}",
5118            received.len()
5119        );
5120        assert!(
5121            received[0],
5122            "First stream_chat call should have received tool specs"
5123        );
5124        assert!(
5125            received[1],
5126            "Second stream_chat call should have received tool specs"
5127        );
5128
5129        // Collect events and verify tool call + tool result were emitted
5130        let mut events = Vec::new();
5131        while let Ok(ev) = event_rx.try_recv() {
5132            events.push(ev);
5133        }
5134        let has_tool_call = events
5135            .iter()
5136            .any(|e| matches!(e, TurnEvent::ToolCall { name, .. } if name == "echo"));
5137        let has_tool_result = events
5138            .iter()
5139            .any(|e| matches!(e, TurnEvent::ToolResult { name, .. } if name == "echo"));
5140        assert!(
5141            has_tool_call,
5142            "Should have emitted a ToolCall event for 'echo'"
5143        );
5144        assert!(
5145            has_tool_result,
5146            "Should have emitted a ToolResult event for 'echo'"
5147        );
5148
5149        // Verify ID correlation
5150        let call_id = events
5151            .iter()
5152            .find_map(|e| {
5153                if let TurnEvent::ToolCall { id, .. } = e {
5154                    Some(id.clone())
5155                } else {
5156                    None
5157                }
5158            })
5159            .expect("ToolCall should have an ID");
5160
5161        let result_id = events
5162            .iter()
5163            .find_map(|e| {
5164                if let TurnEvent::ToolResult { id, .. } = e {
5165                    Some(id.clone())
5166                } else {
5167                    None
5168                }
5169            })
5170            .expect("ToolResult should have an ID");
5171
5172        assert_eq!(
5173            call_id, result_id,
5174            "ToolCall and ToolResult should share the same ID for correlation"
5175        );
5176
5177        // Verify it's a valid UUID
5178        assert!(
5179            uuid::Uuid::parse_str(&call_id).is_ok(),
5180            "Generated ID should be a valid UUID: got '{}'",
5181            call_id
5182        );
5183    }
5184
5185    /// Provider that emits TWO native tool calls in a single assistant turn,
5186    /// then finishes. Used to verify serial dispatch ordering.
5187    struct TwoToolCallStreamModelProvider {
5188        call_count: Arc<Mutex<usize>>,
5189    }
5190
5191    #[async_trait]
5192    impl ModelProvider for TwoToolCallStreamModelProvider {
5193        async fn chat_with_system(
5194            &self,
5195            _system_prompt: Option<&str>,
5196            _message: &str,
5197            _model: &str,
5198            _temperature: Option<f64>,
5199        ) -> Result<String> {
5200            Ok("ok".into())
5201        }
5202
5203        async fn chat(
5204            &self,
5205            _request: ChatRequest<'_>,
5206            _model: &str,
5207            _temperature: Option<f64>,
5208        ) -> Result<zeroclaw_providers::ChatResponse> {
5209            Ok(zeroclaw_providers::ChatResponse {
5210                text: Some("done".into()),
5211                tool_calls: vec![],
5212                usage: None,
5213                reasoning_content: None,
5214            })
5215        }
5216
5217        fn supports_native_tools(&self) -> bool {
5218            true
5219        }
5220
5221        fn stream_chat(
5222            &self,
5223            _request: ChatRequest<'_>,
5224            _model: &str,
5225            _temperature: Option<f64>,
5226            _options: zeroclaw_providers::traits::StreamOptions,
5227        ) -> futures_util::stream::BoxStream<
5228            'static,
5229            zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
5230        > {
5231            use futures_util::stream::{self, StreamExt};
5232            let mut count = self.call_count.lock();
5233            *count += 1;
5234            if *count == 1 {
5235                stream::iter(vec![
5236                    Ok(zeroclaw_providers::traits::StreamEvent::ToolCall(
5237                        zeroclaw_providers::ToolCall {
5238                            id: "00000000-0000-0000-0000-000000000001".into(),
5239                            name: "echo".into(),
5240                            arguments: "{}".into(),
5241                            extra_content: None,
5242                        },
5243                    )),
5244                    Ok(zeroclaw_providers::traits::StreamEvent::ToolCall(
5245                        zeroclaw_providers::ToolCall {
5246                            id: "00000000-0000-0000-0000-000000000002".into(),
5247                            name: "echo".into(),
5248                            arguments: "{}".into(),
5249                            extra_content: None,
5250                        },
5251                    )),
5252                    Ok(zeroclaw_providers::traits::StreamEvent::Final),
5253                ])
5254                .boxed()
5255            } else {
5256                stream::iter(vec![
5257                    Ok(zeroclaw_providers::traits::StreamEvent::TextDelta(
5258                        zeroclaw_providers::traits::StreamChunk {
5259                            delta: "stream-done".into(),
5260                            is_final: false,
5261                            reasoning: None,
5262                            token_count: 0,
5263                        },
5264                    )),
5265                    Ok(zeroclaw_providers::traits::StreamEvent::Final),
5266                ])
5267                .boxed()
5268            }
5269        }
5270    }
5271    impl ::zeroclaw_api::attribution::Attributable for TwoToolCallStreamModelProvider {
5272        fn role(&self) -> ::zeroclaw_api::attribution::Role {
5273            ::zeroclaw_api::attribution::Role::Provider(
5274                ::zeroclaw_api::attribution::ProviderKind::Model(
5275                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
5276                ),
5277            )
5278        }
5279        fn alias(&self) -> &str {
5280            "TwoToolCallStreamModelProvider"
5281        }
5282    }
5283
5284    /// With parallel_tools disabled (the default) and no approval manager, a
5285    /// turn carrying multiple tool calls must dispatch them strictly serially:
5286    /// each ToolCall start event is immediately followed by its own ToolResult
5287    /// before the next call's start event. The pre-fix code emitted all start
5288    /// events up front, then all results — which floods the front-end IPC on a
5289    /// large multi-call turn. Order is the contract this test pins.
5290    #[tokio::test]
5291    async fn turn_streamed_dispatches_multiple_tools_serially_when_parallel_disabled() {
5292        let model_provider = Box::new(TwoToolCallStreamModelProvider {
5293            call_count: Arc::new(Mutex::new(0)),
5294        });
5295
5296        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5297            backend: "none".into(),
5298            ..zeroclaw_config::schema::MemoryConfig::default()
5299        };
5300        let mem: Arc<dyn Memory> = Arc::from(
5301            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5302                .expect("memory creation should succeed with valid config"),
5303        );
5304
5305        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5306        let mut agent = Agent::builder()
5307            .model_provider(model_provider)
5308            .tools(vec![Box::new(MockTool)])
5309            .memory(mem)
5310            .observer(observer)
5311            .tool_dispatcher(Box::new(NativeToolDispatcher))
5312            .workspace_dir(std::path::PathBuf::from("/tmp"))
5313            .build()
5314            .expect("agent builder should succeed with valid config");
5315
5316        // Default resolved config has parallel_tools = false; this is the
5317        // serial path under test.
5318        assert!(
5319            !agent.config.resolved.parallel_tools,
5320            "test precondition: parallel_tools must be disabled"
5321        );
5322
5323        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
5324        let (response, _) = agent
5325            .turn_streamed("use echo twice", event_tx, None)
5326            .await
5327            .unwrap();
5328        assert_eq!(response, "stream-done");
5329
5330        // Reduce events to the call/result sequence, tagged by id.
5331        let mut seq: Vec<(&'static str, String)> = Vec::new();
5332        while let Ok(ev) = event_rx.try_recv() {
5333            match ev {
5334                TurnEvent::ToolCall { id, .. } => seq.push(("call", id)),
5335                TurnEvent::ToolResult { id, .. } => seq.push(("result", id)),
5336                _ => {}
5337            }
5338        }
5339
5340        let id1 = "00000000-0000-0000-0000-000000000001";
5341        let id2 = "00000000-0000-0000-0000-000000000002";
5342        assert_eq!(
5343            seq,
5344            vec![
5345                ("call", id1.to_string()),
5346                ("result", id1.to_string()),
5347                ("call", id2.to_string()),
5348                ("result", id2.to_string()),
5349            ],
5350            "serial dispatch must interleave call->result per tool, not batch all \
5351             starts then all results; got {seq:?}"
5352        );
5353    }
5354
5355    struct PreExecutedToolModelProvider;
5356
5357    #[async_trait]
5358    impl ModelProvider for PreExecutedToolModelProvider {
5359        async fn chat_with_system(
5360            &self,
5361            _system_prompt: Option<&str>,
5362            _message: &str,
5363            _model: &str,
5364            _temperature: Option<f64>,
5365        ) -> Result<String> {
5366            Ok(String::new())
5367        }
5368
5369        async fn chat(
5370            &self,
5371            _request: ChatRequest<'_>,
5372            _model: &str,
5373            _temperature: Option<f64>,
5374        ) -> Result<zeroclaw_providers::ChatResponse> {
5375            Ok(zeroclaw_providers::ChatResponse {
5376                text: Some(String::new()),
5377                tool_calls: vec![],
5378                usage: None,
5379                reasoning_content: None,
5380            })
5381        }
5382
5383        fn stream_chat(
5384            &self,
5385            _request: ChatRequest<'_>,
5386            _model: &str,
5387            _temperature: Option<f64>,
5388            _options: zeroclaw_providers::traits::StreamOptions,
5389        ) -> futures_util::stream::BoxStream<
5390            'static,
5391            zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
5392        > {
5393            use futures_util::stream::{self, StreamExt};
5394
5395            stream::iter(vec![
5396                Ok(
5397                    zeroclaw_providers::traits::StreamEvent::PreExecutedToolCall {
5398                        name: "file_read".into(),
5399                        args: "{\"path\":\"a.txt\"}".into(),
5400                    },
5401                ),
5402                Ok(
5403                    zeroclaw_providers::traits::StreamEvent::PreExecutedToolCall {
5404                        name: "shell".into(),
5405                        args: "{\"command\":\"pwd\"}".into(),
5406                    },
5407                ),
5408                Ok(
5409                    zeroclaw_providers::traits::StreamEvent::PreExecutedToolResult {
5410                        name: "file_read".into(),
5411                        output: "a".into(),
5412                    },
5413                ),
5414                Ok(
5415                    zeroclaw_providers::traits::StreamEvent::PreExecutedToolResult {
5416                        name: "shell".into(),
5417                        output: "b".into(),
5418                    },
5419                ),
5420                Ok(zeroclaw_providers::traits::StreamEvent::Final),
5421            ])
5422            .boxed()
5423        }
5424    }
5425    impl ::zeroclaw_api::attribution::Attributable for PreExecutedToolModelProvider {
5426        fn role(&self) -> ::zeroclaw_api::attribution::Role {
5427            ::zeroclaw_api::attribution::Role::Provider(
5428                ::zeroclaw_api::attribution::ProviderKind::Model(
5429                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
5430                ),
5431            )
5432        }
5433        fn alias(&self) -> &str {
5434            "PreExecutedToolModelProvider"
5435        }
5436    }
5437
5438    #[tokio::test]
5439    async fn pre_executed_tool_results_keep_ids_when_calls_overlap() {
5440        let model_provider = Box::new(PreExecutedToolModelProvider);
5441
5442        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5443            backend: "none".into(),
5444            ..zeroclaw_config::schema::MemoryConfig::default()
5445        };
5446        let mem: Arc<dyn Memory> = Arc::from(
5447            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5448                .expect("memory creation should succeed with valid config"),
5449        );
5450
5451        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5452        let mut agent = Agent::builder()
5453            .model_provider(model_provider)
5454            .tools(vec![Box::new(MockTool)])
5455            .memory(mem)
5456            .observer(observer)
5457            .tool_dispatcher(Box::new(NativeToolDispatcher))
5458            .workspace_dir(std::path::PathBuf::from("/tmp"))
5459            .build()
5460            .expect("agent builder should succeed with valid config");
5461
5462        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
5463        let _ = agent
5464            .turn_streamed("use pre-executed tools", event_tx, None)
5465            .await
5466            .unwrap();
5467
5468        let mut call_ids = HashMap::new();
5469        let mut result_ids = HashMap::new();
5470        while let Ok(event) = event_rx.try_recv() {
5471            match event {
5472                TurnEvent::ToolCall { id, name, .. } => {
5473                    call_ids.insert(name, id);
5474                }
5475                TurnEvent::ToolResult { id, name, .. } => {
5476                    result_ids.insert(name, id);
5477                }
5478                _ => {}
5479            }
5480        }
5481
5482        assert_eq!(call_ids.len(), 2, "expected two pre-executed tool calls");
5483        assert_eq!(
5484            result_ids.len(),
5485            2,
5486            "expected two pre-executed tool results"
5487        );
5488        assert_eq!(call_ids.get("file_read"), result_ids.get("file_read"));
5489        assert_eq!(call_ids.get("shell"), result_ids.get("shell"));
5490    }
5491
5492    #[tokio::test]
5493    async fn turn_normalizes_user_image_markers_before_provider_call() {
5494        let seen_user_messages = Arc::new(Mutex::new(Vec::new()));
5495        let provider = Box::new(MultimodalCaptureProvider {
5496            seen_user_messages: seen_user_messages.clone(),
5497            streamed: false,
5498        });
5499
5500        let temp = tempfile::tempdir().expect("tempdir");
5501        let image_path = temp.path().join("agent-turn.png");
5502        std::fs::write(
5503            &image_path,
5504            [0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n'],
5505        )
5506        .expect("write fixture");
5507
5508        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5509            backend: "none".into(),
5510            ..zeroclaw_config::schema::MemoryConfig::default()
5511        };
5512        let mem: Arc<dyn Memory> = Arc::from(
5513            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5514                .expect("memory creation should succeed with valid config"),
5515        );
5516
5517        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5518        let mut agent = Agent::builder()
5519            .model_provider(provider)
5520            .tools(vec![Box::new(MockTool)])
5521            .memory(mem)
5522            .observer(observer)
5523            .tool_dispatcher(Box::new(NativeToolDispatcher))
5524            .workspace_dir(std::path::PathBuf::from("/tmp"))
5525            .multimodal_config(zeroclaw_config::schema::MultimodalConfig::default())
5526            .build()
5527            .expect("agent builder should succeed with valid config");
5528
5529        agent
5530            .turn(&format!(
5531                "inspect [IMAGE:{}]",
5532                image_path.display().to_string()
5533            ))
5534            .await
5535            .expect("turn should succeed");
5536
5537        let seen = seen_user_messages.lock();
5538        let last = seen.last().expect("provider should receive a user message");
5539        assert!(
5540            last.contains("data:image/png;base64,"),
5541            "expected normalized data URI in provider request, got: {last}"
5542        );
5543    }
5544
5545    #[tokio::test]
5546    async fn turn_streamed_normalizes_user_image_markers_before_provider_call() {
5547        let seen_user_messages = Arc::new(Mutex::new(Vec::new()));
5548        let provider = Box::new(MultimodalCaptureProvider {
5549            seen_user_messages: seen_user_messages.clone(),
5550            streamed: true,
5551        });
5552
5553        let temp = tempfile::tempdir().expect("tempdir");
5554        let image_path = temp.path().join("agent-stream.png");
5555        std::fs::write(
5556            &image_path,
5557            [0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n'],
5558        )
5559        .expect("write fixture");
5560
5561        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5562            backend: "none".into(),
5563            ..zeroclaw_config::schema::MemoryConfig::default()
5564        };
5565        let mem: Arc<dyn Memory> = Arc::from(
5566            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5567                .expect("memory creation should succeed with valid config"),
5568        );
5569
5570        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5571        let mut agent = Agent::builder()
5572            .model_provider(provider)
5573            .tools(vec![Box::new(MockTool)])
5574            .memory(mem)
5575            .observer(observer)
5576            .tool_dispatcher(Box::new(NativeToolDispatcher))
5577            .workspace_dir(std::path::PathBuf::from("/tmp"))
5578            .multimodal_config(zeroclaw_config::schema::MultimodalConfig::default())
5579            .build()
5580            .expect("agent builder should succeed with valid config");
5581
5582        let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(8);
5583        agent
5584            .turn_streamed(
5585                &format!("inspect [IMAGE:{}]", image_path.display().to_string()),
5586                event_tx,
5587                None,
5588            )
5589            .await
5590            .expect("turn_streamed should succeed");
5591
5592        let seen = seen_user_messages.lock();
5593        let last = seen.last().expect("provider should receive a user message");
5594        assert!(
5595            last.contains("data:image/png;base64,"),
5596            "expected normalized data URI in provider request, got: {last}"
5597        );
5598    }
5599
5600    /// Reproduction test for the orphan-tool_results trim bug.
5601    ///
5602    /// `trim_history` previously dropped the oldest N entries blindly. When
5603    /// the boundary fell in the middle of an `AssistantToolCalls` /
5604    /// `ToolResults` pair, the call side was dropped while the result side
5605    /// remained — leaving an orphan `ToolResults` at the head of the
5606    /// history. The next model_provider request then started with a `tool_result`
5607    /// block that had no matching `tool_use`, which Anthropic rejects with:
5608    ///
5609    ///   `messages.0.content.0: unexpected tool_use_id found in tool_result blocks`
5610    ///
5611    /// To reliably reproduce the bug we need the drop boundary to fall in
5612    /// the middle of a pair. Five entries (`AC1, TR1, AC2, TR2, AC3`) with
5613    /// `max = 4` makes `drop_count = 1`, which removes `AC1` and leaves
5614    /// `TR1` as an orphan at the head.
5615    #[test]
5616    fn trim_history_does_not_leave_orphan_tool_results() {
5617        use zeroclaw_providers::{ToolCall, ToolResultMessage};
5618
5619        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5620            backend: "none".into(),
5621            ..zeroclaw_config::schema::MemoryConfig::default()
5622        };
5623        let mem: Arc<dyn Memory> = Arc::from(
5624            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5625                .expect("memory creation should succeed with valid config"),
5626        );
5627
5628        // Force trimming with the boundary landing inside a pair:
5629        // 5 entries (AC, TR, AC, TR, AC) > 4 → drop_count = 1 → AC1 dropped,
5630        // TR1 left as an orphan unless the trim guards against it.
5631        let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
5632            resolved: zeroclaw_config::schema::ResolvedRuntime {
5633                max_history_messages: 4,
5634                ..Default::default()
5635            },
5636            ..zeroclaw_config::schema::AliasedAgentConfig::default()
5637        };
5638
5639        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5640        let mut agent = Agent::builder()
5641            .model_provider(Box::new(MockModelProvider {
5642                responses: Mutex::new(vec![]),
5643            }))
5644            .tools(vec![Box::new(MockTool)])
5645            .memory(mem)
5646            .observer(observer)
5647            .tool_dispatcher(Box::new(NativeToolDispatcher))
5648            .workspace_dir(std::path::PathBuf::from("/tmp"))
5649            .config(agent_config)
5650            .build()
5651            .expect("agent builder should succeed with valid config");
5652
5653        // Build the history: AC1, TR1, AC2, TR2, AC3 (no trailing TR3).
5654        for i in 1..=3 {
5655            agent.history.push(ConversationMessage::AssistantToolCalls {
5656                text: Some(format!("Calling tool {i}")),
5657                tool_calls: vec![ToolCall {
5658                    id: format!("tc{i}"),
5659                    name: format!("tool{i}"),
5660                    arguments: "{}".into(),
5661                    extra_content: None,
5662                }],
5663                reasoning_content: None,
5664            });
5665            // Skip the trailing ToolResults for the last AssistantToolCalls
5666            // so the entry count is 5, not 6, and the drop boundary lands
5667            // mid-pair.
5668            if i < 3 {
5669                agent
5670                    .history
5671                    .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5672                        tool_call_id: format!("tc{i}"),
5673                        content: format!("result{i}"),
5674                    }]));
5675            }
5676        }
5677
5678        assert_eq!(agent.history.len(), 5);
5679        agent.trim_history();
5680
5681        // After trimming, the surviving history must not start with a
5682        // ToolResults entry (that would be an orphan whose AssistantToolCalls
5683        // partner was dropped).
5684        if let Some(first) = agent.history.first() {
5685            assert!(
5686                !matches!(first, ConversationMessage::ToolResults(_)),
5687                "trim_history left an orphan ToolResults at the head of the \
5688                 history; this would cause Anthropic to reject the next \
5689                 request with 'unexpected tool_use_id found in tool_result \
5690                 blocks'"
5691            );
5692        }
5693
5694        // Every ToolResults entry must be immediately preceded by an
5695        // AssistantToolCalls entry.
5696        for window in agent.history.windows(2) {
5697            if matches!(&window[1], ConversationMessage::ToolResults(_)) {
5698                assert!(
5699                    matches!(&window[0], ConversationMessage::AssistantToolCalls { .. }),
5700                    "ToolResults entry is not preceded by an AssistantToolCalls \
5701                     entry — pair was split during trim"
5702                );
5703            }
5704        }
5705    }
5706
5707    #[test]
5708    fn trim_history_does_not_leave_orphan_assistant_tool_calls() {
5709        use zeroclaw_providers::{ToolCall, ToolResultMessage};
5710
5711        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5712            backend: "none".into(),
5713            ..zeroclaw_config::schema::MemoryConfig::default()
5714        };
5715        let mem: Arc<dyn Memory> = Arc::from(
5716            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5717                .expect("memory creation should succeed with valid config"),
5718        );
5719
5720        // Set up so the trim boundary lands between a TR and its following
5721        // AC, leaving the AC at the head without a preceding user message
5722        // context. More importantly, test the case where after the orphan-TR
5723        // guard fires, an AC ends up at position 0.
5724        //
5725        // History: [user1, AC1, TR1, AC2, TR2, user2, AC3, TR3]
5726        // len=8, max=4, drop_count=4 → drops user1, AC1, TR1, AC2
5727        // Position 4 = TR2 → orphan-TR guard bumps to 5
5728        // Position 5 = user2 → stops (user2 is fine)
5729        //
5730        // But we want to test the AC-at-head case. So:
5731        // History: [user1, AC1, TR1, AC2, TR2, AC3, TR3]
5732        // len=7, max=3, drop_count=4 → drops user1, AC1, TR1, AC2
5733        // Position 4 = TR2 → orphan-TR guard bumps to 5
5734        // Position 5 = AC3 → NEW guard should bump to 6 (drop AC3)
5735        // Position 6 = TR3 → NEW guard should bump to 7 (drop TR3)
5736        let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
5737            resolved: zeroclaw_config::schema::ResolvedRuntime {
5738                max_history_messages: 3,
5739                ..Default::default()
5740            },
5741            ..zeroclaw_config::schema::AliasedAgentConfig::default()
5742        };
5743
5744        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5745        let mut agent = Agent::builder()
5746            .model_provider(Box::new(MockModelProvider {
5747                responses: Mutex::new(vec![]),
5748            }))
5749            .tools(vec![Box::new(MockTool)])
5750            .memory(mem)
5751            .observer(observer)
5752            .tool_dispatcher(Box::new(NativeToolDispatcher))
5753            .workspace_dir(std::path::PathBuf::from("/tmp"))
5754            .config(agent_config)
5755            .build()
5756            .expect("agent builder should succeed with valid config");
5757
5758        // user1
5759        agent.history.push(ConversationMessage::Chat(ChatMessage {
5760            role: "user".into(),
5761            content: "hello".into(),
5762        }));
5763        // AC1, TR1
5764        agent.history.push(ConversationMessage::AssistantToolCalls {
5765            text: Some("Calling tool 1".into()),
5766            tool_calls: vec![ToolCall {
5767                id: "tc1".into(),
5768                name: "tool1".into(),
5769                arguments: "{}".into(),
5770                extra_content: None,
5771            }],
5772            reasoning_content: None,
5773        });
5774        agent
5775            .history
5776            .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5777                tool_call_id: "tc1".into(),
5778                content: "result1".into(),
5779            }]));
5780        // AC2, TR2
5781        agent.history.push(ConversationMessage::AssistantToolCalls {
5782            text: Some("Calling tool 2".into()),
5783            tool_calls: vec![ToolCall {
5784                id: "tc2".into(),
5785                name: "tool2".into(),
5786                arguments: "{}".into(),
5787                extra_content: None,
5788            }],
5789            reasoning_content: None,
5790        });
5791        agent
5792            .history
5793            .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5794                tool_call_id: "tc2".into(),
5795                content: "result2".into(),
5796            }]));
5797        // AC3, TR3
5798        agent.history.push(ConversationMessage::AssistantToolCalls {
5799            text: Some("Calling tool 3".into()),
5800            tool_calls: vec![ToolCall {
5801                id: "tc3".into(),
5802                name: "tool3".into(),
5803                arguments: "{}".into(),
5804                extra_content: None,
5805            }],
5806            reasoning_content: None,
5807        });
5808        agent
5809            .history
5810            .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5811                tool_call_id: "tc3".into(),
5812                content: "result3".into(),
5813            }]));
5814
5815        assert_eq!(agent.history.len(), 7);
5816        agent.trim_history();
5817
5818        // The head must not be an AssistantToolCalls (orphaned from context)
5819        if let Some(first) = agent.history.first() {
5820            assert!(
5821                !matches!(first, ConversationMessage::AssistantToolCalls { .. }),
5822                "trim_history left an orphan AssistantToolCalls at the head of \
5823                 the history; the model would see tool calls with no results"
5824            );
5825        }
5826
5827        // Every ToolResults entry must be immediately preceded by an
5828        // AssistantToolCalls entry (no split pairs).
5829        for window in agent.history.windows(2) {
5830            if matches!(&window[1], ConversationMessage::ToolResults(_)) {
5831                assert!(
5832                    matches!(&window[0], ConversationMessage::AssistantToolCalls { .. }),
5833                    "ToolResults entry is not preceded by an AssistantToolCalls \
5834                     entry — pair was split during trim"
5835                );
5836            }
5837        }
5838
5839        // Every AssistantToolCalls must be immediately followed by ToolResults
5840        // (no orphan ACs).
5841        for window in agent.history.windows(2) {
5842            if matches!(&window[0], ConversationMessage::AssistantToolCalls { .. }) {
5843                assert!(
5844                    matches!(&window[1], ConversationMessage::ToolResults(_)),
5845                    "AssistantToolCalls entry is not followed by a ToolResults \
5846                     entry — orphan tool call would confuse the model"
5847                );
5848            }
5849        }
5850    }
5851
5852    /// Regression test for the orphan-cascade-empties-everything 400.
5853    ///
5854    /// When `max_history_messages` is small relative to a long single-turn
5855    /// tool loop, the orphan-removal cascades in `trim_history` can advance
5856    /// `drop_count` all the way to `other_messages.len()`. Before the guard,
5857    /// `other_messages.drain(0..drop_count)` then emptied the entire
5858    /// non-system history, `convert_messages` lifted the system entry into
5859    /// `system_prompt`, and the provider received `messages: []` — a hard
5860    /// 400 `"messages: at least one message is required"` from Anthropic.
5861    ///
5862    /// Minimal repro of the cascade:
5863    ///   history = [user, AC1, TR1, AC2, TR2]   (no system message)
5864    ///   max_history_messages = 4
5865    ///   other_messages.len() = 5, initial_drop_count = 1 → drops `user`
5866    ///   orphan-TR cascade: pos 1 = AC1, not TR → no-op
5867    ///   orphan-AC cascade: pos 1 = AC1, pos 2 = TR1, pos 3 = AC2,
5868    ///                      pos 4 = TR2 → drop_count = 5
5869    ///   drop_count (5) >= other_messages.len() (5) → guard MUST fire.
5870    ///
5871    /// Without the guard this test fails: `agent.history` ends up empty,
5872    /// which is exactly the shape that crashes the provider call.
5873    /// With the guard, the original history is preserved unchanged so the
5874    /// session stays over-limit-but-functional.
5875    #[test]
5876    fn trim_history_does_not_empty_all_messages_on_full_cascade() {
5877        use zeroclaw_providers::{ToolCall, ToolResultMessage};
5878
5879        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
5880            backend: "none".into(),
5881            ..zeroclaw_config::schema::MemoryConfig::default()
5882        };
5883        let mem: Arc<dyn Memory> = Arc::from(
5884            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
5885                .expect("memory creation should succeed with valid config"),
5886        );
5887
5888        let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
5889            resolved: zeroclaw_config::schema::ResolvedRuntime {
5890                max_history_messages: 4,
5891                ..Default::default()
5892            },
5893            ..zeroclaw_config::schema::AliasedAgentConfig::default()
5894        };
5895
5896        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
5897        let mut agent = Agent::builder()
5898            .model_provider(Box::new(MockModelProvider {
5899                responses: Mutex::new(vec![]),
5900            }))
5901            .tools(vec![Box::new(MockTool)])
5902            .memory(mem)
5903            .observer(observer)
5904            .tool_dispatcher(Box::new(NativeToolDispatcher))
5905            .workspace_dir(std::path::PathBuf::from("/tmp"))
5906            .config(agent_config)
5907            .build()
5908            .expect("agent builder should succeed with valid config");
5909
5910        // user
5911        agent.history.push(ConversationMessage::Chat(ChatMessage {
5912            role: "user".into(),
5913            content: "kick off a long tool loop".into(),
5914        }));
5915        // AC1, TR1
5916        agent.history.push(ConversationMessage::AssistantToolCalls {
5917            text: Some("Calling tool 1".into()),
5918            tool_calls: vec![ToolCall {
5919                id: "tc1".into(),
5920                name: "tool1".into(),
5921                arguments: "{}".into(),
5922                extra_content: None,
5923            }],
5924            reasoning_content: None,
5925        });
5926        agent
5927            .history
5928            .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5929                tool_call_id: "tc1".into(),
5930                content: "result1".into(),
5931            }]));
5932        // AC2, TR2
5933        agent.history.push(ConversationMessage::AssistantToolCalls {
5934            text: Some("Calling tool 2".into()),
5935            tool_calls: vec![ToolCall {
5936                id: "tc2".into(),
5937                name: "tool2".into(),
5938                arguments: "{}".into(),
5939                extra_content: None,
5940            }],
5941            reasoning_content: None,
5942        });
5943        agent
5944            .history
5945            .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
5946                tool_call_id: "tc2".into(),
5947                content: "result2".into(),
5948            }]));
5949
5950        assert_eq!(agent.history.len(), 5);
5951        let before = agent.history.clone();
5952
5953        agent.trim_history();
5954
5955        // Load-bearing assertion: trim_history must NOT produce an empty
5956        // provider-visible conversation. Without the guard this is an empty
5957        // Vec and the next provider call returns 400.
5958        assert!(
5959            !agent.history.is_empty(),
5960            "trim_history drained every non-system message; the next \
5961             provider call would fail with 'messages: at least one message \
5962             is required'"
5963        );
5964
5965        // When the full cascade triggers the guard the contract is "skip the
5966        // trim, leave history exactly as it was, accept being temporarily
5967        // over the limit." Lock that exact behavior — anything weaker (e.g.
5968        // partial trim) would silently regress the orphan-pair invariants
5969        // the other trim tests cover.
5970        assert_eq!(
5971            agent.history.len(),
5972            before.len(),
5973            "trim_history dropped messages despite the orphan cascade \
5974             reaching other_messages.len(); the guard's contract is to \
5975             preserve the conversation untouched in this case"
5976        );
5977
5978        // Session is temporarily over the configured limit by design. Codify
5979        // that so a future "tighten trim_history" refactor cannot silently
5980        // turn the guard back into the empty-messages crash.
5981        assert!(
5982            agent.history.len() > agent.config.resolved.max_history_messages,
5983            "expected history to remain over max_history_messages after the \
5984             guard fires (that is the documented trade-off); got len={} max={}",
5985            agent.history.len(),
5986            agent.config.resolved.max_history_messages,
5987        );
5988    }
5989
5990    /// Same cascade-to-empty path, but with a system message present.
5991    ///
5992    /// Verifies the guard's restore path puts the conversation back together
5993    /// in the right order (`system_messages` first, then the original
5994    /// non-system entries) rather than dropping the system message on the
5995    /// floor or returning the slices reversed. Without the guard the system
5996    /// message survives (it is held aside in `system_messages`) but the
5997    /// non-system half is still drained — so `agent.history` would end up as
5998    /// just `[system]` and `convert_messages` would lift it into
5999    /// `system_prompt`, again producing `messages: []`.
6000    #[test]
6001    fn trim_history_full_cascade_with_system_message_preserves_full_history() {
6002        use zeroclaw_providers::{ToolCall, ToolResultMessage};
6003
6004        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6005            backend: "none".into(),
6006            ..zeroclaw_config::schema::MemoryConfig::default()
6007        };
6008        let mem: Arc<dyn Memory> = Arc::from(
6009            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6010                .expect("memory creation should succeed with valid config"),
6011        );
6012
6013        // Same arithmetic as the previous test: 5 non-system entries with
6014        // max=4 → initial_drop_count=1, orphan-AC cascade reaches the end.
6015        let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
6016            resolved: zeroclaw_config::schema::ResolvedRuntime {
6017                max_history_messages: 4,
6018                ..Default::default()
6019            },
6020            ..zeroclaw_config::schema::AliasedAgentConfig::default()
6021        };
6022
6023        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6024        let mut agent = Agent::builder()
6025            .model_provider(Box::new(MockModelProvider {
6026                responses: Mutex::new(vec![]),
6027            }))
6028            .tools(vec![Box::new(MockTool)])
6029            .memory(mem)
6030            .observer(observer)
6031            .tool_dispatcher(Box::new(NativeToolDispatcher))
6032            .workspace_dir(std::path::PathBuf::from("/tmp"))
6033            .config(agent_config)
6034            .build()
6035            .expect("agent builder should succeed with valid config");
6036
6037        // system (gets partitioned into system_messages by trim_history)
6038        agent.history.push(ConversationMessage::Chat(ChatMessage {
6039            role: "system".into(),
6040            content: "you are a helpful agent".into(),
6041        }));
6042        // user
6043        agent.history.push(ConversationMessage::Chat(ChatMessage {
6044            role: "user".into(),
6045            content: "kick off a long tool loop".into(),
6046        }));
6047        // AC1, TR1
6048        agent.history.push(ConversationMessage::AssistantToolCalls {
6049            text: Some("Calling tool 1".into()),
6050            tool_calls: vec![ToolCall {
6051                id: "tc1".into(),
6052                name: "tool1".into(),
6053                arguments: "{}".into(),
6054                extra_content: None,
6055            }],
6056            reasoning_content: None,
6057        });
6058        agent
6059            .history
6060            .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
6061                tool_call_id: "tc1".into(),
6062                content: "result1".into(),
6063            }]));
6064        // AC2, TR2
6065        agent.history.push(ConversationMessage::AssistantToolCalls {
6066            text: Some("Calling tool 2".into()),
6067            tool_calls: vec![ToolCall {
6068                id: "tc2".into(),
6069                name: "tool2".into(),
6070                arguments: "{}".into(),
6071                extra_content: None,
6072            }],
6073            reasoning_content: None,
6074        });
6075        agent
6076            .history
6077            .push(ConversationMessage::ToolResults(vec![ToolResultMessage {
6078                tool_call_id: "tc2".into(),
6079                content: "result2".into(),
6080            }]));
6081
6082        assert_eq!(agent.history.len(), 6);
6083        let before_len = agent.history.len();
6084
6085        agent.trim_history();
6086
6087        // System message must still be present and at the head — that is
6088        // where trim_history's partition+restore lands it.
6089        match agent.history.first() {
6090            Some(ConversationMessage::Chat(chat)) => assert_eq!(
6091                chat.role, "system",
6092                "expected system message at head after restore; got role={:?}",
6093                chat.role
6094            ),
6095            other => panic!(
6096                "expected Chat(system) at head of restored history, got {:?}",
6097                other
6098            ),
6099        }
6100
6101        // The non-system half must not have been drained. Total length must
6102        // equal the pre-trim length: guard's contract is "leave history
6103        // unchanged" once the system + non-system halves are reassembled.
6104        assert_eq!(
6105            agent.history.len(),
6106            before_len,
6107            "trim_history dropped messages from the non-system half despite \
6108             the orphan cascade reaching other_messages.len(); guard must \
6109             preserve every entry when it fires"
6110        );
6111
6112        // At least one non-system message must remain — without this the
6113        // provider still sees `messages: []` after `convert_messages` lifts
6114        // the system entry into `system_prompt`.
6115        let non_system_remaining = agent
6116            .history
6117            .iter()
6118            .filter(|m| !matches!(m, ConversationMessage::Chat(c) if c.role == "system"))
6119            .count();
6120        assert!(
6121            non_system_remaining > 0,
6122            "trim_history left only the system message; convert_messages \
6123             would produce messages: [] and the provider call would 400"
6124        );
6125    }
6126
6127    #[test]
6128    fn cancel_synthesizes_paired_tool_results_for_orphaned_calls() {
6129        use zeroclaw_providers::ToolCall;
6130
6131        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6132            backend: "none".into(),
6133            ..zeroclaw_config::schema::MemoryConfig::default()
6134        };
6135        let mem: Arc<dyn Memory> = Arc::from(
6136            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6137                .expect("memory creation should succeed with valid config"),
6138        );
6139        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6140        let mut agent = Agent::builder()
6141            .model_provider(Box::new(MockModelProvider {
6142                responses: Mutex::new(vec![]),
6143            }))
6144            .tools(vec![Box::new(MockTool)])
6145            .memory(mem)
6146            .observer(observer)
6147            .tool_dispatcher(Box::new(NativeToolDispatcher))
6148            .workspace_dir(std::path::PathBuf::from("/tmp"))
6149            .config(zeroclaw_config::schema::AliasedAgentConfig::default())
6150            .build()
6151            .expect("agent builder should succeed with valid config");
6152
6153        // Mirror the cancellation path: an AssistantToolCalls is committed,
6154        // then the turn is interrupted before results land. The synthesized
6155        // results must key off the same tool_calls stored in the
6156        // AssistantToolCalls — two calls, to catch any count/order drift.
6157        let tool_calls = vec![
6158            ToolCall {
6159                id: "tc-cancel-1".into(),
6160                name: "shell".into(),
6161                arguments: "{}".into(),
6162                extra_content: None,
6163            },
6164            ToolCall {
6165                id: "tc-cancel-2".into(),
6166                name: "shell".into(),
6167                arguments: "{}".into(),
6168                extra_content: None,
6169            },
6170        ];
6171        agent.history.push(ConversationMessage::AssistantToolCalls {
6172            text: None,
6173            tool_calls: tool_calls.clone(),
6174            reasoning_content: None,
6175        });
6176
6177        let mut new_msgs = Vec::new();
6178        agent.synthesize_cancelled_tool_results(vec![], &tool_calls, &mut new_msgs);
6179
6180        // The synthesized ToolResults must answer every pending call by id, in
6181        // both the canonical history and the new_messages persistence vec.
6182        let last = agent.history.last().expect("history not empty");
6183        match last {
6184            ConversationMessage::ToolResults(results) => {
6185                assert_eq!(results.len(), 2);
6186                assert_eq!(results[0].tool_call_id, "tc-cancel-1");
6187                assert_eq!(results[1].tool_call_id, "tc-cancel-2");
6188            }
6189            other => panic!("expected ToolResults, got {other:?}"),
6190        }
6191        assert!(matches!(
6192            new_msgs.last(),
6193            Some(ConversationMessage::ToolResults(r)) if r.len() == 2
6194        ));
6195
6196        // Invariant: every AssistantToolCalls is immediately followed by
6197        // ToolResults — no orphan that would 400 on replay.
6198        for window in agent.history.windows(2) {
6199            if matches!(&window[0], ConversationMessage::AssistantToolCalls { .. }) {
6200                assert!(
6201                    matches!(&window[1], ConversationMessage::ToolResults(_)),
6202                    "orphaned AssistantToolCalls after cancel synthesis"
6203                );
6204            }
6205        }
6206    }
6207
6208    // ── Duplicate narration guard ────────────────────────────────────
6209
6210    /// When the model returns narration text alongside tool calls, the agent
6211    /// must store exactly ONE assistant history entry (AssistantToolCalls) —
6212    /// not a plain Chat(assistant) followed by AssistantToolCalls. The latter
6213    /// pattern causes model_providers that enforce role-alternation to reject the
6214    /// next request with a consecutive-assistant-role error.
6215    #[tokio::test]
6216    async fn narration_with_tool_calls_produces_no_consecutive_assistant_entries() {
6217        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6218            backend: "none".into(),
6219            ..zeroclaw_config::schema::MemoryConfig::default()
6220        };
6221        let mem: Arc<dyn Memory> = Arc::from(
6222            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6223                .expect("memory creation should succeed with valid config"),
6224        );
6225
6226        let model_provider = Box::new(MockModelProvider {
6227            responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
6228                text: Some("I will echo the message.".into()),
6229                tool_calls: vec![zeroclaw_providers::ToolCall {
6230                    id: "tc1".into(),
6231                    name: "echo".into(),
6232                    arguments: "{}".into(),
6233                    extra_content: None,
6234                }],
6235                usage: None,
6236                reasoning_content: None,
6237            }]),
6238        });
6239
6240        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6241        let mut agent = Agent::builder()
6242            .model_provider(model_provider)
6243            .tools(vec![Box::new(MockTool)])
6244            .memory(mem)
6245            .observer(observer)
6246            .tool_dispatcher(Box::new(NativeToolDispatcher))
6247            .workspace_dir(std::path::PathBuf::from("/tmp"))
6248            .build()
6249            .expect("agent builder should succeed with valid config");
6250
6251        agent.turn("hi").await.unwrap();
6252
6253        let history = agent.history();
6254        for window in history.windows(2) {
6255            let prev_is_assistant_chat = matches!(
6256                &window[0],
6257                ConversationMessage::Chat(m) if m.role == "assistant"
6258            );
6259            let next_is_tool_calls =
6260                matches!(&window[1], ConversationMessage::AssistantToolCalls { .. });
6261            assert!(
6262                !(prev_is_assistant_chat && next_is_tool_calls),
6263                "history contains Chat(assistant) immediately before AssistantToolCalls — \
6264                 duplicate narration push was not removed"
6265            );
6266        }
6267    }
6268
6269    /// Streaming mock that emits narration text + tool call on the first turn,
6270    /// then a plain text response on the second. Used to verify the streaming
6271    /// path has the same duplicate-narration guard as the blocking path.
6272    struct NarrationStreamModelProvider {
6273        call_count: Arc<Mutex<usize>>,
6274    }
6275
6276    #[async_trait]
6277    impl ModelProvider for NarrationStreamModelProvider {
6278        async fn chat_with_system(
6279            &self,
6280            _system_prompt: Option<&str>,
6281            _message: &str,
6282            _model: &str,
6283            _temperature: Option<f64>,
6284        ) -> Result<String> {
6285            Ok("ok".into())
6286        }
6287
6288        async fn chat(
6289            &self,
6290            _request: ChatRequest<'_>,
6291            _model: &str,
6292            _temperature: Option<f64>,
6293        ) -> Result<zeroclaw_providers::ChatResponse> {
6294            Ok(zeroclaw_providers::ChatResponse {
6295                text: Some("done".into()),
6296                tool_calls: vec![],
6297                usage: None,
6298                reasoning_content: None,
6299            })
6300        }
6301
6302        fn supports_native_tools(&self) -> bool {
6303            true
6304        }
6305
6306        fn stream_chat(
6307            &self,
6308            _request: ChatRequest<'_>,
6309            _model: &str,
6310            _temperature: Option<f64>,
6311            _options: zeroclaw_providers::traits::StreamOptions,
6312        ) -> futures_util::stream::BoxStream<
6313            'static,
6314            zeroclaw_providers::traits::StreamResult<zeroclaw_providers::traits::StreamEvent>,
6315        > {
6316            use futures_util::stream::{self, StreamExt};
6317            let mut count = self.call_count.lock();
6318            *count += 1;
6319            if *count == 1 {
6320                stream::iter(vec![
6321                    Ok(zeroclaw_providers::traits::StreamEvent::TextDelta(
6322                        zeroclaw_providers::traits::StreamChunk {
6323                            delta: "I will echo the message.".into(),
6324                            is_final: false,
6325                            reasoning: None,
6326                            token_count: 0,
6327                        },
6328                    )),
6329                    Ok(zeroclaw_providers::traits::StreamEvent::ToolCall(
6330                        zeroclaw_providers::ToolCall {
6331                            id: "tc1".into(),
6332                            name: "echo".into(),
6333                            arguments: "{}".into(),
6334                            extra_content: None,
6335                        },
6336                    )),
6337                    Ok(zeroclaw_providers::traits::StreamEvent::Final),
6338                ])
6339                .boxed()
6340            } else {
6341                stream::iter(vec![
6342                    Ok(zeroclaw_providers::traits::StreamEvent::TextDelta(
6343                        zeroclaw_providers::traits::StreamChunk {
6344                            delta: "done".into(),
6345                            is_final: false,
6346                            reasoning: None,
6347                            token_count: 0,
6348                        },
6349                    )),
6350                    Ok(zeroclaw_providers::traits::StreamEvent::Final),
6351                ])
6352                .boxed()
6353            }
6354        }
6355    }
6356    impl ::zeroclaw_api::attribution::Attributable for NarrationStreamModelProvider {
6357        fn role(&self) -> ::zeroclaw_api::attribution::Role {
6358            ::zeroclaw_api::attribution::Role::Provider(
6359                ::zeroclaw_api::attribution::ProviderKind::Model(
6360                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
6361                ),
6362            )
6363        }
6364        fn alias(&self) -> &str {
6365            "NarrationStreamModelProvider"
6366        }
6367    }
6368
6369    #[tokio::test]
6370    async fn streaming_narration_with_tool_calls_produces_no_consecutive_assistant_entries() {
6371        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6372            backend: "none".into(),
6373            ..zeroclaw_config::schema::MemoryConfig::default()
6374        };
6375        let mem: Arc<dyn Memory> = Arc::from(
6376            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6377                .expect("memory creation should succeed with valid config"),
6378        );
6379
6380        let model_provider = Box::new(NarrationStreamModelProvider {
6381            call_count: Arc::new(Mutex::new(0)),
6382        });
6383
6384        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6385        let mut agent = Agent::builder()
6386            .model_provider(model_provider)
6387            .tools(vec![Box::new(MockTool)])
6388            .memory(mem)
6389            .observer(observer)
6390            .tool_dispatcher(Box::new(NativeToolDispatcher))
6391            .workspace_dir(std::path::PathBuf::from("/tmp"))
6392            .build()
6393            .expect("agent builder should succeed with valid config");
6394
6395        let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6396        agent.turn_streamed("hi", event_tx, None).await.unwrap();
6397
6398        let history = agent.history();
6399        for window in history.windows(2) {
6400            let prev_is_assistant_chat = matches!(
6401                &window[0],
6402                ConversationMessage::Chat(m) if m.role == "assistant"
6403            );
6404            let next_is_tool_calls =
6405                matches!(&window[1], ConversationMessage::AssistantToolCalls { .. });
6406            assert!(
6407                !(prev_is_assistant_chat && next_is_tool_calls),
6408                "streaming path: history contains Chat(assistant) immediately before \
6409                 AssistantToolCalls — duplicate narration push was not removed"
6410            );
6411        }
6412    }
6413
6414    #[tokio::test]
6415    async fn response_cache_key_uses_full_provider_visible_transcript() {
6416        let tmp = tempfile::tempdir().expect("temp response cache dir");
6417        let cache = Arc::new(
6418            zeroclaw_memory::response_cache::ResponseCache::new(tmp.path(), 60, 100)
6419                .expect("response cache should initialize"),
6420        );
6421
6422        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6423            backend: "none".into(),
6424            ..zeroclaw_config::schema::MemoryConfig::default()
6425        };
6426        let mem_a: Arc<dyn Memory> = Arc::from(
6427            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6428                .expect("memory creation should succeed with valid config"),
6429        );
6430        let mem_b: Arc<dyn Memory> = Arc::from(
6431            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6432                .expect("memory creation should succeed with valid config"),
6433        );
6434
6435        let seen_a = Arc::new(Mutex::new(Vec::new()));
6436        let seen_b = Arc::new(Mutex::new(Vec::new()));
6437        let provider_a = Box::new(TranscriptCaptureModelProvider {
6438            responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
6439                text: Some("from prior transcript".into()),
6440                tool_calls: vec![],
6441                usage: None,
6442                reasoning_content: None,
6443            }]),
6444            seen_messages: seen_a.clone(),
6445        });
6446        let provider_b = Box::new(TranscriptCaptureModelProvider {
6447            responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
6448                text: Some("from fresh transcript".into()),
6449                tool_calls: vec![],
6450                usage: None,
6451                reasoning_content: None,
6452            }]),
6453            seen_messages: seen_b.clone(),
6454        });
6455
6456        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6457        let mut agent_a = Agent::builder()
6458            .model_provider(provider_a)
6459            .tools(vec![Box::new(MockTool)])
6460            .memory(mem_a)
6461            .observer(observer.clone())
6462            .response_cache(Some(cache.clone()))
6463            .tool_dispatcher(Box::new(NativeToolDispatcher))
6464            .workspace_dir(std::path::PathBuf::from("/tmp"))
6465            .model_name("test-model".into())
6466            .temperature(Some(0.0))
6467            .build()
6468            .expect("agent builder should succeed with valid config");
6469        agent_a.seed_history(&[
6470            ChatMessage::user("earlier turn"),
6471            ChatMessage::assistant("earlier answer"),
6472        ]);
6473
6474        let mut agent_b = Agent::builder()
6475            .model_provider(provider_b)
6476            .tools(vec![Box::new(MockTool)])
6477            .memory(mem_b)
6478            .observer(observer)
6479            .response_cache(Some(cache))
6480            .tool_dispatcher(Box::new(NativeToolDispatcher))
6481            .workspace_dir(std::path::PathBuf::from("/tmp"))
6482            .model_name("test-model".into())
6483            .temperature(Some(0.0))
6484            .build()
6485            .expect("agent builder should succeed with valid config");
6486
6487        assert_eq!(
6488            agent_a.turn("same final prompt").await.unwrap(),
6489            "from prior transcript"
6490        );
6491        assert_eq!(
6492            agent_b.turn("same final prompt").await.unwrap(),
6493            "from fresh transcript"
6494        );
6495        assert_eq!(seen_a.lock().len(), 1);
6496        assert_eq!(
6497            seen_b.lock().len(),
6498            1,
6499            "fresh transcript must not reuse a cache entry written for a different prior transcript"
6500        );
6501    }
6502
6503    #[tokio::test]
6504    async fn turn_streamed_with_steering_commits_streamed_output_before_continuing() {
6505        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6506            backend: "none".into(),
6507            ..zeroclaw_config::schema::MemoryConfig::default()
6508        };
6509        let mem: Arc<dyn Memory> = Arc::from(
6510            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6511                .expect("memory creation should succeed with valid config"),
6512        );
6513
6514        let seen_messages = Arc::new(Mutex::new(Vec::new()));
6515        let model_provider = Box::new(StreamingSteeringModelProvider {
6516            seen_messages: seen_messages.clone(),
6517            call_count: AtomicUsize::new(0),
6518            fail_on_call: None,
6519            fail_chat_on_call: None,
6520            fail_after_delta_on_call: None,
6521            delay_chat_on_call: None,
6522        });
6523        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6524        let mut agent = Agent::builder()
6525            .model_provider(model_provider)
6526            .tools(vec![Box::new(MockTool)])
6527            .memory(mem)
6528            .observer(observer)
6529            .tool_dispatcher(Box::new(NativeToolDispatcher))
6530            .workspace_dir(std::path::PathBuf::from("/tmp"))
6531            .build()
6532            .expect("agent builder should succeed with valid config");
6533
6534        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6535        let (steering_tx, mut steering_rx) = tokio::sync::mpsc::channel::<String>(4);
6536        let handle = zeroclaw_spawn::spawn!(async move {
6537            agent
6538                .turn_streamed_with_steering_state("first", event_tx, None, Some(&mut steering_rx))
6539                .await
6540        });
6541
6542        loop {
6543            match event_rx.recv().await.expect("turn event should arrive") {
6544                TurnEvent::Chunk { delta } if delta == "draft" => {
6545                    steering_tx
6546                        .send("second".into())
6547                        .await
6548                        .expect("steering message should enqueue");
6549                    break;
6550                }
6551                _ => {}
6552            }
6553        }
6554
6555        let outcome = handle
6556            .await
6557            .expect("turn task should finish")
6558            .expect("steered turn should succeed");
6559        assert_eq!(outcome.response, "draftfinal");
6560
6561        let new_chat_messages: Vec<_> = outcome
6562            .new_messages
6563            .iter()
6564            .filter_map(|msg| match msg {
6565                ConversationMessage::Chat(message) => {
6566                    Some((message.role.as_str(), message.content.as_str()))
6567                }
6568                _ => None,
6569            })
6570            .collect();
6571        assert!(
6572            new_chat_messages
6573                .iter()
6574                .any(|(role, content)| { *role == "assistant" && *content == "draft" }),
6575            "already streamed output must be committed before the steering continuation"
6576        );
6577        assert!(
6578            new_chat_messages
6579                .iter()
6580                .any(|(role, content)| { *role == "user" && content.contains("second") }),
6581            "accepted steering must be retained as its own user turn"
6582        );
6583
6584        let seen = seen_messages.lock();
6585        assert_eq!(seen.len(), 2);
6586        let second_call = &seen[1];
6587        assert!(
6588            second_call
6589                .iter()
6590                .any(|msg| msg.role == "assistant" && msg.content == "draft"),
6591            "second provider call must see the committed streamed assistant text"
6592        );
6593        assert!(
6594            second_call
6595                .iter()
6596                .filter(|msg| msg.role == "user")
6597                .any(|msg| msg.content.contains("second")),
6598            "second provider call must include the accepted steering user message"
6599        );
6600    }
6601
6602    #[tokio::test]
6603    async fn turn_streamed_with_steering_error_returns_committed_partial_output() {
6604        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6605            backend: "none".into(),
6606            ..zeroclaw_config::schema::MemoryConfig::default()
6607        };
6608        let mem: Arc<dyn Memory> = Arc::from(
6609            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6610                .expect("memory creation should succeed with valid config"),
6611        );
6612
6613        let model_provider = Box::new(StreamingSteeringModelProvider {
6614            seen_messages: Arc::new(Mutex::new(Vec::new())),
6615            call_count: AtomicUsize::new(0),
6616            fail_on_call: Some(2),
6617            fail_chat_on_call: Some(3),
6618            fail_after_delta_on_call: None,
6619            delay_chat_on_call: None,
6620        });
6621        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6622        let mut agent = Agent::builder()
6623            .model_provider(model_provider)
6624            .tools(vec![Box::new(MockTool)])
6625            .memory(mem)
6626            .observer(observer)
6627            .tool_dispatcher(Box::new(NativeToolDispatcher))
6628            .workspace_dir(std::path::PathBuf::from("/tmp"))
6629            .build()
6630            .expect("agent builder should succeed with valid config");
6631
6632        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6633        let (steering_tx, mut steering_rx) = tokio::sync::mpsc::channel::<String>(4);
6634        let handle = zeroclaw_spawn::spawn!(async move {
6635            agent
6636                .turn_streamed_with_steering_state("first", event_tx, None, Some(&mut steering_rx))
6637                .await
6638        });
6639
6640        loop {
6641            match event_rx.recv().await.expect("turn event should arrive") {
6642                TurnEvent::Chunk { delta } if delta == "draft" => {
6643                    steering_tx
6644                        .send("second".into())
6645                        .await
6646                        .expect("steering message should enqueue");
6647                    break;
6648                }
6649                _ => {}
6650            }
6651        }
6652
6653        let err = handle
6654            .await
6655            .expect("turn task should finish")
6656            .expect_err("second provider call should fail");
6657        assert_eq!(err.committed_response, "draft");
6658        assert!(
6659            err.new_messages.iter().any(|msg| {
6660                matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content == "draft")
6661            }),
6662            "committed partial assistant output should be returned for persistence after continuation failure"
6663        );
6664        assert!(
6665            err.new_messages.iter().any(|msg| {
6666                matches!(msg, ConversationMessage::Chat(message) if message.role == "user" && message.content.contains("second"))
6667            }),
6668            "accepted steering user message should still be returned after continuation failure"
6669        );
6670    }
6671
6672    #[tokio::test]
6673    async fn turn_streamed_error_before_visible_output_falls_back_to_chat() {
6674        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6675            backend: "none".into(),
6676            ..zeroclaw_config::schema::MemoryConfig::default()
6677        };
6678        let mem: Arc<dyn Memory> = Arc::from(
6679            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6680                .expect("memory creation should succeed with valid config"),
6681        );
6682
6683        let seen_messages = Arc::new(Mutex::new(Vec::new()));
6684        let model_provider = Box::new(StreamingSteeringModelProvider {
6685            seen_messages: seen_messages.clone(),
6686            call_count: AtomicUsize::new(0),
6687            fail_on_call: Some(1),
6688            fail_chat_on_call: None,
6689            fail_after_delta_on_call: None,
6690            delay_chat_on_call: None,
6691        });
6692        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6693        let mut agent = Agent::builder()
6694            .model_provider(model_provider)
6695            .tools(vec![Box::new(MockTool)])
6696            .memory(mem)
6697            .observer(observer)
6698            .tool_dispatcher(Box::new(NativeToolDispatcher))
6699            .workspace_dir(std::path::PathBuf::from("/tmp"))
6700            .build()
6701            .expect("agent builder should succeed with valid config");
6702
6703        let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6704        let handle = zeroclaw_spawn::spawn!(async move {
6705            agent
6706                .turn_streamed_with_steering_state("first", event_tx, None, None)
6707                .await
6708        });
6709
6710        let outcome = handle
6711            .await
6712            .expect("turn task should finish")
6713            .expect("pre-output stream failure should fall back to non-streaming chat");
6714        assert_eq!(outcome.response, "final");
6715        assert!(
6716            outcome.new_messages.iter().any(|msg| {
6717                matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content == "final")
6718            }),
6719            "new messages should carry the fallback assistant answer"
6720        );
6721        assert!(
6722            !outcome.new_messages.iter().any(|msg| {
6723                matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content.contains("[stream interrupted]"))
6724            }),
6725            "successful fallback should not persist interrupted stream text"
6726        );
6727
6728        let seen = seen_messages.lock();
6729        assert_eq!(seen.len(), 2);
6730        assert!(
6731            !seen[1]
6732                .iter()
6733                .any(|msg| { msg.role == "assistant" && msg.content.contains("draft") }),
6734            "fallback chat must not receive the abandoned stream attempt as prior assistant text"
6735        );
6736    }
6737
6738    #[tokio::test]
6739    async fn turn_streamed_error_after_delta_preserves_visible_partial() {
6740        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6741            backend: "none".into(),
6742            ..zeroclaw_config::schema::MemoryConfig::default()
6743        };
6744        let mem: Arc<dyn Memory> = Arc::from(
6745            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6746                .expect("memory creation should succeed with valid config"),
6747        );
6748
6749        let model_provider = Box::new(StreamingSteeringModelProvider {
6750            seen_messages: Arc::new(Mutex::new(Vec::new())),
6751            call_count: AtomicUsize::new(0),
6752            fail_on_call: None,
6753            fail_chat_on_call: None,
6754            fail_after_delta_on_call: Some(1),
6755            delay_chat_on_call: None,
6756        });
6757        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6758        let mut agent = Agent::builder()
6759            .model_provider(model_provider)
6760            .tools(vec![Box::new(MockTool)])
6761            .memory(mem)
6762            .observer(observer)
6763            .tool_dispatcher(Box::new(NativeToolDispatcher))
6764            .workspace_dir(std::path::PathBuf::from("/tmp"))
6765            .build()
6766            .expect("agent builder should succeed with valid config");
6767
6768        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6769        let handle = zeroclaw_spawn::spawn!(async move {
6770            agent
6771                .turn_streamed_with_steering_state("first", event_tx, None, None)
6772                .await
6773        });
6774
6775        assert!(
6776            matches!(
6777                event_rx.recv().await,
6778                Some(TurnEvent::Chunk { delta }) if delta == "draft"
6779            ),
6780            "the client should see the streamed text before the provider error"
6781        );
6782
6783        let err = handle
6784            .await
6785            .expect("turn task should finish")
6786            .expect_err("post-output stream failure should return an error with partial output");
6787        assert!(
6788            err.error
6789                .to_string()
6790                .contains("synthetic provider failure after delta"),
6791            "unexpected error: {}",
6792            err.error
6793        );
6794        assert!(
6795            err.committed_response.contains("[stream interrupted]"),
6796            "persisted partial text should mark that the visible stream was interrupted"
6797        );
6798        assert!(
6799            err.new_messages.iter().any(|msg| {
6800                matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content.contains("draft"))
6801            }),
6802            "new messages should carry the visible assistant partial for gateway persistence"
6803        );
6804    }
6805
6806    #[tokio::test]
6807    async fn turn_streamed_error_before_visible_output_fallback_can_be_cancelled() {
6808        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6809            backend: "none".into(),
6810            ..zeroclaw_config::schema::MemoryConfig::default()
6811        };
6812        let mem: Arc<dyn Memory> = Arc::from(
6813            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6814                .expect("memory creation should succeed with valid config"),
6815        );
6816
6817        let model_provider = Box::new(StreamingSteeringModelProvider {
6818            seen_messages: Arc::new(Mutex::new(Vec::new())),
6819            call_count: AtomicUsize::new(0),
6820            fail_on_call: Some(1),
6821            fail_chat_on_call: None,
6822            fail_after_delta_on_call: None,
6823            delay_chat_on_call: Some(2),
6824        });
6825        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6826        let mut agent = Agent::builder()
6827            .model_provider(model_provider)
6828            .tools(vec![Box::new(MockTool)])
6829            .memory(mem)
6830            .observer(observer)
6831            .tool_dispatcher(Box::new(NativeToolDispatcher))
6832            .workspace_dir(std::path::PathBuf::from("/tmp"))
6833            .build()
6834            .expect("agent builder should succeed with valid config");
6835
6836        let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6837        let cancel_token = tokio_util::sync::CancellationToken::new();
6838        let cancel_for_task = cancel_token.clone();
6839        let handle = zeroclaw_spawn::spawn!(async move {
6840            agent
6841                .turn_streamed_with_steering_state("first", event_tx, Some(cancel_for_task), None)
6842                .await
6843        });
6844
6845        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
6846        cancel_token.cancel();
6847
6848        let err = handle
6849            .await
6850            .expect("turn task should finish")
6851            .expect_err("cancelled fallback should return cancellation");
6852        assert!(
6853            crate::agent::loop_::is_tool_loop_cancelled(&err.error),
6854            "unexpected error: {}",
6855            err.error
6856        );
6857        assert_eq!(err.committed_response, "[interrupted by user]");
6858        assert!(
6859            err.new_messages.iter().any(|msg| {
6860                matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content == "[interrupted by user]")
6861            }),
6862            "pre-output fallback cancellation should include an interruption marker"
6863        );
6864    }
6865
6866    #[tokio::test]
6867    async fn turn_streamed_cancel_before_output_returns_interruption_message() {
6868        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6869            backend: "none".into(),
6870            ..zeroclaw_config::schema::MemoryConfig::default()
6871        };
6872        let mem: Arc<dyn Memory> = Arc::from(
6873            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6874                .expect("memory creation should succeed with valid config"),
6875        );
6876
6877        let model_provider = Box::new(StreamingSteeringModelProvider {
6878            seen_messages: Arc::new(Mutex::new(Vec::new())),
6879            call_count: AtomicUsize::new(0),
6880            fail_on_call: None,
6881            fail_chat_on_call: None,
6882            fail_after_delta_on_call: None,
6883            delay_chat_on_call: None,
6884        });
6885        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
6886        let mut agent = Agent::builder()
6887            .model_provider(model_provider)
6888            .tools(vec![Box::new(MockTool)])
6889            .memory(mem)
6890            .observer(observer)
6891            .tool_dispatcher(Box::new(NativeToolDispatcher))
6892            .workspace_dir(std::path::PathBuf::from("/tmp"))
6893            .build()
6894            .expect("agent builder should succeed with valid config");
6895
6896        let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6897        let cancel_token = tokio_util::sync::CancellationToken::new();
6898        cancel_token.cancel();
6899
6900        let err = agent
6901            .turn_streamed_with_steering_state("first", event_tx, Some(cancel_token), None)
6902            .await
6903            .expect_err("pre-cancelled turn should return cancellation");
6904
6905        assert!(
6906            crate::agent::loop_::is_tool_loop_cancelled(&err.error),
6907            "unexpected error: {}",
6908            err.error
6909        );
6910        assert_eq!(err.committed_response, "[interrupted by user]");
6911        assert!(
6912            err.new_messages.iter().any(|msg| {
6913                matches!(msg, ConversationMessage::Chat(message) if message.role == "assistant" && message.content == "[interrupted by user]")
6914            }),
6915            "cancelled turn should include an assistant interruption marker for persistence"
6916        );
6917    }
6918
6919    #[tokio::test]
6920    async fn turn_streamed_stream_error_after_delta_emits_llm_response_failure() {
6921        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
6922            backend: "none".into(),
6923            ..zeroclaw_config::schema::MemoryConfig::default()
6924        };
6925        let mem: Arc<dyn Memory> = Arc::from(
6926            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
6927                .expect("memory creation should succeed with valid config"),
6928        );
6929
6930        let model_provider = Box::new(StreamingSteeringModelProvider {
6931            seen_messages: Arc::new(Mutex::new(Vec::new())),
6932            call_count: AtomicUsize::new(0),
6933            fail_on_call: None,
6934            fail_chat_on_call: None,
6935            fail_after_delta_on_call: Some(1),
6936            delay_chat_on_call: None,
6937        });
6938        let capturing = Arc::new(CapturingObserver::default());
6939        let observer: Arc<dyn Observer> = capturing.clone();
6940        let mut agent = Agent::builder()
6941            .model_provider(model_provider)
6942            .tools(vec![Box::new(MockTool)])
6943            .memory(mem)
6944            .observer(observer)
6945            .tool_dispatcher(Box::new(NativeToolDispatcher))
6946            .workspace_dir(std::path::PathBuf::from("/tmp"))
6947            .build()
6948            .expect("agent builder should succeed with valid config");
6949
6950        let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
6951        let err = agent
6952            .turn_streamed_with_steering_state("test", event_tx, None, None)
6953            .await
6954            .expect_err("provider stream failure should be returned");
6955
6956        assert!(
6957            err.committed_response.contains("draft")
6958                && err.committed_response.contains("[stream interrupted]"),
6959            "unexpected committed_response: {}",
6960            err.committed_response
6961        );
6962
6963        let events = capturing.events.lock();
6964        let request = events
6965            .iter()
6966            .find(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
6967            .expect("LlmRequest should have been recorded");
6968        let response = events
6969            .iter()
6970            .find(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
6971            .expect("LlmResponse should have been recorded");
6972
6973        assert_eq!(
6974            events
6975                .iter()
6976                .filter(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
6977                .count(),
6978            1,
6979            "exactly one LlmRequest expected"
6980        );
6981        assert_eq!(
6982            events
6983                .iter()
6984                .filter(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
6985                .count(),
6986            1,
6987            "exactly one LlmResponse expected"
6988        );
6989
6990        let (
6991            ObserverEvent::LlmRequest {
6992                model_provider: req_provider,
6993                model: req_model,
6994                ..
6995            },
6996            ObserverEvent::LlmResponse {
6997                model_provider: resp_provider,
6998                model: resp_model,
6999                success,
7000                error_message,
7001                ..
7002            },
7003        ) = (request, response)
7004        else {
7005            panic!("matched event variants should be LlmRequest and LlmResponse");
7006        };
7007
7008        assert!(!success, "LlmResponse on stream error must be a failure");
7009        assert!(
7010            error_message.as_deref().is_some_and(|m| !m.is_empty()),
7011            "failure LlmResponse must carry a non-empty error_message"
7012        );
7013        assert_eq!(req_provider, resp_provider, "provider should match");
7014        assert_eq!(req_model, resp_model, "model should match");
7015    }
7016
7017    #[tokio::test]
7018    async fn turn_streamed_cancel_during_stream_emits_llm_response_failure() {
7019        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7020            backend: "none".into(),
7021            ..zeroclaw_config::schema::MemoryConfig::default()
7022        };
7023        let mem: Arc<dyn Memory> = Arc::from(
7024            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7025                .expect("memory creation should succeed with valid config"),
7026        );
7027
7028        let model_provider = Box::new(StreamingSteeringModelProvider {
7029            seen_messages: Arc::new(Mutex::new(Vec::new())),
7030            call_count: AtomicUsize::new(0),
7031            fail_on_call: None,
7032            fail_chat_on_call: None,
7033            fail_after_delta_on_call: None,
7034            delay_chat_on_call: None,
7035        });
7036        let capturing = Arc::new(CapturingObserver::default());
7037        let observer: Arc<dyn Observer> = capturing.clone();
7038        let mut agent = Agent::builder()
7039            .model_provider(model_provider)
7040            .tools(vec![Box::new(MockTool)])
7041            .memory(mem)
7042            .observer(observer)
7043            .tool_dispatcher(Box::new(NativeToolDispatcher))
7044            .workspace_dir(std::path::PathBuf::from("/tmp"))
7045            .build()
7046            .expect("agent builder should succeed with valid config");
7047
7048        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
7049        let cancel_token = tokio_util::sync::CancellationToken::new();
7050        let cancel_for_task = cancel_token.clone();
7051
7052        let canceller = zeroclaw_spawn::spawn!(async move {
7053            while let Some(event) = event_rx.recv().await {
7054                if matches!(event, TurnEvent::Chunk { ref delta } if delta == "draft") {
7055                    cancel_for_task.cancel();
7056                    break;
7057                }
7058            }
7059            while event_rx.recv().await.is_some() {}
7060        });
7061
7062        let err = agent
7063            .turn_streamed_with_steering_state("test", event_tx, Some(cancel_token), None)
7064            .await
7065            .expect_err("cancelled turn should return cancellation");
7066
7067        canceller.await.expect("canceller task should finish");
7068
7069        assert!(
7070            crate::agent::loop_::is_tool_loop_cancelled(&err.error),
7071            "cancelled turn should carry the cancellation error: {}",
7072            err.error
7073        );
7074
7075        let events = capturing.events.lock();
7076        assert_eq!(
7077            events
7078                .iter()
7079                .filter(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
7080                .count(),
7081            1,
7082            "exactly one LlmRequest expected"
7083        );
7084        assert_eq!(
7085            events
7086                .iter()
7087                .filter(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
7088                .count(),
7089            1,
7090            "exactly one LlmResponse expected"
7091        );
7092
7093        let request = events
7094            .iter()
7095            .find(|e| matches!(e, ObserverEvent::LlmRequest { .. }))
7096            .expect("LlmRequest should have been recorded");
7097        let response = events
7098            .iter()
7099            .find(|e| matches!(e, ObserverEvent::LlmResponse { .. }))
7100            .expect("LlmResponse should have been recorded");
7101
7102        let (
7103            ObserverEvent::LlmRequest {
7104                model_provider: req_provider,
7105                model: req_model,
7106                ..
7107            },
7108            ObserverEvent::LlmResponse {
7109                model_provider: resp_provider,
7110                model: resp_model,
7111                success,
7112                error_message,
7113                ..
7114            },
7115        ) = (request, response)
7116        else {
7117            panic!("matched event variants should be LlmRequest and LlmResponse");
7118        };
7119
7120        assert!(!success, "cancellation LlmResponse must be a failure");
7121        assert_eq!(
7122            error_message.as_deref(),
7123            Some("request cancelled by user"),
7124            "cancellation LlmResponse must carry the fixed cancel message"
7125        );
7126        assert_eq!(req_provider, resp_provider, "provider should match");
7127        assert_eq!(req_model, resp_model, "model should match");
7128    }
7129
7130    // ── Skill tool registration & excluded_tools filtering ──────────
7131
7132    /// A mock tool whose name is configurable (unlike `MockTool` which is
7133    /// always "echo").
7134    struct NamedMockTool {
7135        tool_name: String,
7136    }
7137
7138    impl NamedMockTool {
7139        fn new(name: &str) -> Self {
7140            Self {
7141                tool_name: name.to_string(),
7142            }
7143        }
7144    }
7145
7146    #[async_trait]
7147    impl Tool for NamedMockTool {
7148        fn name(&self) -> &str {
7149            &self.tool_name
7150        }
7151
7152        fn description(&self) -> &str {
7153            "mock"
7154        }
7155
7156        fn parameters_schema(&self) -> serde_json::Value {
7157            serde_json::json!({"type": "object"})
7158        }
7159
7160        async fn execute(&self, _args: serde_json::Value) -> Result<crate::tools::ToolResult> {
7161            Ok(crate::tools::ToolResult {
7162                success: true,
7163                output: "ok".into(),
7164                error: None,
7165            })
7166        }
7167    }
7168
7169    fn make_skill(name: &str, tool_names: &[&str]) -> crate::skills::Skill {
7170        crate::skills::Skill {
7171            name: name.to_string(),
7172            description: format!("{name} skill"),
7173            version: "0.1.0".to_string(),
7174            author: None,
7175            tags: vec![],
7176            tools: tool_names
7177                .iter()
7178                .map(|t| crate::skills::SkillTool {
7179                    name: t.to_string(),
7180                    description: format!("{t} tool"),
7181                    kind: "shell".to_string(),
7182                    command: format!("echo {t}"),
7183                    args: std::collections::HashMap::new(),
7184                    target: None,
7185                    locked_args: std::collections::HashMap::new(),
7186                })
7187                .collect(),
7188            prompts: vec![],
7189            location: None,
7190        }
7191    }
7192
7193    #[test]
7194    fn register_skill_tools_adds_skill_tools_to_registry() {
7195        let security = Arc::new(crate::security::SecurityPolicy::default());
7196        let mut tools: Vec<Box<dyn Tool>> = vec![Box::new(NamedMockTool::new("builtin_a"))];
7197
7198        let skills = vec![make_skill("deploy", &["run", "status"])];
7199        tools::register_skill_tools(&mut tools, &skills, security);
7200
7201        let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
7202        assert_eq!(names, &["builtin_a", "deploy__run", "deploy__status"]);
7203    }
7204
7205    #[test]
7206    fn register_skill_tools_skips_shadowed_builtins() {
7207        let security = Arc::new(crate::security::SecurityPolicy::default());
7208        // Pre-populate with a tool whose name matches what the skill would produce.
7209        let mut tools: Vec<Box<dyn Tool>> = vec![Box::new(NamedMockTool::new("my_skill__run"))];
7210
7211        let skills = vec![make_skill("my_skill", &["run"])];
7212        tools::register_skill_tools(&mut tools, &skills, security);
7213
7214        // Should still be just 1 tool — the duplicate was skipped.
7215        assert_eq!(tools.len(), 1);
7216        assert_eq!(tools[0].name(), "my_skill__run");
7217    }
7218
7219    #[test]
7220    fn from_config_policy_filter_blocks_raw_target_but_keeps_scoped_wrapper() {
7221        // Path-level boundary for the from_config (ws.rs / daemon) path. The
7222        // SecurityPolicy allow/deny gate now runs over the built-in registry
7223        // before skill tools are registered (parity with agent::run), so an
7224        // agent allowlisted to `file_read` does NOT keep raw `shell`, while the
7225        // skill's scoped wrapper — distinct prefixed name — remains the only
7226        // callable path to that capability. This exercises the exact
7227        // apply_policy_tool_filter + register_skill_tools_with_context sequence
7228        // from_config performs.
7229        use crate::skills::{Skill, SkillTool};
7230
7231        let shell: Arc<dyn Tool> = Arc::new(NamedMockTool::new("shell"));
7232        let file_read: Arc<dyn Tool> = Arc::new(NamedMockTool::new("file_read"));
7233        // The resolution registry retains the raw tool so the wrapper can
7234        // delegate to it even after the policy filter removes it below.
7235        let resolution: Vec<Arc<dyn Tool>> = vec![Arc::clone(&shell), Arc::clone(&file_read)];
7236
7237        let mut tools: Vec<Box<dyn Tool>> = vec![
7238            Box::new(crate::tools::ArcToolRef(Arc::clone(&shell))),
7239            Box::new(crate::tools::ArcToolRef(Arc::clone(&file_read))),
7240        ];
7241
7242        // Allowlist the agent to `file_read` only — the gate from_config now
7243        // applies to built-ins before skills register. (Pre-fix, from_config
7244        // honored only the denylist, so raw `shell` leaked through.)
7245        let policy = crate::security::SecurityPolicy {
7246            allowed_tools: Some(vec!["file_read".to_string()]),
7247            workspace_dir: std::env::temp_dir(),
7248            ..crate::security::SecurityPolicy::default()
7249        };
7250        crate::agent::loop_::apply_policy_tool_filter(&mut tools, Some(&policy), None);
7251        assert!(
7252            !tools.iter().any(|t| t.name() == "shell"),
7253            "raw shell must be removed by the allowlist on the from_config path"
7254        );
7255        assert!(
7256            tools.iter().any(|t| t.name() == "file_read"),
7257            "allowlisted file_read must survive the filter"
7258        );
7259
7260        let skill = Skill {
7261            name: "ops".to_string(),
7262            description: "d".to_string(),
7263            version: "1".to_string(),
7264            author: None,
7265            tags: vec![],
7266            tools: vec![SkillTool {
7267                name: "use_shell".to_string(),
7268                description: "scoped shell".to_string(),
7269                kind: "builtin".to_string(),
7270                command: String::new(),
7271                args: std::collections::HashMap::new(),
7272                target: Some("shell".to_string()),
7273                locked_args: std::collections::HashMap::new(),
7274            }],
7275            prompts: vec![],
7276            location: None,
7277        };
7278        tools::register_skill_tools_with_context(
7279            &mut tools,
7280            &[skill],
7281            Arc::new(crate::security::SecurityPolicy::default()),
7282            &resolution,
7283        );
7284
7285        assert!(
7286            !tools.iter().any(|t| t.name() == "shell"),
7287            "raw shell must STILL be unavailable after skill registration"
7288        );
7289        assert!(
7290            tools.iter().any(|t| t.name() == "ops__use_shell"),
7291            "the scoped elevation wrapper must remain the only callable path to shell"
7292        );
7293    }
7294
7295    #[test]
7296    fn excluded_tools_filters_matching_tools() {
7297        let mut tools: Vec<Box<dyn Tool>> = vec![
7298            Box::new(NamedMockTool::new("shell")),
7299            Box::new(NamedMockTool::new("file_write")),
7300            Box::new(NamedMockTool::new("web_search")),
7301        ];
7302
7303        let excluded = ["shell".to_string(), "file_write".to_string()];
7304        tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
7305
7306        let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
7307        assert_eq!(names, &["web_search"]);
7308    }
7309
7310    #[test]
7311    fn excluded_tools_preserves_non_excluded() {
7312        let mut tools: Vec<Box<dyn Tool>> = vec![
7313            Box::new(NamedMockTool::new("shell")),
7314            Box::new(NamedMockTool::new("file_read")),
7315            Box::new(NamedMockTool::new("web_fetch")),
7316        ];
7317
7318        // Exclude only "shell" — the other two should survive.
7319        let excluded = ["shell".to_string()];
7320        tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
7321
7322        let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
7323        assert_eq!(names, &["file_read", "web_fetch"]);
7324    }
7325
7326    #[test]
7327    fn empty_excluded_tools_preserves_all() {
7328        let mut tools: Vec<Box<dyn Tool>> = vec![
7329            Box::new(NamedMockTool::new("shell")),
7330            Box::new(NamedMockTool::new("file_read")),
7331        ];
7332
7333        let excluded: Vec<String> = vec![];
7334        if !excluded.is_empty() {
7335            tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
7336        }
7337
7338        assert_eq!(tools.len(), 2);
7339    }
7340
7341    /// Regression test: `turn_streamed` must return the new messages in its
7342    /// second tuple element even when `trim_history` fires and removes old
7343    /// entries from the front of the history.  Before the fix, callers that
7344    /// sliced `history[pre_len..]` after the turn would get an empty slice
7345    /// because trim had shifted the tail back to `pre_len`.
7346    #[tokio::test]
7347    async fn turn_streamed_returns_new_messages_at_history_limit() {
7348        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7349            backend: "none".into(),
7350            ..zeroclaw_config::schema::MemoryConfig::default()
7351        };
7352        let mem: Arc<dyn Memory> = Arc::from(
7353            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7354                .expect("memory creation should succeed with valid config"),
7355        );
7356
7357        // Use a small limit so that pre-filling to the limit forces a trim on
7358        // the very first new turn.
7359        let agent_config = zeroclaw_config::schema::AliasedAgentConfig {
7360            resolved: zeroclaw_config::schema::ResolvedRuntime {
7361                max_history_messages: 4,
7362                ..Default::default()
7363            },
7364            ..zeroclaw_config::schema::AliasedAgentConfig::default()
7365        };
7366
7367        // Simple streaming provider that returns plain text (no tool calls).
7368        let provider = Box::new(NarrationStreamModelProvider {
7369            call_count: Arc::new(Mutex::new(0)),
7370        });
7371
7372        let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
7373        let mut agent = Agent::builder()
7374            .model_provider(provider)
7375            .tools(vec![Box::new(MockTool)])
7376            .memory(mem)
7377            .observer(observer)
7378            .tool_dispatcher(Box::new(NativeToolDispatcher))
7379            .workspace_dir(std::path::PathBuf::from("/tmp"))
7380            .config(agent_config)
7381            .build()
7382            .expect("agent builder should succeed with valid config");
7383
7384        // Pre-fill the history to exactly max_history_messages non-system
7385        // messages so that adding a new user+assistant pair triggers trim.
7386        // (system message is added by turn_streamed on first call, so we
7387        // push user+assistant pairs to simulate a history-at-limit state.)
7388        agent
7389            .history
7390            .push(ConversationMessage::Chat(ChatMessage::system("sys")));
7391        for i in 0..2 {
7392            agent
7393                .history
7394                .push(ConversationMessage::Chat(ChatMessage::user(format!(
7395                    "old {i}"
7396                ))));
7397            agent
7398                .history
7399                .push(ConversationMessage::Chat(ChatMessage::assistant(format!(
7400                    "old reply {i}"
7401                ))));
7402        }
7403        // History is now: [system, user0, assistant0, user1, assistant1] = 5
7404        // entries.  max_history_messages=4 means trim fires after adding the
7405        // new turn.
7406
7407        let (event_tx, _rx) = tokio::sync::mpsc::channel::<TurnEvent>(8);
7408        let (_, new_msgs) = agent
7409            .turn_streamed("new question", event_tx, None)
7410            .await
7411            .expect("turn_streamed should succeed");
7412
7413        // The returned Vec must contain the new user message.
7414        let has_user = new_msgs
7415            .iter()
7416            .any(|m| matches!(m, ConversationMessage::Chat(c) if c.role == "user"));
7417        assert!(
7418            has_user,
7419            "new_msgs must include the user message even after trim; got: {new_msgs:?}"
7420        );
7421
7422        // The returned Vec must contain the new assistant reply.
7423        let has_assistant = new_msgs
7424            .iter()
7425            .any(|m| matches!(m, ConversationMessage::Chat(c) if c.role == "assistant"));
7426        assert!(
7427            has_assistant,
7428            "new_msgs must include the assistant reply even after trim; got: {new_msgs:?}"
7429        );
7430    }
7431
7432    #[test]
7433    fn excluded_tools_then_skill_registration_end_to_end() {
7434        let security = Arc::new(crate::security::SecurityPolicy::default());
7435        let mut tools: Vec<Box<dyn Tool>> = vec![
7436            Box::new(NamedMockTool::new("shell")),
7437            Box::new(NamedMockTool::new("file_read")),
7438            Box::new(NamedMockTool::new("web_fetch")),
7439        ];
7440
7441        // Step 1: filter excluded tools (mirrors from_config logic)
7442        let excluded = ["shell".to_string()];
7443        tools.retain(|t| !excluded.iter().any(|ex| ex == t.name()));
7444
7445        // Step 2: register skill tools (mirrors from_config logic)
7446        let skills = vec![make_skill("ops", &["deploy", "rollback"])];
7447        tools::register_skill_tools(&mut tools, &skills, security);
7448
7449        let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
7450        assert_eq!(
7451            names,
7452            &["file_read", "web_fetch", "ops__deploy", "ops__rollback"]
7453        );
7454    }
7455
7456    fn observer_event_turn_id(event: &ObserverEvent) -> Option<&str> {
7457        match event {
7458            ObserverEvent::AgentStart { turn_id, .. }
7459            | ObserverEvent::LlmRequest { turn_id, .. }
7460            | ObserverEvent::LlmResponse { turn_id, .. }
7461            | ObserverEvent::AgentEnd { turn_id, .. }
7462            | ObserverEvent::ToolCall { turn_id, .. } => turn_id.as_deref(),
7463            _ => None,
7464        }
7465    }
7466
7467    fn assert_single_agent_lifecycle(events: &[ObserverEvent]) -> (usize, usize) {
7468        let starts: Vec<_> = events
7469            .iter()
7470            .enumerate()
7471            .filter(|(_, event)| matches!(event, ObserverEvent::AgentStart { .. }))
7472            .collect();
7473        let ends: Vec<_> = events
7474            .iter()
7475            .enumerate()
7476            .filter(|(_, event)| matches!(event, ObserverEvent::AgentEnd { .. }))
7477            .collect();
7478
7479        assert_eq!(starts.len(), 1, "expected exactly one AgentStart");
7480        assert_eq!(ends.len(), 1, "expected exactly one AgentEnd");
7481        assert!(starts[0].0 < ends[0].0, "AgentEnd must follow AgentStart");
7482        assert_eq!(
7483            observer_event_turn_id(starts[0].1),
7484            observer_event_turn_id(ends[0].1),
7485            "AgentEnd turn_id must match AgentStart turn_id"
7486        );
7487
7488        (starts[0].0, ends[0].0)
7489    }
7490
7491    fn agent_end_tokens(
7492        event: &ObserverEvent,
7493    ) -> Option<zeroclaw_api::observability_traits::TurnTokenUsage> {
7494        match event {
7495            ObserverEvent::AgentEnd { tokens_used, .. } => tokens_used.clone(),
7496            _ => None,
7497        }
7498    }
7499
7500    #[tokio::test]
7501    async fn turn_cache_hit_emits_agent_end_with_none_tokens() {
7502        let tmp = tempfile::tempdir().expect("temp response cache dir");
7503        let cache = Arc::new(
7504            zeroclaw_memory::response_cache::ResponseCache::new(tmp.path(), 60, 100)
7505                .expect("response cache should initialize"),
7506        );
7507        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7508            backend: "none".into(),
7509            ..zeroclaw_config::schema::MemoryConfig::default()
7510        };
7511        let mem_a: Arc<dyn Memory> = Arc::from(
7512            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7513                .expect("memory creation should succeed with valid config"),
7514        );
7515        let mem_b: Arc<dyn Memory> = Arc::from(
7516            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7517                .expect("memory creation should succeed with valid config"),
7518        );
7519
7520        let mut agent_a = Agent::builder()
7521            .model_provider(Box::new(MockModelProvider {
7522                responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
7523                    text: Some("cached answer".into()),
7524                    tool_calls: vec![],
7525                    usage: Some(zeroclaw_providers::traits::TokenUsage {
7526                        input_tokens: Some(10),
7527                        cached_input_tokens: None,
7528                        output_tokens: Some(5),
7529                    }),
7530                    reasoning_content: None,
7531                }]),
7532            }))
7533            .tools(vec![Box::new(MockTool)])
7534            .memory(mem_a)
7535            .observer(Arc::from(crate::observability::NoopObserver {}) as Arc<dyn Observer>)
7536            .response_cache(Some(cache.clone()))
7537            .tool_dispatcher(Box::new(NativeToolDispatcher))
7538            .workspace_dir(std::path::PathBuf::from("/tmp"))
7539            .model_name("test-model".into())
7540            .temperature(Some(0.0))
7541            .build()
7542            .expect("agent builder should succeed with valid config");
7543
7544        assert_eq!(agent_a.turn("seed").await.unwrap(), "cached answer");
7545
7546        let capturing = Arc::new(CapturingObserver::default());
7547        let observer: Arc<dyn Observer> = capturing.clone();
7548        let mut agent_b = Agent::builder()
7549            .model_provider(Box::new(MockModelProvider {
7550                responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
7551                    text: Some("uncached answer".into()),
7552                    tool_calls: vec![],
7553                    usage: None,
7554                    reasoning_content: None,
7555                }]),
7556            }))
7557            .tools(vec![Box::new(MockTool)])
7558            .memory(mem_b)
7559            .observer(observer)
7560            .response_cache(Some(cache))
7561            .tool_dispatcher(Box::new(NativeToolDispatcher))
7562            .workspace_dir(std::path::PathBuf::from("/tmp"))
7563            .model_name("test-model".into())
7564            .temperature(Some(0.0))
7565            .build()
7566            .expect("agent builder should succeed with valid config");
7567
7568        assert_eq!(agent_b.turn("seed").await.unwrap(), "cached answer");
7569
7570        let events = capturing.events.lock();
7571        let (_, end_idx) = assert_single_agent_lifecycle(&events);
7572        assert!(agent_end_tokens(&events[end_idx]).is_none());
7573        assert!(
7574            !events
7575                .iter()
7576                .any(|event| matches!(event, ObserverEvent::LlmRequest { .. })),
7577            "cache hit should not call the LLM"
7578        );
7579    }
7580
7581    #[tokio::test]
7582    async fn turn_streamed_cancel_during_tool_execution_emits_agent_end_with_tokens() {
7583        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7584            backend: "none".into(),
7585            ..zeroclaw_config::schema::MemoryConfig::default()
7586        };
7587        let mem: Arc<dyn Memory> = Arc::from(
7588            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7589                .expect("memory creation should succeed with valid config"),
7590        );
7591        let capturing = Arc::new(CapturingObserver::default());
7592        let observer: Arc<dyn Observer> = capturing.clone();
7593        let mut agent = Agent::builder()
7594            .model_provider(Box::new(MockModelProvider {
7595                responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
7596                    text: Some("I will echo.".into()),
7597                    tool_calls: vec![zeroclaw_providers::ToolCall {
7598                        id: "tc1".into(),
7599                        name: "echo".into(),
7600                        arguments: "{}".into(),
7601                        extra_content: None,
7602                    }],
7603                    usage: Some(zeroclaw_providers::traits::TokenUsage {
7604                        input_tokens: Some(10),
7605                        cached_input_tokens: None,
7606                        output_tokens: Some(5),
7607                    }),
7608                    reasoning_content: None,
7609                }]),
7610            }))
7611            .tools(vec![Box::new(SlowTool)])
7612            .memory(mem)
7613            .observer(observer)
7614            .tool_dispatcher(Box::new(NativeToolDispatcher))
7615            .workspace_dir(std::path::PathBuf::from("/tmp"))
7616            .build()
7617            .expect("agent builder should succeed with valid config");
7618
7619        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(64);
7620        let cancel_token = tokio_util::sync::CancellationToken::new();
7621        let cancel_for_task = cancel_token.clone();
7622        let handle = zeroclaw_spawn::spawn!(async move {
7623            agent
7624                .turn_streamed_with_steering_state(
7625                    "use echo",
7626                    event_tx,
7627                    Some(cancel_for_task),
7628                    None,
7629                )
7630                .await
7631        });
7632
7633        while let Some(event) = event_rx.recv().await {
7634            if matches!(event, TurnEvent::Usage { .. }) {
7635                cancel_token.cancel();
7636                break;
7637            }
7638        }
7639
7640        handle
7641            .await
7642            .expect("turn task should finish")
7643            .expect_err("turn should be cancelled before tool execution completes");
7644
7645        let events = capturing.events.lock();
7646        let (_, end_idx) = assert_single_agent_lifecycle(&events);
7647        let tokens = agent_end_tokens(&events[end_idx]).expect("AgentEnd should include tokens");
7648        assert_eq!(tokens.input_tokens, 10);
7649        assert_eq!(tokens.output_tokens, 5);
7650        let llm_response_idx = events
7651            .iter()
7652            .position(|event| matches!(event, ObserverEvent::LlmResponse { success: true, .. }))
7653            .expect("successful LlmResponse should be recorded");
7654        assert!(
7655            llm_response_idx < end_idx,
7656            "AgentEnd must follow LlmResponse"
7657        );
7658    }
7659
7660    #[tokio::test]
7661    async fn turn_llm_error_emits_agent_end() {
7662        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7663            backend: "none".into(),
7664            ..zeroclaw_config::schema::MemoryConfig::default()
7665        };
7666        let mem: Arc<dyn Memory> = Arc::from(
7667            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7668                .expect("memory creation should succeed with valid config"),
7669        );
7670        let capturing = Arc::new(CapturingObserver::default());
7671        let observer: Arc<dyn Observer> = capturing.clone();
7672        let mut agent = Agent::builder()
7673            .model_provider(Box::new(FailingModelProvider))
7674            .tools(vec![Box::new(MockTool)])
7675            .memory(mem)
7676            .observer(observer)
7677            .tool_dispatcher(Box::new(NativeToolDispatcher))
7678            .workspace_dir(std::path::PathBuf::from("/tmp"))
7679            .model_name("test-model".into())
7680            .temperature(Some(0.0))
7681            .build()
7682            .expect("agent builder should succeed with valid config");
7683
7684        let result = agent.turn("hello").await;
7685        assert!(
7686            result.is_err(),
7687            "turn should fail when provider is unavailable"
7688        );
7689
7690        let events = capturing.events.lock();
7691        let (_, end_idx) = assert_single_agent_lifecycle(&events);
7692        assert!(
7693            agent_end_tokens(&events[end_idx]).is_none(),
7694            "AgentEnd should have tokens_used: None on LLM error"
7695        );
7696    }
7697
7698    #[tokio::test]
7699    async fn turn_events_share_consistent_turn_id() {
7700        let memory_cfg = zeroclaw_config::schema::MemoryConfig {
7701            backend: "none".into(),
7702            ..zeroclaw_config::schema::MemoryConfig::default()
7703        };
7704        let mem: Arc<dyn Memory> = Arc::from(
7705            zeroclaw_memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
7706                .expect("memory creation should succeed with valid config"),
7707        );
7708
7709        let model_provider = Box::new(MockModelProvider {
7710            responses: Mutex::new(vec![zeroclaw_providers::ChatResponse {
7711                text: Some("done".into()),
7712                tool_calls: vec![],
7713                usage: None,
7714                reasoning_content: None,
7715            }]),
7716        });
7717        let capturing = Arc::new(CapturingObserver::default());
7718        let observer: Arc<dyn Observer> = capturing.clone();
7719        let mut agent = Agent::builder()
7720            .model_provider(model_provider)
7721            .tools(vec![Box::new(MockTool)])
7722            .memory(mem)
7723            .observer(observer)
7724            .tool_dispatcher(Box::new(NativeToolDispatcher))
7725            .workspace_dir(std::path::PathBuf::from("/tmp"))
7726            .build()
7727            .expect("agent builder should succeed with valid config");
7728
7729        let _ = agent.turn("test").await.expect("turn should succeed");
7730
7731        let events = capturing.events.lock();
7732        let turn_ids: Vec<&str> = events.iter().filter_map(observer_event_turn_id).collect();
7733        assert!(!turn_ids.is_empty(), "turn events should carry turn_id");
7734        let first = turn_ids[0];
7735        assert!(
7736            turn_ids.iter().all(|turn_id| *turn_id == first),
7737            "all turn_ids should be consistent"
7738        );
7739    }
7740}