1use super::context::RpcContext;
8use super::transport::RpcTransport;
9use super::turn::{TurnAttribution, TurnOutcome, execute_turn};
10use super::types::*;
11use crate::agent::agent::TurnEvent;
12use serde::Serialize;
13use serde::de::DeserializeOwned;
14use serde_json::Value;
15use std::sync::Arc;
16use tokio::sync::mpsc;
17
18use zeroclaw_api::jsonrpc::error_codes::*;
19use zeroclaw_api::jsonrpc::{
20 JSONRPC_VERSION, JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
21 RpcOutbound,
22};
23use zeroclaw_api::model_provider::ChatMessage;
24
25pub const RPC_PROTOCOL_VERSION: u64 = 1;
27
28mod notification {
29 pub const SESSION_UPDATE: &str = "session/update";
30 pub const LOGS_EVENT: &str = "logs/event";
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum Method {
41 Initialize,
43 Status,
44 Health,
45
46 SessionNew,
49 SessionClose,
50 SessionPrompt,
51 SessionConfigure,
52 SessionCancel,
53 SessionGitBranch,
54 SessionList,
55 SessionListAcp,
56 SessionMessages,
57 SessionState,
58 SessionDelete,
59 SessionApprove,
60 SessionKill,
61
62 MemoryList,
64 MemorySearch,
65 MemoryGet,
66 MemoryStore,
67 MemoryDelete,
68
69 CronList,
71 CronGet,
72 CronAdd,
73 CronPatch,
74 CronDelete,
75 CronRuns,
76 CronTrigger,
77 CronSettings,
78
79 ConfigGet,
81 ConfigSet,
82 ConfigValidate,
83 ConfigReload,
84 ConfigList,
85 ConfigDelete,
86 ConfigMapKeys,
87 ConfigMapKeyCreate,
88 ConfigMapKeyDelete,
89 ConfigMapKeyRename,
90 ConfigTemplates,
91
92 AgentsList,
94 AgentsStatus,
95
96 CostQuery,
98
99 SkillsBundles,
101 SkillsList,
102 SkillsRead,
103 SkillsWrite,
104 SkillsDelete,
105
106 PersonalityList,
108 PersonalityGet,
109 PersonalityPut,
110 PersonalityTemplates,
111
112 ConfigSections,
114 ConfigStatus,
115 ConfigCatalog,
116 ConfigCatalogModels,
117
118 LogsSubscribe,
120 LogsQuery,
121 LogsGet,
122
123 TuiList,
125
126 FileAttach,
128 FsListDir,
129
130 LocalesList,
132 LocalesFetch,
133
134 QuickstartState,
136 QuickstartFields,
137 QuickstartValidate,
138 QuickstartApply,
139 QuickstartDismiss,
140}
141
142impl Method {
143 pub const ALL: &[(Method, &str)] = &[
145 (Method::Initialize, "initialize"),
146 (Method::Status, "status"),
147 (Method::Health, "health"),
148 (Method::SessionNew, "session/new"),
150 (Method::SessionClose, "session/close"),
151 (Method::SessionPrompt, "session/prompt"),
152 (Method::SessionConfigure, "session/configure"),
153 (Method::SessionCancel, "session/cancel"),
154 (Method::SessionGitBranch, "session/git_branch"),
155 (Method::SessionList, "session/list"),
156 (Method::SessionListAcp, "session/list-acp"),
157 (Method::SessionMessages, "session/messages"),
158 (Method::SessionState, "session/state"),
159 (Method::SessionDelete, "session/delete"),
160 (Method::SessionApprove, "session/approve"),
161 (Method::SessionKill, "session/kill"),
162 (Method::MemoryList, "memory/list"),
164 (Method::MemorySearch, "memory/search"),
165 (Method::MemoryGet, "memory/get"),
166 (Method::MemoryStore, "memory/store"),
167 (Method::MemoryDelete, "memory/delete"),
168 (Method::CronList, "cron/list"),
170 (Method::CronGet, "cron/get"),
171 (Method::CronAdd, "cron/add"),
172 (Method::CronPatch, "cron/patch"),
173 (Method::CronDelete, "cron/delete"),
174 (Method::CronRuns, "cron/runs"),
175 (Method::CronTrigger, "cron/trigger"),
176 (Method::CronSettings, "cron/settings"),
177 (Method::ConfigGet, "config/get"),
179 (Method::ConfigSet, "config/set"),
180 (Method::ConfigValidate, "config/validate"),
181 (Method::ConfigReload, "config/reload"),
182 (Method::ConfigList, "config/list"),
183 (Method::ConfigDelete, "config/delete"),
184 (Method::ConfigMapKeys, "config/map-keys"),
185 (Method::ConfigMapKeyCreate, "config/map-key-create"),
186 (Method::ConfigMapKeyDelete, "config/map-key-delete"),
187 (Method::ConfigMapKeyRename, "config/map-key-rename"),
188 (Method::ConfigTemplates, "config/templates"),
189 (Method::AgentsList, "agents/list"),
191 (Method::AgentsStatus, "agents/status"),
192 (Method::CostQuery, "cost/query"),
194 (Method::SkillsBundles, "skills/bundles"),
196 (Method::SkillsList, "skills/list"),
197 (Method::SkillsRead, "skills/read"),
198 (Method::SkillsWrite, "skills/write"),
199 (Method::SkillsDelete, "skills/delete"),
200 (Method::PersonalityList, "personality/list"),
202 (Method::PersonalityGet, "personality/get"),
203 (Method::PersonalityPut, "personality/put"),
204 (Method::PersonalityTemplates, "personality/templates"),
205 (Method::ConfigSections, "config/sections"),
207 (Method::ConfigStatus, "config/status"),
208 (Method::ConfigCatalog, "config/catalog"),
209 (Method::ConfigCatalogModels, "config/catalog-models"),
210 (Method::LogsSubscribe, "logs/subscribe"),
212 (Method::LogsQuery, "logs/query"),
213 (Method::LogsGet, "logs/get"),
214 (Method::TuiList, "tui/list"),
216 (Method::FileAttach, "file/attach"),
218 (Method::FsListDir, "fs/list_dir"),
219 (Method::LocalesList, "locales/list"),
221 (Method::LocalesFetch, "locales/fetch"),
222 (Method::QuickstartState, "quickstart/state"),
224 (Method::QuickstartFields, "quickstart/fields"),
225 (Method::QuickstartValidate, "quickstart/validate"),
226 (Method::QuickstartApply, "quickstart/apply"),
227 (Method::QuickstartDismiss, "quickstart/dismiss"),
228 ];
229
230 pub fn from_wire(s: &str) -> Option<Self> {
233 Self::ALL
234 .iter()
235 .find(|(_, wire)| *wire == s)
236 .map(|(m, _)| *m)
237 }
238
239 pub fn wire_name(self) -> &'static str {
241 Self::ALL
242 .iter()
243 .find(|(m, _)| *m == self)
244 .map(|(_, wire)| *wire)
245 .expect("every variant is in ALL")
246 }
247}
248
249type RpcResult = Result<Value, JsonRpcError>;
250
251fn rpc_err(code: i32, msg: impl Into<String>) -> JsonRpcError {
252 JsonRpcError {
253 code,
254 message: msg.into(),
255 data: None,
256 }
257}
258
259fn not_yet_implemented(method: Method) -> RpcResult {
260 Err(rpc_err(
261 INTERNAL_ERROR,
262 format!("{}: not yet implemented", method.wire_name()),
263 ))
264}
265
266fn personality_template_context(
267 config: &zeroclaw_config::schema::Config,
268 req: &PersonalityTemplatesParams,
269) -> crate::agent::personality_templates::TemplateContext {
270 let agent_requested = req.agent.is_some();
271 let requested_agent = req
272 .agent
273 .as_deref()
274 .map(str::trim)
275 .filter(|s| !s.is_empty());
276 let agent_alias = requested_agent.unwrap_or("default");
277 let configured_agent_exists = config.agent(agent_alias).is_some();
278
279 crate::agent::personality_templates::TemplateContext {
280 agent: requested_agent
281 .map(str::to_string)
282 .or_else(|| configured_agent_exists.then(|| agent_alias.to_string()))
283 .unwrap_or_else(|| "ZeroClaw".to_string()),
284 include_memory: configured_agent_exists || agent_requested,
290 ..Default::default()
291 }
292}
293
294fn model_provider_ref_from_provider_profile_prop(prop: &str) -> Option<String> {
295 let rest = prop.strip_prefix("providers.models.")?;
296 let (provider_type, rest) = rest.split_once('.')?;
297 let (provider_alias, field) = rest.split_once('.')?;
298 if provider_type.is_empty() || provider_alias.is_empty() || field.is_empty() {
299 None
300 } else {
301 Some(format!("{provider_type}.{provider_alias}"))
302 }
303}
304
305pub struct RpcDispatcher {
307 ctx: Arc<RpcContext>,
308 rpc: Arc<RpcOutbound>,
309 authenticated: bool,
310 tui_id: Option<String>,
313 peer_label: String,
315}
316
317impl RpcDispatcher {
318 pub fn new(ctx: Arc<RpcContext>, writer_tx: mpsc::Sender<String>, peer_label: String) -> Self {
319 Self {
320 ctx,
321 rpc: Arc::new(RpcOutbound::new(writer_tx)),
322 authenticated: false,
323 tui_id: None,
324 peer_label,
325 }
326 }
327
328 pub fn tui_id(&self) -> Option<&str> {
330 self.tui_id.as_deref()
331 }
332
333 #[cfg(test)]
337 pub fn set_tui_id_for_test(&mut self, tui_id: Option<String>) {
338 self.tui_id = tui_id;
339 }
340
341 fn spawn_handle(&self) -> Self {
345 Self {
346 ctx: Arc::clone(&self.ctx),
347 rpc: Arc::clone(&self.rpc),
348 authenticated: true,
349 tui_id: self.tui_id.clone(),
350 peer_label: self.peer_label.clone(),
351 }
352 }
353
354 async fn flush_config(&self) -> Result<(), JsonRpcError> {
358 let mut snapshot = self.ctx.config.read().clone();
359 snapshot
360 .save_dirty()
361 .await
362 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Config save failed: {e}")))?;
363 *self.ctx.config.write() = snapshot;
364 Ok(())
365 }
366
367 pub async fn run(&mut self, transport: &mut (dyn RpcTransport + Send)) {
369 while let Some(line) = transport.next_frame().await {
370 let trimmed = line.trim();
371 if trimmed.is_empty() {
372 continue;
373 }
374 self.process_line(trimmed).await;
375 }
376 }
377
378 async fn process_line(&mut self, line: &str) {
379 let req: JsonRpcRequest = match serde_json::from_str(line) {
380 Ok(r) => r,
381 Err(e) => {
382 self.send_error(Value::Null, PARSE_ERROR, &format!("Parse error: {e}"))
383 .await;
384 return;
385 }
386 };
387
388 if req.method.is_empty() {
390 if let Some(id) = req.id.as_ref().and_then(Value::as_str) {
391 self.rpc.dispatch_response(id, Some(req.params), None);
392 }
393 return;
394 }
395
396 let id = req.id.clone().unwrap_or(Value::Null);
397 let is_notification = req.id.is_none();
398
399 let method = match Method::from_wire(&req.method) {
400 Some(m) => m,
401 None => {
402 if !is_notification {
403 self.send_error(
404 id,
405 METHOD_NOT_FOUND,
406 &format!("Unknown method: {}", req.method),
407 )
408 .await;
409 }
410 return;
411 }
412 };
413
414 if !self.authenticated && method != Method::Initialize {
415 if !is_notification {
416 self.send_error(id, AUTH_REQUIRED, "First call must be 'initialize'")
417 .await;
418 }
419 return;
420 }
421
422 let result = match method {
424 Method::Initialize => self.handle_initialize(&req.params).await,
426 Method::Status => self.handle_status().await,
427 Method::Health => self.handle_health(),
428
429 Method::SessionNew => self.handle_session_new(&req.params).await,
431 Method::SessionClose => self.handle_session_close(&req.params).await,
432 Method::SessionPrompt => {
433 let handle = self.spawn_handle();
438 let id_clone = id;
439 let params_clone = req.params.clone();
440 let is_notif = is_notification;
441 zeroclaw_spawn::spawn!(async move {
442 let result = handle.handle_session_prompt(¶ms_clone).await;
443 if !is_notif {
444 match result {
445 Ok(_) => handle.send_result(id_clone, serde_json::json!({})).await,
446 Err(e) => handle.send_error(id_clone, e.code, &e.message).await,
447 }
448 }
449 });
450 return;
451 }
452 Method::SessionConfigure => self.handle_session_configure(&req.params).await,
453 Method::SessionCancel => self.handle_session_cancel(&req.params).await,
454 Method::SessionGitBranch => self.handle_session_git_branch(&req.params).await,
455 Method::SessionList => self.handle_session_list(&req.params).await,
456 Method::SessionListAcp => self.handle_session_list_acp(&req.params).await,
457 Method::SessionMessages => self.handle_session_messages(&req.params).await,
458 Method::SessionState => self.handle_session_state(&req.params).await,
459 Method::SessionDelete => self.handle_session_delete(&req.params).await,
460 Method::SessionApprove => self.handle_session_approve(&req.params),
461 Method::SessionKill => self.handle_session_kill(&req.params).await,
462
463 Method::MemoryList => self.handle_memory_list(&req.params).await,
465 Method::MemorySearch => self.handle_memory_search(&req.params).await,
466 Method::MemoryGet => self.handle_memory_get(&req.params).await,
467 Method::MemoryStore => self.handle_memory_store(&req.params).await,
468 Method::MemoryDelete => self.handle_memory_delete(&req.params).await,
469
470 Method::CronList => self.handle_cron_list().await,
472 Method::CronGet => self.handle_cron_get(&req.params).await,
473 Method::CronAdd => self.handle_cron_add(&req.params).await,
474 Method::CronPatch => self.handle_cron_patch(&req.params).await,
475 Method::CronDelete => self.handle_cron_delete(&req.params).await,
476 Method::CronRuns => self.handle_cron_runs(&req.params).await,
477 Method::CronTrigger => self.handle_cron_trigger(&req.params).await,
478 Method::CronSettings => self.handle_cron_settings(&req.params).await,
479
480 Method::ConfigGet => self.handle_config_get(&req.params),
482 Method::ConfigSet => self.handle_config_set(&req.params).await,
483 Method::ConfigValidate => self.handle_config_validate(),
484 Method::ConfigReload => self.handle_config_reload(),
485 Method::ConfigList => self.handle_config_list(&req.params),
486 Method::ConfigDelete => self.handle_config_delete(&req.params).await,
487 Method::ConfigMapKeys => self.handle_config_map_keys(&req.params),
488 Method::ConfigMapKeyCreate => self.handle_config_map_key_create(&req.params).await,
489 Method::ConfigMapKeyDelete => self.handle_config_map_key_delete(&req.params).await,
490 Method::ConfigMapKeyRename => self.handle_config_map_key_rename(&req.params).await,
491 Method::ConfigTemplates => self.handle_config_templates(),
492
493 Method::AgentsList => self.handle_agents_list(),
495 Method::AgentsStatus => self.handle_agents_status().await,
496
497 Method::CostQuery => self.handle_cost_query(&req.params),
499
500 Method::SkillsBundles => self.handle_skills_bundles(),
502 Method::SkillsList => self.handle_skills_list(&req.params),
503 Method::SkillsRead => self.handle_skills_read(&req.params),
504 Method::SkillsWrite => self.handle_skills_write(&req.params),
505 Method::SkillsDelete => self.handle_skills_delete(&req.params),
506
507 Method::PersonalityList => self.handle_personality_list(&req.params),
509 Method::PersonalityGet => self.handle_personality_get(&req.params),
510 Method::PersonalityPut => self.handle_personality_put(&req.params),
511 Method::PersonalityTemplates => self.handle_personality_templates(&req.params),
512
513 Method::ConfigSections => self.handle_config_sections(),
515 Method::ConfigStatus => self.handle_config_status(),
516 Method::ConfigCatalog => self.handle_config_catalog(),
517 Method::ConfigCatalogModels => self.handle_config_catalog_models(&req.params).await,
518
519 Method::LogsSubscribe => self.handle_logs_subscribe().await,
521 Method::LogsQuery => self.handle_logs_query(&req.params).await,
522 Method::LogsGet => self.handle_logs_get(&req.params).await,
523
524 Method::TuiList => self.handle_tui_list(),
526
527 Method::FileAttach => self.handle_file_attach(&req.params).await,
529 Method::FsListDir => super::fs::handle_fs_list_dir(&req.params).await,
530
531 Method::LocalesList => super::locales::handle_locales_list(self.tui_id()),
533 Method::LocalesFetch => {
534 super::locales::handle_locales_fetch(&req.params, self.tui_id()).await
535 }
536
537 Method::QuickstartState => self.handle_quickstart_state(),
539 Method::QuickstartFields => self.handle_quickstart_fields(&req.params),
540 Method::QuickstartValidate => self.handle_quickstart_validate(&req.params),
541 Method::QuickstartApply => self.handle_quickstart_apply(&req.params).await,
542 Method::QuickstartDismiss => self.handle_quickstart_dismiss(&req.params),
543 };
544
545 if is_notification {
546 return;
547 }
548
549 match result {
550 Ok(v) => self.send_result(id, v).await,
551 Err(e) => self.send_error(id, e.code, &e.message).await,
552 }
553 }
554
555 async fn handle_initialize(&mut self, params: &Value) -> RpcResult {
558 let req: InitializeParams = parse_params(params)?;
559
560 if req.protocol_version != RPC_PROTOCOL_VERSION {
561 return Err(rpc_err(
562 VERSION_MISMATCH,
563 format!(
564 "Protocol version mismatch: server={RPC_PROTOCOL_VERSION}, client={}",
565 req.protocol_version,
566 ),
567 ));
568 }
569
570 let tui_id = if let (Some(claimed_id), Some(sig)) =
572 (req.tui_id.as_deref(), req.tui_sig.as_deref())
573 {
574 if !self.ctx.tui_registry.verify(claimed_id, sig) {
576 return Err(rpc_err(AUTH_REQUIRED, "Invalid TUI signature"));
577 }
578 self.ctx.tui_registry.unregister(claimed_id);
580 claimed_id.to_string()
581 } else if let Some(claimed_id) = req.tui_id.as_deref() {
582 if self.ctx.tui_registry.signing_is_enabled() {
584 return Err(rpc_err(AUTH_REQUIRED, "TUI signature required"));
585 }
586 self.ctx.tui_registry.unregister(claimed_id);
587 claimed_id.to_string()
588 } else {
589 self.ctx.tui_registry.generate_unique_tui_id()
591 };
592
593 let tui_sig = self.ctx.tui_registry.sign(&tui_id);
594 self.ctx
595 .tui_registry
596 .register(super::tui_identity::TuiEntry {
597 tui_id: tui_id.clone(),
598 connected_at: chrono::Utc::now(),
599 transport: self
600 .peer_label
601 .split_once(':')
602 .map_or("unknown", |(proto, _)| proto)
603 .to_string(),
604 peer_label: self.peer_label.clone(),
605 env: req.env,
606 });
607 self.tui_id = Some(tui_id.clone());
608
609 self.authenticated = true;
610
611 let capabilities: Vec<String> = Method::ALL
612 .iter()
613 .map(|(_, name)| (*name).to_string())
614 .collect();
615
616 to_result(InitializeResult {
617 protocol_version: RPC_PROTOCOL_VERSION,
618 server_version: env!("CARGO_PKG_VERSION").to_string(),
619 tui_id: Some(tui_id),
620 tui_sig,
621 capabilities,
622 })
623 }
624
625 async fn handle_status(&self) -> RpcResult {
626 let ids = self.ctx.sessions.list_ids().await;
627 let persisted_count = self
630 .ctx
631 .session_backend
632 .as_ref()
633 .map(|b| b.list_sessions_with_metadata().len())
634 .unwrap_or(0);
635 let total = ids.len().max(persisted_count);
636 to_result(StatusResult {
637 server_version: env!("CARGO_PKG_VERSION").to_string(),
638 protocol_version: RPC_PROTOCOL_VERSION,
639 active_sessions: total,
640 session_ids: ids,
641 })
642 }
643
644 fn handle_health(&self) -> RpcResult {
645 let mut val = crate::health::snapshot_json();
646 if let Some(obj) = val.as_object_mut() {
647 let stats = crate::process_stats::sample();
648 obj.insert(
649 "process".to_string(),
650 serde_json::to_value(&stats).unwrap_or_default(),
651 );
652 }
653 Ok(val)
654 }
655
656 fn handle_tui_list(&self) -> RpcResult {
659 let entries = self.ctx.tui_registry.list();
660 to_result(TuiListResult {
661 tuis: entries
662 .into_iter()
663 .map(|e| TuiListEntry {
664 tui_id: e.tui_id,
665 connected_at: e.connected_at.to_rfc3339(),
666 connected_at_unix: e.connected_at.timestamp(),
667 peer_label: e.peer_label,
668 transport: e.transport,
669 })
670 .collect(),
671 })
672 }
673
674 #[cfg(test)]
680 pub async fn handle_session_new_for_test(&self, params: &Value) -> RpcResult {
681 self.handle_session_new(params).await
682 }
683
684 async fn handle_session_new(&self, params: &Value) -> RpcResult {
685 let req: SessionNewParams = parse_params(params)?;
686 let resuming = req.session_id.is_some();
687 let session_id = req
688 .session_id
689 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
690
691 let config = self.ctx.config.read().clone();
692 let chat_mode = req
693 .chat_mode
694 .clone()
695 .unwrap_or(crate::rpc::types::ChatMode::Chat);
696
697 let mut preloaded_acp: Option<zeroclaw_infra::acp_session_store::AcpSessionData> = None;
702 if resuming
703 && req.cwd.is_none()
704 && matches!(chat_mode, crate::rpc::types::ChatMode::Acp)
705 && let Some(ref store) = self.ctx.acp_session_store
706 {
707 let store_cloned = store.clone();
708 let sid = session_id.clone();
709 if let Ok(Ok(Some(data))) =
710 tokio::task::spawn_blocking(move || store_cloned.load_session(&sid)).await
711 {
712 preloaded_acp = Some(data);
713 }
714 }
715
716 let cwd = req
719 .cwd
720 .clone()
721 .or_else(|| preloaded_acp.as_ref().map(|d| d.workspace_dir.clone()))
722 .unwrap_or_else(|| {
723 config
724 .agent_workspace_dir(&req.agent_alias)
725 .to_string_lossy()
726 .to_string()
727 });
728
729 let cwd_path = Some(std::path::Path::new(&cwd));
730 let tui_env = req
731 .tui_id
732 .as_deref()
733 .and_then(|id| self.ctx.tui_registry.get_env(id));
734 let chat_mode = req
735 .chat_mode
736 .clone()
737 .unwrap_or(crate::rpc::types::ChatMode::Chat);
738 let exclude_memory = matches!(chat_mode, crate::rpc::types::ChatMode::Acp)
744 || req.exclude_memory == Some(true);
745 let agent = crate::agent::agent::Agent::from_config_with_tui_env(
746 &config,
747 &req.agent_alias,
748 cwd_path,
749 false,
750 exclude_memory,
751 tui_env,
752 )
753 .await
754 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Failed to create agent: {e}")))?;
755
756 let approval_ch = Arc::new(crate::rpc::approval_channel::RpcApprovalChannel::new(
757 "rpc",
758 session_id.clone(),
759 Arc::clone(&self.rpc),
760 Arc::clone(&self.ctx.approval_pending),
761 ));
762 agent.channel_handles().register_channel("rpc", approval_ch);
763
764 self.ctx
765 .sessions
766 .insert(
767 session_id.clone(),
768 super::session::RpcSession::new(agent, &req.agent_alias, &cwd, chat_mode.clone())
769 .with_owner(self.tui_id.clone()),
770 )
771 .await
772 .map_err(|_| rpc_err(SESSION_LIMIT_REACHED, "Session limit reached"))?;
773
774 if let Some(ref tui_id) = self.tui_id {
775 let evicted = self
776 .ctx
777 .sessions
778 .evict_same_mode_sibling(tui_id, &chat_mode, &session_id)
779 .await;
780 if !evicted.is_empty() {
781 let span = ::zeroclaw_log::info_span!(
782 target: "zeroclaw_log_internal_scope",
783 "zeroclaw_scope",
784 session_key = %session_id,
785 agent_alias = %req.agent_alias,
786 channel = "rpc",
787 );
788 let _guard = span.enter();
789 ::zeroclaw_log::record!(
790 DEBUG,
791 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
792 .with_category(::zeroclaw_log::EventCategory::Agent)
793 .with_outcome(::zeroclaw_log::EventOutcome::Success)
794 .with_attrs(::serde_json::json!({
795 "tui_id": tui_id,
796 "evicted": evicted.iter().map(|(id, _)| id).collect::<Vec<_>>(),
797 })),
798 "Evicted abandoned same-mode session(s) on session/new"
799 );
800 crate::util::release_freed_heap();
804 ::zeroclaw_log::record!(
805 DEBUG,
806 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
807 .with_category(::zeroclaw_log::EventCategory::Agent)
808 .with_outcome(::zeroclaw_log::EventOutcome::Success)
809 .with_attrs(::serde_json::json!({
810 "evicted_count": evicted.len(),
811 })),
812 "Trimmed glibc arenas after same-mode session eviction"
813 );
814 }
815 }
816
817 enum AcpSessionNewLoad {
818 Restored(zeroclaw_infra::acp_session_store::AcpSessionData),
819 Created,
820 Killed,
821 }
822
823 let mut message_count = 0;
824 match chat_mode {
825 crate::rpc::types::ChatMode::Acp => {
826 let loaded = if let Some(data) = preloaded_acp.take() {
830 Ok(Ok(AcpSessionNewLoad::Restored(data)))
831 } else {
832 let Some(ref store) = self.ctx.acp_session_store else {
833 self.ctx.sessions.remove(&session_id).await;
834 return Err(rpc_err(
835 INTERNAL_ERROR,
836 "ACP session store is not available",
837 ));
838 };
839
840 let store_cloned = store.clone();
841 let sid = session_id.clone();
842 let alias = req.agent_alias.clone();
843 let cwd_owned = cwd.clone();
844 tokio::task::spawn_blocking(move || -> anyhow::Result<AcpSessionNewLoad> {
845 match store_cloned.load_session_for_restore(&sid)? {
846 zeroclaw_infra::acp_session_store::AcpSessionRestore::Restorable(
847 data,
848 ) => Ok(AcpSessionNewLoad::Restored(data)),
849 zeroclaw_infra::acp_session_store::AcpSessionRestore::Missing => {
850 store_cloned.create_session(&sid, &alias, &cwd_owned)?;
851 Ok(AcpSessionNewLoad::Created)
852 }
853 zeroclaw_infra::acp_session_store::AcpSessionRestore::Killed => {
854 Ok(AcpSessionNewLoad::Killed)
855 }
856 }
857 })
858 .await
859 };
860 match loaded {
861 Ok(Ok(AcpSessionNewLoad::Restored(data))) => {
862 message_count = data.messages.len();
863 self.ctx
864 .sessions
865 .seed_conversation_history(&session_id, data.messages)
866 .await;
867 }
868 Ok(Ok(AcpSessionNewLoad::Created)) => {}
869 Ok(Ok(AcpSessionNewLoad::Killed)) => {
870 self.ctx.sessions.remove(&session_id).await;
871 return Err(rpc_err(SESSION_NOT_FOUND, "Session not found"));
872 }
873 Ok(Err(e)) => {
874 self.ctx.sessions.remove(&session_id).await;
875 ::zeroclaw_log::record!(
876 WARN,
877 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
878 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
879 .with_attrs(::serde_json::json!({"session_id": session_id, "error": e.to_string()})),
880 "Failed to load or create ACP session"
881 );
882 return Err(rpc_err(
883 INTERNAL_ERROR,
884 format!("Failed to load or create ACP session: {e}"),
885 ));
886 }
887 Err(join) => {
888 self.ctx.sessions.remove(&session_id).await;
889 ::zeroclaw_log::record!(
890 WARN,
891 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
892 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
893 .with_attrs(::serde_json::json!({"session_id": session_id, "error": join.to_string()})),
894 "ACP session load task failed"
895 );
896 return Err(rpc_err(
897 INTERNAL_ERROR,
898 format!("ACP session load task failed: {join}"),
899 ));
900 }
901 }
902 }
903 crate::rpc::types::ChatMode::Chat => {
904 if let Some(ref backend) = self.ctx.session_backend {
905 let session_key = format!("rpc_{session_id}");
906 let _ = backend.set_session_agent_alias(&session_key, &req.agent_alias);
907 let stored = backend.load(&session_key);
908 if !stored.is_empty() {
909 self.ctx.sessions.seed_history(&session_id, &stored).await;
910 message_count = stored.len();
911 }
912 }
913 }
914 }
915
916 to_result(SessionNewResult {
917 session_id,
918 agent_alias: req.agent_alias,
919 message_count,
920 workspace_dir: cwd,
921 })
922 }
923
924 async fn handle_session_close(&self, params: &Value) -> RpcResult {
925 let req: SessionIdParams = parse_params(params)?;
926 if let Some(agent) = self.ctx.sessions.get_agent(&req.session_id).await {
927 agent
928 .lock()
929 .await
930 .channel_handles()
931 .unregister_channel("rpc");
932 let strong = std::sync::Arc::strong_count(&agent);
933 let agent_alias = self
934 .ctx
935 .sessions
936 .get_agent_alias(&req.session_id)
937 .await
938 .unwrap_or_default();
939 let span = ::zeroclaw_log::info_span!(
940 target: "zeroclaw_log_internal_scope",
941 "zeroclaw_scope",
942 session_key = %req.session_id,
943 agent_alias = %agent_alias,
944 channel = "rpc",
945 );
946 let _guard = span.enter();
947 ::zeroclaw_log::record!(
948 INFO,
949 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
950 .with_category(::zeroclaw_log::EventCategory::Agent)
951 .with_attrs(::serde_json::json!({
952 "agent_arc_strong_count_before_remove": strong,
953 })),
954 "session close: dropping local Agent handle before remove"
955 );
956 drop(agent);
961 }
962 if !self.ctx.sessions.remove(&req.session_id).await {
963 return Err(rpc_err(SESSION_NOT_FOUND, "Session not found"));
964 }
965 crate::util::release_freed_heap();
966 {
967 let span = ::zeroclaw_log::info_span!(
968 target: "zeroclaw_log_internal_scope",
969 "zeroclaw_scope",
970 session_key = %req.session_id,
971 channel = "rpc",
972 );
973 let _guard = span.enter();
974 ::zeroclaw_log::record!(
975 DEBUG,
976 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
977 .with_category(::zeroclaw_log::EventCategory::Agent)
978 .with_outcome(::zeroclaw_log::EventOutcome::Success),
979 "Trimmed glibc arenas after session close"
980 );
981 }
982 to_result(SessionCloseResult {
983 session_id: req.session_id,
984 closed: true,
985 })
986 }
987
988 async fn handle_session_kill(&self, params: &Value) -> RpcResult {
989 let req: SessionKillParams = parse_params(params)?;
990 let sid = &req.session_id;
991
992 let chat_mode = self
993 .ctx
994 .sessions
995 .chat_mode(sid)
996 .await
997 .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "Session not found"))?;
998
999 let agent_alias = self
1000 .ctx
1001 .sessions
1002 .get_agent_alias(sid)
1003 .await
1004 .unwrap_or_default();
1005 let span = ::zeroclaw_log::info_span!(
1006 target: "zeroclaw_log_internal_scope",
1007 "zeroclaw_scope",
1008 session_key = %sid,
1009 agent_alias = %agent_alias,
1010 channel = "rpc",
1011 );
1012 let _guard = span.enter();
1013
1014 if matches!(chat_mode, ChatMode::Acp) {
1015 let store = self
1016 .ctx
1017 .acp_session_store
1018 .clone()
1019 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "ACP session store is not available"))?;
1020 let sid_owned = sid.to_string();
1021 let marked =
1022 tokio::task::spawn_blocking(move || store.mark_session_killed(&sid_owned)).await;
1023 match marked {
1024 Ok(Ok(true)) => {}
1025 Ok(Ok(false)) => {
1026 ::zeroclaw_log::record!(
1027 WARN,
1028 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1029 .with_category(::zeroclaw_log::EventCategory::Agent)
1030 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1031 "session/kill: live ACP session had no durable row to tombstone"
1032 );
1033 }
1034 Ok(Err(e)) => {
1035 return Err(rpc_err(
1036 INTERNAL_ERROR,
1037 format!("Failed to mark ACP session killed: {e}"),
1038 ));
1039 }
1040 Err(e) => {
1041 return Err(rpc_err(
1042 INTERNAL_ERROR,
1043 format!("Failed to mark ACP session killed: {e}"),
1044 ));
1045 }
1046 }
1047 }
1048
1049 let killed = self.ctx.sessions.kill_session(sid).await;
1050 if killed {
1051 crate::util::release_freed_heap();
1052 ::zeroclaw_log::record!(
1053 INFO,
1054 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1055 .with_category(::zeroclaw_log::EventCategory::Agent)
1056 .with_outcome(::zeroclaw_log::EventOutcome::Success),
1057 "session/kill: session terminated by admin"
1058 );
1059 } else {
1060 ::zeroclaw_log::record!(
1061 DEBUG,
1062 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1063 .with_category(::zeroclaw_log::EventCategory::Agent)
1064 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1065 "session/kill: session vanished between existence check and kill (concurrent close?)"
1066 );
1067 }
1068
1069 to_result(SessionKillResult {
1070 session_id: req.session_id,
1071 killed,
1072 })
1073 }
1074
1075 async fn rehydrate_reaped_session(
1080 &self,
1081 sid: &str,
1082 ) -> Option<Arc<tokio::sync::Mutex<crate::agent::agent::Agent>>> {
1083 let store = self.ctx.acp_session_store.clone()?;
1084 let sid_owned = sid.to_string();
1085 let loaded =
1086 tokio::task::spawn_blocking(move || store.load_session_for_restore(&sid_owned)).await;
1087 let data = match loaded {
1088 Ok(Ok(zeroclaw_infra::acp_session_store::AcpSessionRestore::Restorable(data))) => data,
1089 Ok(Ok(zeroclaw_infra::acp_session_store::AcpSessionRestore::Killed)) => {
1090 ::zeroclaw_log::record!(
1091 INFO,
1092 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1093 .with_category(::zeroclaw_log::EventCategory::Agent)
1094 .with_outcome(::zeroclaw_log::EventOutcome::Success),
1095 "session/prompt: refusing to rehydrate admin-killed ACP session"
1096 );
1097 return None;
1098 }
1099 Ok(Err(e)) => {
1100 ::zeroclaw_log::record!(
1101 WARN,
1102 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1103 .with_category(::zeroclaw_log::EventCategory::Agent)
1104 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1105 .with_attrs(::serde_json::json!({
1106 "session_id": sid,
1107 "error": e.to_string(),
1108 })),
1109 "session/prompt: failed to query ACP killed marker before rehydrate"
1110 );
1111 return None;
1112 }
1113 Ok(Ok(zeroclaw_infra::acp_session_store::AcpSessionRestore::Missing)) => return None,
1114 Err(e) => {
1115 ::zeroclaw_log::record!(
1116 WARN,
1117 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1118 .with_category(::zeroclaw_log::EventCategory::Agent)
1119 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1120 .with_attrs(::serde_json::json!({
1121 "session_id": sid,
1122 "error": e.to_string(),
1123 })),
1124 "session/prompt: ACP killed-marker query task failed before rehydrate"
1125 );
1126 return None;
1127 }
1128 };
1129
1130 let config = self.ctx.config.read().clone();
1131 let cwd_path = Some(std::path::Path::new(&data.workspace_dir));
1132 let tui_env = self
1133 .tui_id
1134 .as_deref()
1135 .and_then(|id| self.ctx.tui_registry.get_env(id));
1136 let exclude_memory = true;
1142 let agent = crate::agent::agent::Agent::from_config_with_tui_env(
1143 &config,
1144 &data.agent_alias,
1145 cwd_path,
1146 false,
1147 exclude_memory,
1148 tui_env,
1149 )
1150 .await
1151 .ok()?;
1152
1153 let approval_ch = Arc::new(crate::rpc::approval_channel::RpcApprovalChannel::new(
1154 "rpc",
1155 sid.to_string(),
1156 Arc::clone(&self.rpc),
1157 Arc::clone(&self.ctx.approval_pending),
1158 ));
1159 agent.channel_handles().register_channel("rpc", approval_ch);
1160
1161 let message_count = data.messages.len();
1162 self.ctx
1163 .sessions
1164 .insert(
1165 sid.to_string(),
1166 super::session::RpcSession::new(
1167 agent,
1168 &data.agent_alias,
1169 &data.workspace_dir,
1170 crate::rpc::types::ChatMode::Acp,
1171 )
1172 .with_owner(self.tui_id.clone()),
1173 )
1174 .await
1175 .ok()?;
1176 self.ctx
1177 .sessions
1178 .seed_conversation_history(sid, data.messages)
1179 .await;
1180 self.ctx.sessions.touch(sid).await;
1181
1182 ::zeroclaw_log::record!(
1183 INFO,
1184 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1185 .with_category(::zeroclaw_log::EventCategory::Agent)
1186 .with_outcome(::zeroclaw_log::EventOutcome::Success)
1187 .with_attrs(::serde_json::json!({
1188 "session_id": sid,
1189 "agent_alias": data.agent_alias,
1190 "messages_restored": message_count,
1191 })),
1192 "rehydrated reaped session from durable store; turn continues on a working session"
1193 );
1194
1195 self.ctx.sessions.get_agent(sid).await
1196 }
1197
1198 async fn handle_session_prompt(&self, params: &Value) -> RpcResult {
1199 let req: SessionPromptParams = parse_params(params)?;
1200 let sid = &req.session_id;
1201
1202 if req.prompt.trim().is_empty() && req.attachments.is_empty() {
1214 return Err(rpc_err(
1215 INVALID_PARAMS,
1216 "session/prompt requires a non-empty `prompt` or at least one attachment",
1217 ));
1218 }
1219
1220 let agent = match self.ctx.sessions.get_agent(sid).await {
1221 Some(a) => a,
1222 None => {
1223 match self.rehydrate_reaped_session(sid).await {
1231 Some(a) => a,
1232 None => {
1233 ::zeroclaw_log::record!(
1234 WARN,
1235 ::zeroclaw_log::Event::new(
1236 module_path!(),
1237 ::zeroclaw_log::Action::Fail,
1238 )
1239 .with_category(::zeroclaw_log::EventCategory::Agent)
1240 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1241 .with_attrs(::serde_json::json!({ "session_id": sid })),
1242 "session/prompt on a session absent from memory and the durable store; emitting TurnComplete so the client exits the working state"
1243 );
1244 self.emit_turn_complete(
1245 sid,
1246 crate::rpc::types::TurnCompletionOutcome::Failed,
1247 "turn cancelled by daemon: session_not_found".to_string(),
1248 )
1249 .await;
1250 return Err(rpc_err(SESSION_NOT_FOUND, "Session not found"));
1251 }
1252 }
1253 }
1254 };
1255
1256 let mut prompt = req.prompt.clone();
1258 if !req.attachments.is_empty() {
1259 use super::attachments::process_file_entry;
1260
1261 let agent_alias = self
1268 .ctx
1269 .sessions
1270 .get_agent_alias(sid)
1271 .await
1272 .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "Session not found"))?;
1273 let upload_root = self
1274 .ctx
1275 .config
1276 .read()
1277 .agent_workspace_dir(&agent_alias)
1278 .to_string_lossy()
1279 .to_string();
1280 let is_wss = self.peer_label.starts_with("wss:");
1281 if !prompt.is_empty() {
1287 prompt.push('\n');
1288 }
1289 for (idx, entry) in req.attachments.iter().enumerate() {
1290 let result =
1291 process_file_entry(entry, sid, &upload_root, is_wss, &self.ctx.sessions)
1292 .await?;
1293 if idx > 0 {
1294 prompt.push('\n');
1295 }
1296 prompt.push_str(&result.marker);
1297 }
1298 }
1299
1300 let _guard = self
1301 .ctx
1302 .sessions
1303 .session_queue
1304 .acquire(sid)
1305 .await
1306 .map_err(|e| rpc_err(SESSION_BUSY, format!("Session busy: {e}")))?;
1307
1308 let cancel = tokio_util::sync::CancellationToken::new();
1309 let cancel_generation = self.ctx.sessions.register_cancel_token(sid, cancel.clone());
1310 self.ctx.sessions.touch(sid).await;
1311 ::zeroclaw_log::record!(
1312 INFO,
1313 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Invoke)
1314 .with_category(::zeroclaw_log::EventCategory::Agent)
1315 .with_attrs(::serde_json::json!({ "session_id": sid })),
1316 "turn dispatch: registered cancel token, starting turn"
1317 );
1318
1319 let chat_mode = self
1320 .ctx
1321 .sessions
1322 .chat_mode(sid)
1323 .await
1324 .unwrap_or(crate::rpc::types::ChatMode::Chat);
1325 let pre_history_len = if matches!(chat_mode, crate::rpc::types::ChatMode::Acp) {
1326 self.ctx.sessions.history_len(sid).await.unwrap_or(0)
1327 } else {
1328 0
1329 };
1330
1331 let (agent_alias, model_provider, model, max_ctx) = {
1333 let alias = self
1334 .ctx
1335 .sessions
1336 .get_agent_alias(sid)
1337 .await
1338 .unwrap_or_default();
1339 let cfg = self.ctx.config.read().clone();
1340 let mp = cfg
1341 .agent(&alias)
1342 .map(|a| a.model_provider.to_string())
1343 .unwrap_or_default();
1344 let m = cfg
1345 .model_provider_for_agent(&alias)
1346 .and_then(|p| p.model.clone())
1347 .unwrap_or_default();
1348 let max_ctx = Some(cfg.effective_max_context_tokens(&alias) as u64);
1349 (alias, mp, m, max_ctx)
1350 };
1351
1352 let rpc = self.rpc.clone();
1353 let sid_owned = sid.to_string();
1354 let acp_token_store = if matches!(chat_mode, crate::rpc::types::ChatMode::Acp) {
1355 self.ctx.acp_session_store.clone()
1356 } else {
1357 None
1358 };
1359 let attribution_agent_alias = agent_alias.clone();
1360 let attribution_model_provider = model_provider.clone();
1361 let attribution_model = model.clone();
1362 let outcome = execute_turn(
1363 agent,
1364 prompt.clone(),
1365 cancel,
1366 TurnAttribution {
1367 session_key: Some(sid.to_string()),
1368 agent_alias,
1369 model_provider,
1370 model,
1371 channel: "rpc",
1372 },
1373 move |event| {
1374 let rpc = rpc.clone();
1375 let sid = sid_owned.clone();
1376 let acp_token_store = acp_token_store.clone();
1377 async move {
1378 if let (
1379 Some(store),
1380 TurnEvent::Usage {
1381 input_tokens: Some(it),
1382 ..
1383 },
1384 ) = (acp_token_store.as_ref(), &event)
1385 {
1386 let store = store.clone();
1387 let sid = sid.clone();
1388 let it = *it;
1389 let _ =
1390 tokio::task::spawn_blocking(move || store.set_token_count(&sid, it))
1391 .await;
1392 }
1393 if let Some(n) = notification_for_turn_event(&sid, &event, max_ctx) {
1394 let _ = rpc.send_raw(n).await;
1395 }
1396 }
1397 },
1398 )
1399 .await;
1400
1401 let cancel_cause = self.ctx.sessions.take_cancel_cause(sid);
1405 self.ctx
1406 .sessions
1407 .remove_cancel_token(sid, cancel_generation);
1408
1409 if matches!(chat_mode, crate::rpc::types::ChatMode::Acp)
1414 && let Some(store) = self.ctx.acp_session_store.clone()
1415 {
1416 let (action, event_outcome, payload) = match &outcome {
1417 Ok(crate::rpc::turn::TurnOutcome::Completed { .. }) => (
1418 ::zeroclaw_log::Action::Complete,
1419 ::zeroclaw_log::EventOutcome::Success,
1420 None,
1421 ),
1422 Ok(crate::rpc::turn::TurnOutcome::Cancelled { .. }) => (
1423 ::zeroclaw_log::Action::Cancel,
1424 ::zeroclaw_log::EventOutcome::Unknown,
1425 Some(
1426 ::serde_json::json!({
1427 "cancel_cause": cancel_cause.map(|c| c.as_str()),
1428 })
1429 .to_string(),
1430 ),
1431 ),
1432 Err(e) => (
1433 ::zeroclaw_log::Action::Fail,
1434 ::zeroclaw_log::EventOutcome::Failure,
1435 Some(::serde_json::json!({ "error": e.to_string() }).to_string()),
1436 ),
1437 };
1438 let sid_owned = sid.to_string();
1439 let span_session = sid.to_string();
1440 let span_alias = attribution_agent_alias.clone();
1441 let span_provider = attribution_model_provider.clone();
1442 let span_model = attribution_model.clone();
1443 zeroclaw_spawn::spawn!(async move {
1444 use ::zeroclaw_log::Instrument as _;
1445 let span = ::zeroclaw_log::info_span!(
1446 target: "zeroclaw_log_internal_scope",
1447 "zeroclaw_scope",
1448 session_key = %span_session,
1449 agent_alias = %span_alias,
1450 model_provider = %span_provider,
1451 model = %span_model,
1452 channel = "rpc",
1453 );
1454 async move {
1455 let persisted = tokio::task::spawn_blocking(move || {
1456 store.append_event(&sid_owned, action, event_outcome, payload.as_deref())
1457 })
1458 .await;
1459 let error = match persisted {
1460 Ok(Ok(())) => return,
1461 Ok(Err(e)) => e.to_string(),
1462 Err(join) => join.to_string(),
1463 };
1464 ::zeroclaw_log::record!(
1465 WARN,
1466 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Write)
1467 .with_category(::zeroclaw_log::EventCategory::Agent)
1468 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1469 .with_attrs(::serde_json::json!({ "error": error })),
1470 "Failed to persist ACP turn-verdict audit event"
1471 );
1472 }
1473 .instrument(span)
1474 .await;
1475 });
1476 }
1477
1478 match chat_mode {
1479 crate::rpc::types::ChatMode::Acp => {
1480 if let Some(ref store) = self.ctx.acp_session_store
1481 && matches!(
1482 outcome,
1483 Ok(TurnOutcome::Completed { .. }) | Ok(TurnOutcome::Cancelled { .. })
1484 )
1485 && let Some(new_msgs) = self
1486 .ctx
1487 .sessions
1488 .history_slice_from(sid, pre_history_len)
1489 .await
1490 && !new_msgs.is_empty()
1491 {
1492 let store = store.clone();
1493 let sid_owned = sid.to_string();
1494 let persisted = tokio::task::spawn_blocking(move || {
1495 store.append_turn(&sid_owned, &new_msgs)
1496 })
1497 .await;
1498 let error = match persisted {
1499 Ok(Ok(())) => None,
1500 Ok(Err(e)) => Some(e.to_string()),
1501 Err(join) => Some(join.to_string()),
1502 };
1503 if let Some(detail) = error {
1504 ::zeroclaw_log::record!(
1505 WARN,
1506 ::zeroclaw_log::Event::new(
1507 module_path!(),
1508 ::zeroclaw_log::Action::Note
1509 )
1510 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1511 .with_attrs(::serde_json::json!({"session_id": sid, "error": detail})),
1512 "Failed to persist ACP turn"
1513 );
1514 }
1515 }
1516 }
1517 crate::rpc::types::ChatMode::Chat => {
1518 if let Some(ref backend) = self.ctx.session_backend {
1519 let key = format!("rpc_{sid}");
1520 let _ = backend.append(&key, &ChatMessage::user(&prompt));
1521 match &outcome {
1522 Ok(TurnOutcome::Completed { text, .. }) => {
1523 let _ = backend.append(&key, &ChatMessage::assistant(text));
1524 }
1525 Ok(TurnOutcome::Cancelled { partial_text, .. })
1526 if !partial_text.is_empty() =>
1527 {
1528 let _ = backend.append(&key, &ChatMessage::assistant(partial_text));
1529 }
1530 _ => {}
1531 }
1532 }
1533 }
1534 }
1535
1536 match outcome {
1537 Ok(TurnOutcome::Completed { text, .. }) => {
1538 self.emit_turn_complete(
1539 &req.session_id,
1540 crate::rpc::types::TurnCompletionOutcome::Completed,
1541 text.clone(),
1542 )
1543 .await;
1544 to_result(SessionPromptResult {
1545 session_id: req.session_id,
1546 stop_reason: "end_turn".to_string(),
1547 content: text,
1548 })
1549 }
1550 Ok(TurnOutcome::Cancelled { partial_text, .. }) => {
1551 let cancel_message = match cancel_cause {
1552 Some(crate::rpc::session::CancelCause::ClientRpc) => {
1553 format!("turn cancelled by user in RPC_SESSION {}", req.session_id)
1554 }
1555 Some(cause) => {
1556 format!("turn cancelled by daemon: {}", cause.as_str())
1557 }
1558 None => "turn cancelled by daemon: unattributed".to_string(),
1559 };
1560 ::zeroclaw_log::record!(
1561 INFO,
1562 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Cancel)
1563 .with_category(::zeroclaw_log::EventCategory::Agent)
1564 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1565 .with_attrs(::serde_json::json!({
1566 "session_id": req.session_id,
1567 "agent_alias": attribution_agent_alias,
1568 "model_provider": attribution_model_provider,
1569 "model": attribution_model,
1570 "chat_mode": format!("{chat_mode:?}"),
1571 "cancel_cause": cancel_cause.map(|c| c.as_str()),
1572 })),
1573 "turn cancelled; emitting attributed TurnComplete so the client exits the working state"
1574 );
1575 self.emit_turn_complete(
1576 &req.session_id,
1577 crate::rpc::types::TurnCompletionOutcome::Cancelled,
1578 cancel_message,
1579 )
1580 .await;
1581 to_result(SessionPromptResult {
1582 session_id: req.session_id,
1583 stop_reason: "cancelled".to_string(),
1584 content: partial_text,
1585 })
1586 }
1587 Err(e) => {
1588 ::zeroclaw_log::record!(
1589 ERROR,
1590 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1591 .with_category(::zeroclaw_log::EventCategory::Agent)
1592 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1593 .with_attrs(::serde_json::json!({
1594 "session_id": req.session_id,
1595 "agent_alias": attribution_agent_alias,
1596 "model_provider": attribution_model_provider,
1597 "model": attribution_model,
1598 "chat_mode": format!("{chat_mode:?}"),
1599 "error": e.to_string(),
1600 })),
1601 "turn failed; emitting TurnComplete so the client exits the working state"
1602 );
1603 self.emit_turn_complete(
1604 &req.session_id,
1605 crate::rpc::types::TurnCompletionOutcome::Failed,
1606 format!("turn failed: {e}"),
1607 )
1608 .await;
1609 Err(rpc_err(INTERNAL_ERROR, e.to_string()))
1610 }
1611 }
1612 }
1613
1614 async fn emit_turn_complete(
1618 &self,
1619 session_id: &str,
1620 outcome: crate::rpc::types::TurnCompletionOutcome,
1621 content: String,
1622 ) {
1623 let update = SessionUpdateEvent::TurnComplete {
1624 session_id: session_id.to_string(),
1625 outcome,
1626 content,
1627 };
1628 if let Ok(params) = serde_json::to_value(update) {
1629 let n = JsonRpcNotification::new(notification::SESSION_UPDATE, params);
1630 if let Ok(s) = serde_json::to_string(&n) {
1631 let _ = self.rpc.send_raw(s).await;
1632 }
1633 }
1634 }
1635
1636 async fn handle_session_configure(&self, params: &Value) -> RpcResult {
1637 let req: SessionConfigureParams = parse_params(params)?;
1638
1639 let merged = self
1640 .ctx
1641 .sessions
1642 .set_overrides(&req.session_id, req.overrides)
1643 .await
1644 .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "Session not found"))?;
1645
1646 if let Some(ref model_provider_ref) = merged.model_provider {
1651 let built = {
1652 let config = self.ctx.config.read();
1653 crate::agent::agent::build_session_model_provider(
1654 &config,
1655 model_provider_ref,
1656 merged.model.as_deref(),
1657 )
1658 .map_err(|e| rpc_err(INVALID_PARAMS, e.to_string()))?
1659 };
1660 let (model_provider, model_provider_name, model_name) = built;
1661 self.ctx
1662 .sessions
1663 .apply_model_provider(&req.session_id, model_provider, model_provider_name)
1664 .await
1665 .then_some(())
1666 .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "Session not found"))?;
1667 if let Some(agent) = self.ctx.sessions.get_agent(&req.session_id).await {
1669 agent.lock().await.set_model_name(model_name);
1670 }
1671 }
1672
1673 to_result(SessionConfigureResult {
1674 session_id: req.session_id,
1675 overrides: merged,
1676 })
1677 }
1678
1679 async fn handle_session_cancel(&self, params: &Value) -> RpcResult {
1680 let req: SessionIdParams = parse_params(params)?;
1681 let owner = self
1682 .ctx
1683 .sessions
1684 .session_owner_tui_id(&req.session_id)
1685 .await;
1686 let allowed = match (
1687 owner.as_ref().and_then(|o| o.as_deref()),
1688 self.tui_id.as_deref(),
1689 ) {
1690 (Some(o), Some(c)) => o == c,
1691 _ => false,
1692 };
1693 if !allowed {
1694 let (agent_alias, model_provider, model) =
1695 match self.ctx.sessions.get_agent(&req.session_id).await {
1696 Some(agent) => agent.lock().await.attribution_fields(),
1697 None => (String::new(), String::new(), String::new()),
1698 };
1699 let span = ::zeroclaw_log::info_span!(
1700 target: "zeroclaw_log_internal_scope",
1701 "zeroclaw_scope",
1702 session_key = %req.session_id,
1703 agent_alias = %agent_alias,
1704 model_provider = %model_provider,
1705 model = %model,
1706 channel = "rpc",
1707 );
1708 let _guard = span.enter();
1709 ::zeroclaw_log::record!(
1710 WARN,
1711 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1712 .with_category(::zeroclaw_log::EventCategory::Channel)
1713 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1714 .with_attrs(::serde_json::json!({
1715 "caller_tui_id": self.tui_id.as_deref().unwrap_or("<none>"),
1716 "owner_tui_id": owner
1717 .as_ref()
1718 .and_then(|o| o.as_deref())
1719 .unwrap_or("<none>"),
1720 "peer_label": &self.peer_label,
1721 })),
1722 "session/cancel refused: caller does not own the session"
1723 );
1724 return Err(rpc_err(
1725 SESSION_NOT_OWNED,
1726 "Caller does not own this session",
1727 ));
1728 }
1729 if self.ctx.sessions.cancel_session(&req.session_id) {
1730 to_result(SessionCancelResult {
1731 session_id: req.session_id,
1732 cancelled: true,
1733 })
1734 } else {
1735 Err(rpc_err(
1736 SESSION_NOT_FOUND,
1737 "No active turn for this session",
1738 ))
1739 }
1740 }
1741
1742 async fn handle_session_git_branch(&self, params: &Value) -> RpcResult {
1743 let req: SessionIdParams = parse_params(params)?;
1744 let cwd = self
1745 .ctx
1746 .sessions
1747 .get_workspace_dir(&req.session_id)
1748 .await
1749 .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "session not found"))?;
1750 let info = crate::rpc::git::head_info(std::path::Path::new(&cwd)).unwrap_or_default();
1751 to_result(SessionGitBranchResult {
1752 session_id: req.session_id,
1753 branch: info.branch,
1754 hash: info.hash,
1755 })
1756 }
1757
1758 async fn handle_session_list(&self, params: &Value) -> RpcResult {
1759 let backend = self
1760 .ctx
1761 .session_backend
1762 .as_ref()
1763 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Session persistence is disabled"))?;
1764 let req: SessionListParams = parse_params(params)?;
1765 let config = self.ctx.config.read().clone();
1766
1767 let all = if let Some(ref keyword) = req.query {
1769 if keyword.trim().is_empty() {
1770 backend.list_sessions_with_metadata()
1771 } else {
1772 use zeroclaw_infra::session_backend::SessionQuery;
1773 backend.search(&SessionQuery {
1774 keyword: Some(keyword.clone()),
1775 limit: req.limit,
1776 })
1777 }
1778 } else {
1779 backend.list_sessions_with_metadata()
1780 };
1781
1782 let sessions: Vec<SessionEntry> = all
1783 .into_iter()
1784 .filter(|meta| meta.agent_alias.is_some() || meta.channel_id.is_some())
1785 .map(|meta| {
1786 let agent_alias = meta.agent_alias.clone().or_else(|| {
1787 meta.channel_id
1788 .as_deref()
1789 .and_then(|c| config.agent_for_channel(c))
1790 .map(str::to_string)
1791 });
1792 let session_id = meta
1793 .key
1794 .strip_prefix("rpc_")
1795 .or_else(|| meta.key.strip_prefix("gw_"))
1796 .map(str::to_string)
1797 .unwrap_or_else(|| meta.key.clone());
1798 SessionEntry {
1799 session_id,
1800 session_key: meta.key,
1801 created_at: meta.created_at.to_rfc3339(),
1802 last_activity: meta.last_activity.to_rfc3339(),
1803 message_count: meta.message_count,
1804 agent_alias,
1805 channel_id: meta.channel_id,
1806 name: meta.name,
1807 }
1808 })
1809 .collect();
1810 to_result(SessionListResult { sessions })
1811 }
1812
1813 async fn handle_session_list_acp(&self, _params: &Value) -> RpcResult {
1818 let store = self
1819 .ctx
1820 .acp_session_store
1821 .as_ref()
1822 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "ACP session store is not available"))?;
1823
1824 let summaries = store
1825 .list_sessions()
1826 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("acp session list failed: {e}")))?;
1827
1828 let sessions: Vec<SessionEntry> = summaries
1829 .into_iter()
1830 .map(|s| SessionEntry {
1831 session_id: s.session_uuid.clone(),
1832 session_key: s.session_uuid,
1835 created_at: s.created_at.to_rfc3339(),
1836 last_activity: s.last_activity.to_rfc3339(),
1837 message_count: s.message_count,
1838 agent_alias: Some(s.agent_alias),
1839 channel_id: None,
1840 name: None,
1843 })
1844 .collect();
1845
1846 to_result(SessionListResult { sessions })
1847 }
1848
1849 async fn handle_session_messages(&self, params: &Value) -> RpcResult {
1850 let req: SessionMessagesParams = parse_params(params)?;
1851 let backend = self
1852 .ctx
1853 .session_backend
1854 .as_ref()
1855 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Session persistence is disabled"))?;
1856
1857 let candidates = [
1860 req.session_id.clone(),
1861 format!("rpc_{}", req.session_id),
1862 format!("gw_{}", req.session_id),
1863 ];
1864 let mut raw: Vec<zeroclaw_api::model_provider::ChatMessage> = Vec::new();
1865 for key in &candidates {
1866 let loaded = backend.load(key);
1867 if !loaded.is_empty() {
1868 raw = loaded;
1869 break;
1870 }
1871 }
1872
1873 let total = raw.len();
1880 let limit = req.limit.unwrap_or(total);
1881 let end = req.before_index.map(|i| i.min(total)).unwrap_or(total);
1882 let start = end.saturating_sub(limit);
1883 let messages: Vec<MessageEntry> = raw[start..end]
1884 .iter()
1885 .map(|m| MessageEntry {
1886 role: m.role.clone(),
1887 content: m.content.clone(),
1888 })
1889 .collect();
1890
1891 to_result(SessionMessagesResult {
1892 session_id: req.session_id,
1893 messages,
1894 total,
1895 start,
1896 })
1897 }
1898
1899 async fn handle_session_state(&self, params: &Value) -> RpcResult {
1900 let req: SessionIdParams = parse_params(params)?;
1901 let backend = self
1902 .ctx
1903 .session_backend
1904 .as_ref()
1905 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Session persistence is disabled"))?;
1906 let candidates = [
1907 req.session_id.clone(),
1908 format!("rpc_{}", req.session_id),
1909 format!("gw_{}", req.session_id),
1910 ];
1911 for key in &candidates {
1912 match backend.get_session_state(key) {
1913 Ok(Some(ss)) => {
1914 return to_result(SessionStateResult {
1915 session_id: req.session_id,
1916 state: ss.state,
1917 turn_id: ss.turn_id,
1918 turn_started_at: ss.turn_started_at.map(|t| t.to_rfc3339()),
1919 });
1920 }
1921 Ok(None) => continue,
1922 Err(e) => {
1923 return Err(rpc_err(
1924 INTERNAL_ERROR,
1925 format!("Failed to get session state: {e}"),
1926 ));
1927 }
1928 }
1929 }
1930 Err(rpc_err(SESSION_NOT_FOUND, "Session not found"))
1931 }
1932
1933 async fn handle_session_delete(&self, params: &Value) -> RpcResult {
1934 let req: SessionIdParams = parse_params(params)?;
1935 if let Some(agent) = self.ctx.sessions.get_agent(&req.session_id).await {
1936 agent
1937 .lock()
1938 .await
1939 .channel_handles()
1940 .unregister_channel("rpc");
1941 }
1942 self.ctx.sessions.remove(&req.session_id).await;
1943 if let Some(ref backend) = self.ctx.session_backend {
1945 for key in &[
1946 req.session_id.clone(),
1947 format!("rpc_{}", req.session_id),
1948 format!("gw_{}", req.session_id),
1949 ] {
1950 let _ = backend.delete_session(key);
1951 }
1952 }
1953 to_result(SessionDeleteResult {
1954 session_id: req.session_id,
1955 deleted: true,
1956 })
1957 }
1958
1959 fn handle_session_approve(&self, params: &Value) -> RpcResult {
1960 let p: SessionApproveParams = parse_params(params)?;
1961
1962 let response = match p.decision.as_str() {
1963 "allow_once" => zeroclaw_api::channel::ChannelApprovalResponse::Approve,
1964 "allow_always" => zeroclaw_api::channel::ChannelApprovalResponse::AlwaysApprove,
1965 "reject" | "reject_once" => zeroclaw_api::channel::ChannelApprovalResponse::Deny,
1966 "reject_with_edit" => {
1967 let replacement = p.replacement.unwrap_or_default();
1968 zeroclaw_api::channel::ChannelApprovalResponse::DenyWithEdit { replacement }
1969 }
1970 other => {
1971 return Err(rpc_err(
1972 INVALID_PARAMS,
1973 format!("unknown decision: {other}"),
1974 ));
1975 }
1976 };
1977
1978 self.ctx.approval_pending.resolve(&p.request_id, response);
1979
1980 to_result(SessionApproveResult {
1981 session_id: p.session_id,
1982 request_id: p.request_id,
1983 acknowledged: true,
1984 })
1985 }
1986
1987 async fn handle_memory_list(&self, params: &Value) -> RpcResult {
1990 let mem = self
1991 .ctx
1992 .memory
1993 .as_ref()
1994 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Memory subsystem is not available"))?;
1995 let req: MemoryListParams = parse_params(params)?;
1996 let category = req
1997 .category
1998 .as_deref()
1999 .map(|s| MemoryCategory::Custom(s.to_string()));
2000 let entries = mem
2001 .list(category.as_ref(), req.session_id.as_deref())
2002 .await
2003 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Memory list failed: {e}")))?;
2004 let count = entries.len();
2005 let entries = truncate_memory_previews(entries);
2006 to_result(MemoryListResult { entries, count })
2007 }
2008
2009 async fn handle_memory_search(&self, params: &Value) -> RpcResult {
2010 let mem = self
2011 .ctx
2012 .memory
2013 .as_ref()
2014 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Memory subsystem is not available"))?;
2015 let req: MemorySearchParams = parse_params(params)?;
2016 let entries = mem
2017 .recall(
2018 &req.query,
2019 req.limit,
2020 req.session_id.as_deref(),
2021 req.since.as_deref(),
2022 req.until.as_deref(),
2023 )
2024 .await
2025 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Memory search failed: {e}")))?;
2026 let count = entries.len();
2027 let entries = truncate_memory_previews(entries);
2028 to_result(MemorySearchResult { entries, count })
2029 }
2030
2031 async fn handle_memory_get(&self, params: &Value) -> RpcResult {
2036 let mem = self
2037 .ctx
2038 .memory
2039 .as_ref()
2040 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Memory subsystem is not available"))?;
2041 let req: MemoryGetParams = parse_params(params)?;
2042 let entry = mem
2043 .get(&req.key)
2044 .await
2045 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Memory get failed: {e}")))?;
2046 match entry {
2047 Some(e) => to_result(MemoryGetResult { entry: Some(e) }),
2048 None => Err(rpc_err(
2049 INTERNAL_ERROR,
2050 format!("Memory key `{}` not found", req.key),
2051 )),
2052 }
2053 }
2054
2055 async fn handle_memory_store(&self, params: &Value) -> RpcResult {
2056 let mem = self
2057 .ctx
2058 .memory
2059 .as_ref()
2060 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Memory subsystem is not available"))?;
2061 let req: MemoryStoreParams = parse_params(params)?;
2062 let category = req
2063 .category
2064 .as_deref()
2065 .map(|s| MemoryCategory::Custom(s.to_string()))
2066 .unwrap_or(MemoryCategory::Custom("user".into()));
2067 mem.store(&req.key, &req.content, category, req.session_id.as_deref())
2068 .await
2069 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Memory store failed: {e}")))?;
2070 to_result(MemoryStoreResult {
2071 key: req.key,
2072 stored: true,
2073 })
2074 }
2075
2076 async fn handle_memory_delete(&self, params: &Value) -> RpcResult {
2077 let mem = self
2078 .ctx
2079 .memory
2080 .as_ref()
2081 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Memory subsystem is not available"))?;
2082 let req: MemoryDeleteParams = parse_params(params)?;
2083 mem.forget(&req.key)
2084 .await
2085 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Memory delete failed: {e}")))?;
2086 to_result(MemoryDeleteResult {
2087 key: req.key,
2088 deleted: true,
2089 })
2090 }
2091
2092 async fn handle_cron_list(&self) -> RpcResult {
2095 let config = self.ctx.config.read().clone();
2096 let jobs = crate::cron::list_jobs(&config)
2097 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cron list failed: {e}")))?;
2098 to_result(CronListResult { jobs })
2099 }
2100
2101 async fn handle_cron_get(&self, params: &Value) -> RpcResult {
2102 let req: CronIdParams = parse_params(params)?;
2103 let config = self.ctx.config.read().clone();
2104 let job = crate::cron::get_job(&config, &req.id)
2105 .map_err(|e| rpc_err(INVALID_PARAMS, format!("Cron job not found: {e}")))?;
2106 to_result(job)
2107 }
2108
2109 async fn handle_cron_add(&self, params: &Value) -> RpcResult {
2110 let req: CronAddParams = parse_params(params)?;
2111 let config = self.ctx.config.read().clone();
2112 let schedule = Schedule::Cron {
2113 expr: req.schedule,
2114 tz: req.tz,
2115 };
2116 let job = crate::cron::add_shell_job_with_approval(
2117 &config,
2118 &req.agent,
2119 req.name,
2120 schedule,
2121 req.command.as_deref().unwrap_or(""),
2122 req.delivery,
2123 true, )
2125 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cron add failed: {e}")))?;
2126 to_result(job)
2127 }
2128
2129 async fn handle_cron_patch(&self, params: &Value) -> RpcResult {
2130 let req: CronPatchParams = parse_params(params)?;
2131 let config = self.ctx.config.read().clone();
2132 let patch = CronJobPatch {
2133 schedule: req.schedule.map(|s| Schedule::Cron {
2134 expr: s,
2135 tz: if req.clear_tz == Some(true) {
2136 None
2137 } else {
2138 req.tz
2139 },
2140 }),
2141 command: req.command,
2142 prompt: req.prompt,
2143 name: req.name,
2144 ..Default::default()
2145 };
2146 let job = crate::cron::update_job(&config, &req.id, patch)
2147 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cron patch failed: {e}")))?;
2148 to_result(job)
2149 }
2150
2151 async fn handle_cron_delete(&self, params: &Value) -> RpcResult {
2152 let req: CronIdParams = parse_params(params)?;
2153 let config = self.ctx.config.read().clone();
2154 crate::cron::remove_job(&config, &req.id)
2155 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cron delete failed: {e}")))?;
2156 to_result(CronDeleteResult {
2157 id: req.id,
2158 deleted: true,
2159 })
2160 }
2161
2162 async fn handle_cron_runs(&self, params: &Value) -> RpcResult {
2163 let req: CronRunsParams = parse_params(params)?;
2164 let config = self.ctx.config.read().clone();
2165 let limit = req.limit.unwrap_or(20) as usize;
2166 let runs = crate::cron::list_runs(&config, &req.id, limit)
2167 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cron runs failed: {e}")))?;
2168 to_result(CronRunsResult { runs })
2169 }
2170
2171 async fn handle_cron_trigger(&self, params: &Value) -> RpcResult {
2172 let req: CronIdParams = parse_params(params)?;
2173 let config = self.ctx.config.read().clone();
2174 let job = crate::cron::get_job(&config, &req.id)
2175 .map_err(|e| rpc_err(INVALID_PARAMS, format!("Cron job not found: {e}")))?;
2176 let (success, output) = crate::cron::scheduler::execute_job_now(&config, &job).await;
2177 to_result(CronTriggerResult {
2178 id: req.id,
2179 success,
2180 output,
2181 })
2182 }
2183
2184 async fn handle_cron_settings(&self, params: &Value) -> RpcResult {
2185 let config = self.ctx.config.read().clone();
2186 if params.get("patch").is_some() {
2188 not_yet_implemented(Method::CronSettings)
2189 } else {
2190 Ok(serde_json::to_value(&config.scheduler).unwrap_or(Value::Null))
2191 }
2192 }
2193
2194 fn handle_config_get(&self, params: &Value) -> RpcResult {
2197 use zeroclaw_config::traits::MaskSecrets;
2198 let req: ConfigGetParams = parse_params(params)?;
2199 let config = self.ctx.config.read().clone();
2200 if let Some(prop) = req.prop {
2201 let val = config
2202 .get_prop(&prop)
2203 .map_err(|e| rpc_err(INVALID_PARAMS, format!("Unknown prop: {e}")))?;
2204 to_result(ConfigGetPropResult { prop, value: val })
2205 } else {
2206 let mut masked = config;
2208 masked.mask_secrets();
2209 Ok(serde_json::to_value(&masked).unwrap_or(Value::Null))
2210 }
2211 }
2212
2213 async fn handle_config_set(&self, params: &Value) -> RpcResult {
2214 let req: ConfigSetParams = parse_params(params)?;
2215 let refresh_model_provider_ref = model_provider_ref_from_provider_profile_prop(&req.prop);
2216 {
2217 let mut config = self.ctx.config.write();
2218 config.ensure_map_key_for_path(&req.prop);
2219 let info = config
2220 .prop_fields()
2221 .into_iter()
2222 .find(|f| f.name == req.prop);
2223 let value_str = match &req.value {
2225 Value::String(s) => s.clone(),
2226 other => zeroclaw_config::typed_value::coerce_for_set_prop(
2227 other,
2228 info.as_ref().map(|i| i.kind),
2229 )
2230 .map_err(|e| rpc_err(INVALID_PARAMS, e.message))?,
2231 };
2232 let is_secret_prop = info
2237 .as_ref()
2238 .is_some_and(|i| i.is_secret || i.derived_from_secret)
2239 || zeroclaw_config::schema::Config::prop_is_secret(&req.prop);
2240 if is_secret_prop
2241 && (value_str == zeroclaw_config::traits::MASKED_SECRET
2242 || value_str == "****"
2243 || value_str.is_empty())
2244 {
2245 return Err(rpc_err(
2246 INVALID_PARAMS,
2247 format!(
2248 "Refusing to overwrite secret `{}` with a masked or empty value",
2249 req.prop
2250 ),
2251 ));
2252 }
2253 config
2254 .set_prop_persistent(&req.prop, &value_str)
2255 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Config set failed: {e}")))?;
2256 }
2257 self.flush_config().await?;
2258 if let Some(model_provider_ref) = refresh_model_provider_ref {
2259 self.schedule_live_sessions_refresh_for_model_provider(model_provider_ref);
2260 }
2261 to_result(ConfigSetResult {
2262 prop: req.prop,
2263 set: true,
2264 })
2265 }
2266
2267 fn schedule_live_sessions_refresh_for_model_provider(&self, model_provider_ref: String) {
2268 let ctx = Arc::clone(&self.ctx);
2269 zeroclaw_spawn::spawn!(async move {
2270 Self::refresh_live_sessions_for_model_provider(ctx, &model_provider_ref).await;
2271 });
2272 }
2273
2274 async fn refresh_live_sessions_for_model_provider(
2275 ctx: Arc<RpcContext>,
2276 model_provider_ref: &str,
2277 ) {
2278 let session_ids = ctx.sessions.list_ids().await;
2279 for session_id in session_ids {
2280 let Some(agent_alias) = ctx.sessions.get_agent_alias(&session_id).await else {
2281 continue;
2282 };
2283 let Some(overrides) = ctx.sessions.get_overrides(&session_id).await else {
2284 continue;
2285 };
2286 let uses_provider = {
2287 let config = ctx.config.read();
2288 let effective_ref = overrides.model_provider.as_deref().or_else(|| {
2289 config
2290 .agent(&agent_alias)
2291 .map(|agent| agent.model_provider.as_str())
2292 });
2293 effective_ref == Some(model_provider_ref)
2294 };
2295 if !uses_provider {
2296 continue;
2297 }
2298
2299 let (model_provider, model_provider_name, model_name, temperature) = {
2300 let config = ctx.config.read();
2301 let provider_temperature = model_provider_ref.split_once('.').and_then(
2302 |(provider_type, provider_alias)| {
2303 config
2304 .providers
2305 .models
2306 .find(provider_type, provider_alias)
2307 .and_then(|entry| entry.temperature)
2308 },
2309 );
2310 match crate::agent::agent::build_session_model_provider(
2311 &config,
2312 model_provider_ref,
2313 overrides.model.as_deref(),
2314 ) {
2315 Ok((model_provider, model_provider_name, model_name)) => (
2316 model_provider,
2317 model_provider_name,
2318 model_name,
2319 overrides.temperature.or(provider_temperature),
2320 ),
2321 Err(e) => {
2322 ::zeroclaw_log::record!(
2323 WARN,
2324 ::zeroclaw_log::Event::new(
2325 module_path!(),
2326 ::zeroclaw_log::Action::Note
2327 )
2328 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2329 .with_attrs(::serde_json::json!({
2330 "session_id": session_id,
2331 "agent_alias": agent_alias,
2332 "model_provider": model_provider_ref,
2333 "error": e.to_string(),
2334 })),
2335 "config/set saved provider profile but live session refresh failed"
2336 );
2337 continue;
2338 }
2339 }
2340 };
2341 if ctx
2342 .sessions
2343 .apply_model_provider(&session_id, model_provider, model_provider_name)
2344 .await
2345 && let Some(agent) = ctx.sessions.get_agent(&session_id).await
2346 {
2347 let mut agent = agent.lock().await;
2348 agent.set_model_name(model_name);
2349 agent.set_temperature(temperature);
2350 }
2351 }
2352 }
2353
2354 fn handle_config_validate(&self) -> RpcResult {
2355 let config = self.ctx.config.read().clone();
2356 match config.validate() {
2357 Ok(()) => to_result(ConfigValidateResult {
2358 valid: true,
2359 error: None,
2360 }),
2361 Err(e) => to_result(ConfigValidateResult {
2362 valid: false,
2363 error: Some(e.to_string()),
2364 }),
2365 }
2366 }
2367
2368 fn handle_config_reload(&self) -> RpcResult {
2369 let Some(reload_tx) = self.ctx.reload_tx.clone() else {
2370 return Err(rpc_err(INTERNAL_ERROR, "Reload not available"));
2371 };
2372 zeroclaw_spawn::spawn!(async move {
2374 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2375 let _ = reload_tx.send(true);
2376 });
2377 to_result(ConfigReloadResult { reloading: true })
2378 }
2379
2380 fn handle_config_list(&self, params: &Value) -> RpcResult {
2381 use zeroclaw_config::field_visibility;
2382 use zeroclaw_config::traits::ConfigFieldEntry;
2383 let req: ConfigListParams = parse_params(params)?;
2384 let config = self.ctx.config.read().clone();
2385 let prefix = req.prefix.as_deref();
2386 let excluded = field_visibility::excluded_paths(&config, prefix.unwrap_or(""));
2387 let entries: Vec<ConfigFieldEntry> = config
2388 .prop_fields()
2389 .into_iter()
2390 .filter(|info| match prefix {
2391 Some(p) => field_visibility::path_matches_prefix(&info.name, p),
2392 None => true,
2393 })
2394 .filter(|info| !field_visibility::is_excluded(&info.name, &excluded))
2395 .map(|info| {
2396 let env = config.prop_is_env_overridden(&info.name);
2397 ConfigFieldEntry::from_prop_field(info, env)
2398 })
2399 .collect();
2400 to_result(ConfigListResult { entries })
2401 }
2402
2403 async fn handle_config_delete(&self, params: &Value) -> RpcResult {
2404 let req: ConfigDeleteParams = parse_params(params)?;
2405 let refresh_model_provider_ref = model_provider_ref_from_provider_profile_prop(&req.prop);
2406 {
2407 let mut config = self.ctx.config.write();
2408 config
2409 .set_prop_persistent(&req.prop, "")
2410 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Config delete failed: {e}")))?;
2411 }
2412 self.flush_config().await?;
2413 if let Some(model_provider_ref) = refresh_model_provider_ref {
2414 self.schedule_live_sessions_refresh_for_model_provider(model_provider_ref);
2415 }
2416 to_result(ConfigDeleteResult {
2417 prop: req.prop,
2418 deleted: true,
2419 })
2420 }
2421
2422 fn handle_config_map_keys(&self, params: &Value) -> RpcResult {
2423 let req: ConfigMapKeysParams = parse_params(params)?;
2424 let config = self.ctx.config.read().clone();
2425 let keys = config.get_map_keys(&req.path).ok_or_else(|| {
2426 rpc_err(
2427 INVALID_PARAMS,
2428 format!("No map-keyed section at `{}`", req.path),
2429 )
2430 })?;
2431 to_result(ConfigMapKeysResult {
2432 path: req.path,
2433 keys,
2434 })
2435 }
2436
2437 async fn handle_config_map_key_create(&self, params: &Value) -> RpcResult {
2438 let req: ConfigMapKeyCreateParams = parse_params(params)?;
2439 let created = {
2440 let mut config = self.ctx.config.write();
2441 let created = config
2442 .create_map_key(&req.path, &req.key)
2443 .map_err(|e| rpc_err(INVALID_PARAMS, e))?;
2444 if created {
2445 config.mark_dirty(&format!("{}.{}", req.path, req.key));
2446 }
2447 created
2448 };
2449 if created {
2450 self.flush_config().await?;
2451 }
2452 to_result(ConfigMapKeyCreateResult {
2453 path: req.path,
2454 key: req.key,
2455 created,
2456 })
2457 }
2458
2459 async fn handle_config_map_key_delete(&self, params: &Value) -> RpcResult {
2460 let req: ConfigMapKeyDeleteParams = parse_params(params)?;
2461 let deleted = {
2462 let mut config = self.ctx.config.write();
2463 let deleted = config
2464 .delete_map_key(&req.path, &req.key)
2465 .map_err(|e| rpc_err(INVALID_PARAMS, e))?;
2466 if deleted {
2467 config.mark_dirty(&format!("{}.{}", req.path, req.key));
2468 }
2469 deleted
2470 };
2471 if deleted {
2472 self.flush_config().await?;
2473 }
2474 to_result(ConfigMapKeyDeleteResult {
2475 path: req.path,
2476 key: req.key,
2477 deleted,
2478 })
2479 }
2480
2481 async fn handle_config_map_key_rename(&self, params: &Value) -> RpcResult {
2482 let req: ConfigMapKeyRenameParams = parse_params(params)?;
2483 let renamed = {
2484 let mut config = self.ctx.config.write();
2485 let renamed = config
2486 .rename_map_key(&req.path, &req.from, &req.to)
2487 .map_err(|e| rpc_err(INVALID_PARAMS, e))?;
2488 if renamed {
2489 config.mark_dirty(&format!("{}.{}", req.path, req.from));
2490 config.mark_dirty(&format!("{}.{}", req.path, req.to));
2491 }
2492 renamed
2493 };
2494 if renamed {
2495 self.flush_config().await?;
2496 }
2497 to_result(ConfigMapKeyRenameResult {
2498 path: req.path,
2499 from: req.from,
2500 to: req.to,
2501 renamed,
2502 })
2503 }
2504
2505 fn handle_config_templates(&self) -> RpcResult {
2506 use zeroclaw_config::schema::Config;
2507 let templates: Vec<ConfigTemplateEntry> = Config::map_key_sections()
2508 .into_iter()
2509 .map(Into::into)
2510 .collect();
2511 to_result(ConfigTemplatesResult { templates })
2512 }
2513
2514 fn handle_agents_list(&self) -> RpcResult {
2517 let config = self.ctx.config.read().clone();
2518 let agents: Vec<AgentEntry> = config
2519 .agents
2520 .iter()
2521 .map(|(alias, agent_cfg)| AgentEntry {
2522 alias: alias.clone(),
2523 enabled: agent_cfg.enabled,
2524 channels: agent_cfg.channels.iter().map(|c| c.to_string()).collect(),
2525 })
2526 .collect();
2527 to_result(AgentsListResult { agents })
2528 }
2529
2530 async fn handle_agents_status(&self) -> RpcResult {
2531 let config = self.ctx.config.read().clone();
2532
2533 let rpc_counts = self.ctx.sessions.count_by_agent().await;
2536 let mut backend_counts = std::collections::HashMap::<String, usize>::new();
2537 if let Some(ref backend) = self.ctx.session_backend {
2538 for meta in backend.list_sessions_with_metadata() {
2539 let alias = meta.agent_alias.or_else(|| {
2540 meta.channel_id
2541 .as_deref()
2542 .and_then(|c| config.agent_for_channel(c))
2543 .map(str::to_string)
2544 });
2545 if let Some(a) = alias {
2546 *backend_counts.entry(a).or_default() += 1;
2547 }
2548 }
2549 }
2550
2551 let agents: Vec<AgentStatusEntry> = config
2552 .agents
2553 .iter()
2554 .map(|(alias, agent_cfg)| {
2555 let rpc = *rpc_counts.get(alias).unwrap_or(&0);
2556 let persisted = *backend_counts.get(alias).unwrap_or(&0);
2557 AgentStatusEntry {
2558 alias: alias.clone(),
2559 enabled: agent_cfg.enabled,
2560 live_sessions: rpc,
2561 persisted_sessions: persisted,
2562 channels: agent_cfg.channels.iter().map(|c| c.to_string()).collect(),
2563 }
2564 })
2565 .collect();
2566 to_result(AgentsStatusResult { agents })
2567 }
2568
2569 fn handle_cost_query(&self, params: &Value) -> RpcResult {
2572 let tracker = self
2573 .ctx
2574 .cost_tracker
2575 .as_ref()
2576 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Cost tracking is not available"))?;
2577 let req: CostQueryParams = parse_params(params)?;
2578 let summary = if let Some(agent) = req.agent {
2579 tracker
2580 .get_summary_for_agent(&agent)
2581 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cost query failed: {e}")))?
2582 } else {
2583 tracker
2584 .get_summary()
2585 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Cost query failed: {e}")))?
2586 };
2587 to_result(summary)
2588 }
2589
2590 fn handle_skills_bundles(&self) -> RpcResult {
2593 let config = self.ctx.config.read().clone();
2594 let root = config.install_root_dir();
2595 let svc = crate::skills::service::SkillsService::new(&config, &root);
2596 let bundles: Vec<SkillBundleEntry> = svc
2597 .list_bundles()
2598 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Skills bundles failed: {e}")))?
2599 .into_iter()
2600 .map(|b| SkillBundleEntry {
2601 alias: b.alias,
2602 directory: b.directory.to_string_lossy().to_string(),
2603 include: b.include,
2604 exclude: b.exclude,
2605 })
2606 .collect();
2607 to_result(SkillsBundlesResult { bundles })
2608 }
2609
2610 fn handle_skills_list(&self, params: &Value) -> RpcResult {
2611 let req: SkillsListParams = parse_params(params)?;
2612 let config = self.ctx.config.read().clone();
2613 let root = config.install_root_dir();
2614 let svc = crate::skills::service::SkillsService::new(&config, &root);
2615 let skills: Vec<SkillListEntry> = svc
2616 .list_skills(req.bundle.as_deref())
2617 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Skills list failed: {e}")))?
2618 .into_iter()
2619 .map(|s| SkillListEntry {
2620 bundle: s.r#ref.bundle().to_string(),
2621 name: s.r#ref.name().to_string(),
2622 directory: s.directory.to_string_lossy().to_string(),
2623 frontmatter: s.frontmatter,
2624 })
2625 .collect();
2626 to_result(SkillsListResult { skills })
2627 }
2628
2629 fn handle_skills_read(&self, params: &Value) -> RpcResult {
2630 let req: SkillsReadParams = parse_params(params)?;
2631 let config = self.ctx.config.read().clone();
2632 let root = config.install_root_dir();
2633 let svc = crate::skills::service::SkillsService::new(&config, &root);
2634 let skill_ref = svc
2635 .resolve_ref(&req.name, Some(&req.bundle))
2636 .map_err(|e| rpc_err(INVALID_PARAMS, format!("Invalid skill ref: {e}")))?;
2637 let doc = svc
2638 .read_skill(&skill_ref)
2639 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Skill read failed: {e}")))?;
2640 to_result(SkillsReadResult {
2641 bundle: req.bundle,
2642 name: req.name,
2643 frontmatter: doc.frontmatter,
2644 body: doc.body,
2645 })
2646 }
2647
2648 fn handle_skills_write(&self, params: &Value) -> RpcResult {
2649 let req: SkillsWriteParams = parse_params(params)?;
2650 let config = self.ctx.config.read().clone();
2651 let root = config.install_root_dir();
2652 let svc = crate::skills::service::SkillsService::new(&config, &root);
2653 let skill_ref = svc
2654 .resolve_ref(&req.name, Some(&req.bundle))
2655 .map_err(|e| rpc_err(INVALID_PARAMS, format!("Invalid skill ref: {e}")))?;
2656 let doc = crate::skills::document::SkillDocument {
2657 frontmatter: req.frontmatter,
2658 body: req.body,
2659 };
2660 svc.write_skill(&skill_ref, &doc)
2661 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Skill write failed: {e}")))?;
2662 to_result(SkillsWriteResult {
2663 bundle: req.bundle,
2664 name: req.name,
2665 written: true,
2666 })
2667 }
2668
2669 fn handle_skills_delete(&self, params: &Value) -> RpcResult {
2670 let req: SkillsDeleteParams = parse_params(params)?;
2671 let config = self.ctx.config.read().clone();
2672 let root = config.install_root_dir();
2673 let svc = crate::skills::service::SkillsService::new(&config, &root);
2674 let skill_ref = svc
2675 .resolve_ref(&req.name, Some(&req.bundle))
2676 .map_err(|e| rpc_err(INVALID_PARAMS, format!("Invalid skill ref: {e}")))?;
2677 svc.remove_skill(&skill_ref, crate::skills::service::RemoveMode::Archive)
2678 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Skill delete failed: {e}")))?;
2679 to_result(SkillsDeleteResult {
2680 bundle: req.bundle,
2681 name: req.name,
2682 deleted: true,
2683 })
2684 }
2685
2686 fn handle_personality_list(&self, params: &Value) -> RpcResult {
2689 let req: PersonalityListParams = parse_params(params)?;
2690 let config = self.ctx.config.read().clone();
2691 let workspace = req.agent.as_deref().map(|a| config.agent_workspace_dir(a));
2692 let files: Vec<PersonalityFileEntry> =
2693 crate::agent::personality::EDITABLE_PERSONALITY_FILES
2694 .iter()
2695 .map(|&filename| {
2696 let (exists, size, mtime_ms) = workspace
2697 .as_ref()
2698 .and_then(|dir| {
2699 let path = dir.join(filename);
2700 let meta = std::fs::metadata(&path).ok()?;
2701 let mtime = meta
2702 .modified()
2703 .ok()
2704 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
2705 .map(|d| d.as_millis() as i64);
2706 Some((true, meta.len(), mtime))
2707 })
2708 .unwrap_or((false, 0, None));
2709 PersonalityFileEntry {
2710 filename: filename.to_string(),
2711 exists,
2712 size,
2713 mtime_ms,
2714 }
2715 })
2716 .collect();
2717 to_result(PersonalityListResult {
2718 files,
2719 max_chars: crate::agent::personality::MAX_FILE_CHARS,
2720 })
2721 }
2722
2723 fn handle_personality_get(&self, params: &Value) -> RpcResult {
2724 let req: PersonalityGetParams = parse_params(params)?;
2725 let config = self.ctx.config.read().clone();
2726
2727 if !crate::agent::personality::EDITABLE_PERSONALITY_FILES.contains(&req.filename.as_str()) {
2729 return Err(rpc_err(
2730 INVALID_PARAMS,
2731 format!("Not an editable file: {}", req.filename),
2732 ));
2733 }
2734 let workspace = config.agent_workspace_dir(&req.agent);
2735 let path = workspace.join(&req.filename);
2736 match std::fs::read_to_string(&path) {
2737 Ok(content) => {
2738 let mtime_ms = std::fs::metadata(&path)
2739 .ok()
2740 .and_then(|m| m.modified().ok())
2741 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
2742 .map(|d| d.as_millis() as i64);
2743 let truncated = content.chars().count() > crate::agent::personality::MAX_FILE_CHARS;
2744 to_result(PersonalityGetResult {
2745 filename: req.filename,
2746 content: Some(content),
2747 exists: true,
2748 truncated,
2749 mtime_ms,
2750 })
2751 }
2752 Err(e) if e.kind() == std::io::ErrorKind::NotFound => to_result(PersonalityGetResult {
2753 filename: req.filename,
2754 content: None,
2755 exists: false,
2756 truncated: false,
2757 mtime_ms: None,
2758 }),
2759 Err(e) => Err(rpc_err(INTERNAL_ERROR, format!("Read failed: {e}"))),
2760 }
2761 }
2762
2763 fn handle_personality_put(&self, params: &Value) -> RpcResult {
2764 let req: PersonalityPutParams = parse_params(params)?;
2765 let config = self.ctx.config.read().clone();
2766
2767 if !crate::agent::personality::EDITABLE_PERSONALITY_FILES.contains(&req.filename.as_str()) {
2768 return Err(rpc_err(
2769 INVALID_PARAMS,
2770 format!("Not an editable file: {}", req.filename),
2771 ));
2772 }
2773 if req.content.chars().count() > crate::agent::personality::MAX_FILE_CHARS {
2774 return Err(rpc_err(
2775 INVALID_PARAMS,
2776 format!(
2777 "Content exceeds {} char limit",
2778 crate::agent::personality::MAX_FILE_CHARS
2779 ),
2780 ));
2781 }
2782 let workspace = config.agent_workspace_dir(&req.agent);
2783 let path = workspace.join(&req.filename);
2784 if let Some(parent) = path.parent() {
2785 let _ = std::fs::create_dir_all(parent);
2786 }
2787 std::fs::write(&path, &req.content)
2788 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Write failed: {e}")))?;
2789 let bytes_written = req.content.len() as u64;
2790 let mtime_ms = std::fs::metadata(&path)
2791 .ok()
2792 .and_then(|m| m.modified().ok())
2793 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
2794 .map(|d| d.as_millis() as i64);
2795 to_result(PersonalityPutResult {
2796 bytes_written,
2797 mtime_ms,
2798 })
2799 }
2800
2801 fn handle_personality_templates(&self, params: &Value) -> RpcResult {
2802 let req: PersonalityTemplatesParams = parse_params(params)?;
2803 let config = self.ctx.config.read().clone();
2804 let ctx = personality_template_context(&config, &req);
2805 let templates = crate::agent::personality_templates::render_preset_default(&ctx);
2806 let files: Vec<TemplateFileEntry> = templates
2807 .into_iter()
2808 .map(|(name, content)| TemplateFileEntry {
2809 filename: name.to_string(),
2810 content,
2811 })
2812 .collect();
2813 to_result(PersonalityTemplatesResult {
2814 preset: "default".to_string(),
2815 files,
2816 })
2817 }
2818
2819 fn handle_config_sections(&self) -> RpcResult {
2822 use zeroclaw_config::schema::Config;
2823 use zeroclaw_config::sections::{
2824 QUICKSTART_SECTIONS, Section, SectionShape, section_help, section_index_for_key,
2825 };
2826
2827 let config = self.ctx.config.read().clone();
2828
2829 let mut roots: std::collections::BTreeSet<String> = config
2832 .prop_fields()
2833 .iter()
2834 .filter_map(|f| f.name.split('.').next().map(str::to_string))
2835 .collect();
2836
2837 const HIDDEN: &[&str] = &[
2839 "schema_version",
2840 "onboard_state",
2841 "onboard-state",
2842 "config_path",
2843 "workspace_dir",
2844 "env_overridden_paths",
2845 "pre_override_snapshots",
2846 ];
2847 for h in HIDDEN {
2848 roots.remove(*h);
2849 }
2850
2851 let all_map_paths: Vec<&'static str> =
2853 Config::map_key_sections().iter().map(|s| s.path).collect();
2854 for &prefix in &all_map_paths
2855 .iter()
2856 .filter_map(|p| p.split('.').next())
2857 .collect::<std::collections::HashSet<_>>()
2858 {
2859 roots.insert(prefix.to_string());
2860 }
2861
2862 for s in QUICKSTART_SECTIONS {
2864 roots.insert(s.as_str().to_string());
2865 }
2866
2867 let parents_with_children: std::collections::HashSet<String> = roots
2870 .iter()
2871 .filter_map(|k| k.split_once('.').map(|(p, _)| p.to_string()))
2872 .collect();
2873 roots.retain(|k| k.contains('.') || !parents_with_children.contains(k));
2874
2875 roots.retain(|k| !k.starts_with("cost.rates"));
2877
2878 let mut ordered: Vec<String> = roots.into_iter().collect();
2880 ordered.sort_by(
2881 |a, b| match (section_index_for_key(a), section_index_for_key(b)) {
2882 (Some(ai), Some(bi)) => ai.cmp(&bi),
2883 (Some(_), None) => std::cmp::Ordering::Less,
2884 (None, Some(_)) => std::cmp::Ordering::Greater,
2885 (None, None) => a.cmp(b),
2886 },
2887 );
2888
2889 let section_has_picker_for_key = |key: &str| -> bool {
2892 let key_dot = format!("{key}.");
2893 all_map_paths.iter().any(|p| {
2894 *p == key
2895 || p.strip_prefix(&key_dot)
2896 .is_some_and(|rest| !rest.contains('.'))
2897 })
2898 };
2899
2900 let sections: Vec<ConfigSectionEntry> = ordered
2901 .into_iter()
2902 .map(|key| {
2903 let wizard = Section::from_key(&key);
2904 let has_picker = match wizard {
2905 Some(w) => matches!(
2906 w.shape(),
2907 SectionShape::TypedFamilyMap | SectionShape::OneTierAliasMap
2908 ),
2909 None => section_has_picker_for_key(&key),
2910 };
2911 let completed = wizard
2912 .map(|w| zeroclaw_config::sections::section_has_signal(&config, w))
2913 .unwrap_or(false);
2914 let label = zeroclaw_config::sections::humanize_section_key(&key);
2915 ConfigSectionEntry {
2916 help: section_help(&key).to_string(),
2917 has_picker,
2918 completed,
2919 ready: false,
2920 group: String::new(),
2921 is_quickstart: wizard.is_some(),
2922 shape: wizard.map(Section::shape),
2923 label,
2924 key,
2925 }
2926 })
2927 .collect();
2928 to_result(ConfigSectionsResult { sections })
2929 }
2930
2931 fn handle_config_status(&self) -> RpcResult {
2932 use zeroclaw_config::sections::QUICKSTART_SECTIONS;
2933 let config = self.ctx.config.read().clone();
2934 let missing: Vec<String> = QUICKSTART_SECTIONS
2935 .iter()
2936 .filter(|&&s| !zeroclaw_config::sections::section_has_signal(&config, s))
2937 .map(|s| s.as_str().to_string())
2938 .collect();
2939 let needs_quickstart = !missing.is_empty();
2940 let reason = if needs_quickstart {
2941 format!("{} section(s) incomplete", missing.len())
2942 } else {
2943 "all sections complete".to_string()
2944 };
2945 to_result(ConfigStatusResult {
2946 needs_quickstart,
2947 reason,
2948 has_partial_state: false,
2949 missing,
2950 })
2951 }
2952
2953 fn handle_config_catalog(&self) -> RpcResult {
2954 let providers: Vec<CatalogModelProvider> = zeroclaw_providers::list_model_providers()
2955 .into_iter()
2956 .map(|p| CatalogModelProvider {
2957 name: p.name.to_string(),
2958 display_name: p.display_name.to_string(),
2959 local: p.local,
2960 })
2961 .collect();
2962 to_result(CatalogResponse {
2963 model_providers: providers,
2964 })
2965 }
2966
2967 async fn handle_config_catalog_models(&self, params: &Value) -> RpcResult {
2968 let req: CatalogModelsParams = parse_params(params)?;
2969 let local = crate::quickstart::model_provider_is_local(&req.model_provider);
2970 let (models, pricing, live) = crate::quickstart::model_catalog(&req.model_provider).await;
2971 to_result(CatalogModelsResult {
2972 model_provider: req.model_provider,
2973 models,
2974 pricing,
2975 local,
2976 live,
2977 })
2978 }
2979
2980 async fn handle_logs_subscribe(&self) -> RpcResult {
2983 let event_tx = self
2984 .ctx
2985 .event_tx
2986 .as_ref()
2987 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Event streaming is not available"))?;
2988 let mut rx = event_tx.subscribe();
2989 let rpc = self.rpc.clone();
2990 zeroclaw_spawn::spawn!(async move {
2991 loop {
2992 tokio::select! {
2993 biased;
2994 _ = rpc.closed() => break,
2995 event = rx.recv() => match event {
2996 Ok(event) => {
2997 let notification =
2998 JsonRpcNotification::new(notification::LOGS_EVENT, event);
2999 if let Ok(json) = serde_json::to_string(¬ification)
3000 && !rpc.send_raw(json).await
3001 {
3002 break;
3003 }
3004 }
3005 Err(_) => break,
3006 },
3007 }
3008 }
3009 });
3010 to_result(LogsSubscribeResult { subscribed: true })
3011 }
3012
3013 async fn handle_logs_query(&self, params: &Value) -> RpcResult {
3014 let p: LogsQueryParams = parse_params(params)?;
3015
3016 let path = zeroclaw_log::current_log_path()
3017 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Log persistence is not enabled"))?;
3018
3019 let filter = zeroclaw_log::LogFilter {
3020 since_ts: p.since_ts,
3021 until_ts: p.until_ts,
3022 until_id: p.until_id,
3023 action: p.action,
3024 category: p.category,
3025 outcome: p.outcome,
3026 severity_min: p.severity_min,
3027 trace_id: p.trace_id,
3028 q: p.q,
3029 hide_internal: p.hide_internal,
3030 field_eq: std::collections::BTreeMap::new(),
3031 };
3032
3033 let limit = p.limit.unwrap_or(200);
3034
3035 let page = zeroclaw_log::load_page(&path, &filter, limit)
3036 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Log read failed: {e:#}")))?;
3037
3038 let events: Vec<serde_json::Value> = page
3039 .events
3040 .into_iter()
3041 .filter_map(|evt| serde_json::to_value(evt).ok())
3042 .collect();
3043
3044 to_result(LogsQueryResult {
3045 events,
3046 next_cursor: page.next_cursor,
3047 at_end: page.at_end,
3048 })
3049 }
3050
3051 async fn handle_logs_get(&self, params: &Value) -> RpcResult {
3056 let p: LogsGetParams = parse_params(params)?;
3057 let path = zeroclaw_log::current_log_path()
3058 .ok_or_else(|| rpc_err(INTERNAL_ERROR, "Log persistence is not enabled"))?;
3059 let event = zeroclaw_log::find_event_by_id(&path, &p.id)
3060 .map_err(|e| rpc_err(INTERNAL_ERROR, format!("Log read failed: {e:#}")))?;
3061 match event {
3062 Some(evt) => {
3063 let event = serde_json::to_value(evt).map_err(|e| {
3064 rpc_err(INTERNAL_ERROR, format!("Failed to serialize event: {e}"))
3065 })?;
3066 to_result(LogsGetResult { event })
3067 }
3068 None => Err(rpc_err(
3069 INTERNAL_ERROR,
3070 format!("Log id `{}` not found", p.id),
3071 )),
3072 }
3073 }
3074
3075 async fn handle_file_attach(&self, params: &Value) -> RpcResult {
3078 use super::attachments::{MAX_REQUEST_BYTES, process_file_entry};
3079
3080 let req: FileAttachParams = parse_params(params)?;
3081 let sid = &req.session_id;
3082
3083 let agent_alias = self
3086 .ctx
3087 .sessions
3088 .get_agent_alias(sid)
3089 .await
3090 .ok_or_else(|| rpc_err(SESSION_NOT_FOUND, "Session not found"))?;
3091 let upload_root = self
3092 .ctx
3093 .config
3094 .read()
3095 .agent_workspace_dir(&agent_alias)
3096 .to_string_lossy()
3097 .to_string();
3098
3099 let is_wss = self.peer_label.starts_with("wss:");
3100
3101 let mut total_bytes: u64 = 0;
3102 let mut results = Vec::with_capacity(req.files.len());
3103
3104 for entry in &req.files {
3105 let result =
3106 process_file_entry(entry, sid, &upload_root, is_wss, &self.ctx.sessions).await?;
3107 total_bytes += result.size_bytes;
3108 if total_bytes > MAX_REQUEST_BYTES {
3109 return Err(rpc_err(
3110 INVALID_PARAMS,
3111 format!(
3112 "Total attachment size exceeds {} MB limit",
3113 MAX_REQUEST_BYTES / (1024 * 1024)
3114 ),
3115 ));
3116 }
3117 results.push(result);
3118 }
3119
3120 to_result(FileAttachResult { files: results })
3121 }
3122
3123 async fn send_result(&self, id: Value, result: Value) {
3126 let resp = JsonRpcResponse {
3127 jsonrpc: JSONRPC_VERSION,
3128 result: Some(result),
3129 error: None,
3130 id,
3131 };
3132 if let Ok(json) = serde_json::to_string(&resp) {
3133 let _ = self.rpc.send_raw(json).await;
3134 }
3135 }
3136
3137 async fn send_error(&self, id: Value, code: i32, message: &str) {
3138 let resp = JsonRpcResponse {
3139 jsonrpc: JSONRPC_VERSION,
3140 result: None,
3141 error: Some(JsonRpcError {
3142 code,
3143 message: message.to_string(),
3144 data: None,
3145 }),
3146 id,
3147 };
3148 if let Ok(json) = serde_json::to_string(&resp) {
3149 let _ = self.rpc.send_raw(json).await;
3150 }
3151 }
3152
3153 fn handle_quickstart_state(&self) -> RpcResult {
3162 let cfg = self.ctx.config.read().clone();
3163 to_result(crate::quickstart::snapshot_state(&cfg))
3164 }
3165
3166 fn handle_quickstart_fields(&self, params: &Value) -> RpcResult {
3167 let req: QuickstartFieldsParams = parse_params(params)?;
3168 let descriptors = crate::quickstart::field_shape(req.section, &req.type_key);
3169 to_result(QuickstartFieldsResult {
3170 fields: descriptors,
3171 })
3172 }
3173
3174 fn handle_quickstart_validate(&self, params: &Value) -> RpcResult {
3175 let req: QuickstartValidateParams = parse_params(params)?;
3176 let cfg = self.ctx.config.read().clone();
3177 let body = match crate::quickstart::validate_only_with_surface(
3178 &req.submission,
3179 &cfg,
3180 crate::quickstart::Surface::Tui,
3181 ) {
3182 Ok(()) => QuickstartValidateResult::Ok,
3183 Err(errors) => QuickstartValidateResult::Errors { errors },
3184 };
3185 to_result(body)
3186 }
3187
3188 async fn handle_quickstart_apply(&self, params: &Value) -> RpcResult {
3189 let req: QuickstartApplyParams = parse_params(params)?;
3190 let mut working = self.ctx.config.read().clone();
3194 let result = crate::quickstart::apply_with_surface(
3195 req.submission,
3196 &mut working,
3197 crate::quickstart::Surface::Tui,
3198 )
3199 .await;
3200 let body = match result {
3201 Ok(agent) => {
3202 *self.ctx.config.write() = working;
3203 let reload_signalled = self.signal_daemon_reload();
3204 QuickstartApplyResult::Applied {
3205 agent,
3206 daemon_restarted: reload_signalled,
3207 }
3208 }
3209 Err(errors) => QuickstartApplyResult::Errors { errors },
3210 };
3211 to_result(body)
3212 }
3213
3214 fn handle_quickstart_dismiss(&self, params: &Value) -> RpcResult {
3215 let req: QuickstartDismissParams = parse_params(params)?;
3216 crate::quickstart::record_dismissed(&req.run_id, req.surface, req.last_step);
3217 to_result(QuickstartDismissResult { recorded: true })
3218 }
3219
3220 fn signal_daemon_reload(&self) -> bool {
3225 let Some(reload_tx) = self.ctx.reload_tx.clone() else {
3226 ::zeroclaw_log::record!(
3227 WARN,
3228 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3229 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
3230 .with_attrs(::serde_json::json!({
3231 "reason": "no_supervisor",
3232 "surface": crate::quickstart::Surface::Tui.as_str(),
3233 })),
3234 "quickstart: daemon reload not available (standalone daemon)"
3235 );
3236 return false;
3237 };
3238 ::zeroclaw_log::record!(
3239 INFO,
3240 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start).with_attrs(
3241 ::serde_json::json!({
3242 "surface": crate::quickstart::Surface::Tui.as_str(),
3243 })
3244 ),
3245 "quickstart: daemon reload signalled"
3246 );
3247 let started = std::time::Instant::now();
3248 zeroclaw_spawn::spawn!(async move {
3249 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3250 let _ = reload_tx.send(true);
3251 ::zeroclaw_log::record!(
3252 INFO,
3253 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
3254 .with_outcome(::zeroclaw_log::EventOutcome::Success)
3255 .with_attrs(::serde_json::json!({
3256 "elapsed_ms": started.elapsed().as_millis() as u64,
3257 "surface": crate::quickstart::Surface::Tui.as_str(),
3258 })),
3259 "quickstart: daemon reload dispatched"
3260 );
3261 });
3262 true
3263 }
3264}
3265
3266fn parse_params<T: DeserializeOwned>(params: &Value) -> Result<T, JsonRpcError> {
3269 serde_json::from_value(params.clone()).map_err(|e| rpc_err(INVALID_PARAMS, e.to_string()))
3270}
3271
3272fn to_result<T: Serialize>(val: T) -> RpcResult {
3273 serde_json::to_value(val).map_err(|e| rpc_err(INTERNAL_ERROR, e.to_string()))
3274}
3275
3276const MEMORY_PREVIEW_CONTENT_BYTES: usize = 200;
3283
3284fn truncate_memory_previews(
3287 mut entries: Vec<zeroclaw_api::memory_traits::MemoryEntry>,
3288) -> Vec<zeroclaw_api::memory_traits::MemoryEntry> {
3289 for entry in &mut entries {
3290 if entry.content.len() > MEMORY_PREVIEW_CONTENT_BYTES {
3291 let mut end = MEMORY_PREVIEW_CONTENT_BYTES;
3293 while end > 0 && !entry.content.is_char_boundary(end) {
3294 end -= 1;
3295 }
3296 entry.content.truncate(end);
3297 entry.content.push('…');
3298 }
3299 }
3300 entries
3301}
3302
3303fn notification_for_turn_event(
3304 session_id: &str,
3305 event: &TurnEvent,
3306 max_context_tokens: Option<u64>,
3307) -> Option<String> {
3308 let update = match event {
3309 TurnEvent::Chunk { delta } => SessionUpdateEvent::AgentMessageChunk {
3310 session_id: session_id.to_string(),
3311 text: delta.clone(),
3312 },
3313 TurnEvent::Thinking { delta } => SessionUpdateEvent::AgentThoughtChunk {
3314 session_id: session_id.to_string(),
3315 text: delta.clone(),
3316 },
3317 TurnEvent::ToolCall { id, name, args } => SessionUpdateEvent::ToolCall {
3318 session_id: session_id.to_string(),
3319 tool_call_id: id.clone(),
3320 name: name.clone(),
3321 raw_input: args.clone(),
3322 },
3323 TurnEvent::ToolResult { id, name, output } => SessionUpdateEvent::ToolResult {
3324 session_id: session_id.to_string(),
3325 tool_call_id: id.clone(),
3326 name: name.clone(),
3327 raw_output: output.clone(),
3328 },
3329 TurnEvent::ApprovalRequest {
3330 request_id,
3331 tool_name,
3332 arguments_summary,
3333 timeout_secs,
3334 } => SessionUpdateEvent::ApprovalRequest {
3335 session_id: session_id.to_string(),
3336 request_id: request_id.clone(),
3337 tool_name: tool_name.clone(),
3338 arguments_summary: arguments_summary.clone(),
3339 timeout_secs: *timeout_secs,
3340 },
3341 TurnEvent::Usage {
3342 input_tokens,
3343 cached_input_tokens: _,
3344 output_tokens: _,
3345 ..
3346 } => {
3347 SessionUpdateEvent::ContextUsage {
3353 session_id: session_id.to_string(),
3354 input_tokens: *input_tokens,
3355 max_context_tokens,
3356 }
3357 }
3358 };
3359
3360 let params = serde_json::to_value(update).ok()?;
3361 let n = JsonRpcNotification::new(notification::SESSION_UPDATE, params);
3362 serde_json::to_string(&n).ok()
3363}
3364
3365#[cfg(test)]
3366mod tests {
3367 use super::*;
3368 use serde_json::json;
3369
3370 fn parse(s: &str) -> Value {
3371 serde_json::from_str(s).unwrap()
3372 }
3373
3374 #[test]
3375 fn method_from_wire_roundtrip() {
3376 for (method, wire) in Method::ALL {
3377 assert_eq!(
3378 Method::from_wire(wire),
3379 Some(*method),
3380 "from_wire({wire}) should resolve"
3381 );
3382 assert_eq!(method.wire_name(), *wire, "wire_name roundtrip for {wire}");
3383 }
3384 }
3385
3386 #[test]
3387 fn method_from_wire_unknown() {
3388 assert_eq!(Method::from_wire("nonexistent/method"), None);
3389 }
3390
3391 #[test]
3392 fn method_all_no_duplicates() {
3393 let mut seen = std::collections::HashSet::new();
3394 for (_, wire) in Method::ALL {
3395 assert!(seen.insert(*wire), "duplicate wire name: {wire}");
3396 }
3397 }
3398
3399 #[test]
3400 fn personality_templates_use_requested_agent_name_before_config_exists() {
3401 let req = PersonalityTemplatesParams {
3402 agent: Some(" bob ".to_string()),
3403 };
3404 let ctx = personality_template_context(&zeroclaw_config::schema::Config::default(), &req);
3405
3406 assert_eq!(ctx.agent, "bob");
3407 assert!(ctx.include_memory);
3408 }
3409
3410 #[test]
3411 fn personality_templates_without_agent_stay_generic_and_memoryless() {
3412 let req = PersonalityTemplatesParams { agent: None };
3413 let ctx = personality_template_context(&zeroclaw_config::schema::Config::default(), &req);
3414
3415 assert_eq!(ctx.agent, "ZeroClaw");
3416 assert!(!ctx.include_memory);
3417 }
3418
3419 #[test]
3420 fn chunk_notification() {
3421 let event = TurnEvent::Chunk {
3422 delta: "hello".into(),
3423 };
3424 let json = notification_for_turn_event("s1", &event, None).unwrap();
3425 let v = parse(&json);
3426 assert_eq!(v["jsonrpc"], JSONRPC_VERSION);
3427 assert_eq!(v["method"], notification::SESSION_UPDATE);
3428 assert_eq!(v["params"]["session_id"], "s1");
3429 assert_eq!(v["params"]["type"], "agent_message_chunk");
3430 assert_eq!(v["params"]["text"], "hello");
3431 }
3432
3433 #[test]
3434 fn thinking_notification() {
3435 let event = TurnEvent::Thinking {
3436 delta: "hmm".into(),
3437 };
3438 let json = notification_for_turn_event("s1", &event, None).unwrap();
3439 let v = parse(&json);
3440 assert_eq!(v["params"]["type"], "agent_thought_chunk");
3441 assert_eq!(v["params"]["text"], "hmm");
3442 }
3443
3444 #[test]
3445 fn tool_call_notification() {
3446 let event = TurnEvent::ToolCall {
3447 id: "tc_1".into(),
3448 name: "bash".into(),
3449 args: json!({"cmd": "ls"}),
3450 };
3451 let json = notification_for_turn_event("s1", &event, None).unwrap();
3452 let v = parse(&json);
3453 assert_eq!(v["params"]["type"], "tool_call");
3454 assert_eq!(v["params"]["tool_call_id"], "tc_1");
3455 assert_eq!(v["params"]["name"], "bash");
3456 assert_eq!(v["params"]["raw_input"]["cmd"], "ls");
3457 }
3458
3459 #[test]
3460 fn tool_result_notification() {
3461 let event = TurnEvent::ToolResult {
3462 id: "tc_1".into(),
3463 name: "bash".into(),
3464 output: "file.txt".into(),
3465 };
3466 let json = notification_for_turn_event("s1", &event, None).unwrap();
3467 let v = parse(&json);
3468 assert_eq!(v["params"]["type"], "tool_result");
3469 assert_eq!(v["params"]["tool_call_id"], "tc_1");
3470 assert_eq!(v["params"]["raw_output"], "file.txt");
3471 }
3472
3473 #[test]
3474 fn approval_request_notification() {
3475 let event = TurnEvent::ApprovalRequest {
3476 request_id: "ar_1".into(),
3477 tool_name: "bash".into(),
3478 arguments_summary: "rm -rf /".into(),
3479 timeout_secs: 30,
3480 };
3481 let json = notification_for_turn_event("s1", &event, None).unwrap();
3482 let v = parse(&json);
3483 assert_eq!(v["params"]["type"], "approval_request");
3484 assert_eq!(v["params"]["request_id"], "ar_1");
3485 assert_eq!(v["params"]["tool_name"], "bash");
3486 assert_eq!(v["params"]["timeout_secs"], 30);
3487 }
3488
3489 #[test]
3490 fn usage_event_emits_context_usage_notification() {
3491 let event = TurnEvent::Usage {
3492 input_tokens: Some(100),
3493 cached_input_tokens: None,
3494 output_tokens: Some(50),
3495 cost_usd: Some(0.01),
3496 };
3497 let json = notification_for_turn_event("s1", &event, Some(32_000)).unwrap();
3498 let v = parse(&json);
3499 assert_eq!(v["params"]["type"], "context_usage");
3500 assert_eq!(v["params"]["session_id"], "s1");
3501 assert_eq!(v["params"]["input_tokens"], 100);
3506 assert_eq!(v["params"]["max_context_tokens"], 32_000);
3507 }
3508
3509 #[test]
3510 fn usage_event_without_input_tokens_emits_null() {
3511 let event = TurnEvent::Usage {
3512 input_tokens: None,
3513 cached_input_tokens: None,
3514 output_tokens: Some(50),
3515 cost_usd: None,
3516 };
3517 let json = notification_for_turn_event("s1", &event, None).unwrap();
3518 let v = parse(&json);
3519 assert_eq!(v["params"]["type"], "context_usage");
3520 assert!(
3522 v["params"].get("input_tokens").is_none(),
3523 "absent input_tokens should not be synthesized from output_tokens"
3524 );
3525 }
3526
3527 #[test]
3528 fn usage_event_does_not_double_count_cached_subset() {
3529 let event = TurnEvent::Usage {
3536 input_tokens: Some(25_000),
3537 cached_input_tokens: Some(15_000),
3538 output_tokens: Some(200),
3539 cost_usd: None,
3540 };
3541 let json = notification_for_turn_event("s1", &event, Some(200_000)).unwrap();
3542 let v = parse(&json);
3543 assert_eq!(v["params"]["type"], "context_usage");
3544 assert_eq!(
3545 v["params"]["input_tokens"], 25_000,
3546 "input_tokens is reported as-is — cached subset must not be added"
3547 );
3548 }
3549
3550 #[test]
3551 fn usage_event_only_cached_tokens_emits_null() {
3552 let event = TurnEvent::Usage {
3555 input_tokens: None,
3556 cached_input_tokens: Some(80_000),
3557 output_tokens: Some(100),
3558 cost_usd: None,
3559 };
3560 let json = notification_for_turn_event("s1", &event, Some(100_000)).unwrap();
3561 let v = parse(&json);
3562 assert!(
3563 v["params"].get("input_tokens").is_none(),
3564 "cached-only is ambiguous; do not fabricate a total"
3565 );
3566 }
3567
3568 #[test]
3569 fn parse_params_valid() {
3570 let v = json!({"session_id": "s1"});
3571 let p: SessionIdParams = parse_params(&v).unwrap();
3572 assert_eq!(p.session_id, "s1");
3573 }
3574
3575 #[test]
3576 fn parse_params_missing_required() {
3577 let v = json!({});
3578 let err = parse_params::<SessionIdParams>(&v).unwrap_err();
3579 assert_eq!(err.code, INVALID_PARAMS);
3580 }
3581
3582 #[test]
3583 fn to_result_roundtrip() {
3584 let r = InitializeResult {
3585 protocol_version: 1,
3586 server_version: "0.1.0".into(),
3587 tui_id: None,
3588 tui_sig: None,
3589 capabilities: vec![],
3590 };
3591 let val = to_result(r).unwrap();
3592 assert_eq!(val["protocol_version"], 1);
3593 assert_eq!(val["server_version"], "0.1.0");
3594 }
3595
3596 use zeroclaw_tools::MEMORY_TOOL_NAMES as MEMORY_TOOLS;
3609
3610 fn make_acp_test_config(tmp: &tempfile::TempDir) -> zeroclaw_config::schema::Config {
3611 use std::collections::HashMap;
3612 use zeroclaw_config::schema::{AliasedAgentConfig, RiskProfileConfig};
3613
3614 let workspace_dir = tmp.path().join("workspace");
3615 std::fs::create_dir_all(&workspace_dir).unwrap();
3616
3617 let mut providers = zeroclaw_config::providers::Providers::default();
3618 {
3619 let base = providers
3620 .models
3621 .ensure("openai", "test-provider")
3622 .expect("`openai` slot must exist");
3623 base.api_key = Some("test-key".into());
3624 base.model = Some("test-model".into());
3625 base.uri = Some("http://127.0.0.1:1".into());
3626 }
3627
3628 let mut agents = HashMap::new();
3629 agents.insert(
3630 "test-agent".to_string(),
3631 AliasedAgentConfig {
3632 enabled: true,
3633 model_provider: "openai.test-provider".into(),
3634 risk_profile: "test-profile".to_string(),
3635 ..Default::default()
3636 },
3637 );
3638
3639 let mut risk_profiles = HashMap::new();
3640 risk_profiles.insert("test-profile".to_string(), RiskProfileConfig::default());
3641
3642 zeroclaw_config::schema::Config {
3643 data_dir: workspace_dir,
3644 config_path: tmp.path().join("config.toml"),
3645 providers,
3646 agents,
3647 risk_profiles,
3648 ..zeroclaw_config::schema::Config::default()
3649 }
3650 }
3651
3652 fn make_acp_test_dispatcher(
3653 config: zeroclaw_config::schema::Config,
3654 ) -> (RpcDispatcher, Arc<crate::rpc::session::SessionStore>) {
3655 use zeroclaw_infra::session_queue::SessionActorQueue;
3656 let queue = Arc::new(SessionActorQueue::new(4, 10, 60));
3657 let sessions = Arc::new(crate::rpc::session::SessionStore::new(16, queue));
3658 let ctx = RpcContext::minimal(config, Arc::clone(&sessions));
3659 let (tx, _rx) = tokio::sync::mpsc::channel(64);
3660 let dispatcher = RpcDispatcher::new(ctx, tx, "test-peer".into());
3661 (dispatcher, sessions)
3662 }
3663
3664 #[tokio::test]
3665 async fn acp_session_new_exposes_no_memory_tools() {
3666 let tmp = tempfile::TempDir::new().unwrap();
3667 let config = make_acp_test_config(&tmp);
3668 let (dispatcher, sessions) = make_acp_test_dispatcher(config);
3669
3670 let params = json!({
3671 "agent_alias": "test-agent",
3672 "exclude_memory": true,
3673 "session_id": "acp-test-session-001"
3674 });
3675
3676 let result = dispatcher.handle_session_new_for_test(¶ms).await;
3677 assert!(
3678 result.is_ok(),
3679 "session/new should succeed; got: {:?}",
3680 result.err()
3681 );
3682
3683 let agent_arc = sessions
3684 .get_agent("acp-test-session-001")
3685 .await
3686 .expect("session must be registered in the store after session/new");
3687
3688 let agent = agent_arc.lock().await;
3689 let tool_names = agent.tool_names();
3690
3691 for &mem_tool in MEMORY_TOOLS {
3692 assert!(
3693 !tool_names.contains(&mem_tool),
3694 "ACP session must NOT expose `{mem_tool}` — found in tool list: {tool_names:?}"
3695 );
3696 }
3697 }
3698
3699 #[tokio::test]
3700 async fn acp_chat_mode_strips_memory_tools_without_exclude_flag() {
3701 let tmp = tempfile::TempDir::new().unwrap();
3705 let config = make_acp_test_config(&tmp);
3706 let data_dir = config.data_dir.clone();
3707 let (dispatcher, sessions, _chat_backend, _acp_store) =
3708 make_persistence_test_dispatcher(config, &data_dir);
3709
3710 let params = json!({
3711 "agent_alias": "test-agent",
3712 "chat_mode": "acp",
3713 "session_id": "acp-no-flag-session-001"
3714 });
3715
3716 let result = dispatcher.handle_session_new_for_test(¶ms).await;
3717 assert!(
3718 result.is_ok(),
3719 "session/new should succeed; got: {:?}",
3720 result.err()
3721 );
3722
3723 let agent_arc = sessions
3724 .get_agent("acp-no-flag-session-001")
3725 .await
3726 .expect("session must be registered in the store after session/new");
3727
3728 let agent = agent_arc.lock().await;
3729 let tool_names = agent.tool_names();
3730
3731 for &mem_tool in MEMORY_TOOLS {
3732 assert!(
3733 !tool_names.contains(&mem_tool),
3734 "ACP chat_mode must strip `{mem_tool}` even without exclude_memory — \
3735 tool list: {tool_names:?}"
3736 );
3737 }
3738 }
3739
3740 #[tokio::test]
3741 async fn non_acp_session_new_exposes_memory_tools() {
3742 let tmp = tempfile::TempDir::new().unwrap();
3743 let config = make_acp_test_config(&tmp);
3744 let (dispatcher, sessions) = make_acp_test_dispatcher(config);
3745
3746 let params = json!({
3747 "agent_alias": "test-agent",
3748 "exclude_memory": false,
3749 "session_id": "chat-test-session-001"
3750 });
3751
3752 let result = dispatcher.handle_session_new_for_test(¶ms).await;
3753 assert!(
3754 result.is_ok(),
3755 "session/new should succeed; got: {:?}",
3756 result.err()
3757 );
3758
3759 let agent_arc = sessions
3760 .get_agent("chat-test-session-001")
3761 .await
3762 .expect("session must be registered in the store after session/new");
3763
3764 let agent = agent_arc.lock().await;
3765 let tool_names = agent.tool_names();
3766
3767 let has_any_memory_tool = MEMORY_TOOLS.iter().any(|&t| tool_names.contains(&t));
3768 assert!(
3769 has_any_memory_tool,
3770 "non-ACP session MUST expose at least one memory tool — tool list: {tool_names:?}"
3771 );
3772 }
3773
3774 use zeroclaw_infra::session_backend::SessionBackend;
3779
3780 fn make_persistence_test_dispatcher(
3781 config: zeroclaw_config::schema::Config,
3782 data_dir: &std::path::Path,
3783 ) -> (
3784 RpcDispatcher,
3785 Arc<crate::rpc::session::SessionStore>,
3786 Arc<zeroclaw_infra::session_sqlite::SqliteSessionBackend>,
3787 Arc<zeroclaw_infra::acp_session_store::AcpSessionStore>,
3788 ) {
3789 use zeroclaw_infra::session_queue::SessionActorQueue;
3790 let queue = Arc::new(SessionActorQueue::new(4, 10, 60));
3791 let sessions = Arc::new(crate::rpc::session::SessionStore::new(16, queue));
3792 let chat_backend =
3793 Arc::new(zeroclaw_infra::session_sqlite::SqliteSessionBackend::new(data_dir).unwrap());
3794 let acp_store =
3795 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(data_dir).unwrap());
3796 let ctx = RpcContext::for_persistence_tests(
3797 config,
3798 Arc::clone(&sessions),
3799 Some(chat_backend.clone() as Arc<dyn zeroclaw_infra::session_backend::SessionBackend>),
3800 Some(Arc::clone(&acp_store)),
3801 );
3802 let (tx, _rx) = tokio::sync::mpsc::channel(64);
3803 let dispatcher = RpcDispatcher::new(ctx, tx, "test-peer".into());
3804 (dispatcher, sessions, chat_backend, acp_store)
3805 }
3806
3807 #[tokio::test]
3810 async fn acp_session_new_writes_to_acp_store_only() {
3811 let tmp = tempfile::TempDir::new().unwrap();
3812 let config = make_acp_test_config(&tmp);
3813 let data_dir = config.data_dir.clone();
3814 let (dispatcher, _sessions, chat_backend, acp_store) =
3815 make_persistence_test_dispatcher(config, &data_dir);
3816
3817 let sid = "acp-routing-001";
3818 let params = json!({
3819 "agent_alias": "test-agent",
3820 "exclude_memory": true,
3821 "chat_mode": "acp",
3822 "session_id": sid,
3823 });
3824
3825 dispatcher
3826 .handle_session_new_for_test(¶ms)
3827 .await
3828 .expect("session/new should succeed");
3829
3830 assert!(
3831 acp_store.load_session(sid).unwrap().is_some(),
3832 "ACP session must be persisted to acp_session_store"
3833 );
3834
3835 assert!(
3836 chat_backend.load(&format!("rpc_{sid}")).is_empty(),
3837 "ACP session must NOT touch chat session_backend"
3838 );
3839 }
3840
3841 #[tokio::test]
3846 async fn reaped_acp_session_rehydrates_to_working_instead_of_failing() {
3847 let tmp = tempfile::TempDir::new().unwrap();
3848 let config = make_acp_test_config(&tmp);
3849 let data_dir = config.data_dir.clone();
3850 let (dispatcher, sessions, _chat_backend, acp_store) =
3851 make_persistence_test_dispatcher(config, &data_dir);
3852
3853 let sid = "acp-reaped-001";
3854 dispatcher
3855 .handle_session_new_for_test(&json!({
3856 "agent_alias": "test-agent",
3857 "exclude_memory": true,
3858 "chat_mode": "acp",
3859 "session_id": sid,
3860 }))
3861 .await
3862 .expect("session/new should succeed");
3863
3864 assert!(
3865 sessions.get_agent(sid).await.is_some(),
3866 "freshly created session must be live in memory"
3867 );
3868 assert!(
3869 acp_store.load_session(sid).unwrap().is_some(),
3870 "durable row must exist for the rehydrate source"
3871 );
3872
3873 assert!(
3876 sessions.remove(sid).await,
3877 "reap must remove the in-memory session"
3878 );
3879 assert!(
3880 sessions.get_agent(sid).await.is_none(),
3881 "post-reap the session must be absent from memory"
3882 );
3883
3884 let recovered = dispatcher.rehydrate_reaped_session(sid).await;
3885 assert!(
3886 recovered.is_some(),
3887 "a reaped session with a live durable row must rehydrate to a \
3888 working agent, not fail; failing here is the irrecoverable hang"
3889 );
3890 assert!(
3891 sessions.get_agent(sid).await.is_some(),
3892 "after rehydrate the session must be live in memory again so the \
3893 next prompt lands on a working session"
3894 );
3895 }
3896
3897 #[tokio::test]
3902 async fn acp_resume_recovers_persisted_cwd() {
3903 let tmp = tempfile::TempDir::new().unwrap();
3904 let config = make_acp_test_config(&tmp);
3905 let data_dir = config.data_dir.clone();
3906 let (dispatcher, _sessions, _chat_backend, acp_store) =
3907 make_persistence_test_dispatcher(config, &data_dir);
3908
3909 let sid = "acp-cwd-resume-001";
3910 let original_cwd = tmp.path().join("project-dir").to_string_lossy().to_string();
3911
3912 let created = dispatcher
3914 .handle_session_new_for_test(&json!({
3915 "agent_alias": "test-agent",
3916 "exclude_memory": true,
3917 "chat_mode": "acp",
3918 "session_id": sid,
3919 "cwd": original_cwd,
3920 }))
3921 .await
3922 .expect("initial session/new should succeed");
3923 assert_eq!(created["workspace_dir"], original_cwd);
3924 assert_eq!(
3925 acp_store.load_session(sid).unwrap().unwrap().workspace_dir,
3926 original_cwd
3927 );
3928
3929 let resumed = dispatcher
3932 .handle_session_new_for_test(&json!({
3933 "agent_alias": "test-agent",
3934 "exclude_memory": true,
3935 "chat_mode": "acp",
3936 "session_id": sid,
3937 }))
3938 .await
3939 .expect("resume session/new should succeed");
3940 assert_eq!(
3941 resumed["workspace_dir"], original_cwd,
3942 "resume must keep the retained session's cwd, not default it"
3943 );
3944 }
3945
3946 #[tokio::test]
3952 async fn reaped_acp_session_rehydrates_without_memory_tools() {
3953 let tmp = tempfile::TempDir::new().unwrap();
3954 let config = make_acp_test_config(&tmp);
3955 let data_dir = config.data_dir.clone();
3956 let (dispatcher, sessions, _chat_backend, _acp_store) =
3957 make_persistence_test_dispatcher(config, &data_dir);
3958
3959 let sid = "acp-reaped-mem-001";
3960 dispatcher
3961 .handle_session_new_for_test(&json!({
3962 "agent_alias": "test-agent",
3963 "chat_mode": "acp",
3964 "session_id": sid,
3965 }))
3966 .await
3967 .expect("session/new should succeed");
3968
3969 assert!(sessions.remove(sid).await, "reap must remove the session");
3971
3972 let recovered = dispatcher
3973 .rehydrate_reaped_session(sid)
3974 .await
3975 .expect("a reaped ACP session must rehydrate to a working agent");
3976
3977 let agent = recovered.lock().await;
3978 let tool_names = agent.tool_names();
3979 for &mem_tool in MEMORY_TOOLS {
3980 assert!(
3981 !tool_names.contains(&mem_tool),
3982 "rehydrated ACP session must NOT expose `{mem_tool}` — found in tool list: {tool_names:?}"
3983 );
3984 }
3985 }
3986
3987 #[tokio::test]
3991 async fn killed_acp_session_does_not_rehydrate_from_durable_store() {
3992 let tmp = tempfile::TempDir::new().unwrap();
3993 let config = make_acp_test_config(&tmp);
3994 let data_dir = config.data_dir.clone();
3995 let (dispatcher, sessions, _chat_backend, acp_store) =
3996 make_persistence_test_dispatcher(config, &data_dir);
3997
3998 let sid = "acp-killed-001";
3999 dispatcher
4000 .handle_session_new_for_test(&json!({
4001 "agent_alias": "test-agent",
4002 "exclude_memory": true,
4003 "chat_mode": "acp",
4004 "session_id": sid,
4005 }))
4006 .await
4007 .expect("session/new should succeed");
4008
4009 assert!(
4010 sessions.get_agent(sid).await.is_some(),
4011 "freshly created session must be live in memory"
4012 );
4013 assert!(
4014 acp_store.load_session(sid).unwrap().is_some(),
4015 "durable row must exist before kill"
4016 );
4017
4018 dispatcher
4019 .handle_session_kill(&json!({ "session_id": sid }))
4020 .await
4021 .expect("session/kill should succeed");
4022
4023 assert!(
4024 sessions.get_agent(sid).await.is_none(),
4025 "session/kill must remove the live in-memory agent"
4026 );
4027 assert!(
4028 acp_store.load_session(sid).unwrap().is_some(),
4029 "session/kill must preserve durable history"
4030 );
4031
4032 let recovered = dispatcher.rehydrate_reaped_session(sid).await;
4033 assert!(
4034 recovered.is_none(),
4035 "admin-killed ACP sessions must stay killed instead of rehydrating \
4036 from durable history on the next prompt"
4037 );
4038 assert!(
4039 sessions.get_agent(sid).await.is_none(),
4040 "failed rehydrate must leave the session absent from memory"
4041 );
4042 }
4043
4044 #[tokio::test]
4047 async fn chat_session_new_writes_to_chat_backend_only() {
4048 let tmp = tempfile::TempDir::new().unwrap();
4049 let config = make_acp_test_config(&tmp);
4050 let data_dir = config.data_dir.clone();
4051 let (dispatcher, _sessions, chat_backend, acp_store) =
4052 make_persistence_test_dispatcher(config, &data_dir);
4053
4054 let sid = "chat-routing-001";
4055 let params = json!({
4056 "agent_alias": "test-agent",
4057 "session_id": sid,
4058 });
4059
4060 dispatcher
4061 .handle_session_new_for_test(¶ms)
4062 .await
4063 .expect("session/new should succeed");
4064
4065 assert!(
4066 acp_store.load_session(sid).unwrap().is_none(),
4067 "Chat session must NOT touch acp_session_store"
4068 );
4069
4070 let key = format!("rpc_{sid}");
4071 let metadata = chat_backend.list_sessions_with_metadata();
4072 let entry = metadata
4073 .iter()
4074 .find(|m| m.key == key)
4075 .expect("Chat session must be registered in session_backend metadata");
4076 assert_eq!(
4077 entry.agent_alias.as_deref(),
4078 Some("test-agent"),
4079 "Chat session must stamp its agent_alias in session_backend (got: {:?})",
4080 entry.agent_alias
4081 );
4082 }
4083
4084 fn make_config_set_test_dispatcher(config: zeroclaw_config::schema::Config) -> RpcDispatcher {
4087 use zeroclaw_infra::session_queue::SessionActorQueue;
4088 let queue = Arc::new(SessionActorQueue::new(4, 10, 60));
4089 let sessions = Arc::new(crate::rpc::session::SessionStore::new(16, queue));
4090 let ctx = RpcContext::minimal(config, Arc::clone(&sessions));
4091 let (tx, _rx) = tokio::sync::mpsc::channel(64);
4092 let mut dispatcher = RpcDispatcher::new(ctx, tx, "test-peer".into());
4093 dispatcher.authenticated = true;
4094 dispatcher
4095 }
4096
4097 fn make_secret_test_config(tmp: &tempfile::TempDir) -> zeroclaw_config::schema::Config {
4104 let mut cfg = zeroclaw_config::schema::Config {
4105 config_path: tmp.path().join("config.toml"),
4106 data_dir: tmp.path().join("data"),
4107 ..Default::default()
4108 };
4109 cfg.create_map_key("providers.models.anthropic", "default")
4110 .expect("create anthropic.default");
4111 cfg
4112 }
4113
4114 #[tokio::test]
4115 async fn config_set_writes_real_secret_through_set_prop() {
4116 let tmp = tempfile::TempDir::new().unwrap();
4117 let dispatcher = make_config_set_test_dispatcher(make_secret_test_config(&tmp));
4118 let params = json!({
4119 "prop": "providers.models.anthropic.default.api_key",
4120 "value": "sk-real-test-key"
4121 });
4122 let res = dispatcher.handle_config_set(¶ms).await;
4123 assert!(res.is_ok(), "config/set must accept a real secret: {res:?}");
4124 let cfg = dispatcher.ctx.config.read().clone();
4125 let stored = cfg
4126 .providers
4127 .models
4128 .anthropic
4129 .get("default")
4130 .and_then(|e| e.base.api_key.clone());
4131 assert_eq!(
4132 stored.as_deref(),
4133 Some("sk-real-test-key"),
4134 "real secret must land in memory as plaintext"
4135 );
4136 }
4137
4138 #[tokio::test]
4139 async fn config_set_rejects_masked_secret_value() {
4140 let tmp = tempfile::TempDir::new().unwrap();
4141 let mut cfg = make_secret_test_config(&tmp);
4142 cfg.providers
4143 .models
4144 .anthropic
4145 .get_mut("default")
4146 .unwrap()
4147 .base
4148 .api_key = Some("sk-live-secret".into());
4149 let dispatcher = make_config_set_test_dispatcher(cfg);
4150
4151 for masked in [zeroclaw_config::traits::MASKED_SECRET, "****", ""] {
4152 let params = json!({
4153 "prop": "providers.models.anthropic.default.api_key",
4154 "value": masked
4155 });
4156 let res = dispatcher.handle_config_set(¶ms).await;
4157 assert!(
4158 res.is_err(),
4159 "config/set must refuse masked/empty secret (`{masked}`), got: {res:?}"
4160 );
4161 }
4162
4163 let cfg_after = dispatcher.ctx.config.read().clone();
4164 let stored = cfg_after
4165 .providers
4166 .models
4167 .anthropic
4168 .get("default")
4169 .and_then(|e| e.base.api_key.clone());
4170 assert_eq!(
4171 stored.as_deref(),
4172 Some("sk-live-secret"),
4173 "live secret must NOT be clobbered by a masked write"
4174 );
4175 }
4176
4177 #[tokio::test]
4178 async fn config_set_handles_dynamic_http_request_secret_paths() {
4179 let tmp = tempfile::TempDir::new().unwrap();
4180 let dispatcher = make_config_set_test_dispatcher(zeroclaw_config::schema::Config {
4181 config_path: tmp.path().join("config.toml"),
4182 data_dir: tmp.path().join("data"),
4183 ..Default::default()
4184 });
4185
4186 let params = json!({
4187 "prop": "http_request.secrets.api_token",
4188 "value": "Bearer runtime-secret"
4189 });
4190 let res = dispatcher.handle_config_set(¶ms).await;
4191 assert!(
4192 res.is_ok(),
4193 "config/set must accept a real dynamic http_request secret: {res:?}"
4194 );
4195 let cfg = dispatcher.ctx.config.read().clone();
4196 assert_eq!(
4197 cfg.http_request
4198 .secrets
4199 .get("api_token")
4200 .map(String::as_str),
4201 Some("Bearer runtime-secret")
4202 );
4203
4204 for masked in [zeroclaw_config::traits::MASKED_SECRET, "****", ""] {
4205 let params = json!({
4206 "prop": "http_request.secrets.next_token",
4207 "value": masked
4208 });
4209 let res = dispatcher.handle_config_set(¶ms).await;
4210 assert!(
4211 res.is_err(),
4212 "config/set must reject masked/empty dynamic secret (`{masked}`), got: {res:?}"
4213 );
4214 }
4215 let cfg_after = dispatcher.ctx.config.read().clone();
4216 assert!(
4217 !cfg_after.http_request.secrets.contains_key("next_token"),
4218 "masked dynamic writes must not materialize a secret key"
4219 );
4220 }
4221
4222 #[tokio::test]
4223 async fn config_set_non_secret_field_still_uses_set_prop() {
4224 let tmp = tempfile::TempDir::new().unwrap();
4225 let dispatcher = make_config_set_test_dispatcher(make_secret_test_config(&tmp));
4226 let params = json!({
4227 "prop": "providers.models.anthropic.default.model",
4228 "value": "claude-sonnet-4-5"
4229 });
4230 let res = dispatcher.handle_config_set(¶ms).await;
4231 assert!(res.is_ok(), "non-secret set must succeed: {res:?}");
4232 let cfg = dispatcher.ctx.config.read().clone();
4233 let stored = cfg
4234 .providers
4235 .models
4236 .anthropic
4237 .get("default")
4238 .and_then(|e| e.base.model.clone());
4239 assert_eq!(stored.as_deref(), Some("claude-sonnet-4-5"));
4240 }
4241
4242 fn make_model_refresh_test_config(tmp: &tempfile::TempDir) -> zeroclaw_config::schema::Config {
4243 use std::collections::HashMap;
4244 use zeroclaw_config::schema::{AliasedAgentConfig, Config, RiskProfileConfig};
4245
4246 let workspace_dir = tmp.path().join("workspace");
4247 std::fs::create_dir_all(&workspace_dir).unwrap();
4248
4249 let mut config = Config {
4250 config_path: tmp.path().join("config.toml"),
4251 data_dir: tmp.path().join("data"),
4252 ..Default::default()
4253 };
4254 let provider = config
4255 .providers
4256 .models
4257 .ensure("openai", "test-provider")
4258 .expect("openai provider slot exists");
4259 provider.api_key = Some("test-key".into());
4260 provider.uri = Some("http://127.0.0.1:1".into());
4261 provider.model = Some("old-model".into());
4262 provider.temperature = Some(0.2);
4263
4264 config.agents = HashMap::from([(
4265 "test-agent".to_string(),
4266 AliasedAgentConfig {
4267 enabled: true,
4268 model_provider: "openai.test-provider".into(),
4269 risk_profile: "test-profile".into(),
4270 ..Default::default()
4271 },
4272 )]);
4273 config
4274 .risk_profiles
4275 .insert("test-profile".into(), RiskProfileConfig::default());
4276 config
4277 .runtime_profiles
4278 .insert("default".into(), Default::default());
4279 config
4280 }
4281
4282 async fn create_model_refresh_test_session(
4283 dispatcher: &RpcDispatcher,
4284 tmp: &tempfile::TempDir,
4285 ) -> String {
4286 let session_res = dispatcher
4287 .handle_session_new_for_test(&json!({
4288 "agent_alias": "test-agent",
4289 "cwd": tmp.path().join("workspace"),
4290 }))
4291 .await
4292 .expect("session/new should create the agent");
4293 session_res
4294 .get("session_id")
4295 .and_then(|v| v.as_str())
4296 .expect("session/new result includes session_id")
4297 .to_string()
4298 }
4299
4300 async fn model_name_for_session(dispatcher: &RpcDispatcher, session_id: &str) -> String {
4301 let agent = dispatcher
4302 .ctx
4303 .sessions
4304 .get_agent(session_id)
4305 .await
4306 .expect("session agent exists");
4307 agent.lock().await.attribution_fields().2
4308 }
4309
4310 async fn temperature_for_session(dispatcher: &RpcDispatcher, session_id: &str) -> Option<f64> {
4311 let agent = dispatcher
4312 .ctx
4313 .sessions
4314 .get_agent(session_id)
4315 .await
4316 .expect("session agent exists");
4317 agent.lock().await.temperature_for_test()
4318 }
4319
4320 async fn wait_for_model_name(dispatcher: &RpcDispatcher, session_id: &str, expected: &str) {
4321 for _ in 0..50 {
4322 if model_name_for_session(dispatcher, session_id).await == expected {
4323 return;
4324 }
4325 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4326 }
4327 assert_eq!(
4328 model_name_for_session(dispatcher, session_id).await,
4329 expected
4330 );
4331 }
4332
4333 async fn wait_for_temperature(
4334 dispatcher: &RpcDispatcher,
4335 session_id: &str,
4336 expected: Option<f64>,
4337 ) {
4338 for _ in 0..50 {
4339 if temperature_for_session(dispatcher, session_id).await == expected {
4340 return;
4341 }
4342 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4343 }
4344 assert_eq!(
4345 temperature_for_session(dispatcher, session_id).await,
4346 expected
4347 );
4348 }
4349
4350 #[tokio::test]
4351 async fn config_set_provider_model_refreshes_matching_live_session() {
4352 let tmp = tempfile::TempDir::new().unwrap();
4353 let dispatcher = make_config_set_test_dispatcher(make_model_refresh_test_config(&tmp));
4354 let session_id = create_model_refresh_test_session(&dispatcher, &tmp).await;
4355 assert_eq!(
4356 model_name_for_session(&dispatcher, &session_id).await,
4357 "old-model"
4358 );
4359
4360 let res = dispatcher
4361 .handle_config_set(&json!({
4362 "prop": "providers.models.openai.test-provider.model",
4363 "value": "new-model"
4364 }))
4365 .await;
4366 assert!(res.is_ok(), "config/set must succeed: {res:?}");
4367
4368 wait_for_model_name(&dispatcher, &session_id, "new-model").await;
4369 }
4370
4371 #[tokio::test]
4372 async fn config_set_provider_refresh_failure_does_not_fail_saved_write() {
4373 let tmp = tempfile::TempDir::new().unwrap();
4374 let dispatcher = make_config_set_test_dispatcher(make_model_refresh_test_config(&tmp));
4375 let session_id = create_model_refresh_test_session(&dispatcher, &tmp).await;
4376 assert_eq!(
4377 model_name_for_session(&dispatcher, &session_id).await,
4378 "old-model"
4379 );
4380
4381 let res = dispatcher
4382 .handle_config_set(&json!({
4383 "prop": "providers.models.openai.test-provider.model",
4384 "value": ""
4385 }))
4386 .await;
4387 assert!(
4388 res.is_ok(),
4389 "config/set must report the saved write even if live refresh cannot rebuild: {res:?}"
4390 );
4391 let cfg = dispatcher.ctx.config.read().clone();
4392 let stored = cfg
4393 .providers
4394 .models
4395 .openai
4396 .get("test-provider")
4397 .and_then(|e| e.base.model.clone());
4398 assert_eq!(
4399 stored, None,
4400 "config/set must still persist the requested provider-profile clear"
4401 );
4402 assert_eq!(
4403 model_name_for_session(&dispatcher, &session_id).await,
4404 "old-model",
4405 "failed live refresh must leave the existing session provider intact"
4406 );
4407 }
4408
4409 #[tokio::test]
4410 async fn config_set_provider_temperature_refreshes_matching_live_session() {
4411 let tmp = tempfile::TempDir::new().unwrap();
4412 let dispatcher = make_config_set_test_dispatcher(make_model_refresh_test_config(&tmp));
4413 let session_id = create_model_refresh_test_session(&dispatcher, &tmp).await;
4414 assert_eq!(
4415 temperature_for_session(&dispatcher, &session_id).await,
4416 Some(0.2)
4417 );
4418
4419 let res = dispatcher
4420 .handle_config_set(&json!({
4421 "prop": "providers.models.openai.test-provider.temperature",
4422 "value": 0.4
4423 }))
4424 .await;
4425 assert!(res.is_ok(), "config/set must succeed: {res:?}");
4426
4427 wait_for_temperature(&dispatcher, &session_id, Some(0.4)).await;
4428 }
4429
4430 #[tokio::test]
4431 async fn config_set_provider_refresh_preserves_session_temperature_override() {
4432 let tmp = tempfile::TempDir::new().unwrap();
4433 let dispatcher = make_config_set_test_dispatcher(make_model_refresh_test_config(&tmp));
4434 let session_id = create_model_refresh_test_session(&dispatcher, &tmp).await;
4435 let merged = dispatcher
4436 .ctx
4437 .sessions
4438 .set_overrides(
4439 &session_id,
4440 crate::rpc::session::SessionOverrides {
4441 temperature: Some(0.6),
4442 ..Default::default()
4443 },
4444 )
4445 .await
4446 .expect("session override applies");
4447 assert_eq!(merged.temperature, Some(0.6));
4448
4449 let res = dispatcher
4450 .handle_config_set(&json!({
4451 "prop": "providers.models.openai.test-provider.model",
4452 "value": "new-model"
4453 }))
4454 .await;
4455 assert!(res.is_ok(), "config/set must succeed: {res:?}");
4456
4457 wait_for_model_name(&dispatcher, &session_id, "new-model").await;
4458 assert_eq!(
4459 temperature_for_session(&dispatcher, &session_id).await,
4460 Some(0.6),
4461 "session temperature override must win over provider profile temperature"
4462 );
4463 }
4464
4465 #[tokio::test]
4466 async fn config_delete_provider_temperature_refreshes_matching_live_session() {
4467 let tmp = tempfile::TempDir::new().unwrap();
4468 let dispatcher = make_config_set_test_dispatcher(make_model_refresh_test_config(&tmp));
4469 let session_id = create_model_refresh_test_session(&dispatcher, &tmp).await;
4470 assert_eq!(
4471 temperature_for_session(&dispatcher, &session_id).await,
4472 Some(0.2)
4473 );
4474
4475 let res = dispatcher
4476 .handle_config_delete(&json!({
4477 "prop": "providers.models.openai.test-provider.temperature",
4478 }))
4479 .await;
4480 assert!(res.is_ok(), "config/delete must succeed: {res:?}");
4481
4482 wait_for_temperature(&dispatcher, &session_id, None).await;
4483 }
4484
4485 fn make_two_dispatchers_sharing_context(
4493 config: zeroclaw_config::schema::Config,
4494 ) -> (
4495 RpcDispatcher,
4496 RpcDispatcher,
4497 Arc<crate::rpc::session::SessionStore>,
4498 ) {
4499 use zeroclaw_infra::session_queue::SessionActorQueue;
4500 let queue = Arc::new(SessionActorQueue::new(4, 10, 60));
4501 let sessions = Arc::new(crate::rpc::session::SessionStore::new(16, queue));
4502 let ctx = RpcContext::minimal(config, Arc::clone(&sessions));
4503 let (tx_a, _rx_a) = tokio::sync::mpsc::channel(64);
4504 let (tx_b, _rx_b) = tokio::sync::mpsc::channel(64);
4505 let dispatcher_a = RpcDispatcher::new(Arc::clone(&ctx), tx_a, "test-peer-a:pid=1".into());
4506 let dispatcher_b = RpcDispatcher::new(ctx, tx_b, "test-peer-b:pid=2".into());
4507 (dispatcher_a, dispatcher_b, sessions)
4508 }
4509
4510 async fn create_session_with_owner(
4511 dispatcher: &mut RpcDispatcher,
4512 sessions: &Arc<crate::rpc::session::SessionStore>,
4513 session_id: &str,
4514 owner_tui_id: &str,
4515 ) -> tokio_util::sync::CancellationToken {
4516 dispatcher.set_tui_id_for_test(Some(owner_tui_id.to_string()));
4517 let params = json!({
4518 "agent_alias": "test-agent",
4519 "session_id": session_id,
4520 });
4521 dispatcher
4522 .handle_session_new_for_test(¶ms)
4523 .await
4524 .expect("session/new must succeed");
4525
4526 let stamped_owner = sessions
4527 .session_owner_tui_id(session_id)
4528 .await
4529 .expect("session must exist after session/new");
4530 assert_eq!(
4531 stamped_owner.as_deref(),
4532 Some(owner_tui_id),
4533 "harness invariant: session/new must stamp owner_tui_id from the \
4534 caller's tui_id; if this fails, the ownership tests below are \
4535 measuring nothing"
4536 );
4537
4538 let token = tokio_util::sync::CancellationToken::new();
4539 sessions.register_cancel_token(session_id, token.clone());
4540 token
4541 }
4542
4543 fn make_dispatcher_with_capture(
4549 config: zeroclaw_config::schema::Config,
4550 ) -> (
4551 RpcDispatcher,
4552 tokio::sync::mpsc::Receiver<String>,
4553 Arc<crate::rpc::session::SessionStore>,
4554 ) {
4555 use zeroclaw_infra::session_queue::SessionActorQueue;
4556 let queue = Arc::new(SessionActorQueue::new(4, 10, 60));
4557 let sessions = Arc::new(crate::rpc::session::SessionStore::new(16, queue));
4558 let ctx = RpcContext::minimal(config, Arc::clone(&sessions));
4559 let (tx, rx) = tokio::sync::mpsc::channel(64);
4560 let dispatcher = RpcDispatcher::new(ctx, tx, "test-peer-cap:pid=1".into());
4561 (dispatcher, rx, sessions)
4562 }
4563
4564 #[tokio::test]
4572 async fn session_prompt_on_missing_session_emits_turn_complete_failed() {
4573 let tmp = tempfile::TempDir::new().unwrap();
4574 let config = make_acp_test_config(&tmp);
4575 let (dispatcher, mut rx, _sessions) = make_dispatcher_with_capture(config);
4576
4577 let result = dispatcher
4578 .handle_session_prompt(&json!({
4579 "session_id": "gone-id",
4580 "prompt": "anything",
4581 }))
4582 .await;
4583 assert!(
4584 result.is_err(),
4585 "missing session must still produce an RPC error for legacy \
4586 request-form callers; the new behaviour is the additional \
4587 notification, not replacing the error"
4588 );
4589
4590 let raw = rx.try_recv().expect(
4594 "handle_session_prompt must emit a session/update TurnComplete \
4595 notification before returning on missing-session — without it \
4596 the TUI's `working` state never clears and the next prompt is \
4597 the production freeze",
4598 );
4599 let v: serde_json::Value = serde_json::from_str(&raw).expect("notification must be JSON");
4600 assert_eq!(v["method"], notification::SESSION_UPDATE);
4601 assert_eq!(v["params"]["session_id"], "gone-id");
4602 assert_eq!(
4603 v["params"]["outcome"], "failed",
4604 "missing-session is not Completed and not Cancelled — it is a \
4605 distinct Failed verdict. Folding it into Cancelled would lie \
4606 about whether the user pressed Esc."
4607 );
4608 }
4609
4610 #[tokio::test]
4614 async fn session_cancel_from_distinct_non_owner_dispatcher_is_rejected() {
4615 let tmp = tempfile::TempDir::new().unwrap();
4616 let config = make_acp_test_config(&tmp);
4617 let (mut dispatcher_a, mut dispatcher_b, sessions) =
4618 make_two_dispatchers_sharing_context(config);
4619
4620 let token =
4621 create_session_with_owner(&mut dispatcher_a, &sessions, "sess-owned-by-tui-A", "tui-A")
4622 .await;
4623
4624 dispatcher_b.set_tui_id_for_test(Some("tui-B".to_string()));
4625 let result = dispatcher_b
4626 .handle_session_cancel(&json!({
4627 "session_id": "sess-owned-by-tui-A",
4628 }))
4629 .await;
4630
4631 let err = result.expect_err(
4632 "a cancel from a dispatcher whose tui_id does not match the \
4633 session's owner_tui_id must be refused",
4634 );
4635 assert_ne!(
4636 err.code, SESSION_NOT_FOUND,
4637 "the rejection must NOT be reported as SESSION_NOT_FOUND — the \
4638 session DOES exist; reporting NOT_FOUND would hide the \
4639 ownership violation behind a benign-looking error"
4640 );
4641 assert!(
4642 !token.is_cancelled(),
4643 "the owner's cancel token must remain un-fired — the rightful \
4644 owner's turn must survive a mis-targeted cancel from another TUI"
4645 );
4646 }
4647
4648 #[tokio::test]
4652 async fn session_cancel_from_anonymous_dispatcher_is_rejected() {
4653 let tmp = tempfile::TempDir::new().unwrap();
4654 let config = make_acp_test_config(&tmp);
4655 let (mut dispatcher_a, mut dispatcher_b, sessions) =
4656 make_two_dispatchers_sharing_context(config);
4657
4658 let token =
4659 create_session_with_owner(&mut dispatcher_a, &sessions, "sess-owned-by-tui-A", "tui-A")
4660 .await;
4661
4662 dispatcher_b.set_tui_id_for_test(None);
4665 let result = dispatcher_b
4666 .handle_session_cancel(&json!({
4667 "session_id": "sess-owned-by-tui-A",
4668 }))
4669 .await;
4670
4671 let err = result.expect_err("anonymous cancel must be refused");
4672 assert_ne!(err.code, SESSION_NOT_FOUND);
4673 assert!(
4674 !token.is_cancelled(),
4675 "anonymous cancel must not fire the token"
4676 );
4677 }
4678
4679 #[tokio::test]
4683 async fn session_cancel_from_owner_dispatcher_still_works() {
4684 let tmp = tempfile::TempDir::new().unwrap();
4685 let config = make_acp_test_config(&tmp);
4686 let (mut dispatcher_a, _dispatcher_b, sessions) =
4687 make_two_dispatchers_sharing_context(config);
4688
4689 let token =
4690 create_session_with_owner(&mut dispatcher_a, &sessions, "sess-owned-by-tui-A", "tui-A")
4691 .await;
4692
4693 let result = dispatcher_a
4695 .handle_session_cancel(&json!({
4696 "session_id": "sess-owned-by-tui-A",
4697 }))
4698 .await;
4699
4700 assert!(
4701 result.is_ok(),
4702 "owner cancel must succeed; got: {:?}",
4703 result.err()
4704 );
4705 assert!(
4706 token.is_cancelled(),
4707 "owner cancel must fire the session's cancel token"
4708 );
4709 }
4710}