1use anyhow::Result;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::{HashMap, HashSet};
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
28use tokio::sync::{Mutex, mpsc};
29use uuid::Uuid;
30pub use zeroclaw_api::jsonrpc::RpcOutbound;
31use zeroclaw_api::jsonrpc::error_codes::*;
32use zeroclaw_api::jsonrpc::{
33 ACP_PROTOCOL_VERSION, JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
34};
35use zeroclaw_api::model_provider::ConversationMessage;
36use zeroclaw_config::schema::Config;
37use zeroclaw_infra::acp_session_store::AcpSessionStore;
38use zeroclaw_runtime::agent::agent::{Agent, TurnEvent};
39use zeroclaw_runtime::tools::CanvasStore;
40
41use crate::acp_channel::AcpChannel;
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
47#[serde(default)]
48pub struct AcpServerConfig {
49 pub max_sessions: usize,
51 pub session_timeout_secs: u64,
53}
54
55impl Default for AcpServerConfig {
56 fn default() -> Self {
57 Self {
58 max_sessions: 10,
59 session_timeout_secs: 3600,
60 }
61 }
62}
63
64struct Session {
67 agent: Agent,
68 #[allow(dead_code)] created_at: Instant,
70 last_active: Instant,
71 agent_alias: String,
73 model_provider: String,
75 model: String,
77}
78
79pub struct AcpServer {
82 config: Config,
83 acp_config: AcpServerConfig,
84 sessions: Arc<Mutex<HashMap<String, Arc<Mutex<Session>>>>>,
85 rpc: Arc<RpcOutbound>,
86 writer_rx: std::sync::Mutex<Option<mpsc::Receiver<String>>>,
89 cancel_tokens: Arc<std::sync::Mutex<HashMap<String, tokio_util::sync::CancellationToken>>>,
100 loading_sessions: Arc<tokio::sync::Mutex<HashSet<String>>>,
105 store: Option<Arc<AcpSessionStore>>,
106 canvas_store: Option<CanvasStore>,
111}
112
113impl AcpServer {
114 pub fn new(config: Config, acp_config: AcpServerConfig) -> Self {
115 let (writer_tx, writer_rx) = mpsc::channel::<String>(256);
116 Self::with_writer(config, acp_config, writer_tx, Some(writer_rx), None)
117 }
118
119 pub fn new_with_writer(
120 config: Config,
121 acp_config: AcpServerConfig,
122 writer_tx: mpsc::Sender<String>,
123 ) -> Self {
124 Self::with_writer(config, acp_config, writer_tx, None, None)
125 }
126
127 pub fn new_with_store(
128 config: Config,
129 acp_config: AcpServerConfig,
130 store: Arc<AcpSessionStore>,
131 ) -> Self {
132 let (writer_tx, writer_rx) = mpsc::channel::<String>(256);
133 Self::with_writer(config, acp_config, writer_tx, Some(writer_rx), Some(store))
134 }
135
136 pub fn new_with_writer_and_store(
137 config: Config,
138 acp_config: AcpServerConfig,
139 writer_tx: mpsc::Sender<String>,
140 store: Arc<AcpSessionStore>,
141 ) -> Self {
142 Self::with_writer(config, acp_config, writer_tx, None, Some(store))
143 }
144
145 fn with_writer(
146 config: Config,
147 acp_config: AcpServerConfig,
148 writer_tx: mpsc::Sender<String>,
149 writer_rx: Option<mpsc::Receiver<String>>,
150 store: Option<Arc<AcpSessionStore>>,
151 ) -> Self {
152 Self {
153 config,
154 acp_config,
155 sessions: Arc::new(Mutex::new(HashMap::new())),
156 rpc: Arc::new(RpcOutbound::new(writer_tx)),
157 writer_rx: std::sync::Mutex::new(writer_rx),
158 cancel_tokens: Arc::new(std::sync::Mutex::new(HashMap::new())),
159 loading_sessions: Arc::new(tokio::sync::Mutex::new(HashSet::new())),
160 store,
161 canvas_store: None,
162 }
163 }
164
165 pub fn with_canvas_store(mut self, canvas_store: CanvasStore) -> Self {
169 self.canvas_store = Some(canvas_store);
170 self
171 }
172
173 pub async fn run(self: Arc<Self>) -> Result<()> {
176 ::zeroclaw_log::record!(
177 DEBUG,
178 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
179 .with_category(::zeroclaw_log::EventCategory::Channel),
180 &format!(
181 "ACP server starting (max_sessions={}, timeout={}s)",
182 self.acp_config.max_sessions, self.acp_config.session_timeout_secs
183 )
184 );
185
186 let writer_rx = self
190 .writer_rx
191 .lock()
192 .unwrap_or_else(|e| e.into_inner())
193 .take()
194 .ok_or_else(|| {
195 ::zeroclaw_log::record!(
196 ERROR,
197 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
198 .with_category(::zeroclaw_log::EventCategory::Channel)
199 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
200 "ACP server writer already started"
201 );
202 anyhow::Error::msg("ACP server writer already started")
203 })?;
204 zeroclaw_spawn::spawn!(writer_task(writer_rx));
205
206 let stdin = tokio::io::stdin();
207 let mut reader = BufReader::new(stdin);
208 let mut line = String::new();
209
210 let sessions = Arc::clone(&self.sessions);
212 let timeout = Duration::from_secs(self.acp_config.session_timeout_secs);
213 zeroclaw_spawn::spawn!(async move {
214 let mut interval = tokio::time::interval(Duration::from_secs(60));
215 loop {
216 interval.tick().await;
217 let mut sessions = sessions.lock().await;
218 let before = sessions.len();
219 sessions.retain(|id, session_arc| {
220 match session_arc.try_lock() {
223 Ok(session) => {
224 let expired = session.last_active.elapsed() > timeout;
225 if expired {
226 ::zeroclaw_log::record!(
227 DEBUG,
228 ::zeroclaw_log::Event::new(
229 module_path!(),
230 ::zeroclaw_log::Action::Note
231 )
232 .with_category(::zeroclaw_log::EventCategory::Channel)
233 .with_attrs(
234 ::serde_json::json!({
235 "id": id,
236 "agent_alias": session.agent_alias,
237 "model_provider": session.model_provider,
238 "model": session.model,
239 })
240 ),
241 "Session expired after inactivity"
242 );
243 }
244 !expired
245 }
246 Err(_) => true,
247 }
248 });
249 let reaped = before - sessions.len();
250 if reaped > 0 {
251 ::zeroclaw_log::record!(
252 DEBUG,
253 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
254 .with_category(::zeroclaw_log::EventCategory::Channel)
255 .with_attrs(::serde_json::json!({"reaped": reaped})),
256 "Reaped expired session(s)"
257 );
258 }
259 }
260 });
261
262 loop {
263 line.clear();
264 let bytes_read = reader.read_line(&mut line).await?;
265 if bytes_read == 0 {
266 ::zeroclaw_log::record!(
267 DEBUG,
268 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
269 .with_category(::zeroclaw_log::EventCategory::Channel),
270 "ACP server: stdin closed, shutting down"
271 );
272 break;
273 }
274
275 let trimmed = line.trim();
276 if trimmed.is_empty() {
277 continue;
278 }
279
280 self.process_line(trimmed).await;
281 }
282
283 Ok(())
284 }
285
286 pub async fn run_messages(self: Arc<Self>, mut input_rx: mpsc::Receiver<String>) -> Result<()> {
293 ::zeroclaw_log::record!(
294 DEBUG,
295 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
296 .with_category(::zeroclaw_log::EventCategory::Channel),
297 "ACP server starting (WebSocket/framed mode)"
298 );
299 while let Some(line) = input_rx.recv().await {
300 let trimmed = line.trim();
301 if trimmed.is_empty() {
302 continue;
303 }
304 self.process_line(trimmed).await;
305 }
306
307 Ok(())
308 }
309
310 async fn process_line(self: &Arc<Self>, trimmed: &str) {
311 if let Ok(value) = serde_json::from_str::<Value>(trimmed)
315 && value.is_object()
316 && (value.get("result").is_some() || value.get("error").is_some())
317 && let Some(id) = value.get("id")
318 {
319 let id_str = id
320 .as_str()
321 .map(String::from)
322 .unwrap_or_else(|| id.to_string());
323 let result = value.get("result").cloned();
324 let error: Option<JsonRpcError> = value
325 .get("error")
326 .and_then(|e| serde_json::from_value(e.clone()).ok());
327 self.rpc.dispatch_response(&id_str, result, error);
328 return;
329 }
330
331 match serde_json::from_str::<JsonRpcRequest>(trimmed) {
332 Ok(request) => {
333 if request.jsonrpc != "2.0" {
334 if let Some(id) = request.id {
335 self.write_error(id, INVALID_REQUEST, "Invalid JSON-RPC version")
336 .await;
337 }
338 return;
339 }
340 let server = Arc::clone(self);
348 ::zeroclaw_spawn::spawn!(async move {
349 server.handle_request(request).await;
350 });
351 }
352 Err(e) => {
353 ::zeroclaw_log::record!(
354 WARN,
355 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
356 .with_category(::zeroclaw_log::EventCategory::Channel)
357 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
358 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
359 "Failed to parse JSON-RPC request"
360 );
361 self.write_error(Value::Null, PARSE_ERROR, &format!("Parse error: {e}"))
362 .await;
363 }
364 }
365 }
366
367 async fn handle_request(&self, request: JsonRpcRequest) {
368 let id = request.id.clone().unwrap_or(Value::Null);
369 let is_notification = request.id.is_none();
370
371 let result = match request.method.as_str() {
372 "initialize" => self.handle_initialize(&request.params),
373 "session/new" => self.handle_session_new(&request.params).await,
374 "session/load" => self.handle_session_load(&request.params).await,
375 "session/resume" => self.handle_session_resume(&request.params).await,
376 "session/close" => self.handle_session_close(&request.params).await,
377 "session/prompt" => self.handle_session_prompt(&request.params, &id).await,
378 "session/stop" => self.handle_session_stop(&request.params).await,
379 "session/cancel" => self.handle_session_cancel(&request.params).await,
380 "session/event" | "session/update" => self.handle_session_event(&request.params).await,
381 _ => {
382 ::zeroclaw_log::record!(
383 WARN,
384 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
385 .with_category(::zeroclaw_log::EventCategory::Channel)
386 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
387 .with_attrs(::serde_json::json!({"method": request.method})),
388 "ACP method not found"
389 );
390 Err(RpcError {
391 code: METHOD_NOT_FOUND,
392 message: format!("Method not found: {}", request.method),
393 data: None,
394 })
395 }
396 };
397
398 if !is_notification {
400 match result {
401 Ok(value) => self.write_result(id, value).await,
402 Err(e) => {
403 ::zeroclaw_log::record!(
404 WARN,
405 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
406 .with_category(::zeroclaw_log::EventCategory::Channel)
407 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
408 .with_attrs(::serde_json::json!({
409 "method": request.method,
410 "error_code": e.code,
411 "error": e.message,
412 })),
413 "ACP request failed"
414 );
415 self.write_error(id, e.code, &e.message).await;
416 }
417 }
418 }
419 }
420
421 fn handle_initialize(&self, _params: &Value) -> RpcResult {
424 let default_model = self
425 .config
426 .providers
427 .models
428 .iter_entries()
429 .find_map(|(_, _, e)| e.model.clone());
430
431 let mut zeroclaw_meta = serde_json::json!({
432 "maxSessions": self.acp_config.max_sessions,
433 "sessionTimeoutSecs": self.acp_config.session_timeout_secs,
434 });
435 if let Some(model) = default_model {
436 zeroclaw_meta["defaultModel"] = serde_json::json!(model);
437 }
438
439 let session_capabilities = if self.store.is_some() {
440 serde_json::json!({ "resume": {}, "close": {} })
441 } else {
442 serde_json::json!({})
443 };
444
445 Ok(serde_json::json!({
446 "protocolVersion": ACP_PROTOCOL_VERSION,
447 "agentCapabilities": {
448 "loadSession": self.store.is_some(),
449 "promptCapabilities": {
450 "image": false,
451 "audio": false,
452 "embeddedContext": false,
453 },
454 "mcpCapabilities": {
455 "http": false,
456 "sse": false,
457 },
458 "sessionCapabilities": session_capabilities,
459 },
460 "agentInfo": {
461 "name": "zeroclaw-acp",
462 "title": "ZeroClaw ACP",
463 "version": env!("CARGO_PKG_VERSION"),
464 },
465 "authMethods": [],
466 "_meta": {
467 "zeroclaw": zeroclaw_meta,
468 }
469 }))
470 }
471
472 async fn handle_session_new(&self, params: &Value) -> RpcResult {
473 let mut sessions = self.sessions.lock().await;
474
475 let loading_count = self.loading_sessions.lock().await.len();
476 if sessions.len() + loading_count >= self.acp_config.max_sessions {
477 ::zeroclaw_log::record!(
478 WARN,
479 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
480 .with_category(::zeroclaw_log::EventCategory::Channel)
481 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
482 .with_attrs(::serde_json::json!({
483 "active": sessions.len(),
484 "loading": loading_count,
485 "max": self.acp_config.max_sessions,
486 })),
487 "ACP session/new rejected: session limit reached"
488 );
489 return Err(RpcError {
490 code: SESSION_LIMIT_REACHED,
491 message: format!(
492 "Maximum session limit reached ({})",
493 self.acp_config.max_sessions
494 ),
495 data: None,
496 });
497 }
498
499 let requested_cwd = self.requested_session_cwd(params);
500
501 let workspace_dir = std::fs::canonicalize(&requested_cwd)
502 .map_err(|e| RpcError {
503 code: INVALID_PARAMS,
504 message: format!(
505 "cwd is not a usable directory ({}): {e}",
506 requested_cwd.display()
507 ),
508 data: None,
509 })?
510 .to_string_lossy()
511 .into_owned();
512
513 let agent_alias = params
518 .get("agentAlias")
519 .or_else(|| params.get("agent_alias"))
520 .or_else(|| params.get("agent"))
521 .and_then(Value::as_str)
522 .map(str::trim)
523 .filter(|s| !s.is_empty())
524 .map(str::to_string)
525 .or_else(|| self.config.acp.default_agent.clone())
526 .or_else(|| {
527 let mut keys = self.config.agents.keys();
528 if self.config.agents.len() == 1 {
529 keys.next().cloned()
530 } else {
531 None
532 }
533 })
534 .ok_or_else(|| RpcError {
535 code: INVALID_PARAMS,
536 message: "session/new requires `agentAlias` (alias of a configured \
537 [agents.<alias>] entry)"
538 .to_string(),
539 data: None,
540 })?;
541 if self.config.agent(&agent_alias).is_none() {
542 return Err(RpcError {
543 code: INVALID_PARAMS,
544 message: format!(
545 "Unknown agent `{agent_alias}` — no [agents.{agent_alias}] entry configured"
546 ),
547 data: None,
548 });
549 }
550
551 let session_id = Uuid::new_v4().to_string();
552
553 let agent = Agent::from_config_with_session_cwd_and_mcp_backchannel(
559 &self.config,
560 &agent_alias,
561 Some(std::path::Path::new(&workspace_dir)),
562 false,
563 true,
564 )
565 .await
566 .map_err(|e| RpcError {
567 code: INTERNAL_ERROR,
568 message: format!("Failed to create agent: {e}"),
569 data: None,
570 })?;
571
572 let acp_channel = Arc::new(AcpChannel::new(
577 "acp",
578 session_id.clone(),
579 Arc::clone(&self.rpc),
580 Duration::from_secs(self.acp_config.session_timeout_secs),
581 ));
582 agent.channel_handles().register_channel("acp", acp_channel);
583
584 let now = Instant::now();
585 sessions.insert(
586 session_id.clone(),
587 Arc::new(Mutex::new(Session {
588 agent,
589 created_at: now,
590 last_active: now,
591 agent_alias: agent_alias.clone(),
592 model_provider: self
593 .config
594 .agent(&agent_alias)
595 .map(|a| a.model_provider.to_string())
596 .unwrap_or_default(),
597 model: self
598 .config
599 .model_provider_for_agent(&agent_alias)
600 .and_then(|mp| mp.model.clone())
601 .unwrap_or_default(),
602 })),
603 );
604
605 if let Some(store) = &self.store {
606 let store = store.clone();
607 let sid = session_id.clone();
608 let alias = agent_alias.clone();
609 let wsd = workspace_dir.clone();
610 let created =
611 tokio::task::spawn_blocking(move || store.create_session(&sid, &alias, &wsd)).await;
612 let error = match created {
613 Ok(Ok(_)) => None,
614 Ok(Err(e)) => Some(e.to_string()),
615 Err(join) => Some(join.to_string()),
616 };
617 if let Some(detail) = error {
618 sessions.remove(&session_id);
620 return Err(RpcError {
621 code: INTERNAL_ERROR,
622 message: format!("Failed to persist session: {detail}"),
623 data: None,
624 });
625 }
626 }
627
628 let mp = self
629 .config
630 .agent(&agent_alias)
631 .map(|a| a.model_provider.to_string())
632 .unwrap_or_default();
633 let model_name = self
634 .config
635 .model_provider_for_agent(&agent_alias)
636 .and_then(|mp| mp.model.clone())
637 .unwrap_or_default();
638 ::zeroclaw_log::record!(
639 INFO,
640 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start)
641 .with_category(::zeroclaw_log::EventCategory::Channel)
642 .with_outcome(::zeroclaw_log::EventOutcome::Success)
643 .with_attrs(::serde_json::json!({
644 "session_id": session_id,
645 "workspace_dir": workspace_dir,
646 "agent_alias": agent_alias,
647 "model_provider": mp,
648 "model": model_name,
649 })),
650 "ACP session created"
651 );
652
653 Ok(serde_json::json!({
654 "sessionId": session_id,
655 "workspaceDir": workspace_dir,
656 }))
657 }
658
659 async fn handle_session_load(&self, params: &Value) -> RpcResult {
660 let session_id = params
661 .get("sessionId")
662 .or_else(|| params.get("session_id"))
663 .and_then(|v| v.as_str())
664 .ok_or_else(|| RpcError {
665 code: INVALID_PARAMS,
666 message: "Missing required parameter: sessionId".to_string(),
667 data: None,
668 })?
669 .to_string();
670
671 let store = self.store.as_ref().ok_or_else(|| RpcError {
672 code: SESSION_NOT_FOUND,
673 message: format!("Session not found: {session_id}"),
674 data: None,
675 })?;
676
677 {
679 let sessions = self.sessions.lock().await;
680 let mut loading = self.loading_sessions.lock().await;
681 if sessions.len() + loading.len() >= self.acp_config.max_sessions {
682 ::zeroclaw_log::record!(
683 WARN,
684 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
685 .with_category(::zeroclaw_log::EventCategory::Channel)
686 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
687 .with_attrs(::serde_json::json!({
688 "session_id": session_id,
689 "active": sessions.len(),
690 "loading": loading.len(),
691 "max": self.acp_config.max_sessions,
692 })),
693 "ACP session/load rejected: session limit reached"
694 );
695 return Err(RpcError {
696 code: SESSION_LIMIT_REACHED,
697 message: format!(
698 "Maximum session limit reached ({})",
699 self.acp_config.max_sessions
700 ),
701 data: None,
702 });
703 }
704 if sessions.contains_key(&session_id) || loading.contains(&session_id) {
705 ::zeroclaw_log::record!(
706 WARN,
707 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
708 .with_category(::zeroclaw_log::EventCategory::Channel)
709 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
710 .with_attrs(::serde_json::json!({"session_id": session_id})),
711 "ACP session/load rejected: session already active"
712 );
713 return Err(RpcError {
714 code: INVALID_PARAMS,
715 message: format!(
716 "Session already active: {session_id}. Call session/close first."
717 ),
718 data: None,
719 });
720 }
721 loading.insert(session_id.clone());
722 }
723
724 let data = store
728 .load_session(&session_id)
729 .map_err(|e| RpcError {
730 code: INTERNAL_ERROR,
731 message: format!("Failed to load session: {e}"),
732 data: None,
733 })
734 .and_then(|opt| {
735 opt.ok_or_else(|| RpcError {
736 code: SESSION_NOT_FOUND,
737 message: format!("Session not found: {session_id}"),
738 data: None,
739 })
740 });
741
742 let data = match data {
744 Ok(d) => d,
745 Err(e) => {
746 self.loading_sessions.lock().await.remove(&session_id);
747 return Err(e);
748 }
749 };
750
751 let workspace_dir = std::path::PathBuf::from(&data.workspace_dir);
752
753 let restore_alias = self
754 .config
755 .acp
756 .default_agent
757 .clone()
758 .or_else(|| {
759 let mut keys = self.config.agents.keys();
760 if self.config.agents.len() == 1 {
761 keys.next().cloned()
762 } else {
763 None
764 }
765 })
766 .unwrap_or_else(|| "default".to_string());
767
768 let agent_result = Agent::from_config_with_session_cwd_and_mcp_backchannel(
769 &self.config,
770 &restore_alias,
771 Some(&workspace_dir),
772 false,
773 true,
774 )
775 .await
776 .map_err(|e| RpcError {
777 code: INTERNAL_ERROR,
778 message: format!("Failed to create agent: {e}"),
779 data: None,
780 });
781
782 let mut agent = match agent_result {
783 Ok(a) => a,
784 Err(e) => {
785 self.loading_sessions.lock().await.remove(&session_id);
786 return Err(e);
787 }
788 };
789
790 agent.seed_conversation_history(data.messages.clone());
791
792 let acp_channel = Arc::new(AcpChannel::new(
793 "acp",
794 session_id.clone(),
795 Arc::clone(&self.rpc),
796 Duration::from_secs(self.acp_config.session_timeout_secs),
797 ));
798 agent.channel_handles().register_channel("acp", acp_channel);
799
800 let now = Instant::now();
801 {
803 let mut sessions = self.sessions.lock().await;
804 let mut loading = self.loading_sessions.lock().await;
805 loading.remove(&session_id);
806 sessions.insert(
807 session_id.clone(),
808 Arc::new(Mutex::new(Session {
809 agent,
810 created_at: now,
811 last_active: now,
812 agent_alias: restore_alias.clone(),
813 model_provider: self
814 .config
815 .agent(&restore_alias)
816 .map(|a| a.model_provider.to_string())
817 .unwrap_or_default(),
818 model: self
819 .config
820 .model_provider_for_agent(&restore_alias)
821 .and_then(|mp| mp.model.clone())
822 .unwrap_or_default(),
823 })),
824 );
825 }
826
827 for msg in &data.messages {
829 for notification in history_notifications_for_message(&session_id, msg) {
830 self.write_notification(¬ification).await;
831 }
832 }
833
834 let mp = self
835 .config
836 .agent(&restore_alias)
837 .map(|a| a.model_provider.to_string())
838 .unwrap_or_default();
839 let model_name = self
840 .config
841 .model_provider_for_agent(&restore_alias)
842 .and_then(|mp| mp.model.clone())
843 .unwrap_or_default();
844 ::zeroclaw_log::record!(
845 INFO,
846 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start)
847 .with_category(::zeroclaw_log::EventCategory::Channel)
848 .with_outcome(::zeroclaw_log::EventOutcome::Success)
849 .with_attrs(::serde_json::json!({
850 "session_id": session_id,
851 "message_count": data.messages.len(),
852 "agent_alias": restore_alias,
853 "model_provider": mp,
854 "model": model_name,
855 })),
856 "ACP session loaded"
857 );
858 Ok(serde_json::json!({}))
859 }
860
861 async fn handle_session_resume(&self, params: &Value) -> RpcResult {
862 let session_id = params
863 .get("sessionId")
864 .or_else(|| params.get("session_id"))
865 .and_then(|v| v.as_str())
866 .ok_or_else(|| RpcError {
867 code: INVALID_PARAMS,
868 message: "Missing required parameter: sessionId".to_string(),
869 data: None,
870 })?
871 .to_string();
872
873 let store = self.store.as_ref().ok_or_else(|| RpcError {
874 code: SESSION_NOT_FOUND,
875 message: format!("Session not found: {session_id}"),
876 data: None,
877 })?;
878
879 {
881 let sessions = self.sessions.lock().await;
882 let mut loading = self.loading_sessions.lock().await;
883 if sessions.len() + loading.len() >= self.acp_config.max_sessions {
884 ::zeroclaw_log::record!(
885 WARN,
886 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
887 .with_category(::zeroclaw_log::EventCategory::Channel)
888 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
889 .with_attrs(::serde_json::json!({
890 "session_id": session_id,
891 "active": sessions.len(),
892 "loading": loading.len(),
893 "max": self.acp_config.max_sessions,
894 })),
895 "ACP session/resume rejected: session limit reached"
896 );
897 return Err(RpcError {
898 code: SESSION_LIMIT_REACHED,
899 message: format!(
900 "Maximum session limit reached ({})",
901 self.acp_config.max_sessions
902 ),
903 data: None,
904 });
905 }
906 if sessions.contains_key(&session_id) || loading.contains(&session_id) {
907 ::zeroclaw_log::record!(
908 WARN,
909 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
910 .with_category(::zeroclaw_log::EventCategory::Channel)
911 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
912 .with_attrs(::serde_json::json!({"session_id": session_id})),
913 "ACP session/resume rejected: session already active"
914 );
915 return Err(RpcError {
916 code: INVALID_PARAMS,
917 message: format!(
918 "Session already active: {session_id}. Call session/close first."
919 ),
920 data: None,
921 });
922 }
923 loading.insert(session_id.clone());
924 }
925
926 let data = store
927 .load_session(&session_id)
928 .map_err(|e| RpcError {
929 code: INTERNAL_ERROR,
930 message: format!("Failed to load session: {e}"),
931 data: None,
932 })
933 .and_then(|opt| {
934 opt.ok_or_else(|| RpcError {
935 code: SESSION_NOT_FOUND,
936 message: format!("Session not found: {session_id}"),
937 data: None,
938 })
939 });
940
941 let data = match data {
943 Ok(d) => d,
944 Err(e) => {
945 self.loading_sessions.lock().await.remove(&session_id);
946 return Err(e);
947 }
948 };
949
950 let workspace_dir = std::path::PathBuf::from(&data.workspace_dir);
951
952 let restore_alias = self
953 .config
954 .acp
955 .default_agent
956 .clone()
957 .or_else(|| {
958 let mut keys = self.config.agents.keys();
959 if self.config.agents.len() == 1 {
960 keys.next().cloned()
961 } else {
962 None
963 }
964 })
965 .unwrap_or_else(|| "default".to_string());
966
967 let agent_result = Agent::from_config_with_session_cwd_and_mcp_backchannel(
968 &self.config,
969 &restore_alias,
970 Some(&workspace_dir),
971 false,
972 true,
973 )
974 .await
975 .map_err(|e| RpcError {
976 code: INTERNAL_ERROR,
977 message: format!("Failed to create agent: {e}"),
978 data: None,
979 });
980
981 let mut agent = match agent_result {
982 Ok(a) => a,
983 Err(e) => {
984 self.loading_sessions.lock().await.remove(&session_id);
985 return Err(e);
986 }
987 };
988
989 agent.seed_conversation_history(data.messages);
990
991 let acp_channel = Arc::new(AcpChannel::new(
992 "acp",
993 session_id.clone(),
994 Arc::clone(&self.rpc),
995 Duration::from_secs(self.acp_config.session_timeout_secs),
996 ));
997 agent.channel_handles().register_channel("acp", acp_channel);
998
999 let now = Instant::now();
1000 {
1002 let mut sessions = self.sessions.lock().await;
1003 let mut loading = self.loading_sessions.lock().await;
1004 loading.remove(&session_id);
1005 sessions.insert(
1006 session_id.clone(),
1007 Arc::new(Mutex::new(Session {
1008 agent,
1009 created_at: now,
1010 last_active: now,
1011 agent_alias: restore_alias.clone(),
1012 model_provider: self
1013 .config
1014 .agent(&restore_alias)
1015 .map(|a| a.model_provider.to_string())
1016 .unwrap_or_default(),
1017 model: self
1018 .config
1019 .model_provider_for_agent(&restore_alias)
1020 .and_then(|mp| mp.model.clone())
1021 .unwrap_or_default(),
1022 })),
1023 );
1024 }
1025
1026 let mp = self
1027 .config
1028 .agent(&restore_alias)
1029 .map(|a| a.model_provider.to_string())
1030 .unwrap_or_default();
1031 let model_name = self
1032 .config
1033 .model_provider_for_agent(&restore_alias)
1034 .and_then(|mp| mp.model.clone())
1035 .unwrap_or_default();
1036 ::zeroclaw_log::record!(
1037 INFO,
1038 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start)
1039 .with_category(::zeroclaw_log::EventCategory::Channel)
1040 .with_outcome(::zeroclaw_log::EventOutcome::Success)
1041 .with_attrs(::serde_json::json!({
1042 "session_id": session_id,
1043 "agent_alias": restore_alias,
1044 "model_provider": mp,
1045 "model": model_name,
1046 })),
1047 "ACP session resumed"
1048 );
1049 Ok(serde_json::json!({}))
1050 }
1051
1052 async fn handle_session_close(&self, params: &Value) -> RpcResult {
1061 let session_id = params
1062 .get("sessionId")
1063 .or_else(|| params.get("session_id"))
1064 .and_then(|v| v.as_str())
1065 .ok_or_else(|| RpcError {
1066 code: INVALID_PARAMS,
1067 message: "Missing required parameter: sessionId".to_string(),
1068 data: None,
1069 })?;
1070
1071 let token = self
1073 .cancel_tokens
1074 .lock()
1075 .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1076 .get(session_id)
1077 .cloned();
1078 if let Some(token) = token {
1079 token.cancel();
1080 ::zeroclaw_log::record!(
1081 INFO,
1082 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1083 .with_category(::zeroclaw_log::EventCategory::Channel)
1084 .with_attrs(::serde_json::json!({"session_id": session_id})),
1085 "ACP session/close: cancelled active turn"
1086 );
1087 }
1088
1089 let session_arc = {
1090 let mut sessions = self.sessions.lock().await;
1091 sessions.remove(session_id).ok_or_else(|| RpcError {
1092 code: SESSION_NOT_FOUND,
1093 message: format!("Session not found: {session_id}"),
1094 data: None,
1095 })?
1096 };
1097
1098 let session = session_arc.lock().await;
1100 let agent_alias = session.agent_alias.clone();
1101 let model_provider = session.model_provider.clone();
1102 let model = session.model.clone();
1103 session.agent.channel_handles().unregister_channel("acp");
1104 ::zeroclaw_log::record!(
1105 INFO,
1106 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
1107 .with_category(::zeroclaw_log::EventCategory::Channel)
1108 .with_outcome(::zeroclaw_log::EventOutcome::Success)
1109 .with_attrs(::serde_json::json!({
1110 "session_id": session_id,
1111 "agent_alias": agent_alias,
1112 "model_provider": model_provider,
1113 "model": model,
1114 })),
1115 "ACP session closed"
1116 );
1117
1118 Ok(serde_json::json!({}))
1119 }
1120
1121 fn requested_session_cwd(&self, params: &Value) -> PathBuf {
1122 params
1123 .get("cwd")
1124 .or_else(|| params.get("workspaceDir"))
1125 .or_else(|| params.get("workspace_dir"))
1126 .and_then(|v| v.as_str())
1127 .map(PathBuf::from)
1128 .unwrap_or_else(|| {
1129 std::env::current_dir().unwrap_or_else(|_| self.config.data_dir.clone())
1130 })
1131 }
1132
1133 async fn handle_session_prompt(&self, params: &Value, _request_id: &Value) -> RpcResult {
1134 let session_id = params
1135 .get("sessionId")
1136 .or_else(|| params.get("session_id"))
1137 .and_then(|v| v.as_str())
1138 .ok_or_else(|| RpcError {
1139 code: INVALID_PARAMS,
1140 message: "Missing required parameter: sessionId".to_string(),
1141 data: None,
1142 })?
1143 .to_string();
1144
1145 let prompt = Self::parse_prompt(params)?;
1146
1147 let session_arc = {
1151 let sessions = self.sessions.lock().await;
1152 sessions.get(&session_id).cloned().ok_or_else(|| RpcError {
1153 code: SESSION_NOT_FOUND,
1154 message: format!("Session not found: {session_id}"),
1155 data: None,
1156 })?
1157 };
1158
1159 let (agent_alias, model_provider, model) = {
1161 if let Ok(s) = session_arc.try_lock() {
1165 (
1166 s.agent_alias.clone(),
1167 s.model_provider.clone(),
1168 s.model.clone(),
1169 )
1170 } else {
1171 (String::new(), String::new(), String::new())
1172 }
1173 };
1174
1175 let session_id_s = session_id.clone();
1183 let agent_alias_s = agent_alias.clone();
1184 let model_provider_s = model_provider.clone();
1185 let model_s = model.clone();
1186 ::zeroclaw_log::scope!(
1187 agent_alias: agent_alias_s.as_str(),
1188 model_provider: model_provider_s.as_str(),
1189 model: model_s.as_str(),
1190 session_key: session_id_s.as_str(),
1191 channel: "acp",
1192 => async move {
1193
1194 ::zeroclaw_log::record!(
1195 INFO,
1196 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start).with_category(::zeroclaw_log::EventCategory::Channel)
1197 .with_attrs(::serde_json::json!({
1198 "prompt_len": prompt.len(),
1199 })),
1200 "ACP session/prompt turn starting"
1201 );
1202
1203 let cancel_token = tokio_util::sync::CancellationToken::new();
1210 self.register_cancel_token(&session_id, cancel_token.clone())?;
1211 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(100);
1212
1213 let session_id_for_task = session_id.clone();
1218 let turn_handle = zeroclaw_spawn::spawn!(async move {
1219 let mut session = session_arc.lock().await;
1220 let (turn_alias, turn_provider, turn_model) = session.agent.attribution_fields();
1221 let span_session = session_id_for_task.clone();
1222 let result = {
1223 use ::zeroclaw_log::Instrument as _;
1224 let span = ::zeroclaw_log::info_span!(
1225 target: "zeroclaw_log_internal_scope",
1226 "zeroclaw_scope",
1227 session_key = %span_session,
1228 agent_alias = %turn_alias,
1229 model_provider = %turn_provider,
1230 model = %turn_model,
1231 channel = "acp",
1232 );
1233 zeroclaw_runtime::agent::loop_::scope_session_key(
1234 Some(session_id_for_task),
1235 session
1236 .agent
1237 .turn_streamed(&prompt, event_tx, Some(cancel_token))
1238 .instrument(span),
1239 )
1240 .await
1241 };
1242 session.last_active = Instant::now();
1243 result
1244 });
1246
1247 let mut accumulated_text = String::new();
1253 let mut tool_call_count: u32 = 0;
1254 while let Some(event) = event_rx.recv().await {
1255 if let TurnEvent::Usage { input_tokens, .. } = &event {
1262 if let (Some(store), Some(it)) = (&self.store, input_tokens) {
1273 let store = store.clone();
1274 let sid = session_id.clone();
1275 let it = *it;
1276 zeroclaw_spawn::spawn!(async move {
1277 let persisted =
1278 tokio::task::spawn_blocking(move || store.set_token_count(&sid, it))
1279 .await;
1280 let error = match persisted {
1281 Ok(Ok(())) => return,
1282 Ok(Err(e)) => e.to_string(),
1283 Err(join) => join.to_string(),
1284 };
1285 ::zeroclaw_log::record!(
1286 WARN,
1287 ::zeroclaw_log::Event::new(
1288 module_path!(),
1289 ::zeroclaw_log::Action::Write,
1290 )
1291 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1292 .with_attrs(::serde_json::json!({
1293 "input_tokens": it,
1294 "error": error,
1295 })),
1296 "Failed to persist ACP session token_count"
1297 );
1298 });
1299 }
1300 continue;
1301 }
1302 match &event {
1306 TurnEvent::ToolCall { id, name, args } => {
1307 tool_call_count += 1;
1308 ::zeroclaw_log::record!(
1309 DEBUG,
1310 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start).with_category(::zeroclaw_log::EventCategory::Channel)
1311 .with_attrs(::serde_json::json!({
1312 "tool_call_id": id,
1313 "tool": name,
1314 "args_len": args.to_string().len(),
1315 })),
1316 "ACP tool call dispatched"
1317 );
1318 }
1319 TurnEvent::ToolResult { id, name, output } => {
1320 ::zeroclaw_log::record!(
1321 DEBUG,
1322 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete).with_category(::zeroclaw_log::EventCategory::Channel)
1323 .with_outcome(::zeroclaw_log::EventOutcome::Success)
1324 .with_attrs(::serde_json::json!({
1325 "tool_call_id": id,
1326 "tool": name,
1327 "output_len": output.len(),
1328 })),
1329 "ACP tool call completed"
1330 );
1331 }
1332 TurnEvent::Chunk { delta } => {
1333 accumulated_text.push_str(delta);
1334 }
1335 _ => {}
1336 }
1337 if let Some(notification) = notification_for_turn_event(&session_id, &event) {
1338 self.write_notification(¬ification).await;
1339 }
1340 }
1341
1342 self.remove_cancel_token(&session_id);
1345
1346 let turn_result = turn_handle.await.map_err(|e| RpcError {
1347 code: INTERNAL_ERROR,
1348 message: format!("Agent task panicked: {e}"),
1349 data: None,
1350 })?;
1351
1352 let was_cancelled = match &turn_result {
1355 Err(e) => zeroclaw_runtime::agent::loop_::is_tool_loop_cancelled(e),
1356 Ok(_) => false,
1357 };
1358
1359 if was_cancelled {
1360 ::zeroclaw_log::record!(
1361 INFO,
1362 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete).with_category(::zeroclaw_log::EventCategory::Channel)
1363 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1364 .with_attrs(::serde_json::json!({
1365 "tool_calls": tool_call_count,
1366 "stop_reason": "cancelled",
1367 })),
1368 "ACP session/prompt turn cancelled"
1369 );
1370 return Ok(Self::cancelled_prompt_result(session_id, &accumulated_text));
1371 }
1372
1373 let (result_text, new_turn_msgs) = turn_result.map_err(|e| {
1374 ::zeroclaw_log::record!(
1375 ERROR,
1376 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail).with_category(::zeroclaw_log::EventCategory::Channel)
1377 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1378 .with_attrs(::serde_json::json!({
1379 "error": e.to_string(),
1380 })),
1381 "ACP session/prompt turn failed"
1382 );
1383 RpcError {
1384 code: INTERNAL_ERROR,
1385 message: format!("Agent turn failed: {e}"),
1386 data: None,
1387 }
1388 })?;
1389
1390 if let Some(store) = &self.store
1392 && !new_turn_msgs.is_empty()
1393 {
1394 let store = store.clone();
1395 let sid = session_id.clone();
1396 let msgs = new_turn_msgs;
1397 let persisted =
1398 tokio::task::spawn_blocking(move || store.append_turn(&sid, &msgs)).await;
1399 let error = match persisted {
1400 Ok(Ok(())) => None,
1401 Ok(Err(e)) => Some(e.to_string()),
1402 Err(join) => Some(join.to_string()),
1403 };
1404 if let Some(detail) = error {
1405 ::zeroclaw_log::record!(
1406 WARN,
1407 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_category(::zeroclaw_log::EventCategory::Channel)
1408 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1409 .with_attrs(::serde_json::json!({
1410 "error": detail,
1411 })),
1412 "Failed to persist turn; session continues in memory"
1413 );
1414 }
1415 }
1416
1417 ::zeroclaw_log::record!(
1418 INFO,
1419 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete).with_category(::zeroclaw_log::EventCategory::Channel)
1420 .with_outcome(::zeroclaw_log::EventOutcome::Success)
1421 .with_attrs(::serde_json::json!({
1422 "tool_calls": tool_call_count,
1423 "response_len": result_text.len(),
1424 "stop_reason": "end_turn",
1425 })),
1426 "ACP session/prompt turn complete"
1427 );
1428
1429 Ok(Self::prompt_result(session_id, "end_turn", result_text))
1430
1431 }).await
1432 }
1433
1434 fn register_cancel_token(
1435 &self,
1436 session_id: &str,
1437 cancel_token: tokio_util::sync::CancellationToken,
1438 ) -> std::result::Result<(), RpcError> {
1439 let mut tokens = self
1440 .cancel_tokens
1441 .lock()
1442 .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops");
1443 if tokens.contains_key(session_id) {
1444 ::zeroclaw_log::record!(
1445 WARN,
1446 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1447 .with_category(::zeroclaw_log::EventCategory::Channel)
1448 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1449 .with_attrs(::serde_json::json!({"session_id": session_id})),
1450 "ACP session/prompt rejected: session already has an active turn"
1451 );
1452 return Err(RpcError {
1453 code: SESSION_BUSY,
1454 message: format!("Session already has an active prompt turn: {session_id}"),
1455 data: None,
1456 });
1457 }
1458 tokens.insert(session_id.to_string(), cancel_token);
1459 Ok(())
1460 }
1461
1462 fn remove_cancel_token(&self, session_id: &str) {
1463 self.cancel_tokens
1464 .lock()
1465 .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1466 .remove(session_id);
1467 }
1468
1469 fn prompt_result(session_id: String, stop_reason: &'static str, text: String) -> Value {
1470 serde_json::json!({
1471 "sessionId": session_id,
1472 "stopReason": stop_reason,
1473 "content": text,
1474 })
1475 }
1476
1477 fn cancelled_prompt_result(session_id: String, accumulated_text: &str) -> Value {
1478 let content = if accumulated_text.is_empty() {
1479 "[interrupted by user]".to_string()
1480 } else {
1481 format!("{accumulated_text}\n\n[interrupted by user]")
1482 };
1483 Self::prompt_result(session_id, "cancelled", content)
1484 }
1485
1486 fn parse_prompt(params: &Value) -> std::result::Result<String, RpcError> {
1487 match params.get("prompt") {
1488 Some(Value::String(s)) => Ok(s.clone()),
1489 Some(Value::Array(arr)) => {
1490 let mut joined = String::new();
1491 for part in arr {
1492 let mut added = false;
1493 if let Some(text) = part.get("text").and_then(|v| v.as_str()) {
1494 if !joined.is_empty() {
1495 joined.push_str("\n\n");
1496 }
1497 joined.push_str(text);
1498 added = true;
1499 }
1500 if let Some(res) = part.get("resource")
1503 && let Some(text) = res.get("text").and_then(|v| v.as_str())
1504 {
1505 if added || !joined.is_empty() {
1506 joined.push_str("\n\n");
1507 }
1508 joined.push_str(text);
1509 }
1510 }
1511 if joined.is_empty() {
1512 return Err(RpcError {
1513 code: INVALID_PARAMS,
1514 message: "Parameter 'prompt' array must contain at least one text part"
1515 .to_string(),
1516 data: None,
1517 });
1518 }
1519 Ok(joined)
1520 }
1521 _ => Err(RpcError {
1522 code: INVALID_PARAMS,
1523 message: "Missing required parameter: prompt (must be string or array of parts)"
1524 .to_string(),
1525 data: None,
1526 }),
1527 }
1528 }
1529
1530 async fn handle_session_stop(&self, params: &Value) -> RpcResult {
1531 let session_id = params
1532 .get("sessionId")
1533 .or_else(|| params.get("session_id"))
1534 .and_then(|v| v.as_str())
1535 .ok_or_else(|| RpcError {
1536 code: INVALID_PARAMS,
1537 message: "Missing required parameter: sessionId".to_string(),
1538 data: None,
1539 })?;
1540
1541 let session_arc = {
1542 let mut sessions = self.sessions.lock().await;
1543 sessions.remove(session_id).ok_or_else(|| RpcError {
1544 code: SESSION_NOT_FOUND,
1545 message: format!("Session not found: {session_id}"),
1546 data: None,
1547 })?
1548 };
1549
1550 let session = session_arc.lock().await;
1553 let agent_alias = session.agent_alias.clone();
1554 let model_provider = session.model_provider.clone();
1555 let model = session.model.clone();
1556 session.agent.channel_handles().unregister_channel("acp");
1559 ::zeroclaw_log::record!(
1560 INFO,
1561 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
1562 .with_category(::zeroclaw_log::EventCategory::Channel)
1563 .with_outcome(::zeroclaw_log::EventOutcome::Success)
1564 .with_attrs(::serde_json::json!({
1565 "session_id": session_id,
1566 "agent_alias": agent_alias,
1567 "model_provider": model_provider,
1568 "model": model,
1569 })),
1570 "ACP session stopped"
1571 );
1572 Ok(serde_json::json!({
1573 "sessionId": session_id,
1574 "stopped": true,
1575 }))
1576 }
1577
1578 async fn handle_session_cancel(&self, params: &Value) -> RpcResult {
1591 let session_id = params
1592 .get("sessionId")
1593 .or_else(|| params.get("session_id"))
1594 .and_then(|v| v.as_str())
1595 .ok_or_else(|| RpcError {
1596 code: INVALID_PARAMS,
1597 message: "Missing required parameter: sessionId".to_string(),
1598 data: None,
1599 })?;
1600
1601 let token = self
1602 .cancel_tokens
1603 .lock()
1604 .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1605 .get(session_id)
1606 .cloned();
1607
1608 if let Some(token) = token {
1609 token.cancel();
1610 ::zeroclaw_log::record!(
1611 INFO,
1612 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1613 .with_category(::zeroclaw_log::EventCategory::Channel)
1614 .with_attrs(::serde_json::json!({"session_id": session_id})),
1615 "ACP session/cancel: fired cancel token for active turn"
1616 );
1617 }
1618
1619 Ok(serde_json::json!({}))
1620 }
1621
1622 async fn handle_session_event(&self, params: &Value) -> RpcResult {
1629 let session_id = params
1630 .get("sessionId")
1631 .or_else(|| params.get("session_id"))
1632 .and_then(|v| v.as_str())
1633 .ok_or_else(|| RpcError {
1634 code: INVALID_PARAMS,
1635 message: "Missing required parameter: sessionId".to_string(),
1636 data: None,
1637 })?
1638 .to_string();
1639
1640 let event_type = params
1641 .get("type")
1642 .or_else(|| params.get("update").and_then(|u| u.get("sessionUpdate")))
1643 .and_then(|v| v.as_str())
1644 .unwrap_or("unknown")
1645 .to_string();
1646
1647 ::zeroclaw_log::record!(
1648 DEBUG,
1649 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1650 .with_category(::zeroclaw_log::EventCategory::Channel)
1651 .with_attrs(
1652 ::serde_json::json!({"event_type": event_type, "session_id": session_id})
1653 ),
1654 "Received session update (type=) for session"
1655 );
1656
1657 let session_arc = {
1658 let sessions = self.sessions.lock().await;
1659 sessions.get(&session_id).cloned()
1660 };
1661
1662 if let Some(session_arc) = session_arc {
1663 if let Ok(mut session) = session_arc.try_lock() {
1666 session.last_active = Instant::now();
1667 }
1668 Ok(serde_json::json!({
1669 "sessionId": session_id,
1670 "type": event_type,
1671 "status": "processed"
1672 }))
1673 } else {
1674 Err(RpcError {
1675 code: SESSION_NOT_FOUND,
1676 message: format!("Session not found: {session_id}"),
1677 data: None,
1678 })
1679 }
1680 }
1681
1682 async fn write_result(&self, id: Value, result: Value) {
1685 let response = JsonRpcResponse {
1686 jsonrpc: "2.0",
1687 result: Some(result),
1688 error: None,
1689 id,
1690 };
1691 self.write_json(&response).await;
1692 }
1693
1694 async fn write_error(&self, id: Value, code: i32, message: &str) {
1695 let response = JsonRpcResponse {
1696 jsonrpc: "2.0",
1697 result: None,
1698 error: Some(JsonRpcError {
1699 code,
1700 message: message.to_string(),
1701 data: None,
1702 }),
1703 id,
1704 };
1705 self.write_json(&response).await;
1706 }
1707
1708 async fn write_notification(&self, notification: &JsonRpcNotification) {
1709 self.write_json(notification).await;
1710 }
1711
1712 async fn write_json<T: Serialize>(&self, value: &T) {
1713 match serde_json::to_string(value) {
1714 Ok(json) => {
1715 if !self.rpc.send_raw(json).await {
1716 ::zeroclaw_log::record!(
1717 ERROR,
1718 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1719 .with_category(::zeroclaw_log::EventCategory::Channel)
1720 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1721 "ACP writer task closed; dropping outbound message"
1722 );
1723 }
1724 }
1725 Err(e) => {
1726 ::zeroclaw_log::record!(
1727 ERROR,
1728 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1729 .with_category(::zeroclaw_log::EventCategory::Channel)
1730 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1731 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1732 "Failed to serialize JSON-RPC message"
1733 );
1734 }
1735 }
1736 }
1737}
1738
1739async fn writer_task(mut rx: mpsc::Receiver<String>) {
1743 let mut stdout = tokio::io::stdout();
1744 while let Some(line) = rx.recv().await {
1745 if let Err(e) = stdout.write_all(line.as_bytes()).await {
1746 ::zeroclaw_log::record!(
1747 ERROR,
1748 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1749 .with_category(::zeroclaw_log::EventCategory::Channel)
1750 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1751 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1752 "Failed to write to stdout"
1753 );
1754 continue;
1755 }
1756 if let Err(e) = stdout.write_all(b"\n").await {
1757 ::zeroclaw_log::record!(
1758 ERROR,
1759 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1760 .with_category(::zeroclaw_log::EventCategory::Channel)
1761 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1762 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1763 "Failed to write newline to stdout"
1764 );
1765 continue;
1766 }
1767 if let Err(e) = stdout.flush().await {
1768 ::zeroclaw_log::record!(
1769 ERROR,
1770 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1771 .with_category(::zeroclaw_log::EventCategory::Channel)
1772 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1773 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1774 "Failed to flush stdout"
1775 );
1776 }
1777 }
1778}
1779
1780fn to_acp_raw_input(name: &str, args: &Value) -> Value {
1788 match name {
1789 "file_edit" => {
1790 let path = args.get("path").cloned().unwrap_or(Value::Null);
1791 let old_text = args.get("old_string").cloned().unwrap_or(Value::Null);
1792 let new_text = args.get("new_string").cloned().unwrap_or(Value::Null);
1793 serde_json::json!({ "path": path, "oldText": old_text, "newText": new_text })
1794 }
1795 "file_write" => {
1796 let path = args.get("path").cloned().unwrap_or(Value::Null);
1797 let new_text = args.get("content").cloned().unwrap_or(Value::Null);
1798 serde_json::json!({ "path": path, "newText": new_text })
1799 }
1800 _ => args.clone(),
1801 }
1802}
1803
1804fn to_acp_content(name: &str, args: &Value) -> Value {
1811 match name {
1812 "file_edit" => {
1813 let path = args.get("path").cloned().unwrap_or(Value::Null);
1814 let old_text = args.get("old_string").cloned().unwrap_or(Value::Null);
1815 let new_text = args.get("new_string").cloned().unwrap_or(Value::Null);
1816 serde_json::json!([{ "type": "diff", "path": path, "oldText": old_text, "newText": new_text }])
1817 }
1818 "file_write" => {
1819 let path = args.get("path").cloned().unwrap_or(Value::Null);
1820 let new_text = args.get("content").cloned().unwrap_or(Value::Null);
1821 serde_json::json!([{ "type": "diff", "path": path, "newText": new_text }])
1822 }
1823 _ => serde_json::json!([]),
1824 }
1825}
1826
1827fn map_tool_kind(name: &str) -> &'static str {
1828 match name {
1829 "ask_user" | "calculator" | "claude_code" | "claude_code_runner" | "codex_cli"
1830 | "composio" | "delegate" | "escalate_to_human" | "execute_pipeline" | "gemini_cli"
1831 | "jira" | "llm_task" | "opencode_cli" | "schedule" | "security_ops" | "shell"
1832 | "sop_advance" | "sop_approve" | "sop_execute" | "vi_verify" => "execute",
1833 "backup" | "browser_open" | "canvas" | "cloud_ops" | "file_edit" | "file_write"
1834 | "memory_export" | "memory_store" | "report_template" => "edit",
1835 "cron_add" | "poll" | "reaction" => "edit",
1836 "memory_forget" | "memory_purge" => "delete",
1837 "content_search" | "discord_search" | "glob_search" | "knowledge" | "search"
1842 | "tool_search" | "web_search_tool" => "other",
1843 "browser"
1844 | "browser_delegate"
1845 | "cloud_patterns"
1846 | "data_management"
1847 | "file_read"
1848 | "git_operations"
1849 | "google_workspace"
1850 | "hardware_board_info"
1851 | "hardware_memory_map"
1852 | "hardware_memory_read"
1853 | "image_info"
1854 | "linkedin"
1855 | "microsoft365"
1856 | "model_routing_config"
1857 | "model_switch"
1858 | "pdf_read"
1859 | "project_intel"
1860 | "proxy_config"
1861 | "read_skill"
1862 | "sessions_history"
1863 | "sessions_list"
1864 | "sop_list"
1865 | "sop_status"
1866 | "text_browser"
1867 | "weather"
1868 | "workspace" => "other",
1869 "cron_list" | "cron_runs" | "memory_recall" => "other",
1870 "http_request" | "web_fetch" => "other",
1871 "image_gen" => "other",
1872 "cron_remove" => "delete",
1873 "cron_run" => "execute",
1874 "sessions_send" => "execute",
1875 _ => "other",
1876 }
1877}
1878
1879fn notification_for_turn_event(session_id: &str, event: &TurnEvent) -> Option<JsonRpcNotification> {
1880 Some(match event {
1881 TurnEvent::Chunk { delta } => JsonRpcNotification {
1882 jsonrpc: "2.0",
1883 method: "session/update",
1884 params: serde_json::json!({
1885 "sessionId": session_id,
1886 "update": {
1887 "sessionUpdate": "agent_message_chunk",
1888 "content": {
1889 "type": "text",
1890 "text": delta
1891 }
1892 }
1893 }),
1894 },
1895 TurnEvent::ToolCall { id, name, args } => {
1896 let acp_content = to_acp_content(name, args);
1897 let mut update = serde_json::json!({
1898 "sessionUpdate": "tool_call",
1899 "toolCallId": id,
1900 "name": name,
1901 "title": name,
1902 "kind": map_tool_kind(name),
1903 "rawInput": to_acp_raw_input(name, args),
1904 "status": "pending"
1905 });
1906 if acp_content
1907 .as_array()
1908 .is_some_and(|items| !items.is_empty())
1909 {
1910 update["content"] = acp_content;
1911 }
1912 JsonRpcNotification {
1913 jsonrpc: "2.0",
1914 method: "session/update",
1915 params: serde_json::json!({
1916 "sessionId": session_id,
1917 "update": update
1918 }),
1919 }
1920 }
1921 TurnEvent::ToolResult { id, name, output } => JsonRpcNotification {
1922 jsonrpc: "2.0",
1923 method: "session/update",
1924 params: serde_json::json!({
1925 "sessionId": session_id,
1926 "update": {
1927 "sessionUpdate": "tool_call_update",
1928 "toolCallId": id,
1929 "name": name,
1930 "title": name,
1931 "kind": map_tool_kind(name),
1932 "status": "completed",
1933 "rawOutput": output,
1934 "body": output,
1935 "content": [{
1936 "type": "content",
1937 "content": {
1938 "type": "text",
1939 "text": output
1940 }
1941 }]
1942 }
1943 }),
1944 },
1945 TurnEvent::Thinking { delta } => JsonRpcNotification {
1946 jsonrpc: "2.0",
1947 method: "session/update",
1948 params: serde_json::json!({
1949 "sessionId": session_id,
1950 "update": {
1951 "sessionUpdate": "agent_thought_chunk",
1952 "content": {
1953 "type": "text",
1954 "text": delta
1955 }
1956 }
1957 }),
1958 },
1959 TurnEvent::ApprovalRequest { .. } => return None,
1965 TurnEvent::Usage { .. } => unreachable!(
1969 "TurnEvent::Usage must be filtered before notification_for_turn_event; \
1970 ACP has no session/update notification for token usage"
1971 ),
1972 })
1973}
1974
1975fn history_notifications_for_message(
1976 session_id: &str,
1977 msg: &ConversationMessage,
1978) -> Vec<JsonRpcNotification> {
1979 match msg {
1980 ConversationMessage::Chat(chat) => {
1981 let update_type = match chat.role.as_str() {
1982 "user" => "user_message_chunk",
1983 "assistant" => "agent_message_chunk",
1984 _ => return vec![],
1985 };
1986 vec![JsonRpcNotification {
1987 jsonrpc: "2.0",
1988 method: "session/update",
1989 params: serde_json::json!({
1990 "sessionId": session_id,
1991 "update": {
1992 "sessionUpdate": update_type,
1993 "content": { "type": "text", "text": &chat.content }
1994 }
1995 }),
1996 }]
1997 }
1998 ConversationMessage::AssistantToolCalls {
1999 text, tool_calls, ..
2000 } => {
2001 let mut notifications = Vec::new();
2002 if let Some(t) = text
2003 && !t.is_empty()
2004 {
2005 notifications.push(JsonRpcNotification {
2006 jsonrpc: "2.0",
2007 method: "session/update",
2008 params: serde_json::json!({
2009 "sessionId": session_id,
2010 "update": {
2011 "sessionUpdate": "agent_message_chunk",
2012 "content": { "type": "text", "text": t }
2013 }
2014 }),
2015 });
2016 }
2017 for tc in tool_calls {
2018 let args: serde_json::Value =
2019 serde_json::from_str(&tc.arguments).unwrap_or(serde_json::Value::Null);
2020 let acp_content = to_acp_content(&tc.name, &args);
2021 let mut update = serde_json::json!({
2022 "sessionUpdate": "tool_call",
2023 "toolCallId": &tc.id,
2024 "name": &tc.name,
2025 "title": &tc.name,
2026 "kind": map_tool_kind(&tc.name),
2027 "rawInput": to_acp_raw_input(&tc.name, &args),
2028 "status": "completed"
2029 });
2030 if acp_content
2031 .as_array()
2032 .is_some_and(|items| !items.is_empty())
2033 {
2034 update["content"] = acp_content;
2035 }
2036 notifications.push(JsonRpcNotification {
2037 jsonrpc: "2.0",
2038 method: "session/update",
2039 params: serde_json::json!({
2040 "sessionId": session_id,
2041 "update": update
2042 }),
2043 });
2044 }
2045 notifications
2046 }
2047 ConversationMessage::ToolResults(results) => results
2048 .iter()
2049 .map(|r| JsonRpcNotification {
2050 jsonrpc: "2.0",
2051 method: "session/update",
2052 params: serde_json::json!({
2053 "sessionId": session_id,
2054 "update": {
2055 "sessionUpdate": "tool_call_update",
2056 "toolCallId": &r.tool_call_id,
2057 "status": "completed",
2058 "rawOutput": &r.content,
2059 "body": &r.content,
2060 "content": [{
2061 "type": "content",
2062 "content": { "type": "text", "text": &r.content }
2063 }]
2064 }
2065 }),
2066 })
2067 .collect(),
2068 }
2069}
2070
2071#[derive(Debug)]
2074struct RpcError {
2075 code: i32,
2076 message: String,
2077 #[allow(dead_code)] data: Option<Value>,
2079}
2080
2081type RpcResult = std::result::Result<Value, RpcError>;
2082
2083#[cfg(test)]
2084mod tests {
2085 use super::*;
2086
2087 #[test]
2088 fn acp_server_config_defaults() {
2089 let cfg = AcpServerConfig::default();
2090 assert_eq!(cfg.max_sessions, 10);
2091 assert_eq!(cfg.session_timeout_secs, 3600);
2092 }
2093
2094 #[test]
2095 fn acp_server_config_deserialize() {
2096 let json = r#"{"max_sessions": 5, "session_timeout_secs": 1800}"#;
2097 let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
2098 assert_eq!(cfg.max_sessions, 5);
2099 assert_eq!(cfg.session_timeout_secs, 1800);
2100 }
2101
2102 #[test]
2103 fn acp_server_config_deserialize_partial() {
2104 let json = r#"{"max_sessions": 3}"#;
2105 let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
2106 assert_eq!(cfg.max_sessions, 3);
2107 assert_eq!(cfg.session_timeout_secs, 3600);
2108 }
2109
2110 #[test]
2111 fn json_rpc_request_parse() {
2112 let json = r#"{"jsonrpc":"2.0","method":"initialize","params":{},"id":1}"#;
2113 let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
2114 assert_eq!(req.method, "initialize");
2115 assert_eq!(req.id, Some(Value::Number(1.into())));
2116 }
2117
2118 #[test]
2119 fn json_rpc_request_parse_notification() {
2120 let json = r#"{"jsonrpc":"2.0","method":"session/update","params":{}}"#;
2121 let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
2122 assert_eq!(req.method, "session/update");
2123 assert!(req.id.is_none());
2124 }
2125
2126 #[test]
2127 fn json_rpc_response_serialize() {
2128 let resp = JsonRpcResponse {
2129 jsonrpc: "2.0",
2130 result: Some(serde_json::json!({"status": "ok"})),
2131 error: None,
2132 id: Value::Number(1.into()),
2133 };
2134 let json = serde_json::to_string(&resp).unwrap();
2135 let parsed: Value = serde_json::from_str(&json).unwrap();
2136 assert_eq!(parsed["jsonrpc"], "2.0");
2137 assert!(parsed.get("result").is_some());
2138 assert!(parsed.get("error").is_none());
2139 assert_eq!(parsed["id"], 1);
2140 }
2141
2142 #[tokio::test]
2143 async fn rpc_request_timeout_drop_removes_pending_responder() {
2144 let (tx, mut rx) = mpsc::channel::<String>(16);
2145 let rpc = RpcOutbound::new(tx);
2146
2147 let result = tokio::time::timeout(
2148 Duration::from_millis(10),
2149 rpc.request("session/request_permission", serde_json::json!({})),
2150 )
2151 .await;
2152
2153 assert!(result.is_err());
2154 assert!(rx.recv().await.is_some());
2155 assert_eq!(rpc.pending_count(), 0);
2156 }
2157
2158 #[test]
2159 fn initialize_response_uses_acp_v1_shape() {
2160 let server = AcpServer::new(Config::default(), AcpServerConfig::default());
2161 let result = server
2162 .handle_initialize(&serde_json::json!({
2163 "protocolVersion": 1,
2164 "clientCapabilities": {},
2165 "clientInfo": {
2166 "name": "test-client",
2167 "version": "1.0.0"
2168 }
2169 }))
2170 .unwrap();
2171
2172 assert_eq!(result["protocolVersion"], 1);
2173 assert_eq!(result["agentInfo"]["name"], "zeroclaw-acp");
2174 assert_eq!(result["agentInfo"]["title"], "ZeroClaw ACP");
2175 assert_eq!(result["agentInfo"]["version"], env!("CARGO_PKG_VERSION"));
2176 assert_eq!(result["authMethods"], serde_json::json!([]));
2177 assert_eq!(result["agentCapabilities"]["loadSession"], false);
2178 assert_eq!(
2179 result["agentCapabilities"]["promptCapabilities"]["image"],
2180 false
2181 );
2182 assert_eq!(
2183 result["agentCapabilities"]["mcpCapabilities"]["http"],
2184 false
2185 );
2186 assert!(result.get("serverInfo").is_none());
2187 assert!(result.get("capabilities").is_none());
2188 }
2189
2190 #[test]
2191 fn initialize_advertises_load_session_when_store_present() {
2192 let cwd = tempfile::tempdir().unwrap();
2193 let store =
2194 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
2195 let server = AcpServer::new_with_store(
2196 make_test_config(cwd.path()),
2197 AcpServerConfig::default(),
2198 store,
2199 );
2200 let result = server.handle_initialize(&serde_json::json!({})).unwrap();
2201 assert_eq!(result["agentCapabilities"]["loadSession"], true);
2202 assert_eq!(
2203 result["agentCapabilities"]["sessionCapabilities"]["resume"],
2204 serde_json::json!({})
2205 );
2206 assert_eq!(
2207 result["agentCapabilities"]["sessionCapabilities"]["close"],
2208 serde_json::json!({})
2209 );
2210 }
2211
2212 #[test]
2213 fn session_new_defaults_to_launch_cwd_when_client_omits_cwd() {
2214 let config = Config {
2215 data_dir: PathBuf::from("/not/the/project"),
2216 ..Default::default()
2217 };
2218 let server = AcpServer::new(config, AcpServerConfig::default());
2219 let expected = std::env::current_dir().unwrap();
2220
2221 assert_eq!(
2222 server.requested_session_cwd(&serde_json::json!({})),
2223 expected
2224 );
2225 }
2226
2227 #[test]
2228 fn session_new_respects_client_cwd_when_present() {
2229 let server = AcpServer::new(Config::default(), AcpServerConfig::default());
2230 let cwd = std::env::current_dir().unwrap();
2231
2232 assert_eq!(
2233 server.requested_session_cwd(&serde_json::json!({"cwd": cwd})),
2234 cwd
2235 );
2236 }
2237
2238 #[tokio::test]
2239 async fn session_new_does_not_wait_for_configured_mcp_servers() {
2240 let cwd = tempfile::tempdir().unwrap();
2241 let mut config = Config {
2242 data_dir: cwd.path().to_path_buf(),
2243 providers: {
2244 let mut p = zeroclaw_config::providers::Providers::default();
2245 p.models.openrouter.insert(
2246 "default".to_string(),
2247 zeroclaw_config::schema::OpenRouterModelProviderConfig {
2248 base: zeroclaw_config::schema::ModelProviderConfig {
2249 model: Some("test-model".to_string()),
2250 ..Default::default()
2251 },
2252 },
2253 );
2254 p
2255 },
2256 mcp: zeroclaw_config::schema::McpConfig {
2257 enabled: true,
2258 servers: vec![zeroclaw_config::schema::McpServerConfig {
2259 name: "slow".to_string(),
2260 transport: zeroclaw_config::schema::McpTransport::Stdio,
2261 command: "/bin/sh".to_string(),
2262 args: vec!["-c".to_string(), "sleep 60".to_string()],
2263 ..Default::default()
2264 }],
2265 ..Default::default()
2266 },
2267 ..Default::default()
2268 };
2269 config.risk_profiles.insert(
2270 "default".to_string(),
2271 zeroclaw_config::schema::RiskProfileConfig::default(),
2272 );
2273 config.agents.insert(
2274 "test-agent".to_string(),
2275 zeroclaw_config::schema::AliasedAgentConfig {
2276 model_provider: "openrouter.default".into(),
2277 risk_profile: "default".to_string(),
2278 ..Default::default()
2279 },
2280 );
2281 let server = AcpServer::new(config, AcpServerConfig::default());
2282
2283 let result = tokio::time::timeout(
2284 Duration::from_secs(2),
2285 server.handle_session_new(&serde_json::json!({
2286 "cwd": cwd.path().to_string_lossy(),
2287 "agentAlias": "test-agent",
2288 "mcpServers": []
2289 })),
2290 )
2291 .await
2292 .expect("session/new should not block on configured MCP startup")
2293 .expect("session/new should create a session");
2294
2295 assert!(result["sessionId"].as_str().is_some());
2296 }
2297
2298 #[tokio::test]
2299 async fn session_new_auto_selects_sole_configured_agent_when_alias_omitted() {
2300 let cwd = tempfile::tempdir().unwrap();
2301 let mut config = Config {
2302 data_dir: cwd.path().to_path_buf(),
2303 providers: {
2304 let mut p = zeroclaw_config::providers::Providers::default();
2305 p.models.openrouter.insert(
2306 "default".to_string(),
2307 zeroclaw_config::schema::OpenRouterModelProviderConfig {
2308 base: zeroclaw_config::schema::ModelProviderConfig {
2309 api_key: Some("test-key".to_string()),
2310 model: Some("test-model".to_string()),
2311 ..Default::default()
2312 },
2313 },
2314 );
2315 p
2316 },
2317 ..Default::default()
2318 };
2319 config.risk_profiles.insert(
2320 "default".to_string(),
2321 zeroclaw_config::schema::RiskProfileConfig::default(),
2322 );
2323 config.agents.insert(
2324 "only-agent".to_string(),
2325 zeroclaw_config::schema::AliasedAgentConfig {
2326 model_provider: "openrouter.default".into(),
2327 risk_profile: "default".to_string(),
2328 ..Default::default()
2329 },
2330 );
2331 let server = AcpServer::new(config, AcpServerConfig::default());
2332
2333 let result = tokio::time::timeout(
2334 Duration::from_secs(2),
2335 server.handle_session_new(&serde_json::json!({
2336 "cwd": cwd.path().to_string_lossy(),
2337 "mcpServers": []
2338 })),
2339 )
2340 .await
2341 .expect("session/new should not block")
2342 .expect("session/new should auto-select the sole configured agent");
2343
2344 assert!(result["sessionId"].as_str().is_some());
2345 }
2346
2347 #[tokio::test]
2348 async fn session_new_requires_alias_when_multiple_agents_configured() {
2349 let mut config = Config::default();
2350 config.agents.insert(
2351 "agent-one".to_string(),
2352 zeroclaw_config::schema::AliasedAgentConfig::default(),
2353 );
2354 config.agents.insert(
2355 "agent-two".to_string(),
2356 zeroclaw_config::schema::AliasedAgentConfig::default(),
2357 );
2358 let server = AcpServer::new(config, AcpServerConfig::default());
2359
2360 let err = server
2361 .handle_session_new(&serde_json::json!({"mcpServers": []}))
2362 .await
2363 .expect_err("session/new without agentAlias should fail when multiple agents exist");
2364
2365 assert_eq!(err.code, INVALID_PARAMS);
2366 assert!(
2367 err.message.contains("agentAlias"),
2368 "error should mention agentAlias, got: {}",
2369 err.message
2370 );
2371 }
2372
2373 #[tokio::test]
2374 async fn session_new_uses_config_default_agent_when_alias_omitted_and_multiple_agents() {
2375 let cwd = tempfile::tempdir().unwrap();
2376 let mut config = Config {
2377 data_dir: cwd.path().to_path_buf(),
2378 providers: {
2379 let mut p = zeroclaw_config::providers::Providers::default();
2380 p.models.openrouter.insert(
2381 "default".to_string(),
2382 zeroclaw_config::schema::OpenRouterModelProviderConfig {
2383 base: zeroclaw_config::schema::ModelProviderConfig {
2384 api_key: Some("test-key".to_string()),
2385 model: Some("test-model".to_string()),
2386 ..Default::default()
2387 },
2388 },
2389 );
2390 p
2391 },
2392 ..Default::default()
2393 };
2394 config.risk_profiles.insert(
2395 "default".to_string(),
2396 zeroclaw_config::schema::RiskProfileConfig::default(),
2397 );
2398 config.agents.insert(
2399 "agent-alpha".to_string(),
2400 zeroclaw_config::schema::AliasedAgentConfig {
2401 model_provider: "openrouter.default".into(),
2402 risk_profile: "default".to_string(),
2403 ..Default::default()
2404 },
2405 );
2406 config.agents.insert(
2407 "agent-beta".to_string(),
2408 zeroclaw_config::schema::AliasedAgentConfig {
2409 model_provider: "openrouter.default".into(),
2410 risk_profile: "default".to_string(),
2411 ..Default::default()
2412 },
2413 );
2414 config.acp.default_agent = Some("agent-alpha".to_string());
2415 let server = AcpServer::new(config, AcpServerConfig::default());
2416
2417 let result = tokio::time::timeout(
2418 Duration::from_secs(2),
2419 server.handle_session_new(&serde_json::json!({
2420 "cwd": cwd.path().to_string_lossy(),
2421 "mcpServers": []
2422 })),
2423 )
2424 .await
2425 .expect("should not block")
2426 .expect("should select agent-alpha from config.acp.default_agent");
2427
2428 assert!(result["sessionId"].as_str().is_some());
2429 }
2430
2431 #[tokio::test]
2432 async fn session_new_explicit_alias_overrides_config_default_agent() {
2433 let cwd = tempfile::tempdir().unwrap();
2434 let mut config = Config {
2435 data_dir: cwd.path().to_path_buf(),
2436 providers: {
2437 let mut p = zeroclaw_config::providers::Providers::default();
2438 p.models.openrouter.insert(
2439 "default".to_string(),
2440 zeroclaw_config::schema::OpenRouterModelProviderConfig {
2441 base: zeroclaw_config::schema::ModelProviderConfig {
2442 api_key: Some("test-key".to_string()),
2443 model: Some("test-model".to_string()),
2444 ..Default::default()
2445 },
2446 },
2447 );
2448 p
2449 },
2450 ..Default::default()
2451 };
2452 config.risk_profiles.insert(
2453 "default".to_string(),
2454 zeroclaw_config::schema::RiskProfileConfig::default(),
2455 );
2456 config.agents.insert(
2457 "agent-alpha".to_string(),
2458 zeroclaw_config::schema::AliasedAgentConfig {
2459 model_provider: "openrouter.default".into(),
2460 risk_profile: "default".to_string(),
2461 ..Default::default()
2462 },
2463 );
2464 config.agents.insert(
2465 "agent-beta".to_string(),
2466 zeroclaw_config::schema::AliasedAgentConfig {
2467 model_provider: "openrouter.default".into(),
2468 risk_profile: "default".to_string(),
2469 ..Default::default()
2470 },
2471 );
2472 config.acp.default_agent = Some("agent-alpha".to_string());
2473 let server = AcpServer::new(config, AcpServerConfig::default());
2474
2475 let result = tokio::time::timeout(
2477 Duration::from_secs(2),
2478 server.handle_session_new(&serde_json::json!({
2479 "agentAlias": "agent-beta",
2480 "cwd": cwd.path().to_string_lossy(),
2481 "mcpServers": []
2482 })),
2483 )
2484 .await
2485 .expect("should not block")
2486 .expect("should use agent-beta despite default_agent = agent-alpha");
2487
2488 assert!(result["sessionId"].as_str().is_some());
2489 }
2490
2491 #[test]
2492 fn json_rpc_error_response_serialize() {
2493 let resp = JsonRpcResponse {
2494 jsonrpc: "2.0",
2495 result: None,
2496 error: Some(JsonRpcError {
2497 code: METHOD_NOT_FOUND,
2498 message: "Method not found".to_string(),
2499 data: None,
2500 }),
2501 id: Value::Number(1.into()),
2502 };
2503 let json = serde_json::to_string(&resp).unwrap();
2504 let parsed: Value = serde_json::from_str(&json).unwrap();
2505 assert!(parsed.get("error").is_some());
2506 assert_eq!(parsed["error"]["code"], -32601);
2507 assert!(parsed.get("result").is_none());
2508 }
2509
2510 #[test]
2511 fn json_rpc_notification_serialize() {
2512 let notif = JsonRpcNotification {
2513 jsonrpc: "2.0",
2514 method: "session/update",
2515 params: serde_json::json!({
2516 "sessionId": "test-sid",
2517 "update": {
2518 "sessionUpdate": "agent_message_chunk",
2519 "content": { "type": "text", "text": "hello" }
2520 }
2521 }),
2522 };
2523 let json = serde_json::to_string(¬if).unwrap();
2524 assert!(json.contains(r#""method":"session/update""#));
2525 assert!(json.contains(r#""sessionUpdate":"agent_message_chunk""#));
2526 assert!(json.contains(r#""text":"hello""#));
2527 }
2528
2529 #[test]
2530 fn test_prompt_parsing() {
2531 let string_params = serde_json::json!({"prompt": "hello world"});
2533 let result = AcpServer::parse_prompt(&string_params).unwrap();
2534 assert_eq!(result, "hello world");
2535
2536 let array_params = serde_json::json!({
2538 "prompt": [
2539 {"type": "text", "text": "part 1"},
2540 {"type": "text", "text": "part 2"}
2541 ]
2542 });
2543 let result = AcpServer::parse_prompt(&array_params).unwrap();
2544 assert_eq!(result, "part 1\n\npart 2");
2545
2546 let empty_array_params = serde_json::json!({"prompt": []});
2548 let result = AcpServer::parse_prompt(&empty_array_params);
2549 assert!(result.is_err());
2550 assert_eq!(result.unwrap_err().code, INVALID_PARAMS);
2551
2552 let no_text_params = serde_json::json!({
2553 "prompt": [
2554 {"type": "image", "data": "..."}
2555 ]
2556 });
2557 let result = AcpServer::parse_prompt(&no_text_params);
2558 assert!(result.is_err());
2559
2560 let resource_params = serde_json::json!({
2562 "prompt": [
2563 {"type": "text", "text": "analyze this file:"},
2564 {"type": "resource", "resource": {"uri": "file:///tmp/example.rs", "text": "fn main() { println!(\"hi\"); }", "mimeType": "text/rust"}}
2565 ]
2566 });
2567 let result = AcpServer::parse_prompt(&resource_params).unwrap();
2568 assert!(result.contains("analyze this file:"));
2569 assert!(result.contains("fn main() { println!(\"hi\"); }"));
2570 }
2571
2572 #[test]
2573 fn handle_initialize_default_model_absent_when_unconfigured() {
2574 let server = AcpServer::new(Config::default(), AcpServerConfig::default());
2575 let result = server.handle_initialize(&serde_json::json!({})).unwrap();
2576 assert!(
2577 result["_meta"]["zeroclaw"].get("defaultModel").is_none(),
2578 "defaultModel must be absent when no model_provider is configured, got: {}",
2579 result["_meta"]["zeroclaw"]["defaultModel"]
2580 );
2581 }
2582
2583 #[test]
2584 fn handle_initialize_default_model_reflects_configured_provider() {
2585 use zeroclaw_config::schema::{ModelProviderConfig, OllamaModelProviderConfig};
2586 let mut config = Config::default();
2587 config.providers.models.ollama.insert(
2588 "default".to_string(),
2589 OllamaModelProviderConfig {
2590 base: ModelProviderConfig {
2591 model: Some("llama3.2".to_string()),
2592 ..Default::default()
2593 },
2594 ..OllamaModelProviderConfig::default()
2595 },
2596 );
2597 let server = AcpServer::new(config, AcpServerConfig::default());
2598 let result = server.handle_initialize(&serde_json::json!({})).unwrap();
2599 assert_eq!(result["_meta"]["zeroclaw"]["defaultModel"], "llama3.2");
2600 }
2601
2602 #[test]
2603 fn prompt_result_preserves_content_string_shape() {
2604 let result = AcpServer::prompt_result("test-sid".to_string(), "end_turn", "hello".into());
2605 assert_eq!(result["sessionId"], "test-sid");
2606 assert_eq!(result["stopReason"], "end_turn");
2607 assert_eq!(result["content"], "hello");
2608 }
2609
2610 #[test]
2611 fn cancelled_prompt_result_preserves_content_string_shape() {
2612 let with_partial =
2613 AcpServer::cancelled_prompt_result("test-sid".to_string(), "partial text");
2614 assert_eq!(with_partial["sessionId"], "test-sid");
2615 assert_eq!(with_partial["stopReason"], "cancelled");
2616 assert_eq!(
2617 with_partial["content"],
2618 "partial text\n\n[interrupted by user]"
2619 );
2620
2621 let marker_only = AcpServer::cancelled_prompt_result("test-sid".to_string(), "");
2622 assert_eq!(marker_only["content"], "[interrupted by user]");
2623 }
2624
2625 #[test]
2626 fn test_tool_call_and_update_serialization() {
2627 let tool_call_notif = JsonRpcNotification {
2629 jsonrpc: "2.0",
2630 method: "session/update",
2631 params: serde_json::json!({
2632 "sessionId": "test-sid",
2633 "update": {
2634 "sessionUpdate": "tool_call",
2635 "toolCallId": "tc-12345",
2636 "name": "shell",
2637 "title": "shell",
2638 "kind": "execute",
2639 "rawInput": {"command": "ls -la"},
2640 "status": "pending"
2641 }
2642 }),
2643 };
2644 let json1 = serde_json::to_string(&tool_call_notif).unwrap();
2645 assert!(json1.contains("\"sessionUpdate\":\"tool_call\""));
2646 assert!(json1.contains("\"toolCallId\":\"tc-12345\""));
2647 assert!(json1.contains("\"name\":\"shell\""));
2648 assert!(json1.contains("\"title\":\"shell\""));
2649 assert!(json1.contains("\"kind\":\"execute\""));
2650 assert!(json1.contains("\"status\":\"pending\""));
2651 assert!(json1.contains("\"rawInput\""));
2652
2653 let tool_update_notif = JsonRpcNotification {
2655 jsonrpc: "2.0",
2656 method: "session/update",
2657 params: serde_json::json!({
2658 "sessionId": "test-sid",
2659 "update": {
2660 "sessionUpdate": "tool_call_update",
2661 "toolCallId": "tc-12345",
2662 "name": "shell",
2663 "title": "shell",
2664 "kind": "execute",
2665 "status": "completed",
2666 "rawOutput": "file1.txt\nfile2.txt",
2667 "body": "file1.txt\nfile2.txt",
2668 "content": [{
2669 "type": "content",
2670 "content": {
2671 "type": "text",
2672 "text": "file1.txt\nfile2.txt"
2673 }
2674 }]
2675 }
2676 }),
2677 };
2678 let json2 = serde_json::to_string(&tool_update_notif).unwrap();
2679 assert!(json2.contains("\"sessionUpdate\":\"tool_call_update\""));
2680 assert!(json2.contains("\"toolCallId\":\"tc-12345\""));
2681 assert!(json2.contains("\"name\":\"shell\""));
2682 assert!(json2.contains("\"status\":\"completed\""));
2683 assert!(json2.contains("\"rawOutput\""));
2684 assert!(json2.contains("\"body\""));
2685 assert!(json2.contains("\"content\""));
2686 assert!(json2.contains("\"type\":\"content\""));
2687 assert!(json2.contains("file1.txt"));
2688 assert!(json1.contains("tc-12345") && json2.contains("tc-12345"));
2690 }
2691
2692 #[test]
2693 fn file_edit_raw_input_uses_acp_diff_field_names() {
2694 let call = notification_for_turn_event(
2695 "sid",
2696 &TurnEvent::ToolCall {
2697 id: "tc-1".to_string(),
2698 name: "file_edit".to_string(),
2699 args: serde_json::json!({
2700 "path": "src/foo.rs",
2701 "old_string": "let x = 1;",
2702 "new_string": "let x = 2;"
2703 }),
2704 },
2705 );
2706 let v = serde_json::to_value(call.unwrap()).unwrap();
2707 let raw = &v["params"]["update"]["rawInput"];
2708 assert_eq!(raw["path"], "src/foo.rs");
2709 assert_eq!(raw["oldText"], "let x = 1;");
2710 assert_eq!(raw["newText"], "let x = 2;");
2711 assert!(
2712 raw.get("old_string").is_none(),
2713 "old_string must not appear in rawInput"
2714 );
2715 assert!(
2716 raw.get("new_string").is_none(),
2717 "new_string must not appear in rawInput"
2718 );
2719
2720 let content = &v["params"]["update"]["content"];
2721 assert!(content.is_array(), "file_edit must emit a content array");
2722 let diff = &content[0];
2723 assert_eq!(diff["type"], "diff");
2724 assert_eq!(diff["path"], "src/foo.rs");
2725 assert_eq!(diff["oldText"], "let x = 1;");
2726 assert_eq!(diff["newText"], "let x = 2;");
2727 }
2728
2729 #[test]
2730 fn file_write_raw_input_uses_acp_diff_field_names() {
2731 let call = notification_for_turn_event(
2732 "sid",
2733 &TurnEvent::ToolCall {
2734 id: "tc-2".to_string(),
2735 name: "file_write".to_string(),
2736 args: serde_json::json!({
2737 "path": "src/new.rs",
2738 "content": "fn main() {}"
2739 }),
2740 },
2741 );
2742 let v = serde_json::to_value(call.unwrap()).unwrap();
2743 let raw = &v["params"]["update"]["rawInput"];
2744 assert_eq!(raw["path"], "src/new.rs");
2745 assert_eq!(raw["newText"], "fn main() {}");
2746 assert!(
2747 raw.get("oldText").is_none(),
2748 "oldText must not appear in file_write rawInput"
2749 );
2750 assert!(
2751 raw.get("content").is_none(),
2752 "content must not appear in rawInput"
2753 );
2754
2755 let content = &v["params"]["update"]["content"];
2756 assert!(content.is_array(), "file_write must emit a content array");
2757 let diff = &content[0];
2758 assert_eq!(diff["type"], "diff");
2759 assert_eq!(diff["path"], "src/new.rs");
2760 assert_eq!(diff["newText"], "fn main() {}");
2761 assert!(
2762 diff.get("oldText").is_none(),
2763 "oldText must be absent for file_write diff"
2764 );
2765 }
2766
2767 #[test]
2768 fn map_tool_kind_uses_explicit_tool_names() {
2769 assert_eq!(map_tool_kind("memory_forget"), "delete");
2770 assert_eq!(map_tool_kind("memory_purge"), "delete");
2771 assert_eq!(map_tool_kind("cron_run"), "execute");
2772 assert_eq!(map_tool_kind("file_read"), "other");
2773 assert_eq!(map_tool_kind("knowledge"), "other");
2774 assert_eq!(map_tool_kind("web_fetch"), "other");
2775 assert_eq!(map_tool_kind("file_write"), "edit");
2776 assert_eq!(map_tool_kind("unknown_tool"), "other");
2777 }
2778
2779 #[test]
2780 fn turn_tool_events_include_client_visible_tool_fields() {
2781 let call = notification_for_turn_event(
2782 "test-sid",
2783 &TurnEvent::ToolCall {
2784 id: "tc-12345".to_string(),
2785 name: "shell".to_string(),
2786 args: serde_json::json!({"command": "ls -la"}),
2787 },
2788 );
2789 let call_value =
2790 serde_json::to_value(call.expect("ToolCall maps to a notification")).unwrap();
2791 assert_eq!(call_value["method"], "session/update");
2792 assert_eq!(call_value["params"]["update"]["sessionUpdate"], "tool_call");
2793 assert_eq!(call_value["params"]["update"]["toolCallId"], "tc-12345");
2794 assert_eq!(call_value["params"]["update"]["name"], "shell");
2795 assert_eq!(call_value["params"]["update"]["title"], "shell");
2796 assert_eq!(call_value["params"]["update"]["kind"], "execute");
2797 assert_eq!(
2798 call_value["params"]["update"]["rawInput"],
2799 serde_json::json!({"command": "ls -la"})
2800 );
2801
2802 let result = notification_for_turn_event(
2803 "test-sid",
2804 &TurnEvent::ToolResult {
2805 id: "tc-12345".to_string(),
2806 name: "shell".to_string(),
2807 output: "file1.txt\nfile2.txt".to_string(),
2808 },
2809 );
2810 let result_value =
2811 serde_json::to_value(result.expect("ToolResult maps to a notification")).unwrap();
2812 assert_eq!(
2813 result_value["params"]["update"]["sessionUpdate"],
2814 "tool_call_update"
2815 );
2816 assert_eq!(result_value["params"]["update"]["toolCallId"], "tc-12345");
2817 assert_eq!(result_value["params"]["update"]["name"], "shell");
2818 assert_eq!(result_value["params"]["update"]["title"], "shell");
2819 assert_eq!(result_value["params"]["update"]["kind"], "execute");
2820 assert_eq!(result_value["params"]["update"]["status"], "completed");
2821 assert_eq!(
2822 result_value["params"]["update"]["rawOutput"],
2823 "file1.txt\nfile2.txt"
2824 );
2825 assert_eq!(
2826 result_value["params"]["update"]["body"],
2827 "file1.txt\nfile2.txt"
2828 );
2829 assert_eq!(
2830 result_value["params"]["update"]["content"][0]["content"]["text"],
2831 "file1.txt\nfile2.txt"
2832 );
2833 }
2834
2835 #[tokio::test]
2843 async fn session_stop_finds_session_during_active_prompt_turn() {
2844 let cwd = tempfile::tempdir().unwrap();
2845 let mut config = Config {
2846 data_dir: cwd.path().to_path_buf(),
2847 providers: {
2848 let mut p = zeroclaw_config::providers::Providers::default();
2849 p.models.anthropic.insert(
2850 "default".to_string(),
2851 zeroclaw_config::schema::AnthropicModelProviderConfig {
2852 base: zeroclaw_config::schema::ModelProviderConfig {
2853 model: Some("claude-haiku-4-5".to_string()),
2854 ..Default::default()
2855 },
2856 },
2857 );
2858 p
2859 },
2860 ..Default::default()
2861 };
2862 config.risk_profiles.insert(
2863 "default".to_string(),
2864 zeroclaw_config::schema::RiskProfileConfig::default(),
2865 );
2866 config.agents.insert(
2867 "test-agent".to_string(),
2868 zeroclaw_config::schema::AliasedAgentConfig {
2869 model_provider: "anthropic.default".into(),
2870 risk_profile: "default".to_string(),
2871 ..Default::default()
2872 },
2873 );
2874 let server = Arc::new(AcpServer::new(config, AcpServerConfig::default()));
2875
2876 let new_result = server
2878 .handle_session_new(&serde_json::json!({
2879 "cwd": cwd.path().to_string_lossy(),
2880 "agentAlias": "test-agent"
2881 }))
2882 .await
2883 .expect("session/new must succeed");
2884 let session_id = new_result["sessionId"].as_str().unwrap().to_string();
2885
2886 let session_arc = {
2888 let sessions = server.sessions.lock().await;
2889 sessions.get(&session_id).cloned().unwrap()
2890 };
2891 let _guard = session_arc.lock().await;
2892
2893 let server_clone = Arc::clone(&server);
2897 let sid_clone = session_id.clone();
2898 let stop_result = tokio::time::timeout(Duration::from_millis(100), async move {
2899 server_clone
2900 .handle_session_stop(&serde_json::json!({ "sessionId": sid_clone }))
2901 .await
2902 })
2903 .await;
2904
2905 match stop_result {
2906 Err(_timeout) => {} Ok(Ok(_)) => panic!("stop returned Ok without the lock being released"),
2908 Ok(Err(e)) => {
2909 assert_ne!(
2910 e.code, SESSION_NOT_FOUND,
2911 "session/stop must not return SESSION_NOT_FOUND while a turn is in flight"
2912 );
2913 }
2914 }
2915 }
2916
2917 #[tokio::test]
2918 async fn session_new_persists_to_store() {
2919 let cwd = tempfile::tempdir().unwrap();
2920 let store =
2921 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
2922 let server = Arc::new(AcpServer::new_with_store(
2923 make_test_config(cwd.path()),
2924 AcpServerConfig::default(),
2925 Arc::clone(&store),
2926 ));
2927
2928 let result = server
2929 .handle_session_new(&serde_json::json!({
2930 "cwd": cwd.path().to_string_lossy()
2931 }))
2932 .await
2933 .expect("session/new must succeed");
2934
2935 let session_id = result["sessionId"].as_str().unwrap();
2936
2937 let data = store.load_session(session_id).unwrap();
2939 assert!(
2940 data.is_some(),
2941 "session/new must persist to AcpSessionStore"
2942 );
2943 }
2944
2945 #[tokio::test]
2946 async fn session_new_without_store_still_works() {
2947 let cwd = tempfile::tempdir().unwrap();
2948 let server = Arc::new(AcpServer::new(
2949 make_test_config(cwd.path()),
2950 AcpServerConfig::default(),
2951 ));
2952
2953 let result = server
2954 .handle_session_new(&serde_json::json!({
2955 "cwd": cwd.path().to_string_lossy()
2956 }))
2957 .await
2958 .expect("session/new must succeed without a store");
2959
2960 let session_id = result["sessionId"].as_str().unwrap();
2961 assert!(server.sessions.lock().await.contains_key(session_id));
2962 }
2963
2964 fn make_test_config(cwd: &std::path::Path) -> Config {
2965 let mut cfg = Config {
2966 data_dir: cwd.to_path_buf(),
2967 ..Default::default()
2968 };
2969 cfg.providers.models.anthropic.insert(
2970 "default".to_string(),
2971 zeroclaw_config::schema::AnthropicModelProviderConfig {
2972 base: zeroclaw_config::schema::ModelProviderConfig {
2973 model: Some("claude-haiku-4-5".to_string()),
2974 ..Default::default()
2975 },
2976 },
2977 );
2978 cfg.risk_profiles.insert(
2979 "default".to_string(),
2980 zeroclaw_config::schema::RiskProfileConfig::default(),
2981 );
2982 cfg.agents.insert(
2983 "test-agent".to_string(),
2984 zeroclaw_config::schema::AliasedAgentConfig {
2985 model_provider: "anthropic.default".into(),
2986 risk_profile: "default".to_string(),
2987 ..Default::default()
2988 },
2989 );
2990 cfg
2991 }
2992
2993 #[tokio::test]
2995 async fn session_cancel_idle_session_is_noop() {
2996 let cwd = tempfile::tempdir().unwrap();
2997 let server = Arc::new(AcpServer::new(
2998 make_test_config(cwd.path()),
2999 AcpServerConfig::default(),
3000 ));
3001
3002 let new_result = server
3003 .handle_session_new(&serde_json::json!({
3004 "cwd": cwd.path().to_string_lossy(),
3005 "agentAlias": "test-agent"
3006 }))
3007 .await
3008 .expect("session/new must succeed");
3009 let session_id = new_result["sessionId"].as_str().unwrap().to_string();
3010
3011 let result = server
3013 .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
3014 .await;
3015 assert!(result.is_ok(), "idle cancel must succeed: {result:?}");
3016 }
3017
3018 #[tokio::test]
3021 async fn session_cancel_unknown_session_is_noop() {
3022 let cwd = tempfile::tempdir().unwrap();
3023 let server = Arc::new(AcpServer::new(
3024 make_test_config(cwd.path()),
3025 AcpServerConfig::default(),
3026 ));
3027
3028 let result = server
3029 .handle_session_cancel(&serde_json::json!({ "sessionId": "sess_does_not_exist" }))
3030 .await;
3031 assert!(
3032 result.is_ok(),
3033 "unknown-session cancel must succeed: {result:?}"
3034 );
3035 }
3036
3037 #[tokio::test]
3038 async fn session_cancel_accepts_snake_case_session_id() {
3039 let cwd = tempfile::tempdir().unwrap();
3040 let server = Arc::new(AcpServer::new(
3041 make_test_config(cwd.path()),
3042 AcpServerConfig::default(),
3043 ));
3044
3045 let session_id = "sess_snake_case_cancel";
3046 let active_token = tokio_util::sync::CancellationToken::new();
3047 server
3048 .register_cancel_token(session_id, active_token.clone())
3049 .expect("active turn should register token");
3050
3051 server
3052 .handle_session_cancel(&serde_json::json!({ "session_id": session_id }))
3053 .await
3054 .expect("snake_case session_id should cancel the active turn");
3055
3056 assert!(active_token.is_cancelled());
3057 }
3058
3059 #[tokio::test]
3062 async fn register_cancel_token_rejects_concurrent_prompt_for_session() {
3063 let cwd = tempfile::tempdir().unwrap();
3064 let server = Arc::new(AcpServer::new(
3065 make_test_config(cwd.path()),
3066 AcpServerConfig::default(),
3067 ));
3068
3069 let session_id = "sess_active_turn";
3070 let active_token = tokio_util::sync::CancellationToken::new();
3071 let queued_token = tokio_util::sync::CancellationToken::new();
3072
3073 server
3074 .register_cancel_token(session_id, active_token.clone())
3075 .expect("first prompt should register its token");
3076 let err = server
3077 .register_cancel_token(session_id, queued_token.clone())
3078 .expect_err("second prompt must not overwrite active token");
3079
3080 assert_eq!(err.code, SESSION_BUSY);
3081 assert!(
3082 err.message.contains("active prompt turn"),
3083 "error should explain why prompt was rejected: {}",
3084 err.message
3085 );
3086
3087 server
3088 .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
3089 .await
3090 .expect("cancel should still target active token");
3091
3092 assert!(active_token.is_cancelled());
3093 assert!(
3094 !queued_token.is_cancelled(),
3095 "rejected prompt's token must not become the active cancel target"
3096 );
3097 }
3098
3099 #[tokio::test]
3100 async fn session_prompt_rejects_concurrent_turn_before_agent_starts() {
3101 let cwd = tempfile::tempdir().unwrap();
3102 let server = Arc::new(AcpServer::new(
3103 make_test_config(cwd.path()),
3104 AcpServerConfig::default(),
3105 ));
3106
3107 let new_result = server
3108 .handle_session_new(&serde_json::json!({
3109 "cwd": cwd.path().to_string_lossy(),
3110 "agentAlias": "test-agent"
3111 }))
3112 .await
3113 .expect("session/new must succeed");
3114 let session_id = new_result["sessionId"].as_str().unwrap().to_string();
3115 let active_token = tokio_util::sync::CancellationToken::new();
3116 server
3117 .register_cancel_token(&session_id, active_token.clone())
3118 .expect("simulated active turn should register token");
3119
3120 let err = server
3121 .handle_session_prompt(
3122 &serde_json::json!({
3123 "sessionId": session_id.clone(),
3124 "prompt": "queued prompt"
3125 }),
3126 &serde_json::json!(2),
3127 )
3128 .await
3129 .expect_err("concurrent prompt must be rejected before model_provider work starts");
3130
3131 assert_eq!(err.code, SESSION_BUSY);
3132 server
3133 .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
3134 .await
3135 .expect("cancel should still target the original active token");
3136 assert!(active_token.is_cancelled());
3137 }
3138
3139 #[tokio::test]
3144 async fn cancel_tokens_map_remove_works() {
3145 let cwd = tempfile::tempdir().unwrap();
3146 let config = Config {
3147 data_dir: cwd.path().to_path_buf(),
3148 ..Default::default()
3149 };
3150 let server = Arc::new(AcpServer::new(config, AcpServerConfig::default()));
3151
3152 let session_id = "sess_token_leak_test".to_string();
3154 let token = tokio_util::sync::CancellationToken::new();
3155 server
3156 .cancel_tokens
3157 .lock()
3158 .expect("cancel_tokens lock poisoned")
3159 .insert(session_id.clone(), token);
3160
3161 server
3163 .cancel_tokens
3164 .lock()
3165 .expect("cancel_tokens lock poisoned")
3166 .remove(&session_id);
3167
3168 let remaining = server
3169 .cancel_tokens
3170 .lock()
3171 .expect("cancel_tokens lock poisoned")
3172 .len();
3173 assert_eq!(remaining, 0, "cancel token must be removed after turn ends");
3174 }
3175
3176 #[tokio::test]
3177 async fn session_load_restores_history_and_streams_notifications() {
3178 use zeroclaw_api::model_provider::{ChatMessage, ConversationMessage};
3179 let cwd = tempfile::tempdir().unwrap();
3180 let store =
3181 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3182
3183 let session_id = "sess-load-test";
3184 store
3185 .create_session(session_id, "test-agent", &cwd.path().to_string_lossy())
3186 .unwrap();
3187 store
3188 .append_turn(
3189 session_id,
3190 &[
3191 ConversationMessage::Chat(ChatMessage::user("hello")),
3192 ConversationMessage::Chat(ChatMessage::assistant("hi there")),
3193 ],
3194 )
3195 .unwrap();
3196
3197 let (writer_tx, mut writer_rx) = tokio::sync::mpsc::channel::<String>(64);
3198 let server = Arc::new(AcpServer::new_with_writer_and_store(
3199 make_test_config(cwd.path()),
3200 AcpServerConfig::default(),
3201 writer_tx,
3202 Arc::clone(&store),
3203 ));
3204
3205 let result = server
3206 .handle_session_load(&serde_json::json!({
3207 "sessionId": session_id,
3208 "cwd": cwd.path().to_string_lossy()
3209 }))
3210 .await
3211 .expect("session/load must succeed");
3212
3213 assert_eq!(result, serde_json::json!({}));
3214
3215 assert!(server.sessions.lock().await.contains_key(session_id));
3217
3218 let mut notifications = Vec::new();
3220 while let Ok(msg) = writer_rx.try_recv() {
3221 notifications.push(msg);
3222 }
3223
3224 assert_eq!(
3226 notifications.len(),
3227 2,
3228 "expected 2 notifications, got: {notifications:?}"
3229 );
3230 let n0: serde_json::Value = serde_json::from_str(¬ifications[0]).unwrap();
3231 assert_eq!(
3232 n0["params"]["update"]["sessionUpdate"],
3233 "user_message_chunk"
3234 );
3235 assert_eq!(n0["params"]["update"]["content"]["text"], "hello");
3236 let n1: serde_json::Value = serde_json::from_str(¬ifications[1]).unwrap();
3237 assert_eq!(
3238 n1["params"]["update"]["sessionUpdate"],
3239 "agent_message_chunk"
3240 );
3241 assert_eq!(n1["params"]["update"]["content"]["text"], "hi there");
3242 }
3243
3244 #[tokio::test]
3245 async fn session_load_returns_not_found_for_unknown_id() {
3246 let cwd = tempfile::tempdir().unwrap();
3247 let store =
3248 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3249 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3250 let server = AcpServer::new_with_writer_and_store(
3251 make_test_config(cwd.path()),
3252 AcpServerConfig::default(),
3253 writer_tx,
3254 store,
3255 );
3256
3257 let err = server
3258 .handle_session_load(&serde_json::json!({ "sessionId": "ghost" }))
3259 .await
3260 .expect_err("unknown session must fail");
3261
3262 assert_eq!(err.code, SESSION_NOT_FOUND);
3263 }
3264
3265 #[tokio::test]
3266 async fn session_load_rejects_already_active_session() {
3267 let cwd = tempfile::tempdir().unwrap();
3268 let store =
3269 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3270 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3271 let server = Arc::new(AcpServer::new_with_writer_and_store(
3272 make_test_config(cwd.path()),
3273 AcpServerConfig::default(),
3274 writer_tx,
3275 Arc::clone(&store),
3276 ));
3277
3278 let session_id = "sess-already-active";
3280 store
3281 .create_session(session_id, "test-agent", &cwd.path().to_string_lossy())
3282 .unwrap();
3283 server
3284 .handle_session_load(&serde_json::json!({
3285 "sessionId": session_id,
3286 "cwd": cwd.path().to_string_lossy()
3287 }))
3288 .await
3289 .unwrap();
3290
3291 let err = server
3293 .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3294 .await
3295 .expect_err("session/load for active session must fail");
3296
3297 assert_eq!(err.code, INVALID_PARAMS);
3298 }
3299
3300 #[tokio::test]
3301 async fn session_resume_restores_without_replay() {
3302 use zeroclaw_api::model_provider::{ChatMessage, ConversationMessage};
3303 let cwd = tempfile::tempdir().unwrap();
3304 let store =
3305 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3306
3307 let session_id = "sess-resume-test";
3308 store
3309 .create_session(session_id, "test-agent", &cwd.path().to_string_lossy())
3310 .unwrap();
3311 store
3312 .append_turn(
3313 session_id,
3314 &[ConversationMessage::Chat(ChatMessage::user("hello"))],
3315 )
3316 .unwrap();
3317
3318 let (writer_tx, mut writer_rx) = tokio::sync::mpsc::channel::<String>(64);
3319 let server = Arc::new(AcpServer::new_with_writer_and_store(
3320 make_test_config(cwd.path()),
3321 AcpServerConfig::default(),
3322 writer_tx,
3323 Arc::clone(&store),
3324 ));
3325
3326 let result = server
3327 .handle_session_resume(&serde_json::json!({
3328 "sessionId": session_id,
3329 "cwd": cwd.path().to_string_lossy()
3330 }))
3331 .await
3332 .expect("session/resume must succeed");
3333
3334 assert_eq!(result, serde_json::json!({}));
3336
3337 assert!(server.sessions.lock().await.contains_key(session_id));
3339
3340 assert!(
3342 writer_rx.try_recv().is_err(),
3343 "session/resume must not emit session/update notifications"
3344 );
3345 }
3346
3347 #[tokio::test]
3348 async fn session_close_releases_memory_but_keeps_store_record() {
3349 let cwd = tempfile::tempdir().unwrap();
3350 let store =
3351 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3352 let server = Arc::new(AcpServer::new_with_store(
3353 make_test_config(cwd.path()),
3354 AcpServerConfig::default(),
3355 Arc::clone(&store),
3356 ));
3357
3358 let new_result = server
3359 .handle_session_new(&serde_json::json!({
3360 "cwd": cwd.path().to_string_lossy()
3361 }))
3362 .await
3363 .expect("session/new must succeed");
3364 let session_id = new_result["sessionId"].as_str().unwrap().to_string();
3365
3366 assert!(server.sessions.lock().await.contains_key(&session_id));
3367
3368 let result = server
3369 .handle_session_close(&serde_json::json!({ "sessionId": &session_id }))
3370 .await
3371 .expect("session/close must succeed");
3372
3373 assert_eq!(result, serde_json::json!({}));
3374
3375 assert!(!server.sessions.lock().await.contains_key(&session_id));
3377
3378 let data = store.load_session(&session_id).unwrap();
3380 assert!(
3381 data.is_some(),
3382 "session/close must not delete the DB record"
3383 );
3384 }
3385
3386 #[tokio::test]
3387 async fn session_close_returns_not_found_for_unknown_session() {
3388 let cwd = tempfile::tempdir().unwrap();
3389 let server = AcpServer::new(make_test_config(cwd.path()), AcpServerConfig::default());
3390
3391 let err = server
3392 .handle_session_close(&serde_json::json!({ "sessionId": "ghost" }))
3393 .await
3394 .expect_err("unknown session must fail");
3395
3396 assert_eq!(err.code, SESSION_NOT_FOUND);
3397 }
3398
3399 #[tokio::test]
3402 async fn session_load_respects_max_sessions() {
3403 let cwd = tempfile::tempdir().unwrap();
3404 let store =
3405 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3406
3407 let stored_id = "sess-load-limit-test";
3409 store
3410 .create_session(stored_id, "test-agent", &cwd.path().to_string_lossy())
3411 .unwrap();
3412
3413 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3414 let server = Arc::new(AcpServer::new_with_writer_and_store(
3415 make_test_config(cwd.path()),
3416 AcpServerConfig {
3417 max_sessions: 1,
3418 ..AcpServerConfig::default()
3419 },
3420 writer_tx,
3421 Arc::clone(&store),
3422 ));
3423
3424 server
3426 .handle_session_new(&serde_json::json!({
3427 "cwd": cwd.path().to_string_lossy()
3428 }))
3429 .await
3430 .expect("session/new must succeed when under limit");
3431
3432 let err = server
3434 .handle_session_load(&serde_json::json!({ "sessionId": stored_id }))
3435 .await
3436 .expect_err("session/load must fail when max_sessions reached");
3437
3438 assert_eq!(
3439 err.code, SESSION_LIMIT_REACHED,
3440 "expected SESSION_LIMIT_REACHED, got: {:?}",
3441 err
3442 );
3443 }
3444
3445 #[tokio::test]
3448 async fn session_resume_respects_max_sessions() {
3449 let cwd = tempfile::tempdir().unwrap();
3450 let store =
3451 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3452
3453 let stored_id = "sess-resume-limit-test";
3455 store
3456 .create_session(stored_id, "test-agent", &cwd.path().to_string_lossy())
3457 .unwrap();
3458
3459 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3460 let server = Arc::new(AcpServer::new_with_writer_and_store(
3461 make_test_config(cwd.path()),
3462 AcpServerConfig {
3463 max_sessions: 1,
3464 ..AcpServerConfig::default()
3465 },
3466 writer_tx,
3467 Arc::clone(&store),
3468 ));
3469
3470 server
3472 .handle_session_new(&serde_json::json!({
3473 "cwd": cwd.path().to_string_lossy()
3474 }))
3475 .await
3476 .expect("session/new must succeed when under limit");
3477
3478 let err = server
3480 .handle_session_resume(&serde_json::json!({ "sessionId": stored_id }))
3481 .await
3482 .expect_err("session/resume must fail when max_sessions reached");
3483
3484 assert_eq!(
3485 err.code, SESSION_LIMIT_REACHED,
3486 "expected SESSION_LIMIT_REACHED, got: {:?}",
3487 err
3488 );
3489 }
3490
3491 #[tokio::test]
3494 async fn session_load_releases_reservation_on_store_error() {
3495 let cwd = tempfile::tempdir().unwrap();
3496 let store =
3497 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3498
3499 let session_id = "sess-load-store-err";
3500 store
3501 .create_session(session_id, "test-agent", &cwd.path().to_string_lossy())
3502 .unwrap();
3503
3504 let db_path = cwd.path().join("sessions/acp-sessions.db");
3507 {
3508 let second =
3509 rusqlite::Connection::open(&db_path).expect("second conn must open same db");
3510 second
3511 .execute_batch(
3512 "DROP TABLE IF EXISTS acp_messages; DROP TABLE IF EXISTS acp_sessions;",
3513 )
3514 .expect("schema drop must succeed on second conn");
3515 }
3516
3517 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3518 let server = Arc::new(AcpServer::new_with_writer_and_store(
3519 make_test_config(cwd.path()),
3520 AcpServerConfig::default(),
3521 writer_tx,
3522 Arc::clone(&store),
3523 ));
3524
3525 let first_err = server
3527 .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3528 .await
3529 .expect_err("session/load must fail when store returns Err");
3530 assert_eq!(
3531 first_err.code, INTERNAL_ERROR,
3532 "expected INTERNAL_ERROR from store failure, got: {:?}",
3533 first_err
3534 );
3535
3536 let second_err = server
3540 .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3541 .await
3542 .expect_err("second session/load must also fail");
3543 assert_eq!(
3544 second_err.code, INTERNAL_ERROR,
3545 "second load must fail with INTERNAL_ERROR, not INVALID_PARAMS (leaked slot); got: {:?}",
3546 second_err
3547 );
3548 }
3549
3550 #[tokio::test]
3553 async fn session_resume_releases_reservation_on_store_error() {
3554 let cwd = tempfile::tempdir().unwrap();
3555 let store =
3556 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3557
3558 let session_id = "sess-resume-store-err";
3559 store
3560 .create_session(session_id, "test-agent", &cwd.path().to_string_lossy())
3561 .unwrap();
3562
3563 let db_path = cwd.path().join("sessions/acp-sessions.db");
3564 {
3565 let second =
3566 rusqlite::Connection::open(&db_path).expect("second conn must open same db");
3567 second
3568 .execute_batch(
3569 "DROP TABLE IF EXISTS acp_messages; DROP TABLE IF EXISTS acp_sessions;",
3570 )
3571 .expect("schema drop must succeed on second conn");
3572 }
3573
3574 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3575 let server = Arc::new(AcpServer::new_with_writer_and_store(
3576 make_test_config(cwd.path()),
3577 AcpServerConfig::default(),
3578 writer_tx,
3579 Arc::clone(&store),
3580 ));
3581
3582 let first_err = server
3583 .handle_session_resume(&serde_json::json!({ "sessionId": session_id }))
3584 .await
3585 .expect_err("session/resume must fail when store returns Err");
3586 assert_eq!(
3587 first_err.code, INTERNAL_ERROR,
3588 "expected INTERNAL_ERROR from store failure, got: {:?}",
3589 first_err
3590 );
3591
3592 let second_err = server
3593 .handle_session_resume(&serde_json::json!({ "sessionId": session_id }))
3594 .await
3595 .expect_err("second session/resume must also fail");
3596 assert_eq!(
3597 second_err.code, INTERNAL_ERROR,
3598 "second resume must fail with INTERNAL_ERROR, not INVALID_PARAMS (leaked slot); got: {:?}",
3599 second_err
3600 );
3601 }
3602}