Skip to main content

zeroclaw_runtime/rpc/
dispatch.rs

1//! JSON-RPC 2.0 method dispatch. Transport-agnostic.
2//!
3//! **No string-literal matching.** Every wire method name is registered
4//! exactly once in [`Method::ALL`]. The compiler enforces that every
5//! variant has a handler via exhaustive `match`.
6
7use super::context::RpcContext;
8use super::transport::RpcTransport;
9use super::turn::{TurnAttribution, TurnOutcome, execute_turn};
10use super::types::*;
11use crate::agent::agent::TurnEvent;
12use serde::Serialize;
13use serde::de::DeserializeOwned;
14use serde_json::Value;
15use std::sync::Arc;
16use tokio::sync::mpsc;
17
18use zeroclaw_api::jsonrpc::error_codes::*;
19use zeroclaw_api::jsonrpc::{
20    JSONRPC_VERSION, JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
21    RpcOutbound,
22};
23use zeroclaw_api::model_provider::ChatMessage;
24
25/// Wire protocol version. Bump on breaking changes.
26pub const RPC_PROTOCOL_VERSION: u64 = 1;
27
28mod notification {
29    pub const SESSION_UPDATE: &str = "session/update";
30    pub const LOGS_EVENT: &str = "logs/event";
31}
32
33// ── Method registry ──────────────────────────────────────────────
34//
35// Single source of truth. Every variant maps to exactly one wire
36// string. `from_wire` is a table scan — no hand-written string
37// matching anywhere in this file.
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum Method {
41    // Core
42    Initialize,
43    Status,
44    Health,
45
46    // Sessions (agent chat lives here — session/prompt + session/update
47    // notifications is the RPC equivalent of the gateway's ws/chat)
48    SessionNew,
49    SessionClose,
50    SessionPrompt,
51    SessionConfigure,
52    SessionCancel,
53    SessionGitBranch,
54    SessionList,
55    SessionListAcp,
56    SessionMessages,
57    SessionState,
58    SessionDelete,
59    SessionApprove,
60    SessionKill,
61
62    // Memory
63    MemoryList,
64    MemorySearch,
65    MemoryGet,
66    MemoryStore,
67    MemoryDelete,
68
69    // Cron
70    CronList,
71    CronGet,
72    CronAdd,
73    CronPatch,
74    CronDelete,
75    CronRuns,
76    CronTrigger,
77    CronSettings,
78
79    // Config
80    ConfigGet,
81    ConfigSet,
82    ConfigValidate,
83    ConfigReload,
84    ConfigList,
85    ConfigDelete,
86    ConfigMapKeys,
87    ConfigMapKeyCreate,
88    ConfigMapKeyDelete,
89    ConfigMapKeyRename,
90    ConfigTemplates,
91
92    // Agents
93    AgentsList,
94    AgentsStatus,
95
96    // Cost
97    CostQuery,
98
99    // Skills
100    SkillsBundles,
101    SkillsList,
102    SkillsRead,
103    SkillsWrite,
104    SkillsDelete,
105
106    // Personality
107    PersonalityList,
108    PersonalityGet,
109    PersonalityPut,
110    PersonalityTemplates,
111
112    // Config introspection (sections, catalog, status)
113    ConfigSections,
114    ConfigStatus,
115    ConfigCatalog,
116    ConfigCatalogModels,
117
118    // Logs / Events
119    LogsSubscribe,
120    LogsQuery,
121    LogsGet,
122
123    // TUI
124    TuiList,
125
126    // Files
127    FileAttach,
128    FsListDir,
129
130    // Locales
131    LocalesList,
132    LocalesFetch,
133
134    // Quickstart (TUI mirror of `/api/quickstart/*` HTTP routes)
135    QuickstartState,
136    QuickstartFields,
137    QuickstartValidate,
138    QuickstartApply,
139    QuickstartDismiss,
140}
141
142impl Method {
143    /// The single table. Wire name ↔ variant, defined once.
144    pub const ALL: &[(Method, &str)] = &[
145        (Method::Initialize, "initialize"),
146        (Method::Status, "status"),
147        (Method::Health, "health"),
148        // Sessions
149        (Method::SessionNew, "session/new"),
150        (Method::SessionClose, "session/close"),
151        (Method::SessionPrompt, "session/prompt"),
152        (Method::SessionConfigure, "session/configure"),
153        (Method::SessionCancel, "session/cancel"),
154        (Method::SessionGitBranch, "session/git_branch"),
155        (Method::SessionList, "session/list"),
156        (Method::SessionListAcp, "session/list-acp"),
157        (Method::SessionMessages, "session/messages"),
158        (Method::SessionState, "session/state"),
159        (Method::SessionDelete, "session/delete"),
160        (Method::SessionApprove, "session/approve"),
161        (Method::SessionKill, "session/kill"),
162        // Memory
163        (Method::MemoryList, "memory/list"),
164        (Method::MemorySearch, "memory/search"),
165        (Method::MemoryGet, "memory/get"),
166        (Method::MemoryStore, "memory/store"),
167        (Method::MemoryDelete, "memory/delete"),
168        // Cron
169        (Method::CronList, "cron/list"),
170        (Method::CronGet, "cron/get"),
171        (Method::CronAdd, "cron/add"),
172        (Method::CronPatch, "cron/patch"),
173        (Method::CronDelete, "cron/delete"),
174        (Method::CronRuns, "cron/runs"),
175        (Method::CronTrigger, "cron/trigger"),
176        (Method::CronSettings, "cron/settings"),
177        // Config
178        (Method::ConfigGet, "config/get"),
179        (Method::ConfigSet, "config/set"),
180        (Method::ConfigValidate, "config/validate"),
181        (Method::ConfigReload, "config/reload"),
182        (Method::ConfigList, "config/list"),
183        (Method::ConfigDelete, "config/delete"),
184        (Method::ConfigMapKeys, "config/map-keys"),
185        (Method::ConfigMapKeyCreate, "config/map-key-create"),
186        (Method::ConfigMapKeyDelete, "config/map-key-delete"),
187        (Method::ConfigMapKeyRename, "config/map-key-rename"),
188        (Method::ConfigTemplates, "config/templates"),
189        // Agents
190        (Method::AgentsList, "agents/list"),
191        (Method::AgentsStatus, "agents/status"),
192        // Cost
193        (Method::CostQuery, "cost/query"),
194        // Skills
195        (Method::SkillsBundles, "skills/bundles"),
196        (Method::SkillsList, "skills/list"),
197        (Method::SkillsRead, "skills/read"),
198        (Method::SkillsWrite, "skills/write"),
199        (Method::SkillsDelete, "skills/delete"),
200        // Personality
201        (Method::PersonalityList, "personality/list"),
202        (Method::PersonalityGet, "personality/get"),
203        (Method::PersonalityPut, "personality/put"),
204        (Method::PersonalityTemplates, "personality/templates"),
205        // Config introspection
206        (Method::ConfigSections, "config/sections"),
207        (Method::ConfigStatus, "config/status"),
208        (Method::ConfigCatalog, "config/catalog"),
209        (Method::ConfigCatalogModels, "config/catalog-models"),
210        // Logs
211        (Method::LogsSubscribe, "logs/subscribe"),
212        (Method::LogsQuery, "logs/query"),
213        (Method::LogsGet, "logs/get"),
214        // TUI
215        (Method::TuiList, "tui/list"),
216        // Files
217        (Method::FileAttach, "file/attach"),
218        (Method::FsListDir, "fs/list_dir"),
219        // Locales
220        (Method::LocalesList, "locales/list"),
221        (Method::LocalesFetch, "locales/fetch"),
222        // Quickstart
223        (Method::QuickstartState, "quickstart/state"),
224        (Method::QuickstartFields, "quickstart/fields"),
225        (Method::QuickstartValidate, "quickstart/validate"),
226        (Method::QuickstartApply, "quickstart/apply"),
227        (Method::QuickstartDismiss, "quickstart/dismiss"),
228    ];
229
230    /// Resolve a wire method name to a variant. Table scan, no hand-written
231    /// string matching.
232    pub fn from_wire(s: &str) -> Option<Self> {
233        Self::ALL
234            .iter()
235            .find(|(_, wire)| *wire == s)
236            .map(|(m, _)| *m)
237    }
238
239    /// Wire name for this variant.
240    pub fn wire_name(self) -> &'static str {
241        Self::ALL
242            .iter()
243            .find(|(m, _)| *m == self)
244            .map(|(_, wire)| *wire)
245            .expect("every variant is in ALL")
246    }
247}
248
249type RpcResult = Result<Value, JsonRpcError>;
250
251fn rpc_err(code: i32, msg: impl Into<String>) -> JsonRpcError {
252    JsonRpcError {
253        code,
254        message: msg.into(),
255        data: None,
256    }
257}
258
259fn not_yet_implemented(method: Method) -> RpcResult {
260    Err(rpc_err(
261        INTERNAL_ERROR,
262        format!("{}: not yet implemented", method.wire_name()),
263    ))
264}
265
266fn personality_template_context(
267    config: &zeroclaw_config::schema::Config,
268    req: &PersonalityTemplatesParams,
269) -> crate::agent::personality_templates::TemplateContext {
270    let agent_requested = req.agent.is_some();
271    let requested_agent = req
272        .agent
273        .as_deref()
274        .map(str::trim)
275        .filter(|s| !s.is_empty());
276    let agent_alias = requested_agent.unwrap_or("default");
277    let configured_agent_exists = config.agent(agent_alias).is_some();
278
279    crate::agent::personality_templates::TemplateContext {
280        agent: requested_agent
281            .map(str::to_string)
282            .or_else(|| configured_agent_exists.then(|| agent_alias.to_string()))
283            .unwrap_or_else(|| "ZeroClaw".to_string()),
284        // Existing config editors pass an agent alias, but Quickstart
285        // also asks for templates before the new agent exists. Treat an
286        // explicit agent request as a full per-agent template render so
287        // MEMORY.md is available during first-run setup; keep the no-agent
288        // fallback memoryless for generic/default callers.
289        include_memory: configured_agent_exists || agent_requested,
290        ..Default::default()
291    }
292}
293
294fn model_provider_ref_from_provider_profile_prop(prop: &str) -> Option<String> {
295    let rest = prop.strip_prefix("providers.models.")?;
296    let (provider_type, rest) = rest.split_once('.')?;
297    let (provider_alias, field) = rest.split_once('.')?;
298    if provider_type.is_empty() || provider_alias.is_empty() || field.is_empty() {
299        None
300    } else {
301        Some(format!("{provider_type}.{provider_alias}"))
302    }
303}
304
305/// Per-connection dispatcher. Shared state lives in [`RpcContext`].
306pub struct RpcDispatcher {
307    ctx: Arc<RpcContext>,
308    rpc: Arc<RpcOutbound>,
309    authenticated: bool,
310    /// TUI session UID assigned during `initialize`. Used for registry
311    /// cleanup on disconnect.
312    tui_id: Option<String>,
313    /// Transport-level peer label (e.g. `unix:pid=1234,uid=1000`).
314    peer_label: String,
315}
316
317impl RpcDispatcher {
318    pub fn new(ctx: Arc<RpcContext>, writer_tx: mpsc::Sender<String>, peer_label: String) -> Self {
319        Self {
320            ctx,
321            rpc: Arc::new(RpcOutbound::new(writer_tx)),
322            authenticated: false,
323            tui_id: None,
324            peer_label,
325        }
326    }
327
328    /// TUI ID assigned during initialize, if any.
329    pub fn tui_id(&self) -> Option<&str> {
330        self.tui_id.as_deref()
331    }
332
333    /// Test-only: stamp the caller's tui_id without going through the
334    /// `initialize` handshake, so ownership-gated handlers can be exercised
335    /// directly. Never called from prod.
336    #[cfg(test)]
337    pub fn set_tui_id_for_test(&mut self, tui_id: Option<String>) {
338        self.tui_id = tui_id;
339    }
340
341    /// Construct a pre-authenticated dispatcher sharing the same context and
342    /// RPC outbound as `self`. Used to run long-lived methods (e.g.
343    /// `session/prompt`) in a spawned task so the read loop remains live.
344    fn spawn_handle(&self) -> Self {
345        Self {
346            ctx: Arc::clone(&self.ctx),
347            rpc: Arc::clone(&self.rpc),
348            authenticated: true,
349            tui_id: self.tui_id.clone(),
350            peer_label: self.peer_label.clone(),
351        }
352    }
353
354    /// Flush dirty config paths to disk. Clone the config out of the
355    /// lock (parking_lot guards are !Send), save to disk, then write
356    /// the clone (with cleared dirty set) back.
357    async fn flush_config(&self) -> Result<(), JsonRpcError> {
358        let mut snapshot = self.ctx.config.read().clone();
359        snapshot
360            .save_dirty()
361            .await
362            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Config save failed: {e}")))?;
363        *self.ctx.config.write() = snapshot;
364        Ok(())
365    }
366
367    /// Read frames from transport, dispatch, repeat.
368    pub async fn run(&mut self, transport: &mut (dyn RpcTransport + Send)) {
369        while let Some(line) = transport.next_frame().await {
370            let trimmed = line.trim();
371            if trimmed.is_empty() {
372                continue;
373            }
374            self.process_line(trimmed).await;
375        }
376    }
377
378    async fn process_line(&mut self, line: &str) {
379        let req: JsonRpcRequest = match serde_json::from_str(line) {
380            Ok(r) => r,
381            Err(e) => {
382                self.send_error(Value::Null, PARSE_ERROR, &format!("Parse error: {e}"))
383                    .await;
384                return;
385            }
386        };
387
388        // Bidirectional RPC: responses to our outbound requests.
389        if req.method.is_empty() {
390            if let Some(id) = req.id.as_ref().and_then(Value::as_str) {
391                self.rpc.dispatch_response(id, Some(req.params), None);
392            }
393            return;
394        }
395
396        let id = req.id.clone().unwrap_or(Value::Null);
397        let is_notification = req.id.is_none();
398
399        let method = match Method::from_wire(&req.method) {
400            Some(m) => m,
401            None => {
402                if !is_notification {
403                    self.send_error(
404                        id,
405                        METHOD_NOT_FOUND,
406                        &format!("Unknown method: {}", req.method),
407                    )
408                    .await;
409                }
410                return;
411            }
412        };
413
414        if !self.authenticated && method != Method::Initialize {
415            if !is_notification {
416                self.send_error(id, AUTH_REQUIRED, "First call must be 'initialize'")
417                    .await;
418            }
419            return;
420        }
421
422        // Exhaustive match — compiler enforces every Method has a handler.
423        let result = match method {
424            // Core
425            Method::Initialize => self.handle_initialize(&req.params).await,
426            Method::Status => self.handle_status().await,
427            Method::Health => self.handle_health(),
428
429            // Sessions
430            Method::SessionNew => self.handle_session_new(&req.params).await,
431            Method::SessionClose => self.handle_session_close(&req.params).await,
432            Method::SessionPrompt => {
433                // Always spawn — turn completion is signaled by a
434                // TurnComplete notification, not by this method's response.
435                // The response (empty {} or error) is kept only so legacy
436                // request-form callers don't park forever.
437                let handle = self.spawn_handle();
438                let id_clone = id;
439                let params_clone = req.params.clone();
440                let is_notif = is_notification;
441                zeroclaw_spawn::spawn!(async move {
442                    let result = handle.handle_session_prompt(&params_clone).await;
443                    if !is_notif {
444                        match result {
445                            Ok(_) => handle.send_result(id_clone, serde_json::json!({})).await,
446                            Err(e) => handle.send_error(id_clone, e.code, &e.message).await,
447                        }
448                    }
449                });
450                return;
451            }
452            Method::SessionConfigure => self.handle_session_configure(&req.params).await,
453            Method::SessionCancel => self.handle_session_cancel(&req.params).await,
454            Method::SessionGitBranch => self.handle_session_git_branch(&req.params).await,
455            Method::SessionList => self.handle_session_list(&req.params).await,
456            Method::SessionListAcp => self.handle_session_list_acp(&req.params).await,
457            Method::SessionMessages => self.handle_session_messages(&req.params).await,
458            Method::SessionState => self.handle_session_state(&req.params).await,
459            Method::SessionDelete => self.handle_session_delete(&req.params).await,
460            Method::SessionApprove => self.handle_session_approve(&req.params),
461            Method::SessionKill => self.handle_session_kill(&req.params).await,
462
463            // Memory
464            Method::MemoryList => self.handle_memory_list(&req.params).await,
465            Method::MemorySearch => self.handle_memory_search(&req.params).await,
466            Method::MemoryGet => self.handle_memory_get(&req.params).await,
467            Method::MemoryStore => self.handle_memory_store(&req.params).await,
468            Method::MemoryDelete => self.handle_memory_delete(&req.params).await,
469
470            // Cron
471            Method::CronList => self.handle_cron_list().await,
472            Method::CronGet => self.handle_cron_get(&req.params).await,
473            Method::CronAdd => self.handle_cron_add(&req.params).await,
474            Method::CronPatch => self.handle_cron_patch(&req.params).await,
475            Method::CronDelete => self.handle_cron_delete(&req.params).await,
476            Method::CronRuns => self.handle_cron_runs(&req.params).await,
477            Method::CronTrigger => self.handle_cron_trigger(&req.params).await,
478            Method::CronSettings => self.handle_cron_settings(&req.params).await,
479
480            // Config
481            Method::ConfigGet => self.handle_config_get(&req.params),
482            Method::ConfigSet => self.handle_config_set(&req.params).await,
483            Method::ConfigValidate => self.handle_config_validate(),
484            Method::ConfigReload => self.handle_config_reload(),
485            Method::ConfigList => self.handle_config_list(&req.params),
486            Method::ConfigDelete => self.handle_config_delete(&req.params).await,
487            Method::ConfigMapKeys => self.handle_config_map_keys(&req.params),
488            Method::ConfigMapKeyCreate => self.handle_config_map_key_create(&req.params).await,
489            Method::ConfigMapKeyDelete => self.handle_config_map_key_delete(&req.params).await,
490            Method::ConfigMapKeyRename => self.handle_config_map_key_rename(&req.params).await,
491            Method::ConfigTemplates => self.handle_config_templates(),
492
493            // Agents
494            Method::AgentsList => self.handle_agents_list(),
495            Method::AgentsStatus => self.handle_agents_status().await,
496
497            // Cost
498            Method::CostQuery => self.handle_cost_query(&req.params),
499
500            // Skills
501            Method::SkillsBundles => self.handle_skills_bundles(),
502            Method::SkillsList => self.handle_skills_list(&req.params),
503            Method::SkillsRead => self.handle_skills_read(&req.params),
504            Method::SkillsWrite => self.handle_skills_write(&req.params),
505            Method::SkillsDelete => self.handle_skills_delete(&req.params),
506
507            // Personality
508            Method::PersonalityList => self.handle_personality_list(&req.params),
509            Method::PersonalityGet => self.handle_personality_get(&req.params),
510            Method::PersonalityPut => self.handle_personality_put(&req.params),
511            Method::PersonalityTemplates => self.handle_personality_templates(&req.params),
512
513            // Config introspection
514            Method::ConfigSections => self.handle_config_sections(),
515            Method::ConfigStatus => self.handle_config_status(),
516            Method::ConfigCatalog => self.handle_config_catalog(),
517            Method::ConfigCatalogModels => self.handle_config_catalog_models(&req.params).await,
518
519            // Logs
520            Method::LogsSubscribe => self.handle_logs_subscribe().await,
521            Method::LogsQuery => self.handle_logs_query(&req.params).await,
522            Method::LogsGet => self.handle_logs_get(&req.params).await,
523
524            // TUI
525            Method::TuiList => self.handle_tui_list(),
526
527            // Files
528            Method::FileAttach => self.handle_file_attach(&req.params).await,
529            Method::FsListDir => super::fs::handle_fs_list_dir(&req.params).await,
530
531            // Locales
532            Method::LocalesList => super::locales::handle_locales_list(self.tui_id()),
533            Method::LocalesFetch => {
534                super::locales::handle_locales_fetch(&req.params, self.tui_id()).await
535            }
536
537            // Quickstart
538            Method::QuickstartState => self.handle_quickstart_state(),
539            Method::QuickstartFields => self.handle_quickstart_fields(&req.params),
540            Method::QuickstartValidate => self.handle_quickstart_validate(&req.params),
541            Method::QuickstartApply => self.handle_quickstart_apply(&req.params).await,
542            Method::QuickstartDismiss => self.handle_quickstart_dismiss(&req.params),
543        };
544
545        if is_notification {
546            return;
547        }
548
549        match result {
550            Ok(v) => self.send_result(id, v).await,
551            Err(e) => self.send_error(id, e.code, &e.message).await,
552        }
553    }
554
555    // ── Core handlers ────────────────────────────────────────────
556
557    async fn handle_initialize(&mut self, params: &Value) -> RpcResult {
558        let req: InitializeParams = parse_params(params)?;
559
560        if req.protocol_version != RPC_PROTOCOL_VERSION {
561            return Err(rpc_err(
562                VERSION_MISMATCH,
563                format!(
564                    "Protocol version mismatch: server={RPC_PROTOCOL_VERSION}, client={}",
565                    req.protocol_version,
566                ),
567            ));
568        }
569
570        // TUI identity: reconnect with previous credentials or generate new
571        let tui_id = if let (Some(claimed_id), Some(sig)) =
572            (req.tui_id.as_deref(), req.tui_sig.as_deref())
573        {
574            // Client presents ID + signature — verify
575            if !self.ctx.tui_registry.verify(claimed_id, sig) {
576                return Err(rpc_err(AUTH_REQUIRED, "Invalid TUI signature"));
577            }
578            // Remove stale entry from previous connection before re-registering
579            self.ctx.tui_registry.unregister(claimed_id);
580            claimed_id.to_string()
581        } else if let Some(claimed_id) = req.tui_id.as_deref() {
582            // Client claims ID but no signature — accept only if signing disabled
583            if self.ctx.tui_registry.signing_is_enabled() {
584                return Err(rpc_err(AUTH_REQUIRED, "TUI signature required"));
585            }
586            self.ctx.tui_registry.unregister(claimed_id);
587            claimed_id.to_string()
588        } else {
589            // Fresh connection — generate new ID
590            self.ctx.tui_registry.generate_unique_tui_id()
591        };
592
593        let tui_sig = self.ctx.tui_registry.sign(&tui_id);
594        self.ctx
595            .tui_registry
596            .register(super::tui_identity::TuiEntry {
597                tui_id: tui_id.clone(),
598                connected_at: chrono::Utc::now(),
599                transport: self
600                    .peer_label
601                    .split_once(':')
602                    .map_or("unknown", |(proto, _)| proto)
603                    .to_string(),
604                peer_label: self.peer_label.clone(),
605                env: req.env,
606            });
607        self.tui_id = Some(tui_id.clone());
608
609        self.authenticated = true;
610
611        let capabilities: Vec<String> = Method::ALL
612            .iter()
613            .map(|(_, name)| (*name).to_string())
614            .collect();
615
616        to_result(InitializeResult {
617            protocol_version: RPC_PROTOCOL_VERSION,
618            server_version: env!("CARGO_PKG_VERSION").to_string(),
619            tui_id: Some(tui_id),
620            tui_sig,
621            capabilities,
622        })
623    }
624
625    async fn handle_status(&self) -> RpcResult {
626        let ids = self.ctx.sessions.list_ids().await;
627        // Count persisted sessions (channel-originated) that aren't already
628        // in the in-memory RPC store.
629        let persisted_count = self
630            .ctx
631            .session_backend
632            .as_ref()
633            .map(|b| b.list_sessions_with_metadata().len())
634            .unwrap_or(0);
635        let total = ids.len().max(persisted_count);
636        to_result(StatusResult {
637            server_version: env!("CARGO_PKG_VERSION").to_string(),
638            protocol_version: RPC_PROTOCOL_VERSION,
639            active_sessions: total,
640            session_ids: ids,
641        })
642    }
643
644    fn handle_health(&self) -> RpcResult {
645        let mut val = crate::health::snapshot_json();
646        if let Some(obj) = val.as_object_mut() {
647            let stats = crate::process_stats::sample();
648            obj.insert(
649                "process".to_string(),
650                serde_json::to_value(&stats).unwrap_or_default(),
651            );
652        }
653        Ok(val)
654    }
655
656    // ── TUI handlers ─────────────────────────────────────────────
657
658    fn handle_tui_list(&self) -> RpcResult {
659        let entries = self.ctx.tui_registry.list();
660        to_result(TuiListResult {
661            tuis: entries
662                .into_iter()
663                .map(|e| TuiListEntry {
664                    tui_id: e.tui_id,
665                    connected_at: e.connected_at.to_rfc3339(),
666                    connected_at_unix: e.connected_at.timestamp(),
667                    peer_label: e.peer_label,
668                    transport: e.transport,
669                })
670                .collect(),
671        })
672    }
673
674    // ── Session handlers ─────────────────────────────────────────
675
676    /// Test-only: call `handle_session_new` directly, bypassing the
677    /// authentication gate in the `run` loop.  This lets integration tests
678    /// drive the full agent-creation path without spinning up a transport.
679    #[cfg(test)]
680    pub async fn handle_session_new_for_test(&self, params: &Value) -> RpcResult {
681        self.handle_session_new(params).await
682    }
683
684    async fn handle_session_new(&self, params: &Value) -> RpcResult {
685        let req: SessionNewParams = parse_params(params)?;
686        let resuming = req.session_id.is_some();
687        let session_id = req
688            .session_id
689            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
690
691        let config = self.ctx.config.read().clone();
692        let chat_mode = req
693            .chat_mode
694            .clone()
695            .unwrap_or(crate::rpc::types::ChatMode::Chat);
696
697        // Resuming an ACP session with no caller cwd: recover the original
698        // working directory from the persisted store so the rehydrated session
699        // keeps its own cwd instead of falling back to the agent workspace dir.
700        // The loaded data is reused below so history is not fetched twice.
701        let mut preloaded_acp: Option<zeroclaw_infra::acp_session_store::AcpSessionData> = None;
702        if resuming
703            && req.cwd.is_none()
704            && matches!(chat_mode, crate::rpc::types::ChatMode::Acp)
705            && let Some(ref store) = self.ctx.acp_session_store
706        {
707            let store_cloned = store.clone();
708            let sid = session_id.clone();
709            if let Ok(Ok(Some(data))) =
710                tokio::task::spawn_blocking(move || store_cloned.load_session(&sid)).await
711            {
712                preloaded_acp = Some(data);
713            }
714        }
715
716        // The session cwd: caller-supplied wins, then a resumed ACP session's
717        // persisted cwd, then the agent's workspace dir.
718        let cwd = req
719            .cwd
720            .clone()
721            .or_else(|| preloaded_acp.as_ref().map(|d| d.workspace_dir.clone()))
722            .unwrap_or_else(|| {
723                config
724                    .agent_workspace_dir(&req.agent_alias)
725                    .to_string_lossy()
726                    .to_string()
727            });
728
729        let cwd_path = Some(std::path::Path::new(&cwd));
730        let tui_env = req
731            .tui_id
732            .as_deref()
733            .and_then(|id| self.ctx.tui_registry.get_env(id));
734        let chat_mode = req
735            .chat_mode
736            .clone()
737            .unwrap_or(crate::rpc::types::ChatMode::Chat);
738        // ACP (Code) sessions never get the long-term-memory tools or backend.
739        // The exclusion is derived from `chat_mode` on the server rather than
740        // trusted from the wire `exclude_memory` flag, so a client that omits
741        // or falsifies the flag cannot smuggle memory access into a Code
742        // session. A non-ACP caller may still opt in explicitly.
743        let exclude_memory = matches!(chat_mode, crate::rpc::types::ChatMode::Acp)
744            || req.exclude_memory == Some(true);
745        let agent = crate::agent::agent::Agent::from_config_with_tui_env(
746            &config,
747            &req.agent_alias,
748            cwd_path,
749            false,
750            exclude_memory,
751            tui_env,
752        )
753        .await
754        .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Failed to create agent: {e}")))?;
755
756        let approval_ch = Arc::new(crate::rpc::approval_channel::RpcApprovalChannel::new(
757            "rpc",
758            session_id.clone(),
759            Arc::clone(&self.rpc),
760            Arc::clone(&self.ctx.approval_pending),
761        ));
762        agent.channel_handles().register_channel("rpc", approval_ch);
763
764        self.ctx
765            .sessions
766            .insert(
767                session_id.clone(),
768                super::session::RpcSession::new(agent, &req.agent_alias, &cwd, chat_mode.clone())
769                    .with_owner(self.tui_id.clone()),
770            )
771            .await
772            .map_err(|_| rpc_err(SESSION_LIMIT_REACHED, "Session limit reached"))?;
773
774        if let Some(ref tui_id) = self.tui_id {
775            let evicted = self
776                .ctx
777                .sessions
778                .evict_same_mode_sibling(tui_id, &chat_mode, &session_id)
779                .await;
780            if !evicted.is_empty() {
781                let span = ::zeroclaw_log::info_span!(
782                    target: "zeroclaw_log_internal_scope",
783                    "zeroclaw_scope",
784                    session_key = %session_id,
785                    agent_alias = %req.agent_alias,
786                    channel = "rpc",
787                );
788                let _guard = span.enter();
789                ::zeroclaw_log::record!(
790                    DEBUG,
791                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
792                        .with_category(::zeroclaw_log::EventCategory::Agent)
793                        .with_outcome(::zeroclaw_log::EventOutcome::Success)
794                        .with_attrs(::serde_json::json!({
795                            "tui_id": tui_id,
796                            "evicted": evicted.iter().map(|(id, _)| id).collect::<Vec<_>>(),
797                        })),
798                    "Evicted abandoned same-mode session(s) on session/new"
799                );
800                // Every evicted session was idle (no in-flight turn), so its
801                // removal above dropped the last Agent strong ref and freed the
802                // history. Trimming now actually returns those pages.
803                crate::util::release_freed_heap();
804                ::zeroclaw_log::record!(
805                    DEBUG,
806                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
807                        .with_category(::zeroclaw_log::EventCategory::Agent)
808                        .with_outcome(::zeroclaw_log::EventOutcome::Success)
809                        .with_attrs(::serde_json::json!({
810                            "evicted_count": evicted.len(),
811                        })),
812                    "Trimmed glibc arenas after same-mode session eviction"
813                );
814            }
815        }
816
817        enum AcpSessionNewLoad {
818            Restored(zeroclaw_infra::acp_session_store::AcpSessionData),
819            Created,
820            Killed,
821        }
822
823        let mut message_count = 0;
824        match chat_mode {
825            crate::rpc::types::ChatMode::Acp => {
826                // Reuse the data already loaded for cwd recovery on resume so the
827                // store isn't hit twice; otherwise fall through to the restore-
828                // aware load-or-create path below.
829                let loaded = if let Some(data) = preloaded_acp.take() {
830                    Ok(Ok(AcpSessionNewLoad::Restored(data)))
831                } else {
832                    let Some(ref store) = self.ctx.acp_session_store else {
833                        self.ctx.sessions.remove(&session_id).await;
834                        return Err(rpc_err(
835                            INTERNAL_ERROR,
836                            "ACP session store is not available",
837                        ));
838                    };
839
840                    let store_cloned = store.clone();
841                    let sid = session_id.clone();
842                    let alias = req.agent_alias.clone();
843                    let cwd_owned = cwd.clone();
844                    tokio::task::spawn_blocking(move || -> anyhow::Result<AcpSessionNewLoad> {
845                        match store_cloned.load_session_for_restore(&sid)? {
846                            zeroclaw_infra::acp_session_store::AcpSessionRestore::Restorable(
847                                data,
848                            ) => Ok(AcpSessionNewLoad::Restored(data)),
849                            zeroclaw_infra::acp_session_store::AcpSessionRestore::Missing => {
850                                store_cloned.create_session(&sid, &alias, &cwd_owned)?;
851                                Ok(AcpSessionNewLoad::Created)
852                            }
853                            zeroclaw_infra::acp_session_store::AcpSessionRestore::Killed => {
854                                Ok(AcpSessionNewLoad::Killed)
855                            }
856                        }
857                    })
858                    .await
859                };
860                match loaded {
861                    Ok(Ok(AcpSessionNewLoad::Restored(data))) => {
862                        message_count = data.messages.len();
863                        self.ctx
864                            .sessions
865                            .seed_conversation_history(&session_id, data.messages)
866                            .await;
867                    }
868                    Ok(Ok(AcpSessionNewLoad::Created)) => {}
869                    Ok(Ok(AcpSessionNewLoad::Killed)) => {
870                        self.ctx.sessions.remove(&session_id).await;
871                        return Err(rpc_err(SESSION_NOT_FOUND, "Session not found"));
872                    }
873                    Ok(Err(e)) => {
874                        self.ctx.sessions.remove(&session_id).await;
875                        ::zeroclaw_log::record!(
876                            WARN,
877                            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
878                                .with_outcome(::zeroclaw_log::EventOutcome::Failure)
879                                .with_attrs(::serde_json::json!({"session_id": session_id, "error": e.to_string()})),
880                            "Failed to load or create ACP session"
881                        );
882                        return Err(rpc_err(
883                            INTERNAL_ERROR,
884                            format!("Failed to load or create ACP session: {e}"),
885                        ));
886                    }
887                    Err(join) => {
888                        self.ctx.sessions.remove(&session_id).await;
889                        ::zeroclaw_log::record!(
890                            WARN,
891                            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
892                                .with_outcome(::zeroclaw_log::EventOutcome::Failure)
893                                .with_attrs(::serde_json::json!({"session_id": session_id, "error": join.to_string()})),
894                            "ACP session load task failed"
895                        );
896                        return Err(rpc_err(
897                            INTERNAL_ERROR,
898                            format!("ACP session load task failed: {join}"),
899                        ));
900                    }
901                }
902            }
903            crate::rpc::types::ChatMode::Chat => {
904                if let Some(ref backend) = self.ctx.session_backend {
905                    let session_key = format!("rpc_{session_id}");
906                    let _ = backend.set_session_agent_alias(&session_key, &req.agent_alias);
907                    let stored = backend.load(&session_key);
908                    if !stored.is_empty() {
909                        self.ctx.sessions.seed_history(&session_id, &stored).await;
910                        message_count = stored.len();
911                    }
912                }
913            }
914        }
915
916        to_result(SessionNewResult {
917            session_id,
918            agent_alias: req.agent_alias,
919            message_count,
920            workspace_dir: cwd,
921        })
922    }
923
924    async fn handle_session_close(&self, params: &Value) -> RpcResult {
925        let req: SessionIdParams = parse_params(params)?;
926        if let Some(agent) = self.ctx.sessions.get_agent(&req.session_id).await {
927            agent
928                .lock()
929                .await
930                .channel_handles()
931                .unregister_channel("rpc");
932            let strong = std::sync::Arc::strong_count(&agent);
933            let agent_alias = self
934                .ctx
935                .sessions
936                .get_agent_alias(&req.session_id)
937                .await
938                .unwrap_or_default();
939            let span = ::zeroclaw_log::info_span!(
940                target: "zeroclaw_log_internal_scope",
941                "zeroclaw_scope",
942                session_key = %req.session_id,
943                agent_alias = %agent_alias,
944                channel = "rpc",
945            );
946            let _guard = span.enter();
947            ::zeroclaw_log::record!(
948                INFO,
949                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
950                    .with_category(::zeroclaw_log::EventCategory::Agent)
951                    .with_attrs(::serde_json::json!({
952                        "agent_arc_strong_count_before_remove": strong,
953                    })),
954                "session close: dropping local Agent handle before remove"
955            );
956            // Drop our clone explicitly so the session map holds the last
957            // strong ref; `remove` then frees the Agent at removal time
958            // rather than at end-of-scope, letting the allocator reclaim
959            // promptly.
960            drop(agent);
961        }
962        if !self.ctx.sessions.remove(&req.session_id).await {
963            return Err(rpc_err(SESSION_NOT_FOUND, "Session not found"));
964        }
965        crate::util::release_freed_heap();
966        {
967            let span = ::zeroclaw_log::info_span!(
968                target: "zeroclaw_log_internal_scope",
969                "zeroclaw_scope",
970                session_key = %req.session_id,
971                channel = "rpc",
972            );
973            let _guard = span.enter();
974            ::zeroclaw_log::record!(
975                DEBUG,
976                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
977                    .with_category(::zeroclaw_log::EventCategory::Agent)
978                    .with_outcome(::zeroclaw_log::EventOutcome::Success),
979                "Trimmed glibc arenas after session close"
980            );
981        }
982        to_result(SessionCloseResult {
983            session_id: req.session_id,
984            closed: true,
985        })
986    }
987
988    async fn handle_session_kill(&self, params: &Value) -> RpcResult {
989        let req: SessionKillParams = parse_params(params)?;
990        let sid = &req.session_id;
991
992        let chat_mode = self
993            .ctx
994            .sessions
995            .chat_mode(sid)
996            .await
997            .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "Session not found"))?;
998
999        let agent_alias = self
1000            .ctx
1001            .sessions
1002            .get_agent_alias(sid)
1003            .await
1004            .unwrap_or_default();
1005        let span = ::zeroclaw_log::info_span!(
1006            target: "zeroclaw_log_internal_scope",
1007            "zeroclaw_scope",
1008            session_key = %sid,
1009            agent_alias = %agent_alias,
1010            channel = "rpc",
1011        );
1012        let _guard = span.enter();
1013
1014        if matches!(chat_mode, ChatMode::Acp) {
1015            let store = self
1016                .ctx
1017                .acp_session_store
1018                .clone()
1019                .ok_or_else(|| rpc_err(INTERNAL_ERROR, "ACP session store is not available"))?;
1020            let sid_owned = sid.to_string();
1021            let marked =
1022                tokio::task::spawn_blocking(move || store.mark_session_killed(&sid_owned)).await;
1023            match marked {
1024                Ok(Ok(true)) => {}
1025                Ok(Ok(false)) => {
1026                    ::zeroclaw_log::record!(
1027                        WARN,
1028                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1029                            .with_category(::zeroclaw_log::EventCategory::Agent)
1030                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1031                        "session/kill: live ACP session had no durable row to tombstone"
1032                    );
1033                }
1034                Ok(Err(e)) => {
1035                    return Err(rpc_err(
1036                        INTERNAL_ERROR,
1037                        format!("Failed to mark ACP session killed: {e}"),
1038                    ));
1039                }
1040                Err(e) => {
1041                    return Err(rpc_err(
1042                        INTERNAL_ERROR,
1043                        format!("Failed to mark ACP session killed: {e}"),
1044                    ));
1045                }
1046            }
1047        }
1048
1049        let killed = self.ctx.sessions.kill_session(sid).await;
1050        if killed {
1051            crate::util::release_freed_heap();
1052            ::zeroclaw_log::record!(
1053                INFO,
1054                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1055                    .with_category(::zeroclaw_log::EventCategory::Agent)
1056                    .with_outcome(::zeroclaw_log::EventOutcome::Success),
1057                "session/kill: session terminated by admin"
1058            );
1059        } else {
1060            ::zeroclaw_log::record!(
1061                DEBUG,
1062                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1063                    .with_category(::zeroclaw_log::EventCategory::Agent)
1064                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1065                "session/kill: session vanished between existence check and kill (concurrent close?)"
1066            );
1067        }
1068
1069        to_result(SessionKillResult {
1070            session_id: req.session_id,
1071            killed,
1072        })
1073    }
1074
1075    /// Rebuild a reaped ACP session from a restorable durable row so a fresh
1076    /// prompt recovers to a working session instead of hanging. Returns the
1077    /// live agent on success; returns `None` for missing, killed, or unreadable
1078    /// durable state.
1079    async fn rehydrate_reaped_session(
1080        &self,
1081        sid: &str,
1082    ) -> Option<Arc<tokio::sync::Mutex<crate::agent::agent::Agent>>> {
1083        let store = self.ctx.acp_session_store.clone()?;
1084        let sid_owned = sid.to_string();
1085        let loaded =
1086            tokio::task::spawn_blocking(move || store.load_session_for_restore(&sid_owned)).await;
1087        let data = match loaded {
1088            Ok(Ok(zeroclaw_infra::acp_session_store::AcpSessionRestore::Restorable(data))) => data,
1089            Ok(Ok(zeroclaw_infra::acp_session_store::AcpSessionRestore::Killed)) => {
1090                ::zeroclaw_log::record!(
1091                    INFO,
1092                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1093                        .with_category(::zeroclaw_log::EventCategory::Agent)
1094                        .with_outcome(::zeroclaw_log::EventOutcome::Success),
1095                    "session/prompt: refusing to rehydrate admin-killed ACP session"
1096                );
1097                return None;
1098            }
1099            Ok(Err(e)) => {
1100                ::zeroclaw_log::record!(
1101                    WARN,
1102                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1103                        .with_category(::zeroclaw_log::EventCategory::Agent)
1104                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1105                        .with_attrs(::serde_json::json!({
1106                            "session_id": sid,
1107                            "error": e.to_string(),
1108                        })),
1109                    "session/prompt: failed to query ACP killed marker before rehydrate"
1110                );
1111                return None;
1112            }
1113            Ok(Ok(zeroclaw_infra::acp_session_store::AcpSessionRestore::Missing)) => return None,
1114            Err(e) => {
1115                ::zeroclaw_log::record!(
1116                    WARN,
1117                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1118                        .with_category(::zeroclaw_log::EventCategory::Agent)
1119                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1120                        .with_attrs(::serde_json::json!({
1121                            "session_id": sid,
1122                            "error": e.to_string(),
1123                        })),
1124                    "session/prompt: ACP killed-marker query task failed before rehydrate"
1125                );
1126                return None;
1127            }
1128        };
1129
1130        let config = self.ctx.config.read().clone();
1131        let cwd_path = Some(std::path::Path::new(&data.workspace_dir));
1132        let tui_env = self
1133            .tui_id
1134            .as_deref()
1135            .and_then(|id| self.ctx.tui_registry.get_env(id));
1136        // Reaped ACP sessions always rehydrate as `ChatMode::Acp` (see the
1137        // insert below), so the recovered agent must enforce the same
1138        // server-side memory-tool exclusion as a fresh `session/new` ACP
1139        // session — otherwise session recovery silently restores the
1140        // long-term-memory backend and tools the ACP invariant forbids.
1141        let exclude_memory = true;
1142        let agent = crate::agent::agent::Agent::from_config_with_tui_env(
1143            &config,
1144            &data.agent_alias,
1145            cwd_path,
1146            false,
1147            exclude_memory,
1148            tui_env,
1149        )
1150        .await
1151        .ok()?;
1152
1153        let approval_ch = Arc::new(crate::rpc::approval_channel::RpcApprovalChannel::new(
1154            "rpc",
1155            sid.to_string(),
1156            Arc::clone(&self.rpc),
1157            Arc::clone(&self.ctx.approval_pending),
1158        ));
1159        agent.channel_handles().register_channel("rpc", approval_ch);
1160
1161        let message_count = data.messages.len();
1162        self.ctx
1163            .sessions
1164            .insert(
1165                sid.to_string(),
1166                super::session::RpcSession::new(
1167                    agent,
1168                    &data.agent_alias,
1169                    &data.workspace_dir,
1170                    crate::rpc::types::ChatMode::Acp,
1171                )
1172                .with_owner(self.tui_id.clone()),
1173            )
1174            .await
1175            .ok()?;
1176        self.ctx
1177            .sessions
1178            .seed_conversation_history(sid, data.messages)
1179            .await;
1180        self.ctx.sessions.touch(sid).await;
1181
1182        ::zeroclaw_log::record!(
1183            INFO,
1184            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1185                .with_category(::zeroclaw_log::EventCategory::Agent)
1186                .with_outcome(::zeroclaw_log::EventOutcome::Success)
1187                .with_attrs(::serde_json::json!({
1188                    "session_id": sid,
1189                    "agent_alias": data.agent_alias,
1190                    "messages_restored": message_count,
1191                })),
1192            "rehydrated reaped session from durable store; turn continues on a working session"
1193        );
1194
1195        self.ctx.sessions.get_agent(sid).await
1196    }
1197
1198    async fn handle_session_prompt(&self, params: &Value) -> RpcResult {
1199        let req: SessionPromptParams = parse_params(params)?;
1200        let sid = &req.session_id;
1201
1202        // Reject blank turns at the RPC boundary. A turn must carry SOMETHING
1203        // — either prose or an attachment — for the agent to act on. Letting
1204        // an empty `{prompt: "", attachments: []}` through would push a user
1205        // message that contains only the runtime's timestamp prefix into the
1206        // model context; Claude in particular then narrates the trailing
1207        // `<<HUMAN_CONVERSATION_START>>` template sentinel instead of
1208        // responding, and that bleeds into the visible transcript. The
1209        // duplicate guard inside `Agent::turn_streamed` is the load-bearing
1210        // one (any code path that reaches the agent is covered); this one
1211        // gives RPC callers a clean error code instead of a generic agent
1212        // failure surfaced after queue acquisition.
1213        if req.prompt.trim().is_empty() && req.attachments.is_empty() {
1214            return Err(rpc_err(
1215                INVALID_PARAMS,
1216                "session/prompt requires a non-empty `prompt` or at least one attachment",
1217            ));
1218        }
1219
1220        let agent = match self.ctx.sessions.get_agent(sid).await {
1221            Some(a) => a,
1222            None => {
1223                // The in-memory session was reaped (orphan grace or idle TTL)
1224                // between the TUI's last touch and this prompt landing. Recover
1225                // to a WORKING session: rehydrate the agent + history from the
1226                // durable ACP store and continue the turn. The user's prompt
1227                // just lands — no dead end, no "start a new session". Only if
1228                // the durable row is genuinely gone do we fail, and then we
1229                // emit an attributed TurnComplete so the TUI leaves `working`.
1230                match self.rehydrate_reaped_session(sid).await {
1231                    Some(a) => a,
1232                    None => {
1233                        ::zeroclaw_log::record!(
1234                            WARN,
1235                            ::zeroclaw_log::Event::new(
1236                                module_path!(),
1237                                ::zeroclaw_log::Action::Fail,
1238                            )
1239                            .with_category(::zeroclaw_log::EventCategory::Agent)
1240                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1241                            .with_attrs(::serde_json::json!({ "session_id": sid })),
1242                            "session/prompt on a session absent from memory and the durable store; emitting TurnComplete so the client exits the working state"
1243                        );
1244                        self.emit_turn_complete(
1245                            sid,
1246                            crate::rpc::types::TurnCompletionOutcome::Failed,
1247                            "turn cancelled by daemon: session_not_found".to_string(),
1248                        )
1249                        .await;
1250                        return Err(rpc_err(SESSION_NOT_FOUND, "Session not found"));
1251                    }
1252                }
1253            }
1254        };
1255
1256        // Process inline attachments: upload each, append markers to prompt.
1257        let mut prompt = req.prompt.clone();
1258        if !req.attachments.is_empty() {
1259            use super::attachments::process_file_entry;
1260
1261            // Uploads go to the AGENT's workspace dir, not the session cwd.
1262            // The session cwd is often the user's project/git working tree
1263            // (e.g. when the TUI is launched from inside a repo), and we
1264            // don't want to splatter binary blobs into their source tree.
1265            // The per-agent workspace (`<config_dir>/agents/<alias>/workspace`)
1266            // is the canonical home for agent-owned files.
1267            let agent_alias = self
1268                .ctx
1269                .sessions
1270                .get_agent_alias(sid)
1271                .await
1272                .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "Session not found"))?;
1273            let upload_root = self
1274                .ctx
1275                .config
1276                .read()
1277                .agent_workspace_dir(&agent_alias)
1278                .to_string_lossy()
1279                .to_string();
1280            let is_wss = self.peer_label.starts_with("wss:");
1281            // Only insert a newline separator if there's existing text.
1282            // An attachment-only turn must not start with a leading "\n"
1283            // because that produces a user message whose only non-marker
1284            // content is whitespace — same failure mode the top-of-fn
1285            // guard prevents, just at one layer down.
1286            if !prompt.is_empty() {
1287                prompt.push('\n');
1288            }
1289            for (idx, entry) in req.attachments.iter().enumerate() {
1290                let result =
1291                    process_file_entry(entry, sid, &upload_root, is_wss, &self.ctx.sessions)
1292                        .await?;
1293                if idx > 0 {
1294                    prompt.push('\n');
1295                }
1296                prompt.push_str(&result.marker);
1297            }
1298        }
1299
1300        let _guard = self
1301            .ctx
1302            .sessions
1303            .session_queue
1304            .acquire(sid)
1305            .await
1306            .map_err(|e| rpc_err(SESSION_BUSY, format!("Session busy: {e}")))?;
1307
1308        let cancel = tokio_util::sync::CancellationToken::new();
1309        let cancel_generation = self.ctx.sessions.register_cancel_token(sid, cancel.clone());
1310        self.ctx.sessions.touch(sid).await;
1311        ::zeroclaw_log::record!(
1312            INFO,
1313            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Invoke)
1314                .with_category(::zeroclaw_log::EventCategory::Agent)
1315                .with_attrs(::serde_json::json!({ "session_id": sid })),
1316            "turn dispatch: registered cancel token, starting turn"
1317        );
1318
1319        let chat_mode = self
1320            .ctx
1321            .sessions
1322            .chat_mode(sid)
1323            .await
1324            .unwrap_or(crate::rpc::types::ChatMode::Chat);
1325        let pre_history_len = if matches!(chat_mode, crate::rpc::types::ChatMode::Acp) {
1326            self.ctx.sessions.history_len(sid).await.unwrap_or(0)
1327        } else {
1328            0
1329        };
1330
1331        // Capture attribution fields and max_context_tokens for the turn span.
1332        let (agent_alias, model_provider, model, max_ctx) = {
1333            let alias = self
1334                .ctx
1335                .sessions
1336                .get_agent_alias(sid)
1337                .await
1338                .unwrap_or_default();
1339            let cfg = self.ctx.config.read().clone();
1340            let mp = cfg
1341                .agent(&alias)
1342                .map(|a| a.model_provider.to_string())
1343                .unwrap_or_default();
1344            let m = cfg
1345                .model_provider_for_agent(&alias)
1346                .and_then(|p| p.model.clone())
1347                .unwrap_or_default();
1348            let max_ctx = Some(cfg.effective_max_context_tokens(&alias) as u64);
1349            (alias, mp, m, max_ctx)
1350        };
1351
1352        let rpc = self.rpc.clone();
1353        let sid_owned = sid.to_string();
1354        let acp_token_store = if matches!(chat_mode, crate::rpc::types::ChatMode::Acp) {
1355            self.ctx.acp_session_store.clone()
1356        } else {
1357            None
1358        };
1359        let attribution_agent_alias = agent_alias.clone();
1360        let attribution_model_provider = model_provider.clone();
1361        let attribution_model = model.clone();
1362        let outcome = execute_turn(
1363            agent,
1364            prompt.clone(),
1365            cancel,
1366            TurnAttribution {
1367                session_key: Some(sid.to_string()),
1368                agent_alias,
1369                model_provider,
1370                model,
1371                channel: "rpc",
1372            },
1373            move |event| {
1374                let rpc = rpc.clone();
1375                let sid = sid_owned.clone();
1376                let acp_token_store = acp_token_store.clone();
1377                async move {
1378                    if let (
1379                        Some(store),
1380                        TurnEvent::Usage {
1381                            input_tokens: Some(it),
1382                            ..
1383                        },
1384                    ) = (acp_token_store.as_ref(), &event)
1385                    {
1386                        let store = store.clone();
1387                        let sid = sid.clone();
1388                        let it = *it;
1389                        let _ =
1390                            tokio::task::spawn_blocking(move || store.set_token_count(&sid, it))
1391                                .await;
1392                    }
1393                    if let Some(n) = notification_for_turn_event(&sid, &event, max_ctx) {
1394                        let _ = rpc.send_raw(n).await;
1395                    }
1396                }
1397            },
1398        )
1399        .await;
1400
1401        // Drain the cancel cause BEFORE removing the token (removal clears the
1402        // cause map). Every cancel firing site records its cause before firing;
1403        // a cancel with no recorded cause is a bug, not user attribution.
1404        let cancel_cause = self.ctx.sessions.take_cancel_cause(sid);
1405        self.ctx
1406            .sessions
1407            .remove_cancel_token(sid, cancel_generation);
1408
1409        // ── Durable turn-verdict audit row ───────────────────────────────
1410        // Every turn termination writes one attributed row to the ACP session
1411        // store's event log so a cancel verdict is diagnosable after the trace
1412        // log rotates. Fire-and-forget on a blocking task.
1413        if matches!(chat_mode, crate::rpc::types::ChatMode::Acp)
1414            && let Some(store) = self.ctx.acp_session_store.clone()
1415        {
1416            let (action, event_outcome, payload) = match &outcome {
1417                Ok(crate::rpc::turn::TurnOutcome::Completed { .. }) => (
1418                    ::zeroclaw_log::Action::Complete,
1419                    ::zeroclaw_log::EventOutcome::Success,
1420                    None,
1421                ),
1422                Ok(crate::rpc::turn::TurnOutcome::Cancelled { .. }) => (
1423                    ::zeroclaw_log::Action::Cancel,
1424                    ::zeroclaw_log::EventOutcome::Unknown,
1425                    Some(
1426                        ::serde_json::json!({
1427                            "cancel_cause": cancel_cause.map(|c| c.as_str()),
1428                        })
1429                        .to_string(),
1430                    ),
1431                ),
1432                Err(e) => (
1433                    ::zeroclaw_log::Action::Fail,
1434                    ::zeroclaw_log::EventOutcome::Failure,
1435                    Some(::serde_json::json!({ "error": e.to_string() }).to_string()),
1436                ),
1437            };
1438            let sid_owned = sid.to_string();
1439            let span_session = sid.to_string();
1440            let span_alias = attribution_agent_alias.clone();
1441            let span_provider = attribution_model_provider.clone();
1442            let span_model = attribution_model.clone();
1443            zeroclaw_spawn::spawn!(async move {
1444                use ::zeroclaw_log::Instrument as _;
1445                let span = ::zeroclaw_log::info_span!(
1446                    target: "zeroclaw_log_internal_scope",
1447                    "zeroclaw_scope",
1448                    session_key = %span_session,
1449                    agent_alias = %span_alias,
1450                    model_provider = %span_provider,
1451                    model = %span_model,
1452                    channel = "rpc",
1453                );
1454                async move {
1455                    let persisted = tokio::task::spawn_blocking(move || {
1456                        store.append_event(&sid_owned, action, event_outcome, payload.as_deref())
1457                    })
1458                    .await;
1459                    let error = match persisted {
1460                        Ok(Ok(())) => return,
1461                        Ok(Err(e)) => e.to_string(),
1462                        Err(join) => join.to_string(),
1463                    };
1464                    ::zeroclaw_log::record!(
1465                        WARN,
1466                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Write)
1467                            .with_category(::zeroclaw_log::EventCategory::Agent)
1468                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1469                            .with_attrs(::serde_json::json!({ "error": error })),
1470                        "Failed to persist ACP turn-verdict audit event"
1471                    );
1472                }
1473                .instrument(span)
1474                .await;
1475            });
1476        }
1477
1478        match chat_mode {
1479            crate::rpc::types::ChatMode::Acp => {
1480                if let Some(ref store) = self.ctx.acp_session_store
1481                    && matches!(
1482                        outcome,
1483                        Ok(TurnOutcome::Completed { .. }) | Ok(TurnOutcome::Cancelled { .. })
1484                    )
1485                    && let Some(new_msgs) = self
1486                        .ctx
1487                        .sessions
1488                        .history_slice_from(sid, pre_history_len)
1489                        .await
1490                    && !new_msgs.is_empty()
1491                {
1492                    let store = store.clone();
1493                    let sid_owned = sid.to_string();
1494                    let persisted = tokio::task::spawn_blocking(move || {
1495                        store.append_turn(&sid_owned, &new_msgs)
1496                    })
1497                    .await;
1498                    let error = match persisted {
1499                        Ok(Ok(())) => None,
1500                        Ok(Err(e)) => Some(e.to_string()),
1501                        Err(join) => Some(join.to_string()),
1502                    };
1503                    if let Some(detail) = error {
1504                        ::zeroclaw_log::record!(
1505                            WARN,
1506                            ::zeroclaw_log::Event::new(
1507                                module_path!(),
1508                                ::zeroclaw_log::Action::Note
1509                            )
1510                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1511                            .with_attrs(::serde_json::json!({"session_id": sid, "error": detail})),
1512                            "Failed to persist ACP turn"
1513                        );
1514                    }
1515                }
1516            }
1517            crate::rpc::types::ChatMode::Chat => {
1518                if let Some(ref backend) = self.ctx.session_backend {
1519                    let key = format!("rpc_{sid}");
1520                    let _ = backend.append(&key, &ChatMessage::user(&prompt));
1521                    match &outcome {
1522                        Ok(TurnOutcome::Completed { text, .. }) => {
1523                            let _ = backend.append(&key, &ChatMessage::assistant(text));
1524                        }
1525                        Ok(TurnOutcome::Cancelled { partial_text, .. })
1526                            if !partial_text.is_empty() =>
1527                        {
1528                            let _ = backend.append(&key, &ChatMessage::assistant(partial_text));
1529                        }
1530                        _ => {}
1531                    }
1532                }
1533            }
1534        }
1535
1536        match outcome {
1537            Ok(TurnOutcome::Completed { text, .. }) => {
1538                self.emit_turn_complete(
1539                    &req.session_id,
1540                    crate::rpc::types::TurnCompletionOutcome::Completed,
1541                    text.clone(),
1542                )
1543                .await;
1544                to_result(SessionPromptResult {
1545                    session_id: req.session_id,
1546                    stop_reason: "end_turn".to_string(),
1547                    content: text,
1548                })
1549            }
1550            Ok(TurnOutcome::Cancelled { partial_text, .. }) => {
1551                let cancel_message = match cancel_cause {
1552                    Some(crate::rpc::session::CancelCause::ClientRpc) => {
1553                        format!("turn cancelled by user in RPC_SESSION {}", req.session_id)
1554                    }
1555                    Some(cause) => {
1556                        format!("turn cancelled by daemon: {}", cause.as_str())
1557                    }
1558                    None => "turn cancelled by daemon: unattributed".to_string(),
1559                };
1560                ::zeroclaw_log::record!(
1561                    INFO,
1562                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Cancel)
1563                        .with_category(::zeroclaw_log::EventCategory::Agent)
1564                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1565                        .with_attrs(::serde_json::json!({
1566                            "session_id": req.session_id,
1567                            "agent_alias": attribution_agent_alias,
1568                            "model_provider": attribution_model_provider,
1569                            "model": attribution_model,
1570                            "chat_mode": format!("{chat_mode:?}"),
1571                            "cancel_cause": cancel_cause.map(|c| c.as_str()),
1572                        })),
1573                    "turn cancelled; emitting attributed TurnComplete so the client exits the working state"
1574                );
1575                self.emit_turn_complete(
1576                    &req.session_id,
1577                    crate::rpc::types::TurnCompletionOutcome::Cancelled,
1578                    cancel_message,
1579                )
1580                .await;
1581                to_result(SessionPromptResult {
1582                    session_id: req.session_id,
1583                    stop_reason: "cancelled".to_string(),
1584                    content: partial_text,
1585                })
1586            }
1587            Err(e) => {
1588                ::zeroclaw_log::record!(
1589                    ERROR,
1590                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1591                        .with_category(::zeroclaw_log::EventCategory::Agent)
1592                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1593                        .with_attrs(::serde_json::json!({
1594                            "session_id": req.session_id,
1595                            "agent_alias": attribution_agent_alias,
1596                            "model_provider": attribution_model_provider,
1597                            "model": attribution_model,
1598                            "chat_mode": format!("{chat_mode:?}"),
1599                            "error": e.to_string(),
1600                        })),
1601                    "turn failed; emitting TurnComplete so the client exits the working state"
1602                );
1603                self.emit_turn_complete(
1604                    &req.session_id,
1605                    crate::rpc::types::TurnCompletionOutcome::Failed,
1606                    format!("turn failed: {e}"),
1607                )
1608                .await;
1609                Err(rpc_err(INTERNAL_ERROR, e.to_string()))
1610            }
1611        }
1612    }
1613
1614    /// Emit the terminal `session/update` notification for a turn.
1615    /// The TUI uses this — not the JSON-RPC response — to flip
1616    /// `turn_in_flight` back to false.
1617    async fn emit_turn_complete(
1618        &self,
1619        session_id: &str,
1620        outcome: crate::rpc::types::TurnCompletionOutcome,
1621        content: String,
1622    ) {
1623        let update = SessionUpdateEvent::TurnComplete {
1624            session_id: session_id.to_string(),
1625            outcome,
1626            content,
1627        };
1628        if let Ok(params) = serde_json::to_value(update) {
1629            let n = JsonRpcNotification::new(notification::SESSION_UPDATE, params);
1630            if let Ok(s) = serde_json::to_string(&n) {
1631                let _ = self.rpc.send_raw(s).await;
1632            }
1633        }
1634    }
1635
1636    async fn handle_session_configure(&self, params: &Value) -> RpcResult {
1637        let req: SessionConfigureParams = parse_params(params)?;
1638
1639        let merged = self
1640            .ctx
1641            .sessions
1642            .set_overrides(&req.session_id, req.overrides)
1643            .await
1644            .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "Session not found"))?;
1645
1646        // A model_provider override needs a live provider-box rebuild, which
1647        // requires Config — held here, not in the session store. Resolve the
1648        // model from the (already-merged) model override or the configured
1649        // entry, build the box, and swap it onto the session's agent.
1650        if let Some(ref model_provider_ref) = merged.model_provider {
1651            let built = {
1652                let config = self.ctx.config.read();
1653                crate::agent::agent::build_session_model_provider(
1654                    &config,
1655                    model_provider_ref,
1656                    merged.model.as_deref(),
1657                )
1658                .map_err(|e| rpc_err(INVALID_PARAMS, e.to_string()))?
1659            };
1660            let (model_provider, model_provider_name, model_name) = built;
1661            self.ctx
1662                .sessions
1663                .apply_model_provider(&req.session_id, model_provider, model_provider_name)
1664                .await
1665                .then_some(())
1666                .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "Session not found"))?;
1667            // Keep the agent's model name aligned with the model_provider it now holds.
1668            if let Some(agent) = self.ctx.sessions.get_agent(&req.session_id).await {
1669                agent.lock().await.set_model_name(model_name);
1670            }
1671        }
1672
1673        to_result(SessionConfigureResult {
1674            session_id: req.session_id,
1675            overrides: merged,
1676        })
1677    }
1678
1679    async fn handle_session_cancel(&self, params: &Value) -> RpcResult {
1680        let req: SessionIdParams = parse_params(params)?;
1681        let owner = self
1682            .ctx
1683            .sessions
1684            .session_owner_tui_id(&req.session_id)
1685            .await;
1686        let allowed = match (
1687            owner.as_ref().and_then(|o| o.as_deref()),
1688            self.tui_id.as_deref(),
1689        ) {
1690            (Some(o), Some(c)) => o == c,
1691            _ => false,
1692        };
1693        if !allowed {
1694            let (agent_alias, model_provider, model) =
1695                match self.ctx.sessions.get_agent(&req.session_id).await {
1696                    Some(agent) => agent.lock().await.attribution_fields(),
1697                    None => (String::new(), String::new(), String::new()),
1698                };
1699            let span = ::zeroclaw_log::info_span!(
1700                target: "zeroclaw_log_internal_scope",
1701                "zeroclaw_scope",
1702                session_key = %req.session_id,
1703                agent_alias = %agent_alias,
1704                model_provider = %model_provider,
1705                model = %model,
1706                channel = "rpc",
1707            );
1708            let _guard = span.enter();
1709            ::zeroclaw_log::record!(
1710                WARN,
1711                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1712                    .with_category(::zeroclaw_log::EventCategory::Channel)
1713                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1714                    .with_attrs(::serde_json::json!({
1715                        "caller_tui_id": self.tui_id.as_deref().unwrap_or("<none>"),
1716                        "owner_tui_id": owner
1717                            .as_ref()
1718                            .and_then(|o| o.as_deref())
1719                            .unwrap_or("<none>"),
1720                        "peer_label": &self.peer_label,
1721                    })),
1722                "session/cancel refused: caller does not own the session"
1723            );
1724            return Err(rpc_err(
1725                SESSION_NOT_OWNED,
1726                "Caller does not own this session",
1727            ));
1728        }
1729        if self.ctx.sessions.cancel_session(&req.session_id) {
1730            to_result(SessionCancelResult {
1731                session_id: req.session_id,
1732                cancelled: true,
1733            })
1734        } else {
1735            Err(rpc_err(
1736                SESSION_NOT_FOUND,
1737                "No active turn for this session",
1738            ))
1739        }
1740    }
1741
1742    async fn handle_session_git_branch(&self, params: &Value) -> RpcResult {
1743        let req: SessionIdParams = parse_params(params)?;
1744        let cwd = self
1745            .ctx
1746            .sessions
1747            .get_workspace_dir(&req.session_id)
1748            .await
1749            .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "session not found"))?;
1750        let info = crate::rpc::git::head_info(std::path::Path::new(&cwd)).unwrap_or_default();
1751        to_result(SessionGitBranchResult {
1752            session_id: req.session_id,
1753            branch: info.branch,
1754            hash: info.hash,
1755        })
1756    }
1757
1758    async fn handle_session_list(&self, params: &Value) -> RpcResult {
1759        let backend = self
1760            .ctx
1761            .session_backend
1762            .as_ref()
1763            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Session persistence is disabled"))?;
1764        let req: SessionListParams = parse_params(params)?;
1765        let config = self.ctx.config.read().clone();
1766
1767        // Use FTS when a query is provided, plain list otherwise.
1768        let all = if let Some(ref keyword) = req.query {
1769            if keyword.trim().is_empty() {
1770                backend.list_sessions_with_metadata()
1771            } else {
1772                use zeroclaw_infra::session_backend::SessionQuery;
1773                backend.search(&SessionQuery {
1774                    keyword: Some(keyword.clone()),
1775                    limit: req.limit,
1776                })
1777            }
1778        } else {
1779            backend.list_sessions_with_metadata()
1780        };
1781
1782        let sessions: Vec<SessionEntry> = all
1783            .into_iter()
1784            .filter(|meta| meta.agent_alias.is_some() || meta.channel_id.is_some())
1785            .map(|meta| {
1786                let agent_alias = meta.agent_alias.clone().or_else(|| {
1787                    meta.channel_id
1788                        .as_deref()
1789                        .and_then(|c| config.agent_for_channel(c))
1790                        .map(str::to_string)
1791                });
1792                let session_id = meta
1793                    .key
1794                    .strip_prefix("rpc_")
1795                    .or_else(|| meta.key.strip_prefix("gw_"))
1796                    .map(str::to_string)
1797                    .unwrap_or_else(|| meta.key.clone());
1798                SessionEntry {
1799                    session_id,
1800                    session_key: meta.key,
1801                    created_at: meta.created_at.to_rfc3339(),
1802                    last_activity: meta.last_activity.to_rfc3339(),
1803                    message_count: meta.message_count,
1804                    agent_alias,
1805                    channel_id: meta.channel_id,
1806                    name: meta.name,
1807                }
1808            })
1809            .collect();
1810        to_result(SessionListResult { sessions })
1811    }
1812
1813    /// List ACP sessions from the dedicated ACP session store. The Code (ACP)
1814    /// pane in the TUI calls this instead of `session/list` so its picker only
1815    /// shows sessions that came from `acp-sessions.db` — chat-pane sessions
1816    /// live in the unified `session_backend` and must not appear here.
1817    async fn handle_session_list_acp(&self, _params: &Value) -> RpcResult {
1818        let store = self
1819            .ctx
1820            .acp_session_store
1821            .as_ref()
1822            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "ACP session store is not available"))?;
1823
1824        let summaries = store
1825            .list_sessions()
1826            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("acp session list failed: {e}")))?;
1827
1828        let sessions: Vec<SessionEntry> = summaries
1829            .into_iter()
1830            .map(|s| SessionEntry {
1831                session_id: s.session_uuid.clone(),
1832                // ACP sessions are keyed by their UUID directly — no `rpc_`/`gw_`
1833                // prefix exists in this store, so session_id == session_key.
1834                session_key: s.session_uuid,
1835                created_at: s.created_at.to_rfc3339(),
1836                last_activity: s.last_activity.to_rfc3339(),
1837                message_count: s.message_count,
1838                agent_alias: Some(s.agent_alias),
1839                channel_id: None,
1840                // ACP sessions don't carry a user-set display name today; the
1841                // picker falls back to `session_id` when this is None.
1842                name: None,
1843            })
1844            .collect();
1845
1846        to_result(SessionListResult { sessions })
1847    }
1848
1849    async fn handle_session_messages(&self, params: &Value) -> RpcResult {
1850        let req: SessionMessagesParams = parse_params(params)?;
1851        let backend = self
1852            .ctx
1853            .session_backend
1854            .as_ref()
1855            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Session persistence is disabled"))?;
1856
1857        // Try the raw id first (channel sessions store as-is), then
1858        // prefixed variants for RPC/gateway-originated sessions.
1859        let candidates = [
1860            req.session_id.clone(),
1861            format!("rpc_{}", req.session_id),
1862            format!("gw_{}", req.session_id),
1863        ];
1864        let mut raw: Vec<zeroclaw_api::model_provider::ChatMessage> = Vec::new();
1865        for key in &candidates {
1866            let loaded = backend.load(key);
1867            if !loaded.is_empty() {
1868                raw = loaded;
1869                break;
1870            }
1871        }
1872
1873        // Page-window the load. `before_index` is a 0-based index pointing
1874        // at the first message NOT to return — the page contains the N
1875        // messages immediately preceding it. With `before_index = None`
1876        // (the default) the page contains the most recent `limit`
1877        // messages. `limit = None` returns everything for backward
1878        // compatibility with callers that pre-date this change.
1879        let total = raw.len();
1880        let limit = req.limit.unwrap_or(total);
1881        let end = req.before_index.map(|i| i.min(total)).unwrap_or(total);
1882        let start = end.saturating_sub(limit);
1883        let messages: Vec<MessageEntry> = raw[start..end]
1884            .iter()
1885            .map(|m| MessageEntry {
1886                role: m.role.clone(),
1887                content: m.content.clone(),
1888            })
1889            .collect();
1890
1891        to_result(SessionMessagesResult {
1892            session_id: req.session_id,
1893            messages,
1894            total,
1895            start,
1896        })
1897    }
1898
1899    async fn handle_session_state(&self, params: &Value) -> RpcResult {
1900        let req: SessionIdParams = parse_params(params)?;
1901        let backend = self
1902            .ctx
1903            .session_backend
1904            .as_ref()
1905            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Session persistence is disabled"))?;
1906        let candidates = [
1907            req.session_id.clone(),
1908            format!("rpc_{}", req.session_id),
1909            format!("gw_{}", req.session_id),
1910        ];
1911        for key in &candidates {
1912            match backend.get_session_state(key) {
1913                Ok(Some(ss)) => {
1914                    return to_result(SessionStateResult {
1915                        session_id: req.session_id,
1916                        state: ss.state,
1917                        turn_id: ss.turn_id,
1918                        turn_started_at: ss.turn_started_at.map(|t| t.to_rfc3339()),
1919                    });
1920                }
1921                Ok(None) => continue,
1922                Err(e) => {
1923                    return Err(rpc_err(
1924                        INTERNAL_ERROR,
1925                        format!("Failed to get session state: {e}"),
1926                    ));
1927                }
1928            }
1929        }
1930        Err(rpc_err(SESSION_NOT_FOUND, "Session not found"))
1931    }
1932
1933    async fn handle_session_delete(&self, params: &Value) -> RpcResult {
1934        let req: SessionIdParams = parse_params(params)?;
1935        if let Some(agent) = self.ctx.sessions.get_agent(&req.session_id).await {
1936            agent
1937                .lock()
1938                .await
1939                .channel_handles()
1940                .unregister_channel("rpc");
1941        }
1942        self.ctx.sessions.remove(&req.session_id).await;
1943        // Remove from persistent backend — try raw id, then prefixed variants.
1944        if let Some(ref backend) = self.ctx.session_backend {
1945            for key in &[
1946                req.session_id.clone(),
1947                format!("rpc_{}", req.session_id),
1948                format!("gw_{}", req.session_id),
1949            ] {
1950                let _ = backend.delete_session(key);
1951            }
1952        }
1953        to_result(SessionDeleteResult {
1954            session_id: req.session_id,
1955            deleted: true,
1956        })
1957    }
1958
1959    fn handle_session_approve(&self, params: &Value) -> RpcResult {
1960        let p: SessionApproveParams = parse_params(params)?;
1961
1962        let response = match p.decision.as_str() {
1963            "allow_once" => zeroclaw_api::channel::ChannelApprovalResponse::Approve,
1964            "allow_always" => zeroclaw_api::channel::ChannelApprovalResponse::AlwaysApprove,
1965            "reject" | "reject_once" => zeroclaw_api::channel::ChannelApprovalResponse::Deny,
1966            "reject_with_edit" => {
1967                let replacement = p.replacement.unwrap_or_default();
1968                zeroclaw_api::channel::ChannelApprovalResponse::DenyWithEdit { replacement }
1969            }
1970            other => {
1971                return Err(rpc_err(
1972                    INVALID_PARAMS,
1973                    format!("unknown decision: {other}"),
1974                ));
1975            }
1976        };
1977
1978        self.ctx.approval_pending.resolve(&p.request_id, response);
1979
1980        to_result(SessionApproveResult {
1981            session_id: p.session_id,
1982            request_id: p.request_id,
1983            acknowledged: true,
1984        })
1985    }
1986
1987    // ── Memory handlers ──────────────────────────────────────────
1988
1989    async fn handle_memory_list(&self, params: &Value) -> RpcResult {
1990        let mem = self
1991            .ctx
1992            .memory
1993            .as_ref()
1994            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Memory subsystem is not available"))?;
1995        let req: MemoryListParams = parse_params(params)?;
1996        let category = req
1997            .category
1998            .as_deref()
1999            .map(|s| MemoryCategory::Custom(s.to_string()));
2000        let entries = mem
2001            .list(category.as_ref(), req.session_id.as_deref())
2002            .await
2003            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Memory list failed: {e}")))?;
2004        let count = entries.len();
2005        let entries = truncate_memory_previews(entries);
2006        to_result(MemoryListResult { entries, count })
2007    }
2008
2009    async fn handle_memory_search(&self, params: &Value) -> RpcResult {
2010        let mem = self
2011            .ctx
2012            .memory
2013            .as_ref()
2014            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Memory subsystem is not available"))?;
2015        let req: MemorySearchParams = parse_params(params)?;
2016        let entries = mem
2017            .recall(
2018                &req.query,
2019                req.limit,
2020                req.session_id.as_deref(),
2021                req.since.as_deref(),
2022                req.until.as_deref(),
2023            )
2024            .await
2025            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Memory search failed: {e}")))?;
2026        let count = entries.len();
2027        let entries = truncate_memory_previews(entries);
2028        to_result(MemorySearchResult { entries, count })
2029    }
2030
2031    /// `memory/get { key } → MemoryEntry`. Returns the full memory
2032    /// entry for one key so the Memory pane can keep only preview
2033    /// rows in memory and fetch the full `content` only when the
2034    /// detail pane opens. Dropped on detail close.
2035    async fn handle_memory_get(&self, params: &Value) -> RpcResult {
2036        let mem = self
2037            .ctx
2038            .memory
2039            .as_ref()
2040            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Memory subsystem is not available"))?;
2041        let req: MemoryGetParams = parse_params(params)?;
2042        let entry = mem
2043            .get(&req.key)
2044            .await
2045            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Memory get failed: {e}")))?;
2046        match entry {
2047            Some(e) => to_result(MemoryGetResult { entry: Some(e) }),
2048            None => Err(rpc_err(
2049                INTERNAL_ERROR,
2050                format!("Memory key `{}` not found", req.key),
2051            )),
2052        }
2053    }
2054
2055    async fn handle_memory_store(&self, params: &Value) -> RpcResult {
2056        let mem = self
2057            .ctx
2058            .memory
2059            .as_ref()
2060            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Memory subsystem is not available"))?;
2061        let req: MemoryStoreParams = parse_params(params)?;
2062        let category = req
2063            .category
2064            .as_deref()
2065            .map(|s| MemoryCategory::Custom(s.to_string()))
2066            .unwrap_or(MemoryCategory::Custom("user".into()));
2067        mem.store(&req.key, &req.content, category, req.session_id.as_deref())
2068            .await
2069            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Memory store failed: {e}")))?;
2070        to_result(MemoryStoreResult {
2071            key: req.key,
2072            stored: true,
2073        })
2074    }
2075
2076    async fn handle_memory_delete(&self, params: &Value) -> RpcResult {
2077        let mem = self
2078            .ctx
2079            .memory
2080            .as_ref()
2081            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Memory subsystem is not available"))?;
2082        let req: MemoryDeleteParams = parse_params(params)?;
2083        mem.forget(&req.key)
2084            .await
2085            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Memory delete failed: {e}")))?;
2086        to_result(MemoryDeleteResult {
2087            key: req.key,
2088            deleted: true,
2089        })
2090    }
2091
2092    // ── Cron handlers ────────────────────────────────────────────
2093
2094    async fn handle_cron_list(&self) -> RpcResult {
2095        let config = self.ctx.config.read().clone();
2096        let jobs = crate::cron::list_jobs(&config)
2097            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cron list failed: {e}")))?;
2098        to_result(CronListResult { jobs })
2099    }
2100
2101    async fn handle_cron_get(&self, params: &Value) -> RpcResult {
2102        let req: CronIdParams = parse_params(params)?;
2103        let config = self.ctx.config.read().clone();
2104        let job = crate::cron::get_job(&config, &req.id)
2105            .map_err(|e| rpc_err(INVALID_PARAMS, format!("Cron job not found: {e}")))?;
2106        to_result(job)
2107    }
2108
2109    async fn handle_cron_add(&self, params: &Value) -> RpcResult {
2110        let req: CronAddParams = parse_params(params)?;
2111        let config = self.ctx.config.read().clone();
2112        let schedule = Schedule::Cron {
2113            expr: req.schedule,
2114            tz: req.tz,
2115        };
2116        let job = crate::cron::add_shell_job_with_approval(
2117            &config,
2118            &req.agent,
2119            req.name,
2120            schedule,
2121            req.command.as_deref().unwrap_or(""),
2122            req.delivery,
2123            true, // RPC calls are pre-approved
2124        )
2125        .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cron add failed: {e}")))?;
2126        to_result(job)
2127    }
2128
2129    async fn handle_cron_patch(&self, params: &Value) -> RpcResult {
2130        let req: CronPatchParams = parse_params(params)?;
2131        let config = self.ctx.config.read().clone();
2132        let patch = CronJobPatch {
2133            schedule: req.schedule.map(|s| Schedule::Cron {
2134                expr: s,
2135                tz: if req.clear_tz == Some(true) {
2136                    None
2137                } else {
2138                    req.tz
2139                },
2140            }),
2141            command: req.command,
2142            prompt: req.prompt,
2143            name: req.name,
2144            ..Default::default()
2145        };
2146        let job = crate::cron::update_job(&config, &req.id, patch)
2147            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cron patch failed: {e}")))?;
2148        to_result(job)
2149    }
2150
2151    async fn handle_cron_delete(&self, params: &Value) -> RpcResult {
2152        let req: CronIdParams = parse_params(params)?;
2153        let config = self.ctx.config.read().clone();
2154        crate::cron::remove_job(&config, &req.id)
2155            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cron delete failed: {e}")))?;
2156        to_result(CronDeleteResult {
2157            id: req.id,
2158            deleted: true,
2159        })
2160    }
2161
2162    async fn handle_cron_runs(&self, params: &Value) -> RpcResult {
2163        let req: CronRunsParams = parse_params(params)?;
2164        let config = self.ctx.config.read().clone();
2165        let limit = req.limit.unwrap_or(20) as usize;
2166        let runs = crate::cron::list_runs(&config, &req.id, limit)
2167            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cron runs failed: {e}")))?;
2168        to_result(CronRunsResult { runs })
2169    }
2170
2171    async fn handle_cron_trigger(&self, params: &Value) -> RpcResult {
2172        let req: CronIdParams = parse_params(params)?;
2173        let config = self.ctx.config.read().clone();
2174        let job = crate::cron::get_job(&config, &req.id)
2175            .map_err(|e| rpc_err(INVALID_PARAMS, format!("Cron job not found: {e}")))?;
2176        let (success, output) = crate::cron::scheduler::execute_job_now(&config, &job).await;
2177        to_result(CronTriggerResult {
2178            id: req.id,
2179            success,
2180            output,
2181        })
2182    }
2183
2184    async fn handle_cron_settings(&self, params: &Value) -> RpcResult {
2185        let config = self.ctx.config.read().clone();
2186        // If a "patch" field is present, this is a write; otherwise read.
2187        if params.get("patch").is_some() {
2188            not_yet_implemented(Method::CronSettings)
2189        } else {
2190            Ok(serde_json::to_value(&config.scheduler).unwrap_or(Value::Null))
2191        }
2192    }
2193
2194    // ── Config handlers ──────────────────────────────────────────
2195
2196    fn handle_config_get(&self, params: &Value) -> RpcResult {
2197        use zeroclaw_config::traits::MaskSecrets;
2198        let req: ConfigGetParams = parse_params(params)?;
2199        let config = self.ctx.config.read().clone();
2200        if let Some(prop) = req.prop {
2201            let val = config
2202                .get_prop(&prop)
2203                .map_err(|e| rpc_err(INVALID_PARAMS, format!("Unknown prop: {e}")))?;
2204            to_result(ConfigGetPropResult { prop, value: val })
2205        } else {
2206            // Return full config, masked.
2207            let mut masked = config;
2208            masked.mask_secrets();
2209            Ok(serde_json::to_value(&masked).unwrap_or(Value::Null))
2210        }
2211    }
2212
2213    async fn handle_config_set(&self, params: &Value) -> RpcResult {
2214        let req: ConfigSetParams = parse_params(params)?;
2215        let refresh_model_provider_ref = model_provider_ref_from_provider_profile_prop(&req.prop);
2216        {
2217            let mut config = self.ctx.config.write();
2218            config.ensure_map_key_for_path(&req.prop);
2219            let info = config
2220                .prop_fields()
2221                .into_iter()
2222                .find(|f| f.name == req.prop);
2223            // Polymorphic value: strings pass through, everything else coerced.
2224            let value_str = match &req.value {
2225                Value::String(s) => s.clone(),
2226                other => zeroclaw_config::typed_value::coerce_for_set_prop(
2227                    other,
2228                    info.as_ref().map(|i| i.kind),
2229                )
2230                .map_err(|e| rpc_err(INVALID_PARAMS, e.message))?,
2231            };
2232            // Reject the masked sentinel for secrets — surfaces echo the
2233            // masked display value back when no real edit happened, and
2234            // letting that through silently clobbers the live secret with
2235            // the literal masked string.
2236            let is_secret_prop = info
2237                .as_ref()
2238                .is_some_and(|i| i.is_secret || i.derived_from_secret)
2239                || zeroclaw_config::schema::Config::prop_is_secret(&req.prop);
2240            if is_secret_prop
2241                && (value_str == zeroclaw_config::traits::MASKED_SECRET
2242                    || value_str == "****"
2243                    || value_str.is_empty())
2244            {
2245                return Err(rpc_err(
2246                    INVALID_PARAMS,
2247                    format!(
2248                        "Refusing to overwrite secret `{}` with a masked or empty value",
2249                        req.prop
2250                    ),
2251                ));
2252            }
2253            config
2254                .set_prop_persistent(&req.prop, &value_str)
2255                .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Config set failed: {e}")))?;
2256        }
2257        self.flush_config().await?;
2258        if let Some(model_provider_ref) = refresh_model_provider_ref {
2259            self.schedule_live_sessions_refresh_for_model_provider(model_provider_ref);
2260        }
2261        to_result(ConfigSetResult {
2262            prop: req.prop,
2263            set: true,
2264        })
2265    }
2266
2267    fn schedule_live_sessions_refresh_for_model_provider(&self, model_provider_ref: String) {
2268        let ctx = Arc::clone(&self.ctx);
2269        zeroclaw_spawn::spawn!(async move {
2270            Self::refresh_live_sessions_for_model_provider(ctx, &model_provider_ref).await;
2271        });
2272    }
2273
2274    async fn refresh_live_sessions_for_model_provider(
2275        ctx: Arc<RpcContext>,
2276        model_provider_ref: &str,
2277    ) {
2278        let session_ids = ctx.sessions.list_ids().await;
2279        for session_id in session_ids {
2280            let Some(agent_alias) = ctx.sessions.get_agent_alias(&session_id).await else {
2281                continue;
2282            };
2283            let Some(overrides) = ctx.sessions.get_overrides(&session_id).await else {
2284                continue;
2285            };
2286            let uses_provider = {
2287                let config = ctx.config.read();
2288                let effective_ref = overrides.model_provider.as_deref().or_else(|| {
2289                    config
2290                        .agent(&agent_alias)
2291                        .map(|agent| agent.model_provider.as_str())
2292                });
2293                effective_ref == Some(model_provider_ref)
2294            };
2295            if !uses_provider {
2296                continue;
2297            }
2298
2299            let (model_provider, model_provider_name, model_name, temperature) = {
2300                let config = ctx.config.read();
2301                let provider_temperature = model_provider_ref.split_once('.').and_then(
2302                    |(provider_type, provider_alias)| {
2303                        config
2304                            .providers
2305                            .models
2306                            .find(provider_type, provider_alias)
2307                            .and_then(|entry| entry.temperature)
2308                    },
2309                );
2310                match crate::agent::agent::build_session_model_provider(
2311                    &config,
2312                    model_provider_ref,
2313                    overrides.model.as_deref(),
2314                ) {
2315                    Ok((model_provider, model_provider_name, model_name)) => (
2316                        model_provider,
2317                        model_provider_name,
2318                        model_name,
2319                        overrides.temperature.or(provider_temperature),
2320                    ),
2321                    Err(e) => {
2322                        ::zeroclaw_log::record!(
2323                            WARN,
2324                            ::zeroclaw_log::Event::new(
2325                                module_path!(),
2326                                ::zeroclaw_log::Action::Note
2327                            )
2328                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2329                            .with_attrs(::serde_json::json!({
2330                                "session_id": session_id,
2331                                "agent_alias": agent_alias,
2332                                "model_provider": model_provider_ref,
2333                                "error": e.to_string(),
2334                            })),
2335                            "config/set saved provider profile but live session refresh failed"
2336                        );
2337                        continue;
2338                    }
2339                }
2340            };
2341            if ctx
2342                .sessions
2343                .apply_model_provider(&session_id, model_provider, model_provider_name)
2344                .await
2345                && let Some(agent) = ctx.sessions.get_agent(&session_id).await
2346            {
2347                let mut agent = agent.lock().await;
2348                agent.set_model_name(model_name);
2349                agent.set_temperature(temperature);
2350            }
2351        }
2352    }
2353
2354    fn handle_config_validate(&self) -> RpcResult {
2355        let config = self.ctx.config.read().clone();
2356        match config.validate() {
2357            Ok(()) => to_result(ConfigValidateResult {
2358                valid: true,
2359                error: None,
2360            }),
2361            Err(e) => to_result(ConfigValidateResult {
2362                valid: false,
2363                error: Some(e.to_string()),
2364            }),
2365        }
2366    }
2367
2368    fn handle_config_reload(&self) -> RpcResult {
2369        let Some(reload_tx) = self.ctx.reload_tx.clone() else {
2370            return Err(rpc_err(INTERNAL_ERROR, "Reload not available"));
2371        };
2372        // Delay so the RPC reply flushes before the daemon tears down.
2373        zeroclaw_spawn::spawn!(async move {
2374            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2375            let _ = reload_tx.send(true);
2376        });
2377        to_result(ConfigReloadResult { reloading: true })
2378    }
2379
2380    fn handle_config_list(&self, params: &Value) -> RpcResult {
2381        use zeroclaw_config::field_visibility;
2382        use zeroclaw_config::traits::ConfigFieldEntry;
2383        let req: ConfigListParams = parse_params(params)?;
2384        let config = self.ctx.config.read().clone();
2385        let prefix = req.prefix.as_deref();
2386        let excluded = field_visibility::excluded_paths(&config, prefix.unwrap_or(""));
2387        let entries: Vec<ConfigFieldEntry> = config
2388            .prop_fields()
2389            .into_iter()
2390            .filter(|info| match prefix {
2391                Some(p) => field_visibility::path_matches_prefix(&info.name, p),
2392                None => true,
2393            })
2394            .filter(|info| !field_visibility::is_excluded(&info.name, &excluded))
2395            .map(|info| {
2396                let env = config.prop_is_env_overridden(&info.name);
2397                ConfigFieldEntry::from_prop_field(info, env)
2398            })
2399            .collect();
2400        to_result(ConfigListResult { entries })
2401    }
2402
2403    async fn handle_config_delete(&self, params: &Value) -> RpcResult {
2404        let req: ConfigDeleteParams = parse_params(params)?;
2405        let refresh_model_provider_ref = model_provider_ref_from_provider_profile_prop(&req.prop);
2406        {
2407            let mut config = self.ctx.config.write();
2408            config
2409                .set_prop_persistent(&req.prop, "")
2410                .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Config delete failed: {e}")))?;
2411        }
2412        self.flush_config().await?;
2413        if let Some(model_provider_ref) = refresh_model_provider_ref {
2414            self.schedule_live_sessions_refresh_for_model_provider(model_provider_ref);
2415        }
2416        to_result(ConfigDeleteResult {
2417            prop: req.prop,
2418            deleted: true,
2419        })
2420    }
2421
2422    fn handle_config_map_keys(&self, params: &Value) -> RpcResult {
2423        let req: ConfigMapKeysParams = parse_params(params)?;
2424        let config = self.ctx.config.read().clone();
2425        let keys = config.get_map_keys(&req.path).ok_or_else(|| {
2426            rpc_err(
2427                INVALID_PARAMS,
2428                format!("No map-keyed section at `{}`", req.path),
2429            )
2430        })?;
2431        to_result(ConfigMapKeysResult {
2432            path: req.path,
2433            keys,
2434        })
2435    }
2436
2437    async fn handle_config_map_key_create(&self, params: &Value) -> RpcResult {
2438        let req: ConfigMapKeyCreateParams = parse_params(params)?;
2439        let created = {
2440            let mut config = self.ctx.config.write();
2441            let created = config
2442                .create_map_key(&req.path, &req.key)
2443                .map_err(|e| rpc_err(INVALID_PARAMS, e))?;
2444            if created {
2445                config.mark_dirty(&format!("{}.{}", req.path, req.key));
2446            }
2447            created
2448        };
2449        if created {
2450            self.flush_config().await?;
2451        }
2452        to_result(ConfigMapKeyCreateResult {
2453            path: req.path,
2454            key: req.key,
2455            created,
2456        })
2457    }
2458
2459    async fn handle_config_map_key_delete(&self, params: &Value) -> RpcResult {
2460        let req: ConfigMapKeyDeleteParams = parse_params(params)?;
2461        let deleted = {
2462            let mut config = self.ctx.config.write();
2463            let deleted = config
2464                .delete_map_key(&req.path, &req.key)
2465                .map_err(|e| rpc_err(INVALID_PARAMS, e))?;
2466            if deleted {
2467                config.mark_dirty(&format!("{}.{}", req.path, req.key));
2468            }
2469            deleted
2470        };
2471        if deleted {
2472            self.flush_config().await?;
2473        }
2474        to_result(ConfigMapKeyDeleteResult {
2475            path: req.path,
2476            key: req.key,
2477            deleted,
2478        })
2479    }
2480
2481    async fn handle_config_map_key_rename(&self, params: &Value) -> RpcResult {
2482        let req: ConfigMapKeyRenameParams = parse_params(params)?;
2483        let renamed = {
2484            let mut config = self.ctx.config.write();
2485            let renamed = config
2486                .rename_map_key(&req.path, &req.from, &req.to)
2487                .map_err(|e| rpc_err(INVALID_PARAMS, e))?;
2488            if renamed {
2489                config.mark_dirty(&format!("{}.{}", req.path, req.from));
2490                config.mark_dirty(&format!("{}.{}", req.path, req.to));
2491            }
2492            renamed
2493        };
2494        if renamed {
2495            self.flush_config().await?;
2496        }
2497        to_result(ConfigMapKeyRenameResult {
2498            path: req.path,
2499            from: req.from,
2500            to: req.to,
2501            renamed,
2502        })
2503    }
2504
2505    fn handle_config_templates(&self) -> RpcResult {
2506        use zeroclaw_config::schema::Config;
2507        let templates: Vec<ConfigTemplateEntry> = Config::map_key_sections()
2508            .into_iter()
2509            .map(Into::into)
2510            .collect();
2511        to_result(ConfigTemplatesResult { templates })
2512    }
2513
2514    // ── Agents handlers ──────────────────────────────────────────
2515
2516    fn handle_agents_list(&self) -> RpcResult {
2517        let config = self.ctx.config.read().clone();
2518        let agents: Vec<AgentEntry> = config
2519            .agents
2520            .iter()
2521            .map(|(alias, agent_cfg)| AgentEntry {
2522                alias: alias.clone(),
2523                enabled: agent_cfg.enabled,
2524                channels: agent_cfg.channels.iter().map(|c| c.to_string()).collect(),
2525            })
2526            .collect();
2527        to_result(AgentsListResult { agents })
2528    }
2529
2530    async fn handle_agents_status(&self) -> RpcResult {
2531        let config = self.ctx.config.read().clone();
2532
2533        // Count sessions from the persisted backend (covers channel-originated
2534        // sessions) + in-memory RPC sessions, deduped by taking the max.
2535        let rpc_counts = self.ctx.sessions.count_by_agent().await;
2536        let mut backend_counts = std::collections::HashMap::<String, usize>::new();
2537        if let Some(ref backend) = self.ctx.session_backend {
2538            for meta in backend.list_sessions_with_metadata() {
2539                let alias = meta.agent_alias.or_else(|| {
2540                    meta.channel_id
2541                        .as_deref()
2542                        .and_then(|c| config.agent_for_channel(c))
2543                        .map(str::to_string)
2544                });
2545                if let Some(a) = alias {
2546                    *backend_counts.entry(a).or_default() += 1;
2547                }
2548            }
2549        }
2550
2551        let agents: Vec<AgentStatusEntry> = config
2552            .agents
2553            .iter()
2554            .map(|(alias, agent_cfg)| {
2555                let rpc = *rpc_counts.get(alias).unwrap_or(&0);
2556                let persisted = *backend_counts.get(alias).unwrap_or(&0);
2557                AgentStatusEntry {
2558                    alias: alias.clone(),
2559                    enabled: agent_cfg.enabled,
2560                    live_sessions: rpc,
2561                    persisted_sessions: persisted,
2562                    channels: agent_cfg.channels.iter().map(|c| c.to_string()).collect(),
2563                }
2564            })
2565            .collect();
2566        to_result(AgentsStatusResult { agents })
2567    }
2568
2569    // ── Cost handler ─────────────────────────────────────────────
2570
2571    fn handle_cost_query(&self, params: &Value) -> RpcResult {
2572        let tracker = self
2573            .ctx
2574            .cost_tracker
2575            .as_ref()
2576            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Cost tracking is not available"))?;
2577        let req: CostQueryParams = parse_params(params)?;
2578        let summary = if let Some(agent) = req.agent {
2579            tracker
2580                .get_summary_for_agent(&agent)
2581                .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cost query failed: {e}")))?
2582        } else {
2583            tracker
2584                .get_summary()
2585                .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cost query failed: {e}")))?
2586        };
2587        to_result(summary)
2588    }
2589
2590    // ── Skills handlers ──────────────────────────────────────────
2591
2592    fn handle_skills_bundles(&self) -> RpcResult {
2593        let config = self.ctx.config.read().clone();
2594        let root = config.install_root_dir();
2595        let svc = crate::skills::service::SkillsService::new(&config, &root);
2596        let bundles: Vec<SkillBundleEntry> = svc
2597            .list_bundles()
2598            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Skills bundles failed: {e}")))?
2599            .into_iter()
2600            .map(|b| SkillBundleEntry {
2601                alias: b.alias,
2602                directory: b.directory.to_string_lossy().to_string(),
2603                include: b.include,
2604                exclude: b.exclude,
2605            })
2606            .collect();
2607        to_result(SkillsBundlesResult { bundles })
2608    }
2609
2610    fn handle_skills_list(&self, params: &Value) -> RpcResult {
2611        let req: SkillsListParams = parse_params(params)?;
2612        let config = self.ctx.config.read().clone();
2613        let root = config.install_root_dir();
2614        let svc = crate::skills::service::SkillsService::new(&config, &root);
2615        let skills: Vec<SkillListEntry> = svc
2616            .list_skills(req.bundle.as_deref())
2617            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Skills list failed: {e}")))?
2618            .into_iter()
2619            .map(|s| SkillListEntry {
2620                bundle: s.r#ref.bundle().to_string(),
2621                name: s.r#ref.name().to_string(),
2622                directory: s.directory.to_string_lossy().to_string(),
2623                frontmatter: s.frontmatter,
2624            })
2625            .collect();
2626        to_result(SkillsListResult { skills })
2627    }
2628
2629    fn handle_skills_read(&self, params: &Value) -> RpcResult {
2630        let req: SkillsReadParams = parse_params(params)?;
2631        let config = self.ctx.config.read().clone();
2632        let root = config.install_root_dir();
2633        let svc = crate::skills::service::SkillsService::new(&config, &root);
2634        let skill_ref = svc
2635            .resolve_ref(&req.name, Some(&req.bundle))
2636            .map_err(|e| rpc_err(INVALID_PARAMS, format!("Invalid skill ref: {e}")))?;
2637        let doc = svc
2638            .read_skill(&skill_ref)
2639            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Skill read failed: {e}")))?;
2640        to_result(SkillsReadResult {
2641            bundle: req.bundle,
2642            name: req.name,
2643            frontmatter: doc.frontmatter,
2644            body: doc.body,
2645        })
2646    }
2647
2648    fn handle_skills_write(&self, params: &Value) -> RpcResult {
2649        let req: SkillsWriteParams = parse_params(params)?;
2650        let config = self.ctx.config.read().clone();
2651        let root = config.install_root_dir();
2652        let svc = crate::skills::service::SkillsService::new(&config, &root);
2653        let skill_ref = svc
2654            .resolve_ref(&req.name, Some(&req.bundle))
2655            .map_err(|e| rpc_err(INVALID_PARAMS, format!("Invalid skill ref: {e}")))?;
2656        let doc = crate::skills::document::SkillDocument {
2657            frontmatter: req.frontmatter,
2658            body: req.body,
2659        };
2660        svc.write_skill(&skill_ref, &doc)
2661            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Skill write failed: {e}")))?;
2662        to_result(SkillsWriteResult {
2663            bundle: req.bundle,
2664            name: req.name,
2665            written: true,
2666        })
2667    }
2668
2669    fn handle_skills_delete(&self, params: &Value) -> RpcResult {
2670        let req: SkillsDeleteParams = parse_params(params)?;
2671        let config = self.ctx.config.read().clone();
2672        let root = config.install_root_dir();
2673        let svc = crate::skills::service::SkillsService::new(&config, &root);
2674        let skill_ref = svc
2675            .resolve_ref(&req.name, Some(&req.bundle))
2676            .map_err(|e| rpc_err(INVALID_PARAMS, format!("Invalid skill ref: {e}")))?;
2677        svc.remove_skill(&skill_ref, crate::skills::service::RemoveMode::Archive)
2678            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Skill delete failed: {e}")))?;
2679        to_result(SkillsDeleteResult {
2680            bundle: req.bundle,
2681            name: req.name,
2682            deleted: true,
2683        })
2684    }
2685
2686    // ── Personality handlers ─────────────────────────────────────
2687
2688    fn handle_personality_list(&self, params: &Value) -> RpcResult {
2689        let req: PersonalityListParams = parse_params(params)?;
2690        let config = self.ctx.config.read().clone();
2691        let workspace = req.agent.as_deref().map(|a| config.agent_workspace_dir(a));
2692        let files: Vec<PersonalityFileEntry> =
2693            crate::agent::personality::EDITABLE_PERSONALITY_FILES
2694                .iter()
2695                .map(|&filename| {
2696                    let (exists, size, mtime_ms) = workspace
2697                        .as_ref()
2698                        .and_then(|dir| {
2699                            let path = dir.join(filename);
2700                            let meta = std::fs::metadata(&path).ok()?;
2701                            let mtime = meta
2702                                .modified()
2703                                .ok()
2704                                .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
2705                                .map(|d| d.as_millis() as i64);
2706                            Some((true, meta.len(), mtime))
2707                        })
2708                        .unwrap_or((false, 0, None));
2709                    PersonalityFileEntry {
2710                        filename: filename.to_string(),
2711                        exists,
2712                        size,
2713                        mtime_ms,
2714                    }
2715                })
2716                .collect();
2717        to_result(PersonalityListResult {
2718            files,
2719            max_chars: crate::agent::personality::MAX_FILE_CHARS,
2720        })
2721    }
2722
2723    fn handle_personality_get(&self, params: &Value) -> RpcResult {
2724        let req: PersonalityGetParams = parse_params(params)?;
2725        let config = self.ctx.config.read().clone();
2726
2727        // Sandbox: only allow files from the allowlist.
2728        if !crate::agent::personality::EDITABLE_PERSONALITY_FILES.contains(&req.filename.as_str()) {
2729            return Err(rpc_err(
2730                INVALID_PARAMS,
2731                format!("Not an editable file: {}", req.filename),
2732            ));
2733        }
2734        let workspace = config.agent_workspace_dir(&req.agent);
2735        let path = workspace.join(&req.filename);
2736        match std::fs::read_to_string(&path) {
2737            Ok(content) => {
2738                let mtime_ms = std::fs::metadata(&path)
2739                    .ok()
2740                    .and_then(|m| m.modified().ok())
2741                    .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
2742                    .map(|d| d.as_millis() as i64);
2743                let truncated = content.chars().count() > crate::agent::personality::MAX_FILE_CHARS;
2744                to_result(PersonalityGetResult {
2745                    filename: req.filename,
2746                    content: Some(content),
2747                    exists: true,
2748                    truncated,
2749                    mtime_ms,
2750                })
2751            }
2752            Err(e) if e.kind() == std::io::ErrorKind::NotFound => to_result(PersonalityGetResult {
2753                filename: req.filename,
2754                content: None,
2755                exists: false,
2756                truncated: false,
2757                mtime_ms: None,
2758            }),
2759            Err(e) => Err(rpc_err(INTERNAL_ERROR, format!("Read failed: {e}"))),
2760        }
2761    }
2762
2763    fn handle_personality_put(&self, params: &Value) -> RpcResult {
2764        let req: PersonalityPutParams = parse_params(params)?;
2765        let config = self.ctx.config.read().clone();
2766
2767        if !crate::agent::personality::EDITABLE_PERSONALITY_FILES.contains(&req.filename.as_str()) {
2768            return Err(rpc_err(
2769                INVALID_PARAMS,
2770                format!("Not an editable file: {}", req.filename),
2771            ));
2772        }
2773        if req.content.chars().count() > crate::agent::personality::MAX_FILE_CHARS {
2774            return Err(rpc_err(
2775                INVALID_PARAMS,
2776                format!(
2777                    "Content exceeds {} char limit",
2778                    crate::agent::personality::MAX_FILE_CHARS
2779                ),
2780            ));
2781        }
2782        let workspace = config.agent_workspace_dir(&req.agent);
2783        let path = workspace.join(&req.filename);
2784        if let Some(parent) = path.parent() {
2785            let _ = std::fs::create_dir_all(parent);
2786        }
2787        std::fs::write(&path, &req.content)
2788            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Write failed: {e}")))?;
2789        let bytes_written = req.content.len() as u64;
2790        let mtime_ms = std::fs::metadata(&path)
2791            .ok()
2792            .and_then(|m| m.modified().ok())
2793            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
2794            .map(|d| d.as_millis() as i64);
2795        to_result(PersonalityPutResult {
2796            bytes_written,
2797            mtime_ms,
2798        })
2799    }
2800
2801    fn handle_personality_templates(&self, params: &Value) -> RpcResult {
2802        let req: PersonalityTemplatesParams = parse_params(params)?;
2803        let config = self.ctx.config.read().clone();
2804        let ctx = personality_template_context(&config, &req);
2805        let templates = crate::agent::personality_templates::render_preset_default(&ctx);
2806        let files: Vec<TemplateFileEntry> = templates
2807            .into_iter()
2808            .map(|(name, content)| TemplateFileEntry {
2809                filename: name.to_string(),
2810                content,
2811            })
2812            .collect();
2813        to_result(PersonalityTemplatesResult {
2814            preset: "default".to_string(),
2815            files,
2816        })
2817    }
2818
2819    // ── Config introspection handlers ───────────────────────────
2820
2821    fn handle_config_sections(&self) -> RpcResult {
2822        use zeroclaw_config::schema::Config;
2823        use zeroclaw_config::sections::{
2824            QUICKSTART_SECTIONS, Section, SectionShape, section_help, section_index_for_key,
2825        };
2826
2827        let config = self.ctx.config.read().clone();
2828
2829        // Schema-driven: walk Config::prop_fields() to discover ALL
2830        // top-level section roots, not just QUICKSTART_SECTIONS.
2831        let mut roots: std::collections::BTreeSet<String> = config
2832            .prop_fields()
2833            .iter()
2834            .filter_map(|f| f.name.split('.').next().map(str::to_string))
2835            .collect();
2836
2837        // Hidden system fields the user never edits.
2838        const HIDDEN: &[&str] = &[
2839            "schema_version",
2840            "onboard_state",
2841            "onboard-state",
2842            "config_path",
2843            "workspace_dir",
2844            "env_overridden_paths",
2845            "pre_override_snapshots",
2846        ];
2847        for h in HIDDEN {
2848            roots.remove(*h);
2849        }
2850
2851        // Map-keyed sections surface even when empty.
2852        let all_map_paths: Vec<&'static str> =
2853            Config::map_key_sections().iter().map(|s| s.path).collect();
2854        for &prefix in &all_map_paths
2855            .iter()
2856            .filter_map(|p| p.split('.').next())
2857            .collect::<std::collections::HashSet<_>>()
2858        {
2859            roots.insert(prefix.to_string());
2860        }
2861
2862        // Inject synthetic onboarding sections (e.g. personality).
2863        for s in QUICKSTART_SECTIONS {
2864            roots.insert(s.as_str().to_string());
2865        }
2866
2867        // Drop bare parents when a dotted child exists
2868        // (`providers` vanishes once `providers.models` is present).
2869        let parents_with_children: std::collections::HashSet<String> = roots
2870            .iter()
2871            .filter_map(|k| k.split_once('.').map(|(p, _)| p.to_string()))
2872            .collect();
2873        roots.retain(|k| k.contains('.') || !parents_with_children.contains(k));
2874
2875        // Hide cost.rates subtree.
2876        roots.retain(|k| !k.starts_with("cost.rates"));
2877
2878        // Sort: onboarding sections in canonical order first, rest alpha.
2879        let mut ordered: Vec<String> = roots.into_iter().collect();
2880        ordered.sort_by(
2881            |a, b| match (section_index_for_key(a), section_index_for_key(b)) {
2882                (Some(ai), Some(bi)) => ai.cmp(&bi),
2883                (Some(_), None) => std::cmp::Ordering::Less,
2884                (None, Some(_)) => std::cmp::Ordering::Greater,
2885                (None, None) => a.cmp(b),
2886            },
2887        );
2888
2889        // Picker eligibility: map-keyed section or onboarding section
2890        // with a picker shape.
2891        let section_has_picker_for_key = |key: &str| -> bool {
2892            let key_dot = format!("{key}.");
2893            all_map_paths.iter().any(|p| {
2894                *p == key
2895                    || p.strip_prefix(&key_dot)
2896                        .is_some_and(|rest| !rest.contains('.'))
2897            })
2898        };
2899
2900        let sections: Vec<ConfigSectionEntry> = ordered
2901            .into_iter()
2902            .map(|key| {
2903                let wizard = Section::from_key(&key);
2904                let has_picker = match wizard {
2905                    Some(w) => matches!(
2906                        w.shape(),
2907                        SectionShape::TypedFamilyMap | SectionShape::OneTierAliasMap
2908                    ),
2909                    None => section_has_picker_for_key(&key),
2910                };
2911                let completed = wizard
2912                    .map(|w| zeroclaw_config::sections::section_has_signal(&config, w))
2913                    .unwrap_or(false);
2914                let label = zeroclaw_config::sections::humanize_section_key(&key);
2915                ConfigSectionEntry {
2916                    help: section_help(&key).to_string(),
2917                    has_picker,
2918                    completed,
2919                    ready: false,
2920                    group: String::new(),
2921                    is_quickstart: wizard.is_some(),
2922                    shape: wizard.map(Section::shape),
2923                    label,
2924                    key,
2925                }
2926            })
2927            .collect();
2928        to_result(ConfigSectionsResult { sections })
2929    }
2930
2931    fn handle_config_status(&self) -> RpcResult {
2932        use zeroclaw_config::sections::QUICKSTART_SECTIONS;
2933        let config = self.ctx.config.read().clone();
2934        let missing: Vec<String> = QUICKSTART_SECTIONS
2935            .iter()
2936            .filter(|&&s| !zeroclaw_config::sections::section_has_signal(&config, s))
2937            .map(|s| s.as_str().to_string())
2938            .collect();
2939        let needs_quickstart = !missing.is_empty();
2940        let reason = if needs_quickstart {
2941            format!("{} section(s) incomplete", missing.len())
2942        } else {
2943            "all sections complete".to_string()
2944        };
2945        to_result(ConfigStatusResult {
2946            needs_quickstart,
2947            reason,
2948            has_partial_state: false,
2949            missing,
2950        })
2951    }
2952
2953    fn handle_config_catalog(&self) -> RpcResult {
2954        let providers: Vec<CatalogModelProvider> = zeroclaw_providers::list_model_providers()
2955            .into_iter()
2956            .map(|p| CatalogModelProvider {
2957                name: p.name.to_string(),
2958                display_name: p.display_name.to_string(),
2959                local: p.local,
2960            })
2961            .collect();
2962        to_result(CatalogResponse {
2963            model_providers: providers,
2964        })
2965    }
2966
2967    async fn handle_config_catalog_models(&self, params: &Value) -> RpcResult {
2968        let req: CatalogModelsParams = parse_params(params)?;
2969        let local = crate::quickstart::model_provider_is_local(&req.model_provider);
2970        let (models, pricing, live) = crate::quickstart::model_catalog(&req.model_provider).await;
2971        to_result(CatalogModelsResult {
2972            model_provider: req.model_provider,
2973            models,
2974            pricing,
2975            local,
2976            live,
2977        })
2978    }
2979
2980    // ── Logs handler ─────────────────────────────────────────────
2981
2982    async fn handle_logs_subscribe(&self) -> RpcResult {
2983        let event_tx = self
2984            .ctx
2985            .event_tx
2986            .as_ref()
2987            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Event streaming is not available"))?;
2988        let mut rx = event_tx.subscribe();
2989        let rpc = self.rpc.clone();
2990        zeroclaw_spawn::spawn!(async move {
2991            loop {
2992                tokio::select! {
2993                    biased;
2994                    _ = rpc.closed() => break,
2995                    event = rx.recv() => match event {
2996                        Ok(event) => {
2997                            let notification =
2998                                JsonRpcNotification::new(notification::LOGS_EVENT, event);
2999                            if let Ok(json) = serde_json::to_string(&notification)
3000                                && !rpc.send_raw(json).await
3001                            {
3002                                break;
3003                            }
3004                        }
3005                        Err(_) => break,
3006                    },
3007                }
3008            }
3009        });
3010        to_result(LogsSubscribeResult { subscribed: true })
3011    }
3012
3013    async fn handle_logs_query(&self, params: &Value) -> RpcResult {
3014        let p: LogsQueryParams = parse_params(params)?;
3015
3016        let path = zeroclaw_log::current_log_path()
3017            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Log persistence is not enabled"))?;
3018
3019        let filter = zeroclaw_log::LogFilter {
3020            since_ts: p.since_ts,
3021            until_ts: p.until_ts,
3022            until_id: p.until_id,
3023            action: p.action,
3024            category: p.category,
3025            outcome: p.outcome,
3026            severity_min: p.severity_min,
3027            trace_id: p.trace_id,
3028            q: p.q,
3029            hide_internal: p.hide_internal,
3030            field_eq: std::collections::BTreeMap::new(),
3031        };
3032
3033        let limit = p.limit.unwrap_or(200);
3034
3035        let page = zeroclaw_log::load_page(&path, &filter, limit)
3036            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Log read failed: {e:#}")))?;
3037
3038        let events: Vec<serde_json::Value> = page
3039            .events
3040            .into_iter()
3041            .filter_map(|evt| serde_json::to_value(evt).ok())
3042            .collect();
3043
3044        to_result(LogsQueryResult {
3045            events,
3046            next_cursor: page.next_cursor,
3047            at_end: page.at_end,
3048        })
3049    }
3050
3051    /// `logs/get { id } → LogEvent`. Loads one full event by id from
3052    /// the persistent JSONL log so the Logs pane can keep only preview
3053    /// fields in memory and lazy-fetch the full payload only when the
3054    /// user opens the detail pane.
3055    async fn handle_logs_get(&self, params: &Value) -> RpcResult {
3056        let p: LogsGetParams = parse_params(params)?;
3057        let path = zeroclaw_log::current_log_path()
3058            .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Log persistence is not enabled"))?;
3059        let event = zeroclaw_log::find_event_by_id(&path, &p.id)
3060            .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Log read failed: {e:#}")))?;
3061        match event {
3062            Some(evt) => {
3063                let event = serde_json::to_value(evt).map_err(|e| {
3064                    rpc_err(INTERNAL_ERROR, format!("Failed to serialize event: {e}"))
3065                })?;
3066                to_result(LogsGetResult { event })
3067            }
3068            None => Err(rpc_err(
3069                INTERNAL_ERROR,
3070                format!("Log id `{}` not found", p.id),
3071            )),
3072        }
3073    }
3074
3075    // ── File attachment handler ────────────────────────────────
3076
3077    async fn handle_file_attach(&self, params: &Value) -> RpcResult {
3078        use super::attachments::{MAX_REQUEST_BYTES, process_file_entry};
3079
3080        let req: FileAttachParams = parse_params(params)?;
3081        let sid = &req.session_id;
3082
3083        // Uploads land in the per-agent workspace, not the session cwd.
3084        // See `handle_send_message` for the rationale.
3085        let agent_alias = self
3086            .ctx
3087            .sessions
3088            .get_agent_alias(sid)
3089            .await
3090            .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "Session not found"))?;
3091        let upload_root = self
3092            .ctx
3093            .config
3094            .read()
3095            .agent_workspace_dir(&agent_alias)
3096            .to_string_lossy()
3097            .to_string();
3098
3099        let is_wss = self.peer_label.starts_with("wss:");
3100
3101        let mut total_bytes: u64 = 0;
3102        let mut results = Vec::with_capacity(req.files.len());
3103
3104        for entry in &req.files {
3105            let result =
3106                process_file_entry(entry, sid, &upload_root, is_wss, &self.ctx.sessions).await?;
3107            total_bytes += result.size_bytes;
3108            if total_bytes > MAX_REQUEST_BYTES {
3109                return Err(rpc_err(
3110                    INVALID_PARAMS,
3111                    format!(
3112                        "Total attachment size exceeds {} MB limit",
3113                        MAX_REQUEST_BYTES / (1024 * 1024)
3114                    ),
3115                ));
3116            }
3117            results.push(result);
3118        }
3119
3120        to_result(FileAttachResult { files: results })
3121    }
3122
3123    // ── Wire helpers ─────────────────────────────────────────────
3124
3125    async fn send_result(&self, id: Value, result: Value) {
3126        let resp = JsonRpcResponse {
3127            jsonrpc: JSONRPC_VERSION,
3128            result: Some(result),
3129            error: None,
3130            id,
3131        };
3132        if let Ok(json) = serde_json::to_string(&resp) {
3133            let _ = self.rpc.send_raw(json).await;
3134        }
3135    }
3136
3137    async fn send_error(&self, id: Value, code: i32, message: &str) {
3138        let resp = JsonRpcResponse {
3139            jsonrpc: JSONRPC_VERSION,
3140            result: None,
3141            error: Some(JsonRpcError {
3142                code,
3143                message: message.to_string(),
3144                data: None,
3145            }),
3146            id,
3147        };
3148        if let Ok(json) = serde_json::to_string(&resp) {
3149            let _ = self.rpc.send_raw(json).await;
3150        }
3151    }
3152
3153    // ── Quickstart ───────────────────────────────────────────────
3154    //
3155    // RPC mirror of the HTTP `/api/quickstart/*` routes in
3156    // `zeroclaw-gateway`. All business logic lives in
3157    // `zeroclaw_runtime::quickstart`; these handlers are call-the-runtime
3158    // plumbing only — they MUST stay byte-equivalent to the HTTP routes
3159    // so the drift test holds.
3160
3161    fn handle_quickstart_state(&self) -> RpcResult {
3162        let cfg = self.ctx.config.read().clone();
3163        to_result(crate::quickstart::snapshot_state(&cfg))
3164    }
3165
3166    fn handle_quickstart_fields(&self, params: &Value) -> RpcResult {
3167        let req: QuickstartFieldsParams = parse_params(params)?;
3168        let descriptors = crate::quickstart::field_shape(req.section, &req.type_key);
3169        to_result(QuickstartFieldsResult {
3170            fields: descriptors,
3171        })
3172    }
3173
3174    fn handle_quickstart_validate(&self, params: &Value) -> RpcResult {
3175        let req: QuickstartValidateParams = parse_params(params)?;
3176        let cfg = self.ctx.config.read().clone();
3177        let body = match crate::quickstart::validate_only_with_surface(
3178            &req.submission,
3179            &cfg,
3180            crate::quickstart::Surface::Tui,
3181        ) {
3182            Ok(()) => QuickstartValidateResult::Ok,
3183            Err(errors) => QuickstartValidateResult::Errors { errors },
3184        };
3185        to_result(body)
3186    }
3187
3188    async fn handle_quickstart_apply(&self, params: &Value) -> RpcResult {
3189        let req: QuickstartApplyParams = parse_params(params)?;
3190        // Clone out of the lock to satisfy `&mut Config`. On success
3191        // write the mutated snapshot back, mirroring `flush_config`
3192        // and the gateway's `handle_apply`.
3193        let mut working = self.ctx.config.read().clone();
3194        let result = crate::quickstart::apply_with_surface(
3195            req.submission,
3196            &mut working,
3197            crate::quickstart::Surface::Tui,
3198        )
3199        .await;
3200        let body = match result {
3201            Ok(agent) => {
3202                *self.ctx.config.write() = working;
3203                let reload_signalled = self.signal_daemon_reload();
3204                QuickstartApplyResult::Applied {
3205                    agent,
3206                    daemon_restarted: reload_signalled,
3207                }
3208            }
3209            Err(errors) => QuickstartApplyResult::Errors { errors },
3210        };
3211        to_result(body)
3212    }
3213
3214    fn handle_quickstart_dismiss(&self, params: &Value) -> RpcResult {
3215        let req: QuickstartDismissParams = parse_params(params)?;
3216        crate::quickstart::record_dismissed(&req.run_id, req.surface, req.last_step);
3217        to_result(QuickstartDismissResult { recorded: true })
3218    }
3219
3220    /// Signal the in-place daemon reload using the same `reload_tx`
3221    /// watch channel `/admin/reload` and the gateway's quickstart route
3222    /// use. Returns `true` when the supervisor was notified, `false`
3223    /// when no supervisor is attached (e.g. test harness).
3224    fn signal_daemon_reload(&self) -> bool {
3225        let Some(reload_tx) = self.ctx.reload_tx.clone() else {
3226            ::zeroclaw_log::record!(
3227                WARN,
3228                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3229                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
3230                    .with_attrs(::serde_json::json!({
3231                        "reason": "no_supervisor",
3232                        "surface": crate::quickstart::Surface::Tui.as_str(),
3233                    })),
3234                "quickstart: daemon reload not available (standalone daemon)"
3235            );
3236            return false;
3237        };
3238        ::zeroclaw_log::record!(
3239            INFO,
3240            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start).with_attrs(
3241                ::serde_json::json!({
3242                    "surface": crate::quickstart::Surface::Tui.as_str(),
3243                })
3244            ),
3245            "quickstart: daemon reload signalled"
3246        );
3247        let started = std::time::Instant::now();
3248        zeroclaw_spawn::spawn!(async move {
3249            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3250            let _ = reload_tx.send(true);
3251            ::zeroclaw_log::record!(
3252                INFO,
3253                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
3254                    .with_outcome(::zeroclaw_log::EventOutcome::Success)
3255                    .with_attrs(::serde_json::json!({
3256                        "elapsed_ms": started.elapsed().as_millis() as u64,
3257                        "surface": crate::quickstart::Surface::Tui.as_str(),
3258                    })),
3259                "quickstart: daemon reload dispatched"
3260            );
3261        });
3262        true
3263    }
3264}
3265
3266// ── Helpers ──────────────────────────────────────────────────────
3267
3268fn parse_params<T: DeserializeOwned>(params: &Value) -> Result<T, JsonRpcError> {
3269    serde_json::from_value(params.clone()).map_err(|e| rpc_err(INVALID_PARAMS, e.to_string()))
3270}
3271
3272fn to_result<T: Serialize>(val: T) -> RpcResult {
3273    serde_json::to_value(val).map_err(|e| rpc_err(INTERNAL_ERROR, e.to_string()))
3274}
3275
3276/// Cap on the `content` field of memory entries returned via
3277/// `memory/list` and `memory/search`. List rows are previews; the
3278/// full content is only required when the user opens the detail
3279/// pane, which fetches it via `memory/get`. Keeping the preview cap
3280/// here means both wire bytes and client RAM stay bounded across
3281/// large memory backends.
3282const MEMORY_PREVIEW_CONTENT_BYTES: usize = 200;
3283
3284/// Truncate each entry's `content` to the preview budget. Operates
3285/// in place to avoid a second allocation per entry.
3286fn truncate_memory_previews(
3287    mut entries: Vec<zeroclaw_api::memory_traits::MemoryEntry>,
3288) -> Vec<zeroclaw_api::memory_traits::MemoryEntry> {
3289    for entry in &mut entries {
3290        if entry.content.len() > MEMORY_PREVIEW_CONTENT_BYTES {
3291            // Truncate on a char boundary so we never split a UTF-8 sequence.
3292            let mut end = MEMORY_PREVIEW_CONTENT_BYTES;
3293            while end > 0 && !entry.content.is_char_boundary(end) {
3294                end -= 1;
3295            }
3296            entry.content.truncate(end);
3297            entry.content.push('…');
3298        }
3299    }
3300    entries
3301}
3302
3303fn notification_for_turn_event(
3304    session_id: &str,
3305    event: &TurnEvent,
3306    max_context_tokens: Option<u64>,
3307) -> Option<String> {
3308    let update = match event {
3309        TurnEvent::Chunk { delta } => SessionUpdateEvent::AgentMessageChunk {
3310            session_id: session_id.to_string(),
3311            text: delta.clone(),
3312        },
3313        TurnEvent::Thinking { delta } => SessionUpdateEvent::AgentThoughtChunk {
3314            session_id: session_id.to_string(),
3315            text: delta.clone(),
3316        },
3317        TurnEvent::ToolCall { id, name, args } => SessionUpdateEvent::ToolCall {
3318            session_id: session_id.to_string(),
3319            tool_call_id: id.clone(),
3320            name: name.clone(),
3321            raw_input: args.clone(),
3322        },
3323        TurnEvent::ToolResult { id, name, output } => SessionUpdateEvent::ToolResult {
3324            session_id: session_id.to_string(),
3325            tool_call_id: id.clone(),
3326            name: name.clone(),
3327            raw_output: output.clone(),
3328        },
3329        TurnEvent::ApprovalRequest {
3330            request_id,
3331            tool_name,
3332            arguments_summary,
3333            timeout_secs,
3334        } => SessionUpdateEvent::ApprovalRequest {
3335            session_id: session_id.to_string(),
3336            request_id: request_id.clone(),
3337            tool_name: tool_name.clone(),
3338            arguments_summary: arguments_summary.clone(),
3339            timeout_secs: *timeout_secs,
3340        },
3341        TurnEvent::Usage {
3342            input_tokens,
3343            cached_input_tokens: _,
3344            output_tokens: _,
3345            ..
3346        } => {
3347            // `input_tokens` per TokenUsage contract is the *total* prompt
3348            // size (uncached + cached). `cached_input_tokens` is a subset
3349            // and must NOT be added — doing so double-counts cache reads
3350            // and inflates the displayed context size (was showing ~2× the
3351            // real value on Anthropic / OpenAI sessions with prompt cache).
3352            SessionUpdateEvent::ContextUsage {
3353                session_id: session_id.to_string(),
3354                input_tokens: *input_tokens,
3355                max_context_tokens,
3356            }
3357        }
3358    };
3359
3360    let params = serde_json::to_value(update).ok()?;
3361    let n = JsonRpcNotification::new(notification::SESSION_UPDATE, params);
3362    serde_json::to_string(&n).ok()
3363}
3364
3365#[cfg(test)]
3366mod tests {
3367    use super::*;
3368    use serde_json::json;
3369
3370    fn parse(s: &str) -> Value {
3371        serde_json::from_str(s).unwrap()
3372    }
3373
3374    #[test]
3375    fn method_from_wire_roundtrip() {
3376        for (method, wire) in Method::ALL {
3377            assert_eq!(
3378                Method::from_wire(wire),
3379                Some(*method),
3380                "from_wire({wire}) should resolve"
3381            );
3382            assert_eq!(method.wire_name(), *wire, "wire_name roundtrip for {wire}");
3383        }
3384    }
3385
3386    #[test]
3387    fn method_from_wire_unknown() {
3388        assert_eq!(Method::from_wire("nonexistent/method"), None);
3389    }
3390
3391    #[test]
3392    fn method_all_no_duplicates() {
3393        let mut seen = std::collections::HashSet::new();
3394        for (_, wire) in Method::ALL {
3395            assert!(seen.insert(*wire), "duplicate wire name: {wire}");
3396        }
3397    }
3398
3399    #[test]
3400    fn personality_templates_use_requested_agent_name_before_config_exists() {
3401        let req = PersonalityTemplatesParams {
3402            agent: Some(" bob ".to_string()),
3403        };
3404        let ctx = personality_template_context(&zeroclaw_config::schema::Config::default(), &req);
3405
3406        assert_eq!(ctx.agent, "bob");
3407        assert!(ctx.include_memory);
3408    }
3409
3410    #[test]
3411    fn personality_templates_without_agent_stay_generic_and_memoryless() {
3412        let req = PersonalityTemplatesParams { agent: None };
3413        let ctx = personality_template_context(&zeroclaw_config::schema::Config::default(), &req);
3414
3415        assert_eq!(ctx.agent, "ZeroClaw");
3416        assert!(!ctx.include_memory);
3417    }
3418
3419    #[test]
3420    fn chunk_notification() {
3421        let event = TurnEvent::Chunk {
3422            delta: "hello".into(),
3423        };
3424        let json = notification_for_turn_event("s1", &event, None).unwrap();
3425        let v = parse(&json);
3426        assert_eq!(v["jsonrpc"], JSONRPC_VERSION);
3427        assert_eq!(v["method"], notification::SESSION_UPDATE);
3428        assert_eq!(v["params"]["session_id"], "s1");
3429        assert_eq!(v["params"]["type"], "agent_message_chunk");
3430        assert_eq!(v["params"]["text"], "hello");
3431    }
3432
3433    #[test]
3434    fn thinking_notification() {
3435        let event = TurnEvent::Thinking {
3436            delta: "hmm".into(),
3437        };
3438        let json = notification_for_turn_event("s1", &event, None).unwrap();
3439        let v = parse(&json);
3440        assert_eq!(v["params"]["type"], "agent_thought_chunk");
3441        assert_eq!(v["params"]["text"], "hmm");
3442    }
3443
3444    #[test]
3445    fn tool_call_notification() {
3446        let event = TurnEvent::ToolCall {
3447            id: "tc_1".into(),
3448            name: "bash".into(),
3449            args: json!({"cmd": "ls"}),
3450        };
3451        let json = notification_for_turn_event("s1", &event, None).unwrap();
3452        let v = parse(&json);
3453        assert_eq!(v["params"]["type"], "tool_call");
3454        assert_eq!(v["params"]["tool_call_id"], "tc_1");
3455        assert_eq!(v["params"]["name"], "bash");
3456        assert_eq!(v["params"]["raw_input"]["cmd"], "ls");
3457    }
3458
3459    #[test]
3460    fn tool_result_notification() {
3461        let event = TurnEvent::ToolResult {
3462            id: "tc_1".into(),
3463            name: "bash".into(),
3464            output: "file.txt".into(),
3465        };
3466        let json = notification_for_turn_event("s1", &event, None).unwrap();
3467        let v = parse(&json);
3468        assert_eq!(v["params"]["type"], "tool_result");
3469        assert_eq!(v["params"]["tool_call_id"], "tc_1");
3470        assert_eq!(v["params"]["raw_output"], "file.txt");
3471    }
3472
3473    #[test]
3474    fn approval_request_notification() {
3475        let event = TurnEvent::ApprovalRequest {
3476            request_id: "ar_1".into(),
3477            tool_name: "bash".into(),
3478            arguments_summary: "rm -rf /".into(),
3479            timeout_secs: 30,
3480        };
3481        let json = notification_for_turn_event("s1", &event, None).unwrap();
3482        let v = parse(&json);
3483        assert_eq!(v["params"]["type"], "approval_request");
3484        assert_eq!(v["params"]["request_id"], "ar_1");
3485        assert_eq!(v["params"]["tool_name"], "bash");
3486        assert_eq!(v["params"]["timeout_secs"], 30);
3487    }
3488
3489    #[test]
3490    fn usage_event_emits_context_usage_notification() {
3491        let event = TurnEvent::Usage {
3492            input_tokens: Some(100),
3493            cached_input_tokens: None,
3494            output_tokens: Some(50),
3495            cost_usd: Some(0.01),
3496        };
3497        let json = notification_for_turn_event("s1", &event, Some(32_000)).unwrap();
3498        let v = parse(&json);
3499        assert_eq!(v["params"]["type"], "context_usage");
3500        assert_eq!(v["params"]["session_id"], "s1");
3501        // Context size is the prompt the model just consumed = input_tokens.
3502        // Output tokens are the model's reply, not part of the prompt size.
3503        // cached_input_tokens is a *subset* of input_tokens per the
3504        // TokenUsage contract and must NOT be added (double-counts).
3505        assert_eq!(v["params"]["input_tokens"], 100);
3506        assert_eq!(v["params"]["max_context_tokens"], 32_000);
3507    }
3508
3509    #[test]
3510    fn usage_event_without_input_tokens_emits_null() {
3511        let event = TurnEvent::Usage {
3512            input_tokens: None,
3513            cached_input_tokens: None,
3514            output_tokens: Some(50),
3515            cost_usd: None,
3516        };
3517        let json = notification_for_turn_event("s1", &event, None).unwrap();
3518        let v = parse(&json);
3519        assert_eq!(v["params"]["type"], "context_usage");
3520        // No input_tokens reported → field omitted (skip_serializing_if).
3521        assert!(
3522            v["params"].get("input_tokens").is_none(),
3523            "absent input_tokens should not be synthesized from output_tokens"
3524        );
3525    }
3526
3527    #[test]
3528    fn usage_event_does_not_double_count_cached_subset() {
3529        // Per TokenUsage contract, cached_input_tokens is a *subset* of
3530        // input_tokens. The ACP ContextUsage notification must report
3531        // input_tokens as-is — the cached subset is already included.
3532        //
3533        // Realistic OpenAI-shape: prompt_tokens = 25_000 (already total),
3534        // cached_tokens = 15_000 (subset). Context size = 25_000, NOT 40_000.
3535        let event = TurnEvent::Usage {
3536            input_tokens: Some(25_000),
3537            cached_input_tokens: Some(15_000),
3538            output_tokens: Some(200),
3539            cost_usd: None,
3540        };
3541        let json = notification_for_turn_event("s1", &event, Some(200_000)).unwrap();
3542        let v = parse(&json);
3543        assert_eq!(v["params"]["type"], "context_usage");
3544        assert_eq!(
3545            v["params"]["input_tokens"], 25_000,
3546            "input_tokens is reported as-is — cached subset must not be added"
3547        );
3548    }
3549
3550    #[test]
3551    fn usage_event_only_cached_tokens_emits_null() {
3552        // Edge case: provider reports only cached without input total.
3553        // Without a known total this is ambiguous, so we don't synthesize one.
3554        let event = TurnEvent::Usage {
3555            input_tokens: None,
3556            cached_input_tokens: Some(80_000),
3557            output_tokens: Some(100),
3558            cost_usd: None,
3559        };
3560        let json = notification_for_turn_event("s1", &event, Some(100_000)).unwrap();
3561        let v = parse(&json);
3562        assert!(
3563            v["params"].get("input_tokens").is_none(),
3564            "cached-only is ambiguous; do not fabricate a total"
3565        );
3566    }
3567
3568    #[test]
3569    fn parse_params_valid() {
3570        let v = json!({"session_id": "s1"});
3571        let p: SessionIdParams = parse_params(&v).unwrap();
3572        assert_eq!(p.session_id, "s1");
3573    }
3574
3575    #[test]
3576    fn parse_params_missing_required() {
3577        let v = json!({});
3578        let err = parse_params::<SessionIdParams>(&v).unwrap_err();
3579        assert_eq!(err.code, INVALID_PARAMS);
3580    }
3581
3582    #[test]
3583    fn to_result_roundtrip() {
3584        let r = InitializeResult {
3585            protocol_version: 1,
3586            server_version: "0.1.0".into(),
3587            tui_id: None,
3588            tui_sig: None,
3589            capabilities: vec![],
3590        };
3591        let val = to_result(r).unwrap();
3592        assert_eq!(val["protocol_version"], 1);
3593        assert_eq!(val["server_version"], "0.1.0");
3594    }
3595
3596    // -----------------------------------------------------------------------
3597    // ACP session/new — memory-tool exclusion
3598    // -----------------------------------------------------------------------
3599    //
3600    // These tests verify that `session/new` with `exclude_memory: true` strips
3601    // all five memory tools from the agent, while `exclude_memory: false` leaves
3602    // at least one memory tool present.
3603    //
3604    // They live here (not in `tests/`) because they depend on `#[cfg(test)]`
3605    // helpers: `RpcContext::minimal`, `RpcDispatcher::handle_session_new_for_test`,
3606    // and `Agent::tool_names`.
3607
3608    use zeroclaw_tools::MEMORY_TOOL_NAMES as MEMORY_TOOLS;
3609
3610    fn make_acp_test_config(tmp: &tempfile::TempDir) -> zeroclaw_config::schema::Config {
3611        use std::collections::HashMap;
3612        use zeroclaw_config::schema::{AliasedAgentConfig, RiskProfileConfig};
3613
3614        let workspace_dir = tmp.path().join("workspace");
3615        std::fs::create_dir_all(&workspace_dir).unwrap();
3616
3617        let mut providers = zeroclaw_config::providers::Providers::default();
3618        {
3619            let base = providers
3620                .models
3621                .ensure("openai", "test-provider")
3622                .expect("`openai` slot must exist");
3623            base.api_key = Some("test-key".into());
3624            base.model = Some("test-model".into());
3625            base.uri = Some("http://127.0.0.1:1".into());
3626        }
3627
3628        let mut agents = HashMap::new();
3629        agents.insert(
3630            "test-agent".to_string(),
3631            AliasedAgentConfig {
3632                enabled: true,
3633                model_provider: "openai.test-provider".into(),
3634                risk_profile: "test-profile".to_string(),
3635                ..Default::default()
3636            },
3637        );
3638
3639        let mut risk_profiles = HashMap::new();
3640        risk_profiles.insert("test-profile".to_string(), RiskProfileConfig::default());
3641
3642        zeroclaw_config::schema::Config {
3643            data_dir: workspace_dir,
3644            config_path: tmp.path().join("config.toml"),
3645            providers,
3646            agents,
3647            risk_profiles,
3648            ..zeroclaw_config::schema::Config::default()
3649        }
3650    }
3651
3652    fn make_acp_test_dispatcher(
3653        config: zeroclaw_config::schema::Config,
3654    ) -> (RpcDispatcher, Arc<crate::rpc::session::SessionStore>) {
3655        use zeroclaw_infra::session_queue::SessionActorQueue;
3656        let queue = Arc::new(SessionActorQueue::new(4, 10, 60));
3657        let sessions = Arc::new(crate::rpc::session::SessionStore::new(16, queue));
3658        let ctx = RpcContext::minimal(config, Arc::clone(&sessions));
3659        let (tx, _rx) = tokio::sync::mpsc::channel(64);
3660        let dispatcher = RpcDispatcher::new(ctx, tx, "test-peer".into());
3661        (dispatcher, sessions)
3662    }
3663
3664    #[tokio::test]
3665    async fn acp_session_new_exposes_no_memory_tools() {
3666        let tmp = tempfile::TempDir::new().unwrap();
3667        let config = make_acp_test_config(&tmp);
3668        let (dispatcher, sessions) = make_acp_test_dispatcher(config);
3669
3670        let params = json!({
3671            "agent_alias": "test-agent",
3672            "exclude_memory": true,
3673            "session_id": "acp-test-session-001"
3674        });
3675
3676        let result = dispatcher.handle_session_new_for_test(&params).await;
3677        assert!(
3678            result.is_ok(),
3679            "session/new should succeed; got: {:?}",
3680            result.err()
3681        );
3682
3683        let agent_arc = sessions
3684            .get_agent("acp-test-session-001")
3685            .await
3686            .expect("session must be registered in the store after session/new");
3687
3688        let agent = agent_arc.lock().await;
3689        let tool_names = agent.tool_names();
3690
3691        for &mem_tool in MEMORY_TOOLS {
3692            assert!(
3693                !tool_names.contains(&mem_tool),
3694                "ACP session must NOT expose `{mem_tool}` — found in tool list: {tool_names:?}"
3695            );
3696        }
3697    }
3698
3699    #[tokio::test]
3700    async fn acp_chat_mode_strips_memory_tools_without_exclude_flag() {
3701        // The server must derive memory exclusion from `chat_mode: acp`, not
3702        // trust the wire `exclude_memory` flag. A Code session that omits the
3703        // flag entirely must still come up with no memory tools.
3704        let tmp = tempfile::TempDir::new().unwrap();
3705        let config = make_acp_test_config(&tmp);
3706        let data_dir = config.data_dir.clone();
3707        let (dispatcher, sessions, _chat_backend, _acp_store) =
3708            make_persistence_test_dispatcher(config, &data_dir);
3709
3710        let params = json!({
3711            "agent_alias": "test-agent",
3712            "chat_mode": "acp",
3713            "session_id": "acp-no-flag-session-001"
3714        });
3715
3716        let result = dispatcher.handle_session_new_for_test(&params).await;
3717        assert!(
3718            result.is_ok(),
3719            "session/new should succeed; got: {:?}",
3720            result.err()
3721        );
3722
3723        let agent_arc = sessions
3724            .get_agent("acp-no-flag-session-001")
3725            .await
3726            .expect("session must be registered in the store after session/new");
3727
3728        let agent = agent_arc.lock().await;
3729        let tool_names = agent.tool_names();
3730
3731        for &mem_tool in MEMORY_TOOLS {
3732            assert!(
3733                !tool_names.contains(&mem_tool),
3734                "ACP chat_mode must strip `{mem_tool}` even without exclude_memory — \
3735                 tool list: {tool_names:?}"
3736            );
3737        }
3738    }
3739
3740    #[tokio::test]
3741    async fn non_acp_session_new_exposes_memory_tools() {
3742        let tmp = tempfile::TempDir::new().unwrap();
3743        let config = make_acp_test_config(&tmp);
3744        let (dispatcher, sessions) = make_acp_test_dispatcher(config);
3745
3746        let params = json!({
3747            "agent_alias": "test-agent",
3748            "exclude_memory": false,
3749            "session_id": "chat-test-session-001"
3750        });
3751
3752        let result = dispatcher.handle_session_new_for_test(&params).await;
3753        assert!(
3754            result.is_ok(),
3755            "session/new should succeed; got: {:?}",
3756            result.err()
3757        );
3758
3759        let agent_arc = sessions
3760            .get_agent("chat-test-session-001")
3761            .await
3762            .expect("session must be registered in the store after session/new");
3763
3764        let agent = agent_arc.lock().await;
3765        let tool_names = agent.tool_names();
3766
3767        let has_any_memory_tool = MEMORY_TOOLS.iter().any(|&t| tool_names.contains(&t));
3768        assert!(
3769            has_any_memory_tool,
3770            "non-ACP session MUST expose at least one memory tool — tool list: {tool_names:?}"
3771        );
3772    }
3773
3774    // -----------------------------------------------------------------------
3775    // chat_mode persistence routing: ACP vs Chat must not cross stores
3776    // -----------------------------------------------------------------------
3777
3778    use zeroclaw_infra::session_backend::SessionBackend;
3779
3780    fn make_persistence_test_dispatcher(
3781        config: zeroclaw_config::schema::Config,
3782        data_dir: &std::path::Path,
3783    ) -> (
3784        RpcDispatcher,
3785        Arc<crate::rpc::session::SessionStore>,
3786        Arc<zeroclaw_infra::session_sqlite::SqliteSessionBackend>,
3787        Arc<zeroclaw_infra::acp_session_store::AcpSessionStore>,
3788    ) {
3789        use zeroclaw_infra::session_queue::SessionActorQueue;
3790        let queue = Arc::new(SessionActorQueue::new(4, 10, 60));
3791        let sessions = Arc::new(crate::rpc::session::SessionStore::new(16, queue));
3792        let chat_backend =
3793            Arc::new(zeroclaw_infra::session_sqlite::SqliteSessionBackend::new(data_dir).unwrap());
3794        let acp_store =
3795            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(data_dir).unwrap());
3796        let ctx = RpcContext::for_persistence_tests(
3797            config,
3798            Arc::clone(&sessions),
3799            Some(chat_backend.clone() as Arc<dyn zeroclaw_infra::session_backend::SessionBackend>),
3800            Some(Arc::clone(&acp_store)),
3801        );
3802        let (tx, _rx) = tokio::sync::mpsc::channel(64);
3803        let dispatcher = RpcDispatcher::new(ctx, tx, "test-peer".into());
3804        (dispatcher, sessions, chat_backend, acp_store)
3805    }
3806
3807    /// chat_mode=acp creates a row in acp-sessions.db, sessions.db stays empty
3808    /// for that session_id.
3809    #[tokio::test]
3810    async fn acp_session_new_writes_to_acp_store_only() {
3811        let tmp = tempfile::TempDir::new().unwrap();
3812        let config = make_acp_test_config(&tmp);
3813        let data_dir = config.data_dir.clone();
3814        let (dispatcher, _sessions, chat_backend, acp_store) =
3815            make_persistence_test_dispatcher(config, &data_dir);
3816
3817        let sid = "acp-routing-001";
3818        let params = json!({
3819            "agent_alias": "test-agent",
3820            "exclude_memory": true,
3821            "chat_mode": "acp",
3822            "session_id": sid,
3823        });
3824
3825        dispatcher
3826            .handle_session_new_for_test(&params)
3827            .await
3828            .expect("session/new should succeed");
3829
3830        assert!(
3831            acp_store.load_session(sid).unwrap().is_some(),
3832            "ACP session must be persisted to acp_session_store"
3833        );
3834
3835        assert!(
3836            chat_backend.load(&format!("rpc_{sid}")).is_empty(),
3837            "ACP session must NOT touch chat session_backend"
3838        );
3839    }
3840
3841    /// A reaped ACP session (gone from memory, durable row intact) must
3842    /// rehydrate to a WORKING session — the agent comes back in memory and the
3843    /// next turn continues on the same conversation. This is the recovery path:
3844    /// the alternative ("start a new session") is the irrecoverable freeze.
3845    #[tokio::test]
3846    async fn reaped_acp_session_rehydrates_to_working_instead_of_failing() {
3847        let tmp = tempfile::TempDir::new().unwrap();
3848        let config = make_acp_test_config(&tmp);
3849        let data_dir = config.data_dir.clone();
3850        let (dispatcher, sessions, _chat_backend, acp_store) =
3851            make_persistence_test_dispatcher(config, &data_dir);
3852
3853        let sid = "acp-reaped-001";
3854        dispatcher
3855            .handle_session_new_for_test(&json!({
3856                "agent_alias": "test-agent",
3857                "exclude_memory": true,
3858                "chat_mode": "acp",
3859                "session_id": sid,
3860            }))
3861            .await
3862            .expect("session/new should succeed");
3863
3864        assert!(
3865            sessions.get_agent(sid).await.is_some(),
3866            "freshly created session must be live in memory"
3867        );
3868        assert!(
3869            acp_store.load_session(sid).unwrap().is_some(),
3870            "durable row must exist for the rehydrate source"
3871        );
3872
3873        // Simulate the reaper tearing the in-memory session down while the
3874        // durable row survives.
3875        assert!(
3876            sessions.remove(sid).await,
3877            "reap must remove the in-memory session"
3878        );
3879        assert!(
3880            sessions.get_agent(sid).await.is_none(),
3881            "post-reap the session must be absent from memory"
3882        );
3883
3884        let recovered = dispatcher.rehydrate_reaped_session(sid).await;
3885        assert!(
3886            recovered.is_some(),
3887            "a reaped session with a live durable row must rehydrate to a \
3888             working agent, not fail; failing here is the irrecoverable hang"
3889        );
3890        assert!(
3891            sessions.get_agent(sid).await.is_some(),
3892            "after rehydrate the session must be live in memory again so the \
3893             next prompt lands on a working session"
3894        );
3895    }
3896
3897    /// Resuming an ACP session with no caller cwd must recover the original
3898    /// working directory from the persisted store, not fall back to the agent
3899    /// workspace dir. Regression: a reconnect showed the wrong cwd because the
3900    /// resume path defaulted the cwd instead of reading the retained session's.
3901    #[tokio::test]
3902    async fn acp_resume_recovers_persisted_cwd() {
3903        let tmp = tempfile::TempDir::new().unwrap();
3904        let config = make_acp_test_config(&tmp);
3905        let data_dir = config.data_dir.clone();
3906        let (dispatcher, _sessions, _chat_backend, acp_store) =
3907            make_persistence_test_dispatcher(config, &data_dir);
3908
3909        let sid = "acp-cwd-resume-001";
3910        let original_cwd = tmp.path().join("project-dir").to_string_lossy().to_string();
3911
3912        // First create the session with an explicit cwd.
3913        let created = dispatcher
3914            .handle_session_new_for_test(&json!({
3915                "agent_alias": "test-agent",
3916                "exclude_memory": true,
3917                "chat_mode": "acp",
3918                "session_id": sid,
3919                "cwd": original_cwd,
3920            }))
3921            .await
3922            .expect("initial session/new should succeed");
3923        assert_eq!(created["workspace_dir"], original_cwd);
3924        assert_eq!(
3925            acp_store.load_session(sid).unwrap().unwrap().workspace_dir,
3926            original_cwd
3927        );
3928
3929        // Resume with NO cwd: the daemon must report the persisted cwd, not the
3930        // agent workspace dir.
3931        let resumed = dispatcher
3932            .handle_session_new_for_test(&json!({
3933                "agent_alias": "test-agent",
3934                "exclude_memory": true,
3935                "chat_mode": "acp",
3936                "session_id": sid,
3937            }))
3938            .await
3939            .expect("resume session/new should succeed");
3940        assert_eq!(
3941            resumed["workspace_dir"], original_cwd,
3942            "resume must keep the retained session's cwd, not default it"
3943        );
3944    }
3945
3946    /// The ACP memory-tool invariant must survive session recovery: a reaped
3947    /// ACP session that rehydrates must come back with NONE of the long-term
3948    /// memory tools, exactly like a fresh `session/new` ACP session. Without
3949    /// the server-side exclusion on the rehydrate path, recovery would silently
3950    /// restore the memory backend and tools the ACP boundary forbids.
3951    #[tokio::test]
3952    async fn reaped_acp_session_rehydrates_without_memory_tools() {
3953        let tmp = tempfile::TempDir::new().unwrap();
3954        let config = make_acp_test_config(&tmp);
3955        let data_dir = config.data_dir.clone();
3956        let (dispatcher, sessions, _chat_backend, _acp_store) =
3957            make_persistence_test_dispatcher(config, &data_dir);
3958
3959        let sid = "acp-reaped-mem-001";
3960        dispatcher
3961            .handle_session_new_for_test(&json!({
3962                "agent_alias": "test-agent",
3963                "chat_mode": "acp",
3964                "session_id": sid,
3965            }))
3966            .await
3967            .expect("session/new should succeed");
3968
3969        // Reap the in-memory session, leaving the durable row to rehydrate from.
3970        assert!(sessions.remove(sid).await, "reap must remove the session");
3971
3972        let recovered = dispatcher
3973            .rehydrate_reaped_session(sid)
3974            .await
3975            .expect("a reaped ACP session must rehydrate to a working agent");
3976
3977        let agent = recovered.lock().await;
3978        let tool_names = agent.tool_names();
3979        for &mem_tool in MEMORY_TOOLS {
3980            assert!(
3981                !tool_names.contains(&mem_tool),
3982                "rehydrated ACP session must NOT expose `{mem_tool}` — found in tool list: {tool_names:?}"
3983            );
3984        }
3985    }
3986
3987    /// A deliberately killed ACP session must not be treated like a merely
3988    /// reaped session. The durable transcript remains available, but the next
3989    /// prompt must not silently resurrect the killed live session.
3990    #[tokio::test]
3991    async fn killed_acp_session_does_not_rehydrate_from_durable_store() {
3992        let tmp = tempfile::TempDir::new().unwrap();
3993        let config = make_acp_test_config(&tmp);
3994        let data_dir = config.data_dir.clone();
3995        let (dispatcher, sessions, _chat_backend, acp_store) =
3996            make_persistence_test_dispatcher(config, &data_dir);
3997
3998        let sid = "acp-killed-001";
3999        dispatcher
4000            .handle_session_new_for_test(&json!({
4001                "agent_alias": "test-agent",
4002                "exclude_memory": true,
4003                "chat_mode": "acp",
4004                "session_id": sid,
4005            }))
4006            .await
4007            .expect("session/new should succeed");
4008
4009        assert!(
4010            sessions.get_agent(sid).await.is_some(),
4011            "freshly created session must be live in memory"
4012        );
4013        assert!(
4014            acp_store.load_session(sid).unwrap().is_some(),
4015            "durable row must exist before kill"
4016        );
4017
4018        dispatcher
4019            .handle_session_kill(&json!({ "session_id": sid }))
4020            .await
4021            .expect("session/kill should succeed");
4022
4023        assert!(
4024            sessions.get_agent(sid).await.is_none(),
4025            "session/kill must remove the live in-memory agent"
4026        );
4027        assert!(
4028            acp_store.load_session(sid).unwrap().is_some(),
4029            "session/kill must preserve durable history"
4030        );
4031
4032        let recovered = dispatcher.rehydrate_reaped_session(sid).await;
4033        assert!(
4034            recovered.is_none(),
4035            "admin-killed ACP sessions must stay killed instead of rehydrating \
4036             from durable history on the next prompt"
4037        );
4038        assert!(
4039            sessions.get_agent(sid).await.is_none(),
4040            "failed rehydrate must leave the session absent from memory"
4041        );
4042    }
4043
4044    /// chat_mode omitted (or =chat) creates rows via session_backend,
4045    /// acp-sessions.db stays empty for that session_id.
4046    #[tokio::test]
4047    async fn chat_session_new_writes_to_chat_backend_only() {
4048        let tmp = tempfile::TempDir::new().unwrap();
4049        let config = make_acp_test_config(&tmp);
4050        let data_dir = config.data_dir.clone();
4051        let (dispatcher, _sessions, chat_backend, acp_store) =
4052            make_persistence_test_dispatcher(config, &data_dir);
4053
4054        let sid = "chat-routing-001";
4055        let params = json!({
4056            "agent_alias": "test-agent",
4057            "session_id": sid,
4058        });
4059
4060        dispatcher
4061            .handle_session_new_for_test(&params)
4062            .await
4063            .expect("session/new should succeed");
4064
4065        assert!(
4066            acp_store.load_session(sid).unwrap().is_none(),
4067            "Chat session must NOT touch acp_session_store"
4068        );
4069
4070        let key = format!("rpc_{sid}");
4071        let metadata = chat_backend.list_sessions_with_metadata();
4072        let entry = metadata
4073            .iter()
4074            .find(|m| m.key == key)
4075            .expect("Chat session must be registered in session_backend metadata");
4076        assert_eq!(
4077            entry.agent_alias.as_deref(),
4078            Some("test-agent"),
4079            "Chat session must stamp its agent_alias in session_backend (got: {:?})",
4080            entry.agent_alias
4081        );
4082    }
4083
4084    // ── config/set secret-routing ────────────────────────────────
4085
4086    fn make_config_set_test_dispatcher(config: zeroclaw_config::schema::Config) -> RpcDispatcher {
4087        use zeroclaw_infra::session_queue::SessionActorQueue;
4088        let queue = Arc::new(SessionActorQueue::new(4, 10, 60));
4089        let sessions = Arc::new(crate::rpc::session::SessionStore::new(16, queue));
4090        let ctx = RpcContext::minimal(config, Arc::clone(&sessions));
4091        let (tx, _rx) = tokio::sync::mpsc::channel(64);
4092        let mut dispatcher = RpcDispatcher::new(ctx, tx, "test-peer".into());
4093        dispatcher.authenticated = true;
4094        dispatcher
4095    }
4096
4097    /// Mint a config with `providers.models.anthropic.default` so we can
4098    /// poke its `#[secret]` `api-key` field through `config/set`.
4099    ///
4100    /// IMPORTANT: pins `config_path` and `data_dir` into the supplied tempdir
4101    /// so that `flush_config()` → `save_dirty()` never falls through to
4102    /// `default_config_and_data_dirs()` and clobbers `~/.zeroclaw/config.toml`.
4103    fn make_secret_test_config(tmp: &tempfile::TempDir) -> zeroclaw_config::schema::Config {
4104        let mut cfg = zeroclaw_config::schema::Config {
4105            config_path: tmp.path().join("config.toml"),
4106            data_dir: tmp.path().join("data"),
4107            ..Default::default()
4108        };
4109        cfg.create_map_key("providers.models.anthropic", "default")
4110            .expect("create anthropic.default");
4111        cfg
4112    }
4113
4114    #[tokio::test]
4115    async fn config_set_writes_real_secret_through_set_prop() {
4116        let tmp = tempfile::TempDir::new().unwrap();
4117        let dispatcher = make_config_set_test_dispatcher(make_secret_test_config(&tmp));
4118        let params = json!({
4119            "prop": "providers.models.anthropic.default.api_key",
4120            "value": "sk-real-test-key"
4121        });
4122        let res = dispatcher.handle_config_set(&params).await;
4123        assert!(res.is_ok(), "config/set must accept a real secret: {res:?}");
4124        let cfg = dispatcher.ctx.config.read().clone();
4125        let stored = cfg
4126            .providers
4127            .models
4128            .anthropic
4129            .get("default")
4130            .and_then(|e| e.base.api_key.clone());
4131        assert_eq!(
4132            stored.as_deref(),
4133            Some("sk-real-test-key"),
4134            "real secret must land in memory as plaintext"
4135        );
4136    }
4137
4138    #[tokio::test]
4139    async fn config_set_rejects_masked_secret_value() {
4140        let tmp = tempfile::TempDir::new().unwrap();
4141        let mut cfg = make_secret_test_config(&tmp);
4142        cfg.providers
4143            .models
4144            .anthropic
4145            .get_mut("default")
4146            .unwrap()
4147            .base
4148            .api_key = Some("sk-live-secret".into());
4149        let dispatcher = make_config_set_test_dispatcher(cfg);
4150
4151        for masked in [zeroclaw_config::traits::MASKED_SECRET, "****", ""] {
4152            let params = json!({
4153                "prop": "providers.models.anthropic.default.api_key",
4154                "value": masked
4155            });
4156            let res = dispatcher.handle_config_set(&params).await;
4157            assert!(
4158                res.is_err(),
4159                "config/set must refuse masked/empty secret (`{masked}`), got: {res:?}"
4160            );
4161        }
4162
4163        let cfg_after = dispatcher.ctx.config.read().clone();
4164        let stored = cfg_after
4165            .providers
4166            .models
4167            .anthropic
4168            .get("default")
4169            .and_then(|e| e.base.api_key.clone());
4170        assert_eq!(
4171            stored.as_deref(),
4172            Some("sk-live-secret"),
4173            "live secret must NOT be clobbered by a masked write"
4174        );
4175    }
4176
4177    #[tokio::test]
4178    async fn config_set_handles_dynamic_http_request_secret_paths() {
4179        let tmp = tempfile::TempDir::new().unwrap();
4180        let dispatcher = make_config_set_test_dispatcher(zeroclaw_config::schema::Config {
4181            config_path: tmp.path().join("config.toml"),
4182            data_dir: tmp.path().join("data"),
4183            ..Default::default()
4184        });
4185
4186        let params = json!({
4187            "prop": "http_request.secrets.api_token",
4188            "value": "Bearer runtime-secret"
4189        });
4190        let res = dispatcher.handle_config_set(&params).await;
4191        assert!(
4192            res.is_ok(),
4193            "config/set must accept a real dynamic http_request secret: {res:?}"
4194        );
4195        let cfg = dispatcher.ctx.config.read().clone();
4196        assert_eq!(
4197            cfg.http_request
4198                .secrets
4199                .get("api_token")
4200                .map(String::as_str),
4201            Some("Bearer runtime-secret")
4202        );
4203
4204        for masked in [zeroclaw_config::traits::MASKED_SECRET, "****", ""] {
4205            let params = json!({
4206                "prop": "http_request.secrets.next_token",
4207                "value": masked
4208            });
4209            let res = dispatcher.handle_config_set(&params).await;
4210            assert!(
4211                res.is_err(),
4212                "config/set must reject masked/empty dynamic secret (`{masked}`), got: {res:?}"
4213            );
4214        }
4215        let cfg_after = dispatcher.ctx.config.read().clone();
4216        assert!(
4217            !cfg_after.http_request.secrets.contains_key("next_token"),
4218            "masked dynamic writes must not materialize a secret key"
4219        );
4220    }
4221
4222    #[tokio::test]
4223    async fn config_set_non_secret_field_still_uses_set_prop() {
4224        let tmp = tempfile::TempDir::new().unwrap();
4225        let dispatcher = make_config_set_test_dispatcher(make_secret_test_config(&tmp));
4226        let params = json!({
4227            "prop": "providers.models.anthropic.default.model",
4228            "value": "claude-sonnet-4-5"
4229        });
4230        let res = dispatcher.handle_config_set(&params).await;
4231        assert!(res.is_ok(), "non-secret set must succeed: {res:?}");
4232        let cfg = dispatcher.ctx.config.read().clone();
4233        let stored = cfg
4234            .providers
4235            .models
4236            .anthropic
4237            .get("default")
4238            .and_then(|e| e.base.model.clone());
4239        assert_eq!(stored.as_deref(), Some("claude-sonnet-4-5"));
4240    }
4241
4242    fn make_model_refresh_test_config(tmp: &tempfile::TempDir) -> zeroclaw_config::schema::Config {
4243        use std::collections::HashMap;
4244        use zeroclaw_config::schema::{AliasedAgentConfig, Config, RiskProfileConfig};
4245
4246        let workspace_dir = tmp.path().join("workspace");
4247        std::fs::create_dir_all(&workspace_dir).unwrap();
4248
4249        let mut config = Config {
4250            config_path: tmp.path().join("config.toml"),
4251            data_dir: tmp.path().join("data"),
4252            ..Default::default()
4253        };
4254        let provider = config
4255            .providers
4256            .models
4257            .ensure("openai", "test-provider")
4258            .expect("openai provider slot exists");
4259        provider.api_key = Some("test-key".into());
4260        provider.uri = Some("http://127.0.0.1:1".into());
4261        provider.model = Some("old-model".into());
4262        provider.temperature = Some(0.2);
4263
4264        config.agents = HashMap::from([(
4265            "test-agent".to_string(),
4266            AliasedAgentConfig {
4267                enabled: true,
4268                model_provider: "openai.test-provider".into(),
4269                risk_profile: "test-profile".into(),
4270                ..Default::default()
4271            },
4272        )]);
4273        config
4274            .risk_profiles
4275            .insert("test-profile".into(), RiskProfileConfig::default());
4276        config
4277            .runtime_profiles
4278            .insert("default".into(), Default::default());
4279        config
4280    }
4281
4282    async fn create_model_refresh_test_session(
4283        dispatcher: &RpcDispatcher,
4284        tmp: &tempfile::TempDir,
4285    ) -> String {
4286        let session_res = dispatcher
4287            .handle_session_new_for_test(&json!({
4288                "agent_alias": "test-agent",
4289                "cwd": tmp.path().join("workspace"),
4290            }))
4291            .await
4292            .expect("session/new should create the agent");
4293        session_res
4294            .get("session_id")
4295            .and_then(|v| v.as_str())
4296            .expect("session/new result includes session_id")
4297            .to_string()
4298    }
4299
4300    async fn model_name_for_session(dispatcher: &RpcDispatcher, session_id: &str) -> String {
4301        let agent = dispatcher
4302            .ctx
4303            .sessions
4304            .get_agent(session_id)
4305            .await
4306            .expect("session agent exists");
4307        agent.lock().await.attribution_fields().2
4308    }
4309
4310    async fn temperature_for_session(dispatcher: &RpcDispatcher, session_id: &str) -> Option<f64> {
4311        let agent = dispatcher
4312            .ctx
4313            .sessions
4314            .get_agent(session_id)
4315            .await
4316            .expect("session agent exists");
4317        agent.lock().await.temperature_for_test()
4318    }
4319
4320    async fn wait_for_model_name(dispatcher: &RpcDispatcher, session_id: &str, expected: &str) {
4321        for _ in 0..50 {
4322            if model_name_for_session(dispatcher, session_id).await == expected {
4323                return;
4324            }
4325            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4326        }
4327        assert_eq!(
4328            model_name_for_session(dispatcher, session_id).await,
4329            expected
4330        );
4331    }
4332
4333    async fn wait_for_temperature(
4334        dispatcher: &RpcDispatcher,
4335        session_id: &str,
4336        expected: Option<f64>,
4337    ) {
4338        for _ in 0..50 {
4339            if temperature_for_session(dispatcher, session_id).await == expected {
4340                return;
4341            }
4342            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4343        }
4344        assert_eq!(
4345            temperature_for_session(dispatcher, session_id).await,
4346            expected
4347        );
4348    }
4349
4350    #[tokio::test]
4351    async fn config_set_provider_model_refreshes_matching_live_session() {
4352        let tmp = tempfile::TempDir::new().unwrap();
4353        let dispatcher = make_config_set_test_dispatcher(make_model_refresh_test_config(&tmp));
4354        let session_id = create_model_refresh_test_session(&dispatcher, &tmp).await;
4355        assert_eq!(
4356            model_name_for_session(&dispatcher, &session_id).await,
4357            "old-model"
4358        );
4359
4360        let res = dispatcher
4361            .handle_config_set(&json!({
4362                "prop": "providers.models.openai.test-provider.model",
4363                "value": "new-model"
4364            }))
4365            .await;
4366        assert!(res.is_ok(), "config/set must succeed: {res:?}");
4367
4368        wait_for_model_name(&dispatcher, &session_id, "new-model").await;
4369    }
4370
4371    #[tokio::test]
4372    async fn config_set_provider_refresh_failure_does_not_fail_saved_write() {
4373        let tmp = tempfile::TempDir::new().unwrap();
4374        let dispatcher = make_config_set_test_dispatcher(make_model_refresh_test_config(&tmp));
4375        let session_id = create_model_refresh_test_session(&dispatcher, &tmp).await;
4376        assert_eq!(
4377            model_name_for_session(&dispatcher, &session_id).await,
4378            "old-model"
4379        );
4380
4381        let res = dispatcher
4382            .handle_config_set(&json!({
4383                "prop": "providers.models.openai.test-provider.model",
4384                "value": ""
4385            }))
4386            .await;
4387        assert!(
4388            res.is_ok(),
4389            "config/set must report the saved write even if live refresh cannot rebuild: {res:?}"
4390        );
4391        let cfg = dispatcher.ctx.config.read().clone();
4392        let stored = cfg
4393            .providers
4394            .models
4395            .openai
4396            .get("test-provider")
4397            .and_then(|e| e.base.model.clone());
4398        assert_eq!(
4399            stored, None,
4400            "config/set must still persist the requested provider-profile clear"
4401        );
4402        assert_eq!(
4403            model_name_for_session(&dispatcher, &session_id).await,
4404            "old-model",
4405            "failed live refresh must leave the existing session provider intact"
4406        );
4407    }
4408
4409    #[tokio::test]
4410    async fn config_set_provider_temperature_refreshes_matching_live_session() {
4411        let tmp = tempfile::TempDir::new().unwrap();
4412        let dispatcher = make_config_set_test_dispatcher(make_model_refresh_test_config(&tmp));
4413        let session_id = create_model_refresh_test_session(&dispatcher, &tmp).await;
4414        assert_eq!(
4415            temperature_for_session(&dispatcher, &session_id).await,
4416            Some(0.2)
4417        );
4418
4419        let res = dispatcher
4420            .handle_config_set(&json!({
4421                "prop": "providers.models.openai.test-provider.temperature",
4422                "value": 0.4
4423            }))
4424            .await;
4425        assert!(res.is_ok(), "config/set must succeed: {res:?}");
4426
4427        wait_for_temperature(&dispatcher, &session_id, Some(0.4)).await;
4428    }
4429
4430    #[tokio::test]
4431    async fn config_set_provider_refresh_preserves_session_temperature_override() {
4432        let tmp = tempfile::TempDir::new().unwrap();
4433        let dispatcher = make_config_set_test_dispatcher(make_model_refresh_test_config(&tmp));
4434        let session_id = create_model_refresh_test_session(&dispatcher, &tmp).await;
4435        let merged = dispatcher
4436            .ctx
4437            .sessions
4438            .set_overrides(
4439                &session_id,
4440                crate::rpc::session::SessionOverrides {
4441                    temperature: Some(0.6),
4442                    ..Default::default()
4443                },
4444            )
4445            .await
4446            .expect("session override applies");
4447        assert_eq!(merged.temperature, Some(0.6));
4448
4449        let res = dispatcher
4450            .handle_config_set(&json!({
4451                "prop": "providers.models.openai.test-provider.model",
4452                "value": "new-model"
4453            }))
4454            .await;
4455        assert!(res.is_ok(), "config/set must succeed: {res:?}");
4456
4457        wait_for_model_name(&dispatcher, &session_id, "new-model").await;
4458        assert_eq!(
4459            temperature_for_session(&dispatcher, &session_id).await,
4460            Some(0.6),
4461            "session temperature override must win over provider profile temperature"
4462        );
4463    }
4464
4465    #[tokio::test]
4466    async fn config_delete_provider_temperature_refreshes_matching_live_session() {
4467        let tmp = tempfile::TempDir::new().unwrap();
4468        let dispatcher = make_config_set_test_dispatcher(make_model_refresh_test_config(&tmp));
4469        let session_id = create_model_refresh_test_session(&dispatcher, &tmp).await;
4470        assert_eq!(
4471            temperature_for_session(&dispatcher, &session_id).await,
4472            Some(0.2)
4473        );
4474
4475        let res = dispatcher
4476            .handle_config_delete(&json!({
4477                "prop": "providers.models.openai.test-provider.temperature",
4478            }))
4479            .await;
4480        assert!(res.is_ok(), "config/delete must succeed: {res:?}");
4481
4482        wait_for_temperature(&dispatcher, &session_id, None).await;
4483    }
4484
4485    // -----------------------------------------------------------------------
4486    // session/cancel ownership enforcement — the spurious-cancel bug
4487    // -----------------------------------------------------------------------
4488
4489    /// Build two dispatchers sharing one `RpcContext`/`SessionStore`. Mirrors
4490    /// production where each TUI connection gets its own dispatcher with its
4491    /// own `tui_id`, all routing to the same shared session map.
4492    fn make_two_dispatchers_sharing_context(
4493        config: zeroclaw_config::schema::Config,
4494    ) -> (
4495        RpcDispatcher,
4496        RpcDispatcher,
4497        Arc<crate::rpc::session::SessionStore>,
4498    ) {
4499        use zeroclaw_infra::session_queue::SessionActorQueue;
4500        let queue = Arc::new(SessionActorQueue::new(4, 10, 60));
4501        let sessions = Arc::new(crate::rpc::session::SessionStore::new(16, queue));
4502        let ctx = RpcContext::minimal(config, Arc::clone(&sessions));
4503        let (tx_a, _rx_a) = tokio::sync::mpsc::channel(64);
4504        let (tx_b, _rx_b) = tokio::sync::mpsc::channel(64);
4505        let dispatcher_a = RpcDispatcher::new(Arc::clone(&ctx), tx_a, "test-peer-a:pid=1".into());
4506        let dispatcher_b = RpcDispatcher::new(ctx, tx_b, "test-peer-b:pid=2".into());
4507        (dispatcher_a, dispatcher_b, sessions)
4508    }
4509
4510    async fn create_session_with_owner(
4511        dispatcher: &mut RpcDispatcher,
4512        sessions: &Arc<crate::rpc::session::SessionStore>,
4513        session_id: &str,
4514        owner_tui_id: &str,
4515    ) -> tokio_util::sync::CancellationToken {
4516        dispatcher.set_tui_id_for_test(Some(owner_tui_id.to_string()));
4517        let params = json!({
4518            "agent_alias": "test-agent",
4519            "session_id": session_id,
4520        });
4521        dispatcher
4522            .handle_session_new_for_test(&params)
4523            .await
4524            .expect("session/new must succeed");
4525
4526        let stamped_owner = sessions
4527            .session_owner_tui_id(session_id)
4528            .await
4529            .expect("session must exist after session/new");
4530        assert_eq!(
4531            stamped_owner.as_deref(),
4532            Some(owner_tui_id),
4533            "harness invariant: session/new must stamp owner_tui_id from the \
4534             caller's tui_id; if this fails, the ownership tests below are \
4535             measuring nothing"
4536        );
4537
4538        let token = tokio_util::sync::CancellationToken::new();
4539        sessions.register_cancel_token(session_id, token.clone());
4540        token
4541    }
4542
4543    /// Variant of `make_two_dispatchers_sharing_context` that returns the
4544    /// writer-channel receivers so a test can assert which notifications
4545    /// the dispatcher emitted. The notifications carry the load-bearing
4546    /// `session/update TurnComplete` events that flip the TUI out of its
4547    /// `working` state — silently dropping one is the production freeze.
4548    fn make_dispatcher_with_capture(
4549        config: zeroclaw_config::schema::Config,
4550    ) -> (
4551        RpcDispatcher,
4552        tokio::sync::mpsc::Receiver<String>,
4553        Arc<crate::rpc::session::SessionStore>,
4554    ) {
4555        use zeroclaw_infra::session_queue::SessionActorQueue;
4556        let queue = Arc::new(SessionActorQueue::new(4, 10, 60));
4557        let sessions = Arc::new(crate::rpc::session::SessionStore::new(16, queue));
4558        let ctx = RpcContext::minimal(config, Arc::clone(&sessions));
4559        let (tx, rx) = tokio::sync::mpsc::channel(64);
4560        let dispatcher = RpcDispatcher::new(ctx, tx, "test-peer-cap:pid=1".into());
4561        (dispatcher, rx, sessions)
4562    }
4563
4564    /// RED guard: a `session/prompt` for a session that no longer exists
4565    /// (e.g. evicted by the reaper while the TUI thought the session was
4566    /// still live) MUST emit a `session/update TurnComplete::Failed`
4567    /// notification so the TUI can exit `working` state. Silently dropping
4568    /// the request — the production behaviour — leaves the TUI parked
4569    /// forever with no `TurnComplete` ever arriving. This is the second
4570    /// half of the freeze: a reaped session + a fresh prompt = hang.
4571    #[tokio::test]
4572    async fn session_prompt_on_missing_session_emits_turn_complete_failed() {
4573        let tmp = tempfile::TempDir::new().unwrap();
4574        let config = make_acp_test_config(&tmp);
4575        let (dispatcher, mut rx, _sessions) = make_dispatcher_with_capture(config);
4576
4577        let result = dispatcher
4578            .handle_session_prompt(&json!({
4579                "session_id": "gone-id",
4580                "prompt": "anything",
4581            }))
4582            .await;
4583        assert!(
4584            result.is_err(),
4585            "missing session must still produce an RPC error for legacy \
4586             request-form callers; the new behaviour is the additional \
4587             notification, not replacing the error"
4588        );
4589
4590        // The notification must already be queued on the writer channel by
4591        // the time `handle_session_prompt` returns. `try_recv` rules out
4592        // any test flakiness from racing with a spawned task.
4593        let raw = rx.try_recv().expect(
4594            "handle_session_prompt must emit a session/update TurnComplete \
4595             notification before returning on missing-session — without it \
4596             the TUI's `working` state never clears and the next prompt is \
4597             the production freeze",
4598        );
4599        let v: serde_json::Value = serde_json::from_str(&raw).expect("notification must be JSON");
4600        assert_eq!(v["method"], notification::SESSION_UPDATE);
4601        assert_eq!(v["params"]["session_id"], "gone-id");
4602        assert_eq!(
4603            v["params"]["outcome"], "failed",
4604            "missing-session is not Completed and not Cancelled — it is a \
4605             distinct Failed verdict. Folding it into Cancelled would lie \
4606             about whether the user pressed Esc."
4607        );
4608    }
4609
4610    /// Cross-TUI cancel from a distinct dispatcher (separate connection,
4611    /// separate `tui_id`) targeting a session owned by another TUI. The
4612    /// fixed daemon must refuse and leave the owner's token un-fired.
4613    #[tokio::test]
4614    async fn session_cancel_from_distinct_non_owner_dispatcher_is_rejected() {
4615        let tmp = tempfile::TempDir::new().unwrap();
4616        let config = make_acp_test_config(&tmp);
4617        let (mut dispatcher_a, mut dispatcher_b, sessions) =
4618            make_two_dispatchers_sharing_context(config);
4619
4620        let token =
4621            create_session_with_owner(&mut dispatcher_a, &sessions, "sess-owned-by-tui-A", "tui-A")
4622                .await;
4623
4624        dispatcher_b.set_tui_id_for_test(Some("tui-B".to_string()));
4625        let result = dispatcher_b
4626            .handle_session_cancel(&json!({
4627                "session_id": "sess-owned-by-tui-A",
4628            }))
4629            .await;
4630
4631        let err = result.expect_err(
4632            "a cancel from a dispatcher whose tui_id does not match the \
4633             session's owner_tui_id must be refused",
4634        );
4635        assert_ne!(
4636            err.code, SESSION_NOT_FOUND,
4637            "the rejection must NOT be reported as SESSION_NOT_FOUND — the \
4638             session DOES exist; reporting NOT_FOUND would hide the \
4639             ownership violation behind a benign-looking error"
4640        );
4641        assert!(
4642            !token.is_cancelled(),
4643            "the owner's cancel token must remain un-fired — the rightful \
4644             owner's turn must survive a mis-targeted cancel from another TUI"
4645        );
4646    }
4647
4648    /// Cancel from a dispatcher that never completed the `initialize`
4649    /// handshake (no `tui_id`) must be refused. An unauthenticated caller
4650    /// has no provable ownership claim over any session.
4651    #[tokio::test]
4652    async fn session_cancel_from_anonymous_dispatcher_is_rejected() {
4653        let tmp = tempfile::TempDir::new().unwrap();
4654        let config = make_acp_test_config(&tmp);
4655        let (mut dispatcher_a, mut dispatcher_b, sessions) =
4656            make_two_dispatchers_sharing_context(config);
4657
4658        let token =
4659            create_session_with_owner(&mut dispatcher_a, &sessions, "sess-owned-by-tui-A", "tui-A")
4660                .await;
4661
4662        // dispatcher_b never set its tui_id — fresh connection, no
4663        // initialize handshake yet.
4664        dispatcher_b.set_tui_id_for_test(None);
4665        let result = dispatcher_b
4666            .handle_session_cancel(&json!({
4667                "session_id": "sess-owned-by-tui-A",
4668            }))
4669            .await;
4670
4671        let err = result.expect_err("anonymous cancel must be refused");
4672        assert_ne!(err.code, SESSION_NOT_FOUND);
4673        assert!(
4674            !token.is_cancelled(),
4675            "anonymous cancel must not fire the token"
4676        );
4677    }
4678
4679    /// Regression guard: the legitimate owner must still be able to cancel
4680    /// its own session via its OWN dispatcher. A fix that over-rejects and
4681    /// breaks the user-pressed-Esc path is unacceptable.
4682    #[tokio::test]
4683    async fn session_cancel_from_owner_dispatcher_still_works() {
4684        let tmp = tempfile::TempDir::new().unwrap();
4685        let config = make_acp_test_config(&tmp);
4686        let (mut dispatcher_a, _dispatcher_b, sessions) =
4687            make_two_dispatchers_sharing_context(config);
4688
4689        let token =
4690            create_session_with_owner(&mut dispatcher_a, &sessions, "sess-owned-by-tui-A", "tui-A")
4691                .await;
4692
4693        // Same dispatcher, same tui_id that created the session.
4694        let result = dispatcher_a
4695            .handle_session_cancel(&json!({
4696                "session_id": "sess-owned-by-tui-A",
4697            }))
4698            .await;
4699
4700        assert!(
4701            result.is_ok(),
4702            "owner cancel must succeed; got: {:?}",
4703            result.err()
4704        );
4705        assert!(
4706            token.is_cancelled(),
4707            "owner cancel must fire the session's cancel token"
4708        );
4709    }
4710}