1use anyhow::Result;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::{HashMap, HashSet};
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::time::{Duration, Instant};
28use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
29use tokio::sync::{Mutex, mpsc, oneshot};
30use uuid::Uuid;
31use zeroclaw_api::model_provider::ConversationMessage;
32use zeroclaw_config::schema::Config;
33use zeroclaw_infra::acp_session_store::AcpSessionStore;
34use zeroclaw_runtime::agent::agent::{Agent, TurnEvent};
35use zeroclaw_runtime::tools::CanvasStore;
36
37use crate::acp_channel::AcpChannel;
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(default)]
44pub struct AcpServerConfig {
45 pub max_sessions: usize,
47 pub session_timeout_secs: u64,
49}
50
51impl Default for AcpServerConfig {
52 fn default() -> Self {
53 Self {
54 max_sessions: 10,
55 session_timeout_secs: 3600,
56 }
57 }
58}
59
60#[derive(Debug, Deserialize)]
63struct JsonRpcRequest {
64 jsonrpc: String,
65 method: String,
66 #[serde(default)]
67 params: Value,
68 id: Option<Value>,
69}
70
71#[derive(Debug, Serialize)]
72struct JsonRpcResponse {
73 jsonrpc: &'static str,
74 #[serde(skip_serializing_if = "Option::is_none")]
75 result: Option<Value>,
76 #[serde(skip_serializing_if = "Option::is_none")]
77 error: Option<JsonRpcError>,
78 id: Value,
79}
80
81#[derive(Debug, Serialize)]
82struct JsonRpcNotification {
83 jsonrpc: &'static str,
84 method: &'static str,
85 params: Value,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct JsonRpcError {
90 pub code: i32,
91 pub message: String,
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub data: Option<Value>,
94}
95
96const PARSE_ERROR: i32 = -32700;
98const INVALID_REQUEST: i32 = -32600;
99const METHOD_NOT_FOUND: i32 = -32601;
100const INVALID_PARAMS: i32 = -32602;
101const INTERNAL_ERROR: i32 = -32603;
102
103const SESSION_NOT_FOUND: i32 = -32000;
105const SESSION_LIMIT_REACHED: i32 = -32001;
106const SESSION_BUSY: i32 = -32002;
107const ACP_PROTOCOL_VERSION: u64 = 1;
108
109type PendingResponder = oneshot::Sender<std::result::Result<Value, JsonRpcError>>;
113
114pub struct RpcOutbound {
121 writer_tx: mpsc::Sender<String>,
122 pending: std::sync::Mutex<HashMap<String, PendingResponder>>,
123 next_id: AtomicU64,
124}
125
126struct PendingRequestGuard<'a> {
127 pending: &'a std::sync::Mutex<HashMap<String, PendingResponder>>,
128 id: String,
129}
130
131impl Drop for PendingRequestGuard<'_> {
132 fn drop(&mut self) {
133 self.pending
134 .lock()
135 .unwrap_or_else(|e| e.into_inner())
136 .remove(&self.id);
137 }
138}
139
140impl RpcOutbound {
141 fn new(writer_tx: mpsc::Sender<String>) -> Self {
142 Self {
143 writer_tx,
144 pending: std::sync::Mutex::new(HashMap::new()),
145 next_id: AtomicU64::new(0),
146 }
147 }
148
149 pub async fn notify(&self, method: &str, params: Value) {
151 let n = serde_json::json!({
152 "jsonrpc": "2.0",
153 "method": method,
154 "params": params,
155 });
156 if let Ok(s) = serde_json::to_string(&n)
157 && self.writer_tx.send(s).await.is_err()
158 {
159 ::zeroclaw_log::record!(
160 WARN,
161 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
162 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
163 "ACP writer task closed; dropping outbound notification"
164 );
165 }
166 }
167
168 pub async fn request(
170 &self,
171 method: &str,
172 params: Value,
173 ) -> std::result::Result<Value, JsonRpcError> {
174 let n = self.next_id.fetch_add(1, Ordering::Relaxed);
175 let id = format!("zc-out-{n}");
176 let (tx, rx) = oneshot::channel();
177 {
178 let mut pending = self.pending.lock().unwrap_or_else(|e| e.into_inner());
179 pending.insert(id.clone(), tx);
180 }
181 let _pending_guard = PendingRequestGuard {
182 pending: &self.pending,
183 id: id.clone(),
184 };
185 let req = serde_json::json!({
186 "jsonrpc": "2.0",
187 "method": method,
188 "params": params,
189 "id": id,
190 });
191 let body = match serde_json::to_string(&req) {
192 Ok(s) => s,
193 Err(e) => {
194 return Err(JsonRpcError {
195 code: INTERNAL_ERROR,
196 message: format!("Failed to encode request: {e}"),
197 data: None,
198 });
199 }
200 };
201 if self.writer_tx.send(body).await.is_err() {
202 return Err(JsonRpcError {
203 code: INTERNAL_ERROR,
204 message: "ACP writer task closed".to_string(),
205 data: None,
206 });
207 }
208 rx.await.unwrap_or_else(|_| {
209 Err(JsonRpcError {
210 code: INTERNAL_ERROR,
211 message: "Outbound RPC dropped".to_string(),
212 data: None,
213 })
214 })
215 }
216
217 pub(crate) fn dispatch_response(
220 &self,
221 id_str: &str,
222 result: Option<Value>,
223 error: Option<JsonRpcError>,
224 ) {
225 let responder = self
226 .pending
227 .lock()
228 .unwrap_or_else(|e| e.into_inner())
229 .remove(id_str);
230 if let Some(tx) = responder {
231 let payload = if let Some(err) = error {
232 Err(err)
233 } else {
234 Ok(result.unwrap_or(Value::Null))
235 };
236 let _ = tx.send(payload);
237 } else {
238 ::zeroclaw_log::record!(
239 DEBUG,
240 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
241 .with_attrs(::serde_json::json!({"id_str": id_str})),
242 "No pending outbound RPC matched response id="
243 );
244 }
245 }
246}
247
248#[cfg(test)]
249impl RpcOutbound {
250 pub fn for_testing(writer_tx: mpsc::Sender<String>) -> Self {
253 Self::new(writer_tx)
254 }
255
256 pub fn dispatch_response_for_test(
259 &self,
260 id_str: &str,
261 result: Option<Value>,
262 error: Option<JsonRpcError>,
263 ) {
264 self.dispatch_response(id_str, result, error);
265 }
266
267 pub fn pending_count_for_test(&self) -> usize {
268 self.pending.lock().unwrap_or_else(|e| e.into_inner()).len()
269 }
270}
271
272struct Session {
275 agent: Agent,
276 #[allow(dead_code)] created_at: Instant,
278 last_active: Instant,
279}
280
281pub struct AcpServer {
284 config: Config,
285 acp_config: AcpServerConfig,
286 sessions: Arc<Mutex<HashMap<String, Arc<Mutex<Session>>>>>,
287 rpc: Arc<RpcOutbound>,
288 writer_rx: std::sync::Mutex<Option<mpsc::Receiver<String>>>,
291 cancel_tokens: Arc<std::sync::Mutex<HashMap<String, tokio_util::sync::CancellationToken>>>,
302 loading_sessions: Arc<tokio::sync::Mutex<HashSet<String>>>,
307 store: Option<Arc<AcpSessionStore>>,
308 canvas_store: Option<CanvasStore>,
313}
314
315impl AcpServer {
316 pub fn new(config: Config, acp_config: AcpServerConfig) -> Self {
317 let (writer_tx, writer_rx) = mpsc::channel::<String>(256);
318 Self::with_writer(config, acp_config, writer_tx, Some(writer_rx), None)
319 }
320
321 pub fn new_with_writer(
322 config: Config,
323 acp_config: AcpServerConfig,
324 writer_tx: mpsc::Sender<String>,
325 ) -> Self {
326 Self::with_writer(config, acp_config, writer_tx, None, None)
327 }
328
329 pub fn new_with_store(
330 config: Config,
331 acp_config: AcpServerConfig,
332 store: Arc<AcpSessionStore>,
333 ) -> Self {
334 let (writer_tx, writer_rx) = mpsc::channel::<String>(256);
335 Self::with_writer(config, acp_config, writer_tx, Some(writer_rx), Some(store))
336 }
337
338 pub fn new_with_writer_and_store(
339 config: Config,
340 acp_config: AcpServerConfig,
341 writer_tx: mpsc::Sender<String>,
342 store: Arc<AcpSessionStore>,
343 ) -> Self {
344 Self::with_writer(config, acp_config, writer_tx, None, Some(store))
345 }
346
347 fn with_writer(
348 config: Config,
349 acp_config: AcpServerConfig,
350 writer_tx: mpsc::Sender<String>,
351 writer_rx: Option<mpsc::Receiver<String>>,
352 store: Option<Arc<AcpSessionStore>>,
353 ) -> Self {
354 Self {
355 config,
356 acp_config,
357 sessions: Arc::new(Mutex::new(HashMap::new())),
358 rpc: Arc::new(RpcOutbound::new(writer_tx)),
359 writer_rx: std::sync::Mutex::new(writer_rx),
360 cancel_tokens: Arc::new(std::sync::Mutex::new(HashMap::new())),
361 loading_sessions: Arc::new(tokio::sync::Mutex::new(HashSet::new())),
362 store,
363 canvas_store: None,
364 }
365 }
366
367 pub fn with_canvas_store(mut self, canvas_store: CanvasStore) -> Self {
371 self.canvas_store = Some(canvas_store);
372 self
373 }
374
375 pub async fn run(self: Arc<Self>) -> Result<()> {
378 ::zeroclaw_log::record!(
379 DEBUG,
380 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
381 &format!(
382 "ACP server starting (max_sessions={}, timeout={}s)",
383 self.acp_config.max_sessions, self.acp_config.session_timeout_secs
384 )
385 );
386
387 let writer_rx = self
391 .writer_rx
392 .lock()
393 .unwrap_or_else(|e| e.into_inner())
394 .take()
395 .ok_or_else(|| {
396 ::zeroclaw_log::record!(
397 ERROR,
398 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
399 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
400 "ACP server writer already started"
401 );
402 anyhow::Error::msg("ACP server writer already started")
403 })?;
404 tokio::spawn(writer_task(writer_rx));
405
406 let stdin = tokio::io::stdin();
407 let mut reader = BufReader::new(stdin);
408 let mut line = String::new();
409
410 let sessions = Arc::clone(&self.sessions);
412 let timeout = Duration::from_secs(self.acp_config.session_timeout_secs);
413 tokio::spawn(async move {
414 let mut interval = tokio::time::interval(Duration::from_secs(60));
415 loop {
416 interval.tick().await;
417 let mut sessions = sessions.lock().await;
418 let before = sessions.len();
419 sessions.retain(|id, session_arc| {
420 match session_arc.try_lock() {
423 Ok(session) => {
424 let expired = session.last_active.elapsed() > timeout;
425 if expired {
426 ::zeroclaw_log::record!(
427 DEBUG,
428 ::zeroclaw_log::Event::new(
429 module_path!(),
430 ::zeroclaw_log::Action::Note
431 )
432 .with_attrs(::serde_json::json!({"id": id})),
433 "Session expired after inactivity"
434 );
435 }
436 !expired
437 }
438 Err(_) => true,
439 }
440 });
441 let reaped = before - sessions.len();
442 if reaped > 0 {
443 ::zeroclaw_log::record!(
444 DEBUG,
445 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
446 .with_attrs(::serde_json::json!({"reaped": reaped})),
447 "Reaped expired session(s)"
448 );
449 }
450 }
451 });
452
453 loop {
454 line.clear();
455 let bytes_read = reader.read_line(&mut line).await?;
456 if bytes_read == 0 {
457 ::zeroclaw_log::record!(
458 DEBUG,
459 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
460 "ACP server: stdin closed, shutting down"
461 );
462 break;
463 }
464
465 let trimmed = line.trim();
466 if trimmed.is_empty() {
467 continue;
468 }
469
470 self.process_line(trimmed).await;
471 }
472
473 Ok(())
474 }
475
476 pub async fn run_messages(self: Arc<Self>, mut input_rx: mpsc::Receiver<String>) -> Result<()> {
483 while let Some(line) = input_rx.recv().await {
484 let trimmed = line.trim();
485 if trimmed.is_empty() {
486 continue;
487 }
488 self.process_line(trimmed).await;
489 }
490
491 Ok(())
492 }
493
494 async fn process_line(self: &Arc<Self>, trimmed: &str) {
495 if let Ok(value) = serde_json::from_str::<Value>(trimmed)
499 && value.is_object()
500 && (value.get("result").is_some() || value.get("error").is_some())
501 && let Some(id) = value.get("id")
502 {
503 let id_str = id
504 .as_str()
505 .map(String::from)
506 .unwrap_or_else(|| id.to_string());
507 let result = value.get("result").cloned();
508 let error: Option<JsonRpcError> = value
509 .get("error")
510 .and_then(|e| serde_json::from_value(e.clone()).ok());
511 self.rpc.dispatch_response(&id_str, result, error);
512 return;
513 }
514
515 match serde_json::from_str::<JsonRpcRequest>(trimmed) {
516 Ok(request) => {
517 if request.jsonrpc != "2.0" {
518 if let Some(id) = request.id {
519 self.write_error(id, INVALID_REQUEST, "Invalid JSON-RPC version")
520 .await;
521 }
522 return;
523 }
524 let server = Arc::clone(self);
529 tokio::spawn(async move {
530 server.handle_request(request).await;
531 });
532 }
533 Err(e) => {
534 ::zeroclaw_log::record!(
535 WARN,
536 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
537 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
538 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
539 "Failed to parse JSON-RPC request"
540 );
541 self.write_error(Value::Null, PARSE_ERROR, &format!("Parse error: {e}"))
542 .await;
543 }
544 }
545 }
546
547 async fn handle_request(&self, request: JsonRpcRequest) {
548 let id = request.id.clone().unwrap_or(Value::Null);
549 let is_notification = request.id.is_none();
550
551 let result = match request.method.as_str() {
552 "initialize" => self.handle_initialize(&request.params),
553 "session/new" => self.handle_session_new(&request.params).await,
554 "session/load" => self.handle_session_load(&request.params).await,
555 "session/resume" => self.handle_session_resume(&request.params).await,
556 "session/close" => self.handle_session_close(&request.params).await,
557 "session/prompt" => self.handle_session_prompt(&request.params, &id).await,
558 "session/stop" => self.handle_session_stop(&request.params).await,
559 "session/cancel" => self.handle_session_cancel(&request.params).await,
560 "session/event" | "session/update" => self.handle_session_event(&request.params).await,
561 _ => Err(RpcError {
562 code: METHOD_NOT_FOUND,
563 message: format!("Method not found: {}", request.method),
564 data: None,
565 }),
566 };
567
568 if !is_notification {
570 match result {
571 Ok(value) => self.write_result(id, value).await,
572 Err(e) => self.write_error(id, e.code, &e.message).await,
573 }
574 }
575 }
576
577 fn handle_initialize(&self, _params: &Value) -> RpcResult {
580 let default_model = self
581 .config
582 .first_model_provider()
583 .and_then(|e| e.model.clone());
584
585 let mut zeroclaw_meta = serde_json::json!({
586 "maxSessions": self.acp_config.max_sessions,
587 "sessionTimeoutSecs": self.acp_config.session_timeout_secs,
588 });
589 if let Some(model) = default_model {
590 zeroclaw_meta["defaultModel"] = serde_json::json!(model);
591 }
592
593 let session_capabilities = if self.store.is_some() {
594 serde_json::json!({ "resume": {}, "close": {} })
595 } else {
596 serde_json::json!({})
597 };
598
599 Ok(serde_json::json!({
600 "protocolVersion": ACP_PROTOCOL_VERSION,
601 "agentCapabilities": {
602 "loadSession": self.store.is_some(),
603 "promptCapabilities": {
604 "image": false,
605 "audio": false,
606 "embeddedContext": false,
607 },
608 "mcpCapabilities": {
609 "http": false,
610 "sse": false,
611 },
612 "sessionCapabilities": session_capabilities,
613 },
614 "agentInfo": {
615 "name": "zeroclaw-acp",
616 "title": "ZeroClaw ACP",
617 "version": env!("CARGO_PKG_VERSION"),
618 },
619 "authMethods": [],
620 "_meta": {
621 "zeroclaw": zeroclaw_meta,
622 }
623 }))
624 }
625
626 async fn handle_session_new(&self, params: &Value) -> RpcResult {
627 let mut sessions = self.sessions.lock().await;
628
629 let loading_count = self.loading_sessions.lock().await.len();
630 if sessions.len() + loading_count >= self.acp_config.max_sessions {
631 return Err(RpcError {
632 code: SESSION_LIMIT_REACHED,
633 message: format!(
634 "Maximum session limit reached ({})",
635 self.acp_config.max_sessions
636 ),
637 data: None,
638 });
639 }
640
641 let requested_cwd = self.requested_session_cwd(params);
642
643 let workspace_dir = std::fs::canonicalize(&requested_cwd)
644 .map_err(|e| RpcError {
645 code: INVALID_PARAMS,
646 message: format!(
647 "cwd is not a usable directory ({}): {e}",
648 requested_cwd.display()
649 ),
650 data: None,
651 })?
652 .to_string_lossy()
653 .into_owned();
654
655 let agent_alias = params
660 .get("agentAlias")
661 .or_else(|| params.get("agent_alias"))
662 .or_else(|| params.get("agent"))
663 .and_then(Value::as_str)
664 .map(str::trim)
665 .filter(|s| !s.is_empty())
666 .map(str::to_string)
667 .or_else(|| self.config.acp.default_agent.clone())
668 .or_else(|| {
669 let mut keys = self.config.agents.keys();
670 if self.config.agents.len() == 1 {
671 keys.next().cloned()
672 } else {
673 None
674 }
675 })
676 .ok_or_else(|| RpcError {
677 code: INVALID_PARAMS,
678 message: "session/new requires `agentAlias` (alias of a configured \
679 [agents.<alias>] entry)"
680 .to_string(),
681 data: None,
682 })?;
683 if self.config.agent(&agent_alias).is_none() {
684 return Err(RpcError {
685 code: INVALID_PARAMS,
686 message: format!(
687 "Unknown agent `{agent_alias}` — no [agents.{agent_alias}] entry configured"
688 ),
689 data: None,
690 });
691 }
692
693 let session_id = Uuid::new_v4().to_string();
694
695 let agent = Agent::from_config_with_session_cwd_and_mcp_backchannel(
700 &self.config,
701 &agent_alias,
702 Some(std::path::Path::new(&workspace_dir)),
703 false,
704 self.canvas_store.clone(),
705 )
706 .await
707 .map_err(|e| RpcError {
708 code: INTERNAL_ERROR,
709 message: format!("Failed to create agent: {e}"),
710 data: None,
711 })?;
712
713 let acp_channel = Arc::new(AcpChannel::new(
718 "acp",
719 session_id.clone(),
720 Arc::clone(&self.rpc),
721 Duration::from_secs(self.acp_config.session_timeout_secs),
722 ));
723 agent.channel_handles().register_channel("acp", acp_channel);
724
725 let now = Instant::now();
726 sessions.insert(
727 session_id.clone(),
728 Arc::new(Mutex::new(Session {
729 agent,
730 created_at: now,
731 last_active: now,
732 })),
733 );
734
735 if let Some(store) = &self.store
736 && let Err(e) = store.create_session(&session_id, &workspace_dir)
737 {
738 sessions.remove(&session_id);
740 return Err(RpcError {
741 code: INTERNAL_ERROR,
742 message: format!("Failed to persist session: {e}"),
743 data: None,
744 });
745 }
746
747 ::zeroclaw_log::record!(
748 DEBUG,
749 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(
750 ::serde_json::json!({"session_id": session_id, "workspace_dir": workspace_dir})
751 ),
752 "Created session (workspace: )"
753 );
754
755 Ok(serde_json::json!({
756 "sessionId": session_id,
757 "workspaceDir": workspace_dir,
758 }))
759 }
760
761 async fn handle_session_load(&self, params: &Value) -> RpcResult {
762 let session_id = params
763 .get("sessionId")
764 .or_else(|| params.get("session_id"))
765 .and_then(|v| v.as_str())
766 .ok_or_else(|| RpcError {
767 code: INVALID_PARAMS,
768 message: "Missing required parameter: sessionId".to_string(),
769 data: None,
770 })?
771 .to_string();
772
773 let store = self.store.as_ref().ok_or_else(|| RpcError {
774 code: SESSION_NOT_FOUND,
775 message: format!("Session not found: {session_id}"),
776 data: None,
777 })?;
778
779 {
781 let sessions = self.sessions.lock().await;
782 let mut loading = self.loading_sessions.lock().await;
783 if sessions.len() + loading.len() >= self.acp_config.max_sessions {
784 return Err(RpcError {
785 code: SESSION_LIMIT_REACHED,
786 message: format!(
787 "Maximum session limit reached ({})",
788 self.acp_config.max_sessions
789 ),
790 data: None,
791 });
792 }
793 if sessions.contains_key(&session_id) || loading.contains(&session_id) {
794 return Err(RpcError {
795 code: INVALID_PARAMS,
796 message: format!(
797 "Session already active: {session_id}. Call session/close first."
798 ),
799 data: None,
800 });
801 }
802 loading.insert(session_id.clone());
803 }
804
805 let data = store
809 .load_session(&session_id)
810 .map_err(|e| RpcError {
811 code: INTERNAL_ERROR,
812 message: format!("Failed to load session: {e}"),
813 data: None,
814 })
815 .and_then(|opt| {
816 opt.ok_or_else(|| RpcError {
817 code: SESSION_NOT_FOUND,
818 message: format!("Session not found: {session_id}"),
819 data: None,
820 })
821 });
822
823 let data = match data {
825 Ok(d) => d,
826 Err(e) => {
827 self.loading_sessions.lock().await.remove(&session_id);
828 return Err(e);
829 }
830 };
831
832 let workspace_dir = std::path::PathBuf::from(&data.workspace_dir);
833
834 let restore_alias = self
835 .config
836 .acp
837 .default_agent
838 .clone()
839 .or_else(|| {
840 let mut keys = self.config.agents.keys();
841 if self.config.agents.len() == 1 {
842 keys.next().cloned()
843 } else {
844 None
845 }
846 })
847 .unwrap_or_else(|| "default".to_string());
848
849 let agent_result = Agent::from_config_with_session_cwd_and_mcp_backchannel(
850 &self.config,
851 &restore_alias,
852 Some(&workspace_dir),
853 false,
854 self.canvas_store.clone(),
855 )
856 .await
857 .map_err(|e| RpcError {
858 code: INTERNAL_ERROR,
859 message: format!("Failed to create agent: {e}"),
860 data: None,
861 });
862
863 let mut agent = match agent_result {
864 Ok(a) => a,
865 Err(e) => {
866 self.loading_sessions.lock().await.remove(&session_id);
867 return Err(e);
868 }
869 };
870
871 agent.seed_conversation_history(data.messages.clone());
872
873 let acp_channel = Arc::new(AcpChannel::new(
874 "acp",
875 session_id.clone(),
876 Arc::clone(&self.rpc),
877 Duration::from_secs(self.acp_config.session_timeout_secs),
878 ));
879 agent.channel_handles().register_channel("acp", acp_channel);
880
881 let now = Instant::now();
882 {
884 let mut sessions = self.sessions.lock().await;
885 let mut loading = self.loading_sessions.lock().await;
886 loading.remove(&session_id);
887 sessions.insert(
888 session_id.clone(),
889 Arc::new(Mutex::new(Session {
890 agent,
891 created_at: now,
892 last_active: now,
893 })),
894 );
895 }
896
897 for msg in &data.messages {
899 for notification in history_notifications_for_message(&session_id, msg) {
900 self.write_notification(¬ification).await;
901 }
902 }
903
904 ::zeroclaw_log::record!(
905 DEBUG,
906 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
907 .with_attrs(::serde_json::json!({"session_id": session_id, "message_count": data.messages.len()})),
908 "Loaded session"
909 );
910 Ok(serde_json::json!({}))
911 }
912
913 async fn handle_session_resume(&self, params: &Value) -> RpcResult {
914 let session_id = params
915 .get("sessionId")
916 .or_else(|| params.get("session_id"))
917 .and_then(|v| v.as_str())
918 .ok_or_else(|| RpcError {
919 code: INVALID_PARAMS,
920 message: "Missing required parameter: sessionId".to_string(),
921 data: None,
922 })?
923 .to_string();
924
925 let store = self.store.as_ref().ok_or_else(|| RpcError {
926 code: SESSION_NOT_FOUND,
927 message: format!("Session not found: {session_id}"),
928 data: None,
929 })?;
930
931 {
933 let sessions = self.sessions.lock().await;
934 let mut loading = self.loading_sessions.lock().await;
935 if sessions.len() + loading.len() >= self.acp_config.max_sessions {
936 return Err(RpcError {
937 code: SESSION_LIMIT_REACHED,
938 message: format!(
939 "Maximum session limit reached ({})",
940 self.acp_config.max_sessions
941 ),
942 data: None,
943 });
944 }
945 if sessions.contains_key(&session_id) || loading.contains(&session_id) {
946 return Err(RpcError {
947 code: INVALID_PARAMS,
948 message: format!(
949 "Session already active: {session_id}. Call session/close first."
950 ),
951 data: None,
952 });
953 }
954 loading.insert(session_id.clone());
955 }
956
957 let data = store
958 .load_session(&session_id)
959 .map_err(|e| RpcError {
960 code: INTERNAL_ERROR,
961 message: format!("Failed to load session: {e}"),
962 data: None,
963 })
964 .and_then(|opt| {
965 opt.ok_or_else(|| RpcError {
966 code: SESSION_NOT_FOUND,
967 message: format!("Session not found: {session_id}"),
968 data: None,
969 })
970 });
971
972 let data = match data {
974 Ok(d) => d,
975 Err(e) => {
976 self.loading_sessions.lock().await.remove(&session_id);
977 return Err(e);
978 }
979 };
980
981 let workspace_dir = std::path::PathBuf::from(&data.workspace_dir);
982
983 let restore_alias = self
984 .config
985 .acp
986 .default_agent
987 .clone()
988 .or_else(|| {
989 let mut keys = self.config.agents.keys();
990 if self.config.agents.len() == 1 {
991 keys.next().cloned()
992 } else {
993 None
994 }
995 })
996 .unwrap_or_else(|| "default".to_string());
997
998 let agent_result = Agent::from_config_with_session_cwd_and_mcp_backchannel(
999 &self.config,
1000 &restore_alias,
1001 Some(&workspace_dir),
1002 false,
1003 self.canvas_store.clone(),
1004 )
1005 .await
1006 .map_err(|e| RpcError {
1007 code: INTERNAL_ERROR,
1008 message: format!("Failed to create agent: {e}"),
1009 data: None,
1010 });
1011
1012 let mut agent = match agent_result {
1013 Ok(a) => a,
1014 Err(e) => {
1015 self.loading_sessions.lock().await.remove(&session_id);
1016 return Err(e);
1017 }
1018 };
1019
1020 agent.seed_conversation_history(data.messages);
1021
1022 let acp_channel = Arc::new(AcpChannel::new(
1023 "acp",
1024 session_id.clone(),
1025 Arc::clone(&self.rpc),
1026 Duration::from_secs(self.acp_config.session_timeout_secs),
1027 ));
1028 agent.channel_handles().register_channel("acp", acp_channel);
1029
1030 let now = Instant::now();
1031 {
1033 let mut sessions = self.sessions.lock().await;
1034 let mut loading = self.loading_sessions.lock().await;
1035 loading.remove(&session_id);
1036 sessions.insert(
1037 session_id.clone(),
1038 Arc::new(Mutex::new(Session {
1039 agent,
1040 created_at: now,
1041 last_active: now,
1042 })),
1043 );
1044 }
1045
1046 ::zeroclaw_log::record!(
1047 DEBUG,
1048 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1049 .with_attrs(::serde_json::json!({"session_id": session_id})),
1050 "Resumed session"
1051 );
1052 Ok(serde_json::json!({}))
1053 }
1054
1055 async fn handle_session_close(&self, params: &Value) -> RpcResult {
1064 let session_id = params
1065 .get("sessionId")
1066 .or_else(|| params.get("session_id"))
1067 .and_then(|v| v.as_str())
1068 .ok_or_else(|| RpcError {
1069 code: INVALID_PARAMS,
1070 message: "Missing required parameter: sessionId".to_string(),
1071 data: None,
1072 })?;
1073
1074 let token = self
1076 .cancel_tokens
1077 .lock()
1078 .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1079 .get(session_id)
1080 .cloned();
1081 if let Some(token) = token {
1082 token.cancel();
1083 ::zeroclaw_log::record!(
1084 DEBUG,
1085 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1086 .with_attrs(::serde_json::json!({"session_id": session_id})),
1087 "Cancelled active turn for closing session"
1088 );
1089 }
1090
1091 let session_arc = {
1092 let mut sessions = self.sessions.lock().await;
1093 sessions.remove(session_id).ok_or_else(|| RpcError {
1094 code: SESSION_NOT_FOUND,
1095 message: format!("Session not found: {session_id}"),
1096 data: None,
1097 })?
1098 };
1099
1100 let session = session_arc.lock().await;
1102 session.agent.channel_handles().unregister_channel("acp");
1103 ::zeroclaw_log::record!(
1104 DEBUG,
1105 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1106 .with_attrs(::serde_json::json!({"session_id": session_id})),
1107 "Closed session"
1108 );
1109
1110 Ok(serde_json::json!({}))
1111 }
1112
1113 fn requested_session_cwd(&self, params: &Value) -> PathBuf {
1114 params
1115 .get("cwd")
1116 .or_else(|| params.get("workspaceDir"))
1117 .or_else(|| params.get("workspace_dir"))
1118 .and_then(|v| v.as_str())
1119 .map(PathBuf::from)
1120 .unwrap_or_else(|| {
1121 std::env::current_dir().unwrap_or_else(|_| self.config.data_dir.clone())
1122 })
1123 }
1124
1125 async fn handle_session_prompt(&self, params: &Value, _request_id: &Value) -> RpcResult {
1126 let session_id = params
1127 .get("sessionId")
1128 .or_else(|| params.get("session_id"))
1129 .and_then(|v| v.as_str())
1130 .ok_or_else(|| RpcError {
1131 code: INVALID_PARAMS,
1132 message: "Missing required parameter: sessionId".to_string(),
1133 data: None,
1134 })?
1135 .to_string();
1136
1137 let prompt = Self::parse_prompt(params)?;
1138
1139 let session_arc = {
1143 let sessions = self.sessions.lock().await;
1144 sessions.get(&session_id).cloned().ok_or_else(|| RpcError {
1145 code: SESSION_NOT_FOUND,
1146 message: format!("Session not found: {session_id}"),
1147 data: None,
1148 })?
1149 };
1150
1151 let cancel_token = tokio_util::sync::CancellationToken::new();
1158 self.register_cancel_token(&session_id, cancel_token.clone())?;
1159 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(100);
1160
1161 let turn_handle = tokio::spawn(async move {
1166 let mut session = session_arc.lock().await;
1167 let result = session
1168 .agent
1169 .turn_streamed(&prompt, event_tx, Some(cancel_token))
1170 .await;
1171 session.last_active = Instant::now();
1172 result
1173 });
1175
1176 let mut accumulated_text = String::new();
1182 while let Some(event) = event_rx.recv().await {
1183 if matches!(event, TurnEvent::Usage { .. }) {
1188 continue;
1189 }
1190 if let TurnEvent::Chunk { ref delta } = event {
1192 accumulated_text.push_str(delta);
1193 }
1194 if let Some(notification) = notification_for_turn_event(&session_id, &event) {
1195 self.write_notification(¬ification).await;
1196 }
1197 }
1198
1199 self.remove_cancel_token(&session_id);
1202
1203 let turn_result = turn_handle.await.map_err(|e| RpcError {
1204 code: INTERNAL_ERROR,
1205 message: format!("Agent task panicked: {e}"),
1206 data: None,
1207 })?;
1208
1209 let was_cancelled = match &turn_result {
1212 Err(e) => zeroclaw_runtime::agent::loop_::is_tool_loop_cancelled(e),
1213 Ok(_) => false,
1214 };
1215
1216 if was_cancelled {
1217 return Ok(Self::cancelled_prompt_result(session_id, &accumulated_text));
1218 }
1219
1220 let (result_text, new_turn_msgs) = turn_result.map_err(|e| RpcError {
1221 code: INTERNAL_ERROR,
1222 message: format!("Agent turn failed: {e}"),
1223 data: None,
1224 })?;
1225
1226 if let Some(store) = &self.store
1228 && !new_turn_msgs.is_empty()
1229 && let Err(e) = store.append_turn(&session_id, &new_turn_msgs)
1230 {
1231 ::zeroclaw_log::record!(
1232 WARN,
1233 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1234 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1235 .with_attrs(
1236 ::serde_json::json!({"session_id": session_id, "error": e.to_string()})
1237 ),
1238 "Failed to persist turn; session continues in memory"
1239 );
1240 }
1241
1242 Ok(Self::prompt_result(session_id, "end_turn", result_text))
1243 }
1244
1245 fn register_cancel_token(
1246 &self,
1247 session_id: &str,
1248 cancel_token: tokio_util::sync::CancellationToken,
1249 ) -> std::result::Result<(), RpcError> {
1250 let mut tokens = self
1251 .cancel_tokens
1252 .lock()
1253 .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops");
1254 if tokens.contains_key(session_id) {
1255 return Err(RpcError {
1256 code: SESSION_BUSY,
1257 message: format!("Session already has an active prompt turn: {session_id}"),
1258 data: None,
1259 });
1260 }
1261 tokens.insert(session_id.to_string(), cancel_token);
1262 Ok(())
1263 }
1264
1265 fn remove_cancel_token(&self, session_id: &str) {
1266 self.cancel_tokens
1267 .lock()
1268 .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1269 .remove(session_id);
1270 }
1271
1272 fn prompt_result(session_id: String, stop_reason: &'static str, text: String) -> Value {
1273 serde_json::json!({
1274 "sessionId": session_id,
1275 "stopReason": stop_reason,
1276 "content": text,
1277 })
1278 }
1279
1280 fn cancelled_prompt_result(session_id: String, accumulated_text: &str) -> Value {
1281 let content = if accumulated_text.is_empty() {
1282 "[interrupted by user]".to_string()
1283 } else {
1284 format!("{accumulated_text}\n\n[interrupted by user]")
1285 };
1286 Self::prompt_result(session_id, "cancelled", content)
1287 }
1288
1289 fn parse_prompt(params: &Value) -> std::result::Result<String, RpcError> {
1290 match params.get("prompt") {
1291 Some(Value::String(s)) => Ok(s.clone()),
1292 Some(Value::Array(arr)) => {
1293 let mut joined = String::new();
1294 for part in arr {
1295 let mut added = false;
1296 if let Some(text) = part.get("text").and_then(|v| v.as_str()) {
1297 if !joined.is_empty() {
1298 joined.push_str("\n\n");
1299 }
1300 joined.push_str(text);
1301 added = true;
1302 }
1303 if let Some(res) = part.get("resource")
1306 && let Some(text) = res.get("text").and_then(|v| v.as_str())
1307 {
1308 if added || !joined.is_empty() {
1309 joined.push_str("\n\n");
1310 }
1311 joined.push_str(text);
1312 }
1313 }
1314 if joined.is_empty() {
1315 return Err(RpcError {
1316 code: INVALID_PARAMS,
1317 message: "Parameter 'prompt' array must contain at least one text part"
1318 .to_string(),
1319 data: None,
1320 });
1321 }
1322 Ok(joined)
1323 }
1324 _ => Err(RpcError {
1325 code: INVALID_PARAMS,
1326 message: "Missing required parameter: prompt (must be string or array of parts)"
1327 .to_string(),
1328 data: None,
1329 }),
1330 }
1331 }
1332
1333 async fn handle_session_stop(&self, params: &Value) -> RpcResult {
1334 let session_id = params
1335 .get("sessionId")
1336 .or_else(|| params.get("session_id"))
1337 .and_then(|v| v.as_str())
1338 .ok_or_else(|| RpcError {
1339 code: INVALID_PARAMS,
1340 message: "Missing required parameter: sessionId".to_string(),
1341 data: None,
1342 })?;
1343
1344 let session_arc = {
1345 let mut sessions = self.sessions.lock().await;
1346 sessions.remove(session_id).ok_or_else(|| RpcError {
1347 code: SESSION_NOT_FOUND,
1348 message: format!("Session not found: {session_id}"),
1349 data: None,
1350 })?
1351 };
1352
1353 let session = session_arc.lock().await;
1356 session.agent.channel_handles().unregister_channel("acp");
1359 ::zeroclaw_log::record!(
1360 DEBUG,
1361 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1362 .with_attrs(::serde_json::json!({"session_id": session_id})),
1363 "Stopped session"
1364 );
1365 Ok(serde_json::json!({
1366 "sessionId": session_id,
1367 "stopped": true,
1368 }))
1369 }
1370
1371 async fn handle_session_cancel(&self, params: &Value) -> RpcResult {
1384 let session_id = params
1385 .get("sessionId")
1386 .or_else(|| params.get("session_id"))
1387 .and_then(|v| v.as_str())
1388 .ok_or_else(|| RpcError {
1389 code: INVALID_PARAMS,
1390 message: "Missing required parameter: sessionId".to_string(),
1391 data: None,
1392 })?;
1393
1394 let token = self
1395 .cancel_tokens
1396 .lock()
1397 .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1398 .get(session_id)
1399 .cloned();
1400
1401 if let Some(token) = token {
1402 token.cancel();
1403 ::zeroclaw_log::record!(
1404 DEBUG,
1405 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1406 .with_attrs(::serde_json::json!({"session_id": session_id})),
1407 "Cancelled active turn for session"
1408 );
1409 }
1410
1411 Ok(serde_json::json!({}))
1412 }
1413
1414 async fn handle_session_event(&self, params: &Value) -> RpcResult {
1421 let session_id = params
1422 .get("sessionId")
1423 .or_else(|| params.get("session_id"))
1424 .and_then(|v| v.as_str())
1425 .ok_or_else(|| RpcError {
1426 code: INVALID_PARAMS,
1427 message: "Missing required parameter: sessionId".to_string(),
1428 data: None,
1429 })?
1430 .to_string();
1431
1432 let event_type = params
1433 .get("type")
1434 .or_else(|| params.get("update").and_then(|u| u.get("sessionUpdate")))
1435 .and_then(|v| v.as_str())
1436 .unwrap_or("unknown")
1437 .to_string();
1438
1439 ::zeroclaw_log::record!(
1440 DEBUG,
1441 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(
1442 ::serde_json::json!({"event_type": event_type, "session_id": session_id})
1443 ),
1444 "Received session update (type=) for session"
1445 );
1446
1447 let session_arc = {
1448 let sessions = self.sessions.lock().await;
1449 sessions.get(&session_id).cloned()
1450 };
1451
1452 if let Some(session_arc) = session_arc {
1453 if let Ok(mut session) = session_arc.try_lock() {
1456 session.last_active = Instant::now();
1457 }
1458 Ok(serde_json::json!({
1459 "sessionId": session_id,
1460 "type": event_type,
1461 "status": "processed"
1462 }))
1463 } else {
1464 Err(RpcError {
1465 code: SESSION_NOT_FOUND,
1466 message: format!("Session not found: {session_id}"),
1467 data: None,
1468 })
1469 }
1470 }
1471
1472 async fn write_result(&self, id: Value, result: Value) {
1475 let response = JsonRpcResponse {
1476 jsonrpc: "2.0",
1477 result: Some(result),
1478 error: None,
1479 id,
1480 };
1481 self.write_json(&response).await;
1482 }
1483
1484 async fn write_error(&self, id: Value, code: i32, message: &str) {
1485 let response = JsonRpcResponse {
1486 jsonrpc: "2.0",
1487 result: None,
1488 error: Some(JsonRpcError {
1489 code,
1490 message: message.to_string(),
1491 data: None,
1492 }),
1493 id,
1494 };
1495 self.write_json(&response).await;
1496 }
1497
1498 async fn write_notification(&self, notification: &JsonRpcNotification) {
1499 self.write_json(notification).await;
1500 }
1501
1502 async fn write_json<T: Serialize>(&self, value: &T) {
1503 match serde_json::to_string(value) {
1504 Ok(json) => {
1505 if self.rpc.writer_tx.send(json).await.is_err() {
1506 ::zeroclaw_log::record!(
1507 ERROR,
1508 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1509 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1510 "ACP writer task closed; dropping outbound message"
1511 );
1512 }
1513 }
1514 Err(e) => {
1515 ::zeroclaw_log::record!(
1516 ERROR,
1517 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1518 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1519 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1520 "Failed to serialize JSON-RPC message"
1521 );
1522 }
1523 }
1524 }
1525}
1526
1527async fn writer_task(mut rx: mpsc::Receiver<String>) {
1531 let mut stdout = tokio::io::stdout();
1532 while let Some(line) = rx.recv().await {
1533 if let Err(e) = stdout.write_all(line.as_bytes()).await {
1534 ::zeroclaw_log::record!(
1535 ERROR,
1536 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1537 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1538 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1539 "Failed to write to stdout"
1540 );
1541 continue;
1542 }
1543 if let Err(e) = stdout.write_all(b"\n").await {
1544 ::zeroclaw_log::record!(
1545 ERROR,
1546 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1547 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1548 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1549 "Failed to write newline to stdout"
1550 );
1551 continue;
1552 }
1553 if let Err(e) = stdout.flush().await {
1554 ::zeroclaw_log::record!(
1555 ERROR,
1556 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1557 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1558 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1559 "Failed to flush stdout"
1560 );
1561 }
1562 }
1563}
1564
1565fn to_acp_raw_input(name: &str, args: &Value) -> Value {
1573 match name {
1574 "file_edit" => {
1575 let path = args.get("path").cloned().unwrap_or(Value::Null);
1576 let old_text = args.get("old_string").cloned().unwrap_or(Value::Null);
1577 let new_text = args.get("new_string").cloned().unwrap_or(Value::Null);
1578 serde_json::json!({ "path": path, "oldText": old_text, "newText": new_text })
1579 }
1580 "file_write" => {
1581 let path = args.get("path").cloned().unwrap_or(Value::Null);
1582 let new_text = args.get("content").cloned().unwrap_or(Value::Null);
1583 serde_json::json!({ "path": path, "newText": new_text })
1584 }
1585 _ => args.clone(),
1586 }
1587}
1588
1589fn to_acp_content(name: &str, args: &Value) -> Value {
1596 match name {
1597 "file_edit" => {
1598 let path = args.get("path").cloned().unwrap_or(Value::Null);
1599 let old_text = args.get("old_string").cloned().unwrap_or(Value::Null);
1600 let new_text = args.get("new_string").cloned().unwrap_or(Value::Null);
1601 serde_json::json!([{ "type": "diff", "path": path, "oldText": old_text, "newText": new_text }])
1602 }
1603 "file_write" => {
1604 let path = args.get("path").cloned().unwrap_or(Value::Null);
1605 let new_text = args.get("content").cloned().unwrap_or(Value::Null);
1606 serde_json::json!([{ "type": "diff", "path": path, "newText": new_text }])
1607 }
1608 _ => serde_json::json!([]),
1609 }
1610}
1611
1612fn map_tool_kind(name: &str) -> &'static str {
1613 match name {
1614 "ask_user" | "calculator" | "claude_code" | "claude_code_runner" | "codex_cli"
1615 | "composio" | "delegate" | "escalate_to_human" | "execute_pipeline" | "gemini_cli"
1616 | "jira" | "llm_task" | "opencode_cli" | "schedule" | "security_ops" | "shell"
1617 | "sop_advance" | "sop_approve" | "sop_execute" | "vi_verify" => "execute",
1618 "backup" | "browser_open" | "canvas" | "cloud_ops" | "file_edit" | "file_write"
1619 | "memory_export" | "memory_store" | "report_template" => "edit",
1620 "cron_add" | "poll" | "reaction" => "edit",
1621 "memory_forget" | "memory_purge" => "delete",
1622 "content_search" | "discord_search" | "glob_search" | "knowledge" | "search"
1627 | "tool_search" | "web_search_tool" => "other",
1628 "browser"
1629 | "browser_delegate"
1630 | "cloud_patterns"
1631 | "data_management"
1632 | "file_read"
1633 | "git_operations"
1634 | "google_workspace"
1635 | "hardware_board_info"
1636 | "hardware_memory_map"
1637 | "hardware_memory_read"
1638 | "image_info"
1639 | "linkedin"
1640 | "microsoft365"
1641 | "model_routing_config"
1642 | "model_switch"
1643 | "pdf_read"
1644 | "project_intel"
1645 | "proxy_config"
1646 | "read_skill"
1647 | "sessions_history"
1648 | "sessions_list"
1649 | "sop_list"
1650 | "sop_status"
1651 | "text_browser"
1652 | "weather"
1653 | "workspace" => "other",
1654 "cron_list" | "cron_runs" | "memory_recall" => "other",
1655 "http_request" | "web_fetch" => "other",
1656 "image_gen" => "other",
1657 "cron_remove" => "delete",
1658 "cron_run" => "execute",
1659 "sessions_send" => "execute",
1660 _ => "other",
1661 }
1662}
1663
1664fn notification_for_turn_event(session_id: &str, event: &TurnEvent) -> Option<JsonRpcNotification> {
1665 Some(match event {
1666 TurnEvent::Chunk { delta } => JsonRpcNotification {
1667 jsonrpc: "2.0",
1668 method: "session/update",
1669 params: serde_json::json!({
1670 "sessionId": session_id,
1671 "update": {
1672 "sessionUpdate": "agent_message_chunk",
1673 "content": {
1674 "type": "text",
1675 "text": delta
1676 }
1677 }
1678 }),
1679 },
1680 TurnEvent::ToolCall { id, name, args } => {
1681 let acp_content = to_acp_content(name, args);
1682 let mut update = serde_json::json!({
1683 "sessionUpdate": "tool_call",
1684 "toolCallId": id,
1685 "name": name,
1686 "title": name,
1687 "kind": map_tool_kind(name),
1688 "rawInput": to_acp_raw_input(name, args),
1689 "status": "pending"
1690 });
1691 if acp_content
1692 .as_array()
1693 .is_some_and(|items| !items.is_empty())
1694 {
1695 update["content"] = acp_content;
1696 }
1697 JsonRpcNotification {
1698 jsonrpc: "2.0",
1699 method: "session/update",
1700 params: serde_json::json!({
1701 "sessionId": session_id,
1702 "update": update
1703 }),
1704 }
1705 }
1706 TurnEvent::ToolResult { id, name, output } => JsonRpcNotification {
1707 jsonrpc: "2.0",
1708 method: "session/update",
1709 params: serde_json::json!({
1710 "sessionId": session_id,
1711 "update": {
1712 "sessionUpdate": "tool_call_update",
1713 "toolCallId": id,
1714 "name": name,
1715 "title": name,
1716 "kind": map_tool_kind(name),
1717 "status": "completed",
1718 "rawOutput": output,
1719 "body": output,
1720 "content": [{
1721 "type": "content",
1722 "content": {
1723 "type": "text",
1724 "text": output
1725 }
1726 }]
1727 }
1728 }),
1729 },
1730 TurnEvent::Thinking { delta } => JsonRpcNotification {
1731 jsonrpc: "2.0",
1732 method: "session/update",
1733 params: serde_json::json!({
1734 "sessionId": session_id,
1735 "update": {
1736 "sessionUpdate": "agent_thought_chunk",
1737 "content": {
1738 "type": "text",
1739 "text": delta
1740 }
1741 }
1742 }),
1743 },
1744 TurnEvent::ApprovalRequest { .. } => return None,
1750 TurnEvent::Usage { .. } => unreachable!(
1754 "TurnEvent::Usage must be filtered before notification_for_turn_event; \
1755 ACP has no session/update notification for token usage"
1756 ),
1757 })
1758}
1759
1760fn history_notifications_for_message(
1761 session_id: &str,
1762 msg: &ConversationMessage,
1763) -> Vec<JsonRpcNotification> {
1764 match msg {
1765 ConversationMessage::Chat(chat) => {
1766 let update_type = match chat.role.as_str() {
1767 "user" => "user_message_chunk",
1768 "assistant" => "agent_message_chunk",
1769 _ => return vec![],
1770 };
1771 vec![JsonRpcNotification {
1772 jsonrpc: "2.0",
1773 method: "session/update",
1774 params: serde_json::json!({
1775 "sessionId": session_id,
1776 "update": {
1777 "sessionUpdate": update_type,
1778 "content": { "type": "text", "text": &chat.content }
1779 }
1780 }),
1781 }]
1782 }
1783 ConversationMessage::AssistantToolCalls {
1784 text, tool_calls, ..
1785 } => {
1786 let mut notifications = Vec::new();
1787 if let Some(t) = text
1788 && !t.is_empty()
1789 {
1790 notifications.push(JsonRpcNotification {
1791 jsonrpc: "2.0",
1792 method: "session/update",
1793 params: serde_json::json!({
1794 "sessionId": session_id,
1795 "update": {
1796 "sessionUpdate": "agent_message_chunk",
1797 "content": { "type": "text", "text": t }
1798 }
1799 }),
1800 });
1801 }
1802 for tc in tool_calls {
1803 let args: serde_json::Value =
1804 serde_json::from_str(&tc.arguments).unwrap_or(serde_json::Value::Null);
1805 let acp_content = to_acp_content(&tc.name, &args);
1806 let mut update = serde_json::json!({
1807 "sessionUpdate": "tool_call",
1808 "toolCallId": &tc.id,
1809 "name": &tc.name,
1810 "title": &tc.name,
1811 "kind": map_tool_kind(&tc.name),
1812 "rawInput": to_acp_raw_input(&tc.name, &args),
1813 "status": "completed"
1814 });
1815 if acp_content
1816 .as_array()
1817 .is_some_and(|items| !items.is_empty())
1818 {
1819 update["content"] = acp_content;
1820 }
1821 notifications.push(JsonRpcNotification {
1822 jsonrpc: "2.0",
1823 method: "session/update",
1824 params: serde_json::json!({
1825 "sessionId": session_id,
1826 "update": update
1827 }),
1828 });
1829 }
1830 notifications
1831 }
1832 ConversationMessage::ToolResults(results) => results
1833 .iter()
1834 .map(|r| JsonRpcNotification {
1835 jsonrpc: "2.0",
1836 method: "session/update",
1837 params: serde_json::json!({
1838 "sessionId": session_id,
1839 "update": {
1840 "sessionUpdate": "tool_call_update",
1841 "toolCallId": &r.tool_call_id,
1842 "status": "completed",
1843 "rawOutput": &r.content,
1844 "body": &r.content,
1845 "content": [{
1846 "type": "content",
1847 "content": { "type": "text", "text": &r.content }
1848 }]
1849 }
1850 }),
1851 })
1852 .collect(),
1853 }
1854}
1855
1856#[derive(Debug)]
1859struct RpcError {
1860 code: i32,
1861 message: String,
1862 #[allow(dead_code)] data: Option<Value>,
1864}
1865
1866type RpcResult = std::result::Result<Value, RpcError>;
1867
1868#[cfg(test)]
1869mod tests {
1870 use super::*;
1871
1872 #[test]
1873 fn acp_server_config_defaults() {
1874 let cfg = AcpServerConfig::default();
1875 assert_eq!(cfg.max_sessions, 10);
1876 assert_eq!(cfg.session_timeout_secs, 3600);
1877 }
1878
1879 #[test]
1880 fn acp_server_config_deserialize() {
1881 let json = r#"{"max_sessions": 5, "session_timeout_secs": 1800}"#;
1882 let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
1883 assert_eq!(cfg.max_sessions, 5);
1884 assert_eq!(cfg.session_timeout_secs, 1800);
1885 }
1886
1887 #[test]
1888 fn acp_server_config_deserialize_partial() {
1889 let json = r#"{"max_sessions": 3}"#;
1890 let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
1891 assert_eq!(cfg.max_sessions, 3);
1892 assert_eq!(cfg.session_timeout_secs, 3600);
1893 }
1894
1895 #[test]
1896 fn json_rpc_request_parse() {
1897 let json = r#"{"jsonrpc":"2.0","method":"initialize","params":{},"id":1}"#;
1898 let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
1899 assert_eq!(req.method, "initialize");
1900 assert_eq!(req.id, Some(Value::Number(1.into())));
1901 }
1902
1903 #[test]
1904 fn json_rpc_request_parse_notification() {
1905 let json = r#"{"jsonrpc":"2.0","method":"session/update","params":{}}"#;
1906 let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
1907 assert_eq!(req.method, "session/update");
1908 assert!(req.id.is_none());
1909 }
1910
1911 #[test]
1912 fn json_rpc_response_serialize() {
1913 let resp = JsonRpcResponse {
1914 jsonrpc: "2.0",
1915 result: Some(serde_json::json!({"status": "ok"})),
1916 error: None,
1917 id: Value::Number(1.into()),
1918 };
1919 let json = serde_json::to_string(&resp).unwrap();
1920 let parsed: Value = serde_json::from_str(&json).unwrap();
1921 assert_eq!(parsed["jsonrpc"], "2.0");
1922 assert!(parsed.get("result").is_some());
1923 assert!(parsed.get("error").is_none());
1924 assert_eq!(parsed["id"], 1);
1925 }
1926
1927 #[tokio::test]
1928 async fn rpc_request_timeout_drop_removes_pending_responder() {
1929 let (tx, mut rx) = mpsc::channel::<String>(16);
1930 let rpc = RpcOutbound::for_testing(tx);
1931
1932 let result = tokio::time::timeout(
1933 Duration::from_millis(10),
1934 rpc.request("session/request_permission", serde_json::json!({})),
1935 )
1936 .await;
1937
1938 assert!(result.is_err());
1939 assert!(rx.recv().await.is_some());
1940 assert_eq!(rpc.pending_count_for_test(), 0);
1941 }
1942
1943 #[test]
1944 fn initialize_response_uses_acp_v1_shape() {
1945 let server = AcpServer::new(Config::default(), AcpServerConfig::default());
1946 let result = server
1947 .handle_initialize(&serde_json::json!({
1948 "protocolVersion": 1,
1949 "clientCapabilities": {},
1950 "clientInfo": {
1951 "name": "test-client",
1952 "version": "1.0.0"
1953 }
1954 }))
1955 .unwrap();
1956
1957 assert_eq!(result["protocolVersion"], 1);
1958 assert_eq!(result["agentInfo"]["name"], "zeroclaw-acp");
1959 assert_eq!(result["agentInfo"]["title"], "ZeroClaw ACP");
1960 assert_eq!(result["agentInfo"]["version"], env!("CARGO_PKG_VERSION"));
1961 assert_eq!(result["authMethods"], serde_json::json!([]));
1962 assert_eq!(result["agentCapabilities"]["loadSession"], false);
1963 assert_eq!(
1964 result["agentCapabilities"]["promptCapabilities"]["image"],
1965 false
1966 );
1967 assert_eq!(
1968 result["agentCapabilities"]["mcpCapabilities"]["http"],
1969 false
1970 );
1971 assert!(result.get("serverInfo").is_none());
1972 assert!(result.get("capabilities").is_none());
1973 }
1974
1975 #[test]
1976 fn initialize_advertises_load_session_when_store_present() {
1977 let cwd = tempfile::tempdir().unwrap();
1978 let store =
1979 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
1980 let server = AcpServer::new_with_store(
1981 make_test_config(cwd.path()),
1982 AcpServerConfig::default(),
1983 store,
1984 );
1985 let result = server.handle_initialize(&serde_json::json!({})).unwrap();
1986 assert_eq!(result["agentCapabilities"]["loadSession"], true);
1987 assert_eq!(
1988 result["agentCapabilities"]["sessionCapabilities"]["resume"],
1989 serde_json::json!({})
1990 );
1991 assert_eq!(
1992 result["agentCapabilities"]["sessionCapabilities"]["close"],
1993 serde_json::json!({})
1994 );
1995 }
1996
1997 #[test]
1998 fn session_new_defaults_to_launch_cwd_when_client_omits_cwd() {
1999 let config = Config {
2000 data_dir: PathBuf::from("/not/the/project"),
2001 ..Default::default()
2002 };
2003 let server = AcpServer::new(config, AcpServerConfig::default());
2004 let expected = std::env::current_dir().unwrap();
2005
2006 assert_eq!(
2007 server.requested_session_cwd(&serde_json::json!({})),
2008 expected
2009 );
2010 }
2011
2012 #[test]
2013 fn session_new_respects_client_cwd_when_present() {
2014 let server = AcpServer::new(Config::default(), AcpServerConfig::default());
2015 let cwd = std::env::current_dir().unwrap();
2016
2017 assert_eq!(
2018 server.requested_session_cwd(&serde_json::json!({"cwd": cwd})),
2019 cwd
2020 );
2021 }
2022
2023 #[tokio::test]
2024 async fn session_new_does_not_wait_for_configured_mcp_servers() {
2025 let cwd = tempfile::tempdir().unwrap();
2026 let mut config = Config {
2027 data_dir: cwd.path().to_path_buf(),
2028 providers: {
2029 let mut p = zeroclaw_config::providers::Providers::default();
2030 p.models.openrouter.insert(
2031 "default".to_string(),
2032 zeroclaw_config::schema::OpenRouterModelProviderConfig {
2033 base: zeroclaw_config::schema::ModelProviderConfig {
2034 model: Some("test-model".to_string()),
2035 ..Default::default()
2036 },
2037 },
2038 );
2039 p
2040 },
2041 mcp: zeroclaw_config::schema::McpConfig {
2042 enabled: true,
2043 servers: vec![zeroclaw_config::schema::McpServerConfig {
2044 name: "slow".to_string(),
2045 transport: zeroclaw_config::schema::McpTransport::Stdio,
2046 command: "/bin/sh".to_string(),
2047 args: vec!["-c".to_string(), "sleep 60".to_string()],
2048 ..Default::default()
2049 }],
2050 ..Default::default()
2051 },
2052 ..Default::default()
2053 };
2054 config.risk_profiles.insert(
2055 "default".to_string(),
2056 zeroclaw_config::schema::RiskProfileConfig::default(),
2057 );
2058 config.agents.insert(
2059 "test-agent".to_string(),
2060 zeroclaw_config::schema::AliasedAgentConfig {
2061 model_provider: "openrouter.default".into(),
2062 risk_profile: "default".to_string(),
2063 ..Default::default()
2064 },
2065 );
2066 let server = AcpServer::new(config, AcpServerConfig::default());
2067
2068 let result = tokio::time::timeout(
2069 Duration::from_secs(2),
2070 server.handle_session_new(&serde_json::json!({
2071 "cwd": cwd.path().to_string_lossy(),
2072 "agentAlias": "test-agent",
2073 "mcpServers": []
2074 })),
2075 )
2076 .await
2077 .expect("session/new should not block on configured MCP startup")
2078 .expect("session/new should create a session");
2079
2080 assert!(result["sessionId"].as_str().is_some());
2081 }
2082
2083 #[tokio::test]
2084 async fn session_new_auto_selects_sole_configured_agent_when_alias_omitted() {
2085 let cwd = tempfile::tempdir().unwrap();
2086 let mut config = Config {
2087 data_dir: cwd.path().to_path_buf(),
2088 providers: {
2089 let mut p = zeroclaw_config::providers::Providers::default();
2090 p.models.openrouter.insert(
2091 "default".to_string(),
2092 zeroclaw_config::schema::OpenRouterModelProviderConfig {
2093 base: zeroclaw_config::schema::ModelProviderConfig {
2094 api_key: Some("test-key".to_string()),
2095 model: Some("test-model".to_string()),
2096 ..Default::default()
2097 },
2098 },
2099 );
2100 p
2101 },
2102 ..Default::default()
2103 };
2104 config.risk_profiles.insert(
2105 "default".to_string(),
2106 zeroclaw_config::schema::RiskProfileConfig::default(),
2107 );
2108 config.agents.insert(
2109 "only-agent".to_string(),
2110 zeroclaw_config::schema::AliasedAgentConfig {
2111 model_provider: "openrouter.default".into(),
2112 risk_profile: "default".to_string(),
2113 ..Default::default()
2114 },
2115 );
2116 let server = AcpServer::new(config, AcpServerConfig::default());
2117
2118 let result = tokio::time::timeout(
2119 Duration::from_secs(2),
2120 server.handle_session_new(&serde_json::json!({
2121 "cwd": cwd.path().to_string_lossy(),
2122 "mcpServers": []
2123 })),
2124 )
2125 .await
2126 .expect("session/new should not block")
2127 .expect("session/new should auto-select the sole configured agent");
2128
2129 assert!(result["sessionId"].as_str().is_some());
2130 }
2131
2132 #[tokio::test]
2133 async fn session_new_requires_alias_when_multiple_agents_configured() {
2134 let mut config = Config::default();
2135 config.agents.insert(
2136 "agent-one".to_string(),
2137 zeroclaw_config::schema::AliasedAgentConfig::default(),
2138 );
2139 config.agents.insert(
2140 "agent-two".to_string(),
2141 zeroclaw_config::schema::AliasedAgentConfig::default(),
2142 );
2143 let server = AcpServer::new(config, AcpServerConfig::default());
2144
2145 let err = server
2146 .handle_session_new(&serde_json::json!({"mcpServers": []}))
2147 .await
2148 .expect_err("session/new without agentAlias should fail when multiple agents exist");
2149
2150 assert_eq!(err.code, INVALID_PARAMS);
2151 assert!(
2152 err.message.contains("agentAlias"),
2153 "error should mention agentAlias, got: {}",
2154 err.message
2155 );
2156 }
2157
2158 #[tokio::test]
2159 async fn session_new_uses_config_default_agent_when_alias_omitted_and_multiple_agents() {
2160 let cwd = tempfile::tempdir().unwrap();
2161 let mut config = Config {
2162 data_dir: cwd.path().to_path_buf(),
2163 providers: {
2164 let mut p = zeroclaw_config::providers::Providers::default();
2165 p.models.openrouter.insert(
2166 "default".to_string(),
2167 zeroclaw_config::schema::OpenRouterModelProviderConfig {
2168 base: zeroclaw_config::schema::ModelProviderConfig {
2169 api_key: Some("test-key".to_string()),
2170 model: Some("test-model".to_string()),
2171 ..Default::default()
2172 },
2173 },
2174 );
2175 p
2176 },
2177 ..Default::default()
2178 };
2179 config.risk_profiles.insert(
2180 "default".to_string(),
2181 zeroclaw_config::schema::RiskProfileConfig::default(),
2182 );
2183 config.agents.insert(
2184 "agent-alpha".to_string(),
2185 zeroclaw_config::schema::AliasedAgentConfig {
2186 model_provider: "openrouter.default".into(),
2187 risk_profile: "default".to_string(),
2188 ..Default::default()
2189 },
2190 );
2191 config.agents.insert(
2192 "agent-beta".to_string(),
2193 zeroclaw_config::schema::AliasedAgentConfig {
2194 model_provider: "openrouter.default".into(),
2195 risk_profile: "default".to_string(),
2196 ..Default::default()
2197 },
2198 );
2199 config.acp.default_agent = Some("agent-alpha".to_string());
2200 let server = AcpServer::new(config, AcpServerConfig::default());
2201
2202 let result = tokio::time::timeout(
2203 Duration::from_secs(2),
2204 server.handle_session_new(&serde_json::json!({
2205 "cwd": cwd.path().to_string_lossy(),
2206 "mcpServers": []
2207 })),
2208 )
2209 .await
2210 .expect("should not block")
2211 .expect("should select agent-alpha from config.acp.default_agent");
2212
2213 assert!(result["sessionId"].as_str().is_some());
2214 }
2215
2216 #[tokio::test]
2217 async fn session_new_explicit_alias_overrides_config_default_agent() {
2218 let cwd = tempfile::tempdir().unwrap();
2219 let mut config = Config {
2220 data_dir: cwd.path().to_path_buf(),
2221 providers: {
2222 let mut p = zeroclaw_config::providers::Providers::default();
2223 p.models.openrouter.insert(
2224 "default".to_string(),
2225 zeroclaw_config::schema::OpenRouterModelProviderConfig {
2226 base: zeroclaw_config::schema::ModelProviderConfig {
2227 api_key: Some("test-key".to_string()),
2228 model: Some("test-model".to_string()),
2229 ..Default::default()
2230 },
2231 },
2232 );
2233 p
2234 },
2235 ..Default::default()
2236 };
2237 config.risk_profiles.insert(
2238 "default".to_string(),
2239 zeroclaw_config::schema::RiskProfileConfig::default(),
2240 );
2241 config.agents.insert(
2242 "agent-alpha".to_string(),
2243 zeroclaw_config::schema::AliasedAgentConfig {
2244 model_provider: "openrouter.default".into(),
2245 risk_profile: "default".to_string(),
2246 ..Default::default()
2247 },
2248 );
2249 config.agents.insert(
2250 "agent-beta".to_string(),
2251 zeroclaw_config::schema::AliasedAgentConfig {
2252 model_provider: "openrouter.default".into(),
2253 risk_profile: "default".to_string(),
2254 ..Default::default()
2255 },
2256 );
2257 config.acp.default_agent = Some("agent-alpha".to_string());
2258 let server = AcpServer::new(config, AcpServerConfig::default());
2259
2260 let result = tokio::time::timeout(
2262 Duration::from_secs(2),
2263 server.handle_session_new(&serde_json::json!({
2264 "agentAlias": "agent-beta",
2265 "cwd": cwd.path().to_string_lossy(),
2266 "mcpServers": []
2267 })),
2268 )
2269 .await
2270 .expect("should not block")
2271 .expect("should use agent-beta despite default_agent = agent-alpha");
2272
2273 assert!(result["sessionId"].as_str().is_some());
2274 }
2275
2276 #[test]
2277 fn json_rpc_error_response_serialize() {
2278 let resp = JsonRpcResponse {
2279 jsonrpc: "2.0",
2280 result: None,
2281 error: Some(JsonRpcError {
2282 code: METHOD_NOT_FOUND,
2283 message: "Method not found".to_string(),
2284 data: None,
2285 }),
2286 id: Value::Number(1.into()),
2287 };
2288 let json = serde_json::to_string(&resp).unwrap();
2289 let parsed: Value = serde_json::from_str(&json).unwrap();
2290 assert!(parsed.get("error").is_some());
2291 assert_eq!(parsed["error"]["code"], -32601);
2292 assert!(parsed.get("result").is_none());
2293 }
2294
2295 #[test]
2296 fn json_rpc_notification_serialize() {
2297 let notif = JsonRpcNotification {
2298 jsonrpc: "2.0",
2299 method: "session/update",
2300 params: serde_json::json!({
2301 "sessionId": "test-sid",
2302 "update": {
2303 "sessionUpdate": "agent_message_chunk",
2304 "content": { "type": "text", "text": "hello" }
2305 }
2306 }),
2307 };
2308 let json = serde_json::to_string(¬if).unwrap();
2309 assert!(json.contains(r#""method":"session/update""#));
2310 assert!(json.contains(r#""sessionUpdate":"agent_message_chunk""#));
2311 assert!(json.contains(r#""text":"hello""#));
2312 }
2313
2314 #[test]
2315 fn test_prompt_parsing() {
2316 let string_params = serde_json::json!({"prompt": "hello world"});
2318 let result = AcpServer::parse_prompt(&string_params).unwrap();
2319 assert_eq!(result, "hello world");
2320
2321 let array_params = serde_json::json!({
2323 "prompt": [
2324 {"type": "text", "text": "part 1"},
2325 {"type": "text", "text": "part 2"}
2326 ]
2327 });
2328 let result = AcpServer::parse_prompt(&array_params).unwrap();
2329 assert_eq!(result, "part 1\n\npart 2");
2330
2331 let empty_array_params = serde_json::json!({"prompt": []});
2333 let result = AcpServer::parse_prompt(&empty_array_params);
2334 assert!(result.is_err());
2335 assert_eq!(result.unwrap_err().code, INVALID_PARAMS);
2336
2337 let no_text_params = serde_json::json!({
2338 "prompt": [
2339 {"type": "image", "data": "..."}
2340 ]
2341 });
2342 let result = AcpServer::parse_prompt(&no_text_params);
2343 assert!(result.is_err());
2344
2345 let resource_params = serde_json::json!({
2347 "prompt": [
2348 {"type": "text", "text": "analyze this file:"},
2349 {"type": "resource", "resource": {"uri": "file:///tmp/example.rs", "text": "fn main() { println!(\"hi\"); }", "mimeType": "text/rust"}}
2350 ]
2351 });
2352 let result = AcpServer::parse_prompt(&resource_params).unwrap();
2353 assert!(result.contains("analyze this file:"));
2354 assert!(result.contains("fn main() { println!(\"hi\"); }"));
2355 }
2356
2357 #[test]
2358 fn handle_initialize_default_model_absent_when_unconfigured() {
2359 let server = AcpServer::new(Config::default(), AcpServerConfig::default());
2360 let result = server.handle_initialize(&serde_json::json!({})).unwrap();
2361 assert!(
2362 result["_meta"]["zeroclaw"].get("defaultModel").is_none(),
2363 "defaultModel must be absent when no model_provider is configured, got: {}",
2364 result["_meta"]["zeroclaw"]["defaultModel"]
2365 );
2366 }
2367
2368 #[test]
2369 fn handle_initialize_default_model_reflects_configured_provider() {
2370 use zeroclaw_config::schema::{ModelProviderConfig, OllamaModelProviderConfig};
2371 let mut config = Config::default();
2372 config.providers.models.ollama.insert(
2373 "default".to_string(),
2374 OllamaModelProviderConfig {
2375 base: ModelProviderConfig {
2376 model: Some("llama3.2".to_string()),
2377 ..Default::default()
2378 },
2379 ..OllamaModelProviderConfig::default()
2380 },
2381 );
2382 let server = AcpServer::new(config, AcpServerConfig::default());
2383 let result = server.handle_initialize(&serde_json::json!({})).unwrap();
2384 assert_eq!(result["_meta"]["zeroclaw"]["defaultModel"], "llama3.2");
2385 }
2386
2387 #[test]
2388 fn prompt_result_preserves_content_string_shape() {
2389 let result = AcpServer::prompt_result("test-sid".to_string(), "end_turn", "hello".into());
2390 assert_eq!(result["sessionId"], "test-sid");
2391 assert_eq!(result["stopReason"], "end_turn");
2392 assert_eq!(result["content"], "hello");
2393 }
2394
2395 #[test]
2396 fn cancelled_prompt_result_preserves_content_string_shape() {
2397 let with_partial =
2398 AcpServer::cancelled_prompt_result("test-sid".to_string(), "partial text");
2399 assert_eq!(with_partial["sessionId"], "test-sid");
2400 assert_eq!(with_partial["stopReason"], "cancelled");
2401 assert_eq!(
2402 with_partial["content"],
2403 "partial text\n\n[interrupted by user]"
2404 );
2405
2406 let marker_only = AcpServer::cancelled_prompt_result("test-sid".to_string(), "");
2407 assert_eq!(marker_only["content"], "[interrupted by user]");
2408 }
2409
2410 #[test]
2411 fn test_tool_call_and_update_serialization() {
2412 let tool_call_notif = JsonRpcNotification {
2414 jsonrpc: "2.0",
2415 method: "session/update",
2416 params: serde_json::json!({
2417 "sessionId": "test-sid",
2418 "update": {
2419 "sessionUpdate": "tool_call",
2420 "toolCallId": "tc-12345",
2421 "name": "shell",
2422 "title": "shell",
2423 "kind": "execute",
2424 "rawInput": {"command": "ls -la"},
2425 "status": "pending"
2426 }
2427 }),
2428 };
2429 let json1 = serde_json::to_string(&tool_call_notif).unwrap();
2430 assert!(json1.contains("\"sessionUpdate\":\"tool_call\""));
2431 assert!(json1.contains("\"toolCallId\":\"tc-12345\""));
2432 assert!(json1.contains("\"name\":\"shell\""));
2433 assert!(json1.contains("\"title\":\"shell\""));
2434 assert!(json1.contains("\"kind\":\"execute\""));
2435 assert!(json1.contains("\"status\":\"pending\""));
2436 assert!(json1.contains("\"rawInput\""));
2437
2438 let tool_update_notif = JsonRpcNotification {
2440 jsonrpc: "2.0",
2441 method: "session/update",
2442 params: serde_json::json!({
2443 "sessionId": "test-sid",
2444 "update": {
2445 "sessionUpdate": "tool_call_update",
2446 "toolCallId": "tc-12345",
2447 "name": "shell",
2448 "title": "shell",
2449 "kind": "execute",
2450 "status": "completed",
2451 "rawOutput": "file1.txt\nfile2.txt",
2452 "body": "file1.txt\nfile2.txt",
2453 "content": [{
2454 "type": "content",
2455 "content": {
2456 "type": "text",
2457 "text": "file1.txt\nfile2.txt"
2458 }
2459 }]
2460 }
2461 }),
2462 };
2463 let json2 = serde_json::to_string(&tool_update_notif).unwrap();
2464 assert!(json2.contains("\"sessionUpdate\":\"tool_call_update\""));
2465 assert!(json2.contains("\"toolCallId\":\"tc-12345\""));
2466 assert!(json2.contains("\"name\":\"shell\""));
2467 assert!(json2.contains("\"status\":\"completed\""));
2468 assert!(json2.contains("\"rawOutput\""));
2469 assert!(json2.contains("\"body\""));
2470 assert!(json2.contains("\"content\""));
2471 assert!(json2.contains("\"type\":\"content\""));
2472 assert!(json2.contains("file1.txt"));
2473 assert!(json1.contains("tc-12345") && json2.contains("tc-12345"));
2475 }
2476
2477 #[test]
2478 fn file_edit_raw_input_uses_acp_diff_field_names() {
2479 let call = notification_for_turn_event(
2480 "sid",
2481 &TurnEvent::ToolCall {
2482 id: "tc-1".to_string(),
2483 name: "file_edit".to_string(),
2484 args: serde_json::json!({
2485 "path": "src/foo.rs",
2486 "old_string": "let x = 1;",
2487 "new_string": "let x = 2;"
2488 }),
2489 },
2490 );
2491 let v = serde_json::to_value(call.unwrap()).unwrap();
2492 let raw = &v["params"]["update"]["rawInput"];
2493 assert_eq!(raw["path"], "src/foo.rs");
2494 assert_eq!(raw["oldText"], "let x = 1;");
2495 assert_eq!(raw["newText"], "let x = 2;");
2496 assert!(
2497 raw.get("old_string").is_none(),
2498 "old_string must not appear in rawInput"
2499 );
2500 assert!(
2501 raw.get("new_string").is_none(),
2502 "new_string must not appear in rawInput"
2503 );
2504
2505 let content = &v["params"]["update"]["content"];
2506 assert!(content.is_array(), "file_edit must emit a content array");
2507 let diff = &content[0];
2508 assert_eq!(diff["type"], "diff");
2509 assert_eq!(diff["path"], "src/foo.rs");
2510 assert_eq!(diff["oldText"], "let x = 1;");
2511 assert_eq!(diff["newText"], "let x = 2;");
2512 }
2513
2514 #[test]
2515 fn file_write_raw_input_uses_acp_diff_field_names() {
2516 let call = notification_for_turn_event(
2517 "sid",
2518 &TurnEvent::ToolCall {
2519 id: "tc-2".to_string(),
2520 name: "file_write".to_string(),
2521 args: serde_json::json!({
2522 "path": "src/new.rs",
2523 "content": "fn main() {}"
2524 }),
2525 },
2526 );
2527 let v = serde_json::to_value(call.unwrap()).unwrap();
2528 let raw = &v["params"]["update"]["rawInput"];
2529 assert_eq!(raw["path"], "src/new.rs");
2530 assert_eq!(raw["newText"], "fn main() {}");
2531 assert!(
2532 raw.get("oldText").is_none(),
2533 "oldText must not appear in file_write rawInput"
2534 );
2535 assert!(
2536 raw.get("content").is_none(),
2537 "content must not appear in rawInput"
2538 );
2539
2540 let content = &v["params"]["update"]["content"];
2541 assert!(content.is_array(), "file_write must emit a content array");
2542 let diff = &content[0];
2543 assert_eq!(diff["type"], "diff");
2544 assert_eq!(diff["path"], "src/new.rs");
2545 assert_eq!(diff["newText"], "fn main() {}");
2546 assert!(
2547 diff.get("oldText").is_none(),
2548 "oldText must be absent for file_write diff"
2549 );
2550 }
2551
2552 #[test]
2553 fn map_tool_kind_uses_explicit_tool_names() {
2554 assert_eq!(map_tool_kind("memory_forget"), "delete");
2555 assert_eq!(map_tool_kind("memory_purge"), "delete");
2556 assert_eq!(map_tool_kind("cron_run"), "execute");
2557 assert_eq!(map_tool_kind("file_read"), "other");
2558 assert_eq!(map_tool_kind("knowledge"), "other");
2559 assert_eq!(map_tool_kind("web_fetch"), "other");
2560 assert_eq!(map_tool_kind("file_write"), "edit");
2561 assert_eq!(map_tool_kind("unknown_tool"), "other");
2562 }
2563
2564 #[test]
2565 fn turn_tool_events_include_client_visible_tool_fields() {
2566 let call = notification_for_turn_event(
2567 "test-sid",
2568 &TurnEvent::ToolCall {
2569 id: "tc-12345".to_string(),
2570 name: "shell".to_string(),
2571 args: serde_json::json!({"command": "ls -la"}),
2572 },
2573 );
2574 let call_value =
2575 serde_json::to_value(call.expect("ToolCall maps to a notification")).unwrap();
2576 assert_eq!(call_value["method"], "session/update");
2577 assert_eq!(call_value["params"]["update"]["sessionUpdate"], "tool_call");
2578 assert_eq!(call_value["params"]["update"]["toolCallId"], "tc-12345");
2579 assert_eq!(call_value["params"]["update"]["name"], "shell");
2580 assert_eq!(call_value["params"]["update"]["title"], "shell");
2581 assert_eq!(call_value["params"]["update"]["kind"], "execute");
2582 assert_eq!(
2583 call_value["params"]["update"]["rawInput"],
2584 serde_json::json!({"command": "ls -la"})
2585 );
2586
2587 let result = notification_for_turn_event(
2588 "test-sid",
2589 &TurnEvent::ToolResult {
2590 id: "tc-12345".to_string(),
2591 name: "shell".to_string(),
2592 output: "file1.txt\nfile2.txt".to_string(),
2593 },
2594 );
2595 let result_value =
2596 serde_json::to_value(result.expect("ToolResult maps to a notification")).unwrap();
2597 assert_eq!(
2598 result_value["params"]["update"]["sessionUpdate"],
2599 "tool_call_update"
2600 );
2601 assert_eq!(result_value["params"]["update"]["toolCallId"], "tc-12345");
2602 assert_eq!(result_value["params"]["update"]["name"], "shell");
2603 assert_eq!(result_value["params"]["update"]["title"], "shell");
2604 assert_eq!(result_value["params"]["update"]["kind"], "execute");
2605 assert_eq!(result_value["params"]["update"]["status"], "completed");
2606 assert_eq!(
2607 result_value["params"]["update"]["rawOutput"],
2608 "file1.txt\nfile2.txt"
2609 );
2610 assert_eq!(
2611 result_value["params"]["update"]["body"],
2612 "file1.txt\nfile2.txt"
2613 );
2614 assert_eq!(
2615 result_value["params"]["update"]["content"][0]["content"]["text"],
2616 "file1.txt\nfile2.txt"
2617 );
2618 }
2619
2620 #[tokio::test]
2628 async fn session_stop_finds_session_during_active_prompt_turn() {
2629 let cwd = tempfile::tempdir().unwrap();
2630 let mut config = Config {
2631 data_dir: cwd.path().to_path_buf(),
2632 providers: {
2633 let mut p = zeroclaw_config::providers::Providers::default();
2634 p.models.anthropic.insert(
2635 "default".to_string(),
2636 zeroclaw_config::schema::AnthropicModelProviderConfig {
2637 base: zeroclaw_config::schema::ModelProviderConfig {
2638 model: Some("claude-haiku-4-5".to_string()),
2639 ..Default::default()
2640 },
2641 },
2642 );
2643 p
2644 },
2645 ..Default::default()
2646 };
2647 config.risk_profiles.insert(
2648 "default".to_string(),
2649 zeroclaw_config::schema::RiskProfileConfig::default(),
2650 );
2651 config.agents.insert(
2652 "test-agent".to_string(),
2653 zeroclaw_config::schema::AliasedAgentConfig {
2654 model_provider: "anthropic.default".into(),
2655 risk_profile: "default".to_string(),
2656 ..Default::default()
2657 },
2658 );
2659 let server = Arc::new(AcpServer::new(config, AcpServerConfig::default()));
2660
2661 let new_result = server
2663 .handle_session_new(&serde_json::json!({
2664 "cwd": cwd.path().to_string_lossy(),
2665 "agentAlias": "test-agent"
2666 }))
2667 .await
2668 .expect("session/new must succeed");
2669 let session_id = new_result["sessionId"].as_str().unwrap().to_string();
2670
2671 let session_arc = {
2673 let sessions = server.sessions.lock().await;
2674 sessions.get(&session_id).cloned().unwrap()
2675 };
2676 let _guard = session_arc.lock().await;
2677
2678 let server_clone = Arc::clone(&server);
2682 let sid_clone = session_id.clone();
2683 let stop_result = tokio::time::timeout(Duration::from_millis(100), async move {
2684 server_clone
2685 .handle_session_stop(&serde_json::json!({ "sessionId": sid_clone }))
2686 .await
2687 })
2688 .await;
2689
2690 match stop_result {
2691 Err(_timeout) => {} Ok(Ok(_)) => panic!("stop returned Ok without the lock being released"),
2693 Ok(Err(e)) => {
2694 assert_ne!(
2695 e.code, SESSION_NOT_FOUND,
2696 "session/stop must not return SESSION_NOT_FOUND while a turn is in flight"
2697 );
2698 }
2699 }
2700 }
2701
2702 #[tokio::test]
2703 async fn session_new_persists_to_store() {
2704 let cwd = tempfile::tempdir().unwrap();
2705 let store =
2706 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
2707 let server = Arc::new(AcpServer::new_with_store(
2708 make_test_config(cwd.path()),
2709 AcpServerConfig::default(),
2710 Arc::clone(&store),
2711 ));
2712
2713 let result = server
2714 .handle_session_new(&serde_json::json!({
2715 "cwd": cwd.path().to_string_lossy()
2716 }))
2717 .await
2718 .expect("session/new must succeed");
2719
2720 let session_id = result["sessionId"].as_str().unwrap();
2721
2722 let data = store.load_session(session_id).unwrap();
2724 assert!(
2725 data.is_some(),
2726 "session/new must persist to AcpSessionStore"
2727 );
2728 }
2729
2730 #[tokio::test]
2731 async fn session_new_without_store_still_works() {
2732 let cwd = tempfile::tempdir().unwrap();
2733 let server = Arc::new(AcpServer::new(
2734 make_test_config(cwd.path()),
2735 AcpServerConfig::default(),
2736 ));
2737
2738 let result = server
2739 .handle_session_new(&serde_json::json!({
2740 "cwd": cwd.path().to_string_lossy()
2741 }))
2742 .await
2743 .expect("session/new must succeed without a store");
2744
2745 let session_id = result["sessionId"].as_str().unwrap();
2746 assert!(server.sessions.lock().await.contains_key(session_id));
2747 }
2748
2749 fn make_test_config(cwd: &std::path::Path) -> Config {
2750 let mut cfg = Config {
2751 data_dir: cwd.to_path_buf(),
2752 ..Default::default()
2753 };
2754 cfg.providers.models.anthropic.insert(
2755 "default".to_string(),
2756 zeroclaw_config::schema::AnthropicModelProviderConfig {
2757 base: zeroclaw_config::schema::ModelProviderConfig {
2758 model: Some("claude-haiku-4-5".to_string()),
2759 ..Default::default()
2760 },
2761 },
2762 );
2763 cfg.risk_profiles.insert(
2764 "default".to_string(),
2765 zeroclaw_config::schema::RiskProfileConfig::default(),
2766 );
2767 cfg.agents.insert(
2768 "test-agent".to_string(),
2769 zeroclaw_config::schema::AliasedAgentConfig {
2770 model_provider: "anthropic.default".into(),
2771 risk_profile: "default".to_string(),
2772 ..Default::default()
2773 },
2774 );
2775 cfg
2776 }
2777
2778 #[tokio::test]
2780 async fn session_cancel_idle_session_is_noop() {
2781 let cwd = tempfile::tempdir().unwrap();
2782 let server = Arc::new(AcpServer::new(
2783 make_test_config(cwd.path()),
2784 AcpServerConfig::default(),
2785 ));
2786
2787 let new_result = server
2788 .handle_session_new(&serde_json::json!({
2789 "cwd": cwd.path().to_string_lossy(),
2790 "agentAlias": "test-agent"
2791 }))
2792 .await
2793 .expect("session/new must succeed");
2794 let session_id = new_result["sessionId"].as_str().unwrap().to_string();
2795
2796 let result = server
2798 .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
2799 .await;
2800 assert!(result.is_ok(), "idle cancel must succeed: {result:?}");
2801 }
2802
2803 #[tokio::test]
2806 async fn session_cancel_unknown_session_is_noop() {
2807 let cwd = tempfile::tempdir().unwrap();
2808 let server = Arc::new(AcpServer::new(
2809 make_test_config(cwd.path()),
2810 AcpServerConfig::default(),
2811 ));
2812
2813 let result = server
2814 .handle_session_cancel(&serde_json::json!({ "sessionId": "sess_does_not_exist" }))
2815 .await;
2816 assert!(
2817 result.is_ok(),
2818 "unknown-session cancel must succeed: {result:?}"
2819 );
2820 }
2821
2822 #[tokio::test]
2823 async fn session_cancel_accepts_snake_case_session_id() {
2824 let cwd = tempfile::tempdir().unwrap();
2825 let server = Arc::new(AcpServer::new(
2826 make_test_config(cwd.path()),
2827 AcpServerConfig::default(),
2828 ));
2829
2830 let session_id = "sess_snake_case_cancel";
2831 let active_token = tokio_util::sync::CancellationToken::new();
2832 server
2833 .register_cancel_token(session_id, active_token.clone())
2834 .expect("active turn should register token");
2835
2836 server
2837 .handle_session_cancel(&serde_json::json!({ "session_id": session_id }))
2838 .await
2839 .expect("snake_case session_id should cancel the active turn");
2840
2841 assert!(active_token.is_cancelled());
2842 }
2843
2844 #[tokio::test]
2847 async fn register_cancel_token_rejects_concurrent_prompt_for_session() {
2848 let cwd = tempfile::tempdir().unwrap();
2849 let server = Arc::new(AcpServer::new(
2850 make_test_config(cwd.path()),
2851 AcpServerConfig::default(),
2852 ));
2853
2854 let session_id = "sess_active_turn";
2855 let active_token = tokio_util::sync::CancellationToken::new();
2856 let queued_token = tokio_util::sync::CancellationToken::new();
2857
2858 server
2859 .register_cancel_token(session_id, active_token.clone())
2860 .expect("first prompt should register its token");
2861 let err = server
2862 .register_cancel_token(session_id, queued_token.clone())
2863 .expect_err("second prompt must not overwrite active token");
2864
2865 assert_eq!(err.code, SESSION_BUSY);
2866 assert!(
2867 err.message.contains("active prompt turn"),
2868 "error should explain why prompt was rejected: {}",
2869 err.message
2870 );
2871
2872 server
2873 .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
2874 .await
2875 .expect("cancel should still target active token");
2876
2877 assert!(active_token.is_cancelled());
2878 assert!(
2879 !queued_token.is_cancelled(),
2880 "rejected prompt's token must not become the active cancel target"
2881 );
2882 }
2883
2884 #[tokio::test]
2885 async fn session_prompt_rejects_concurrent_turn_before_agent_starts() {
2886 let cwd = tempfile::tempdir().unwrap();
2887 let server = Arc::new(AcpServer::new(
2888 make_test_config(cwd.path()),
2889 AcpServerConfig::default(),
2890 ));
2891
2892 let new_result = server
2893 .handle_session_new(&serde_json::json!({
2894 "cwd": cwd.path().to_string_lossy(),
2895 "agentAlias": "test-agent"
2896 }))
2897 .await
2898 .expect("session/new must succeed");
2899 let session_id = new_result["sessionId"].as_str().unwrap().to_string();
2900 let active_token = tokio_util::sync::CancellationToken::new();
2901 server
2902 .register_cancel_token(&session_id, active_token.clone())
2903 .expect("simulated active turn should register token");
2904
2905 let err = server
2906 .handle_session_prompt(
2907 &serde_json::json!({
2908 "sessionId": session_id.clone(),
2909 "prompt": "queued prompt"
2910 }),
2911 &serde_json::json!(2),
2912 )
2913 .await
2914 .expect_err("concurrent prompt must be rejected before model_provider work starts");
2915
2916 assert_eq!(err.code, SESSION_BUSY);
2917 server
2918 .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
2919 .await
2920 .expect("cancel should still target the original active token");
2921 assert!(active_token.is_cancelled());
2922 }
2923
2924 #[tokio::test]
2929 async fn cancel_tokens_map_remove_works() {
2930 let cwd = tempfile::tempdir().unwrap();
2931 let config = Config {
2932 data_dir: cwd.path().to_path_buf(),
2933 ..Default::default()
2934 };
2935 let server = Arc::new(AcpServer::new(config, AcpServerConfig::default()));
2936
2937 let session_id = "sess_token_leak_test".to_string();
2939 let token = tokio_util::sync::CancellationToken::new();
2940 server
2941 .cancel_tokens
2942 .lock()
2943 .expect("cancel_tokens lock poisoned")
2944 .insert(session_id.clone(), token);
2945
2946 server
2948 .cancel_tokens
2949 .lock()
2950 .expect("cancel_tokens lock poisoned")
2951 .remove(&session_id);
2952
2953 let remaining = server
2954 .cancel_tokens
2955 .lock()
2956 .expect("cancel_tokens lock poisoned")
2957 .len();
2958 assert_eq!(remaining, 0, "cancel token must be removed after turn ends");
2959 }
2960
2961 #[tokio::test]
2962 async fn session_load_restores_history_and_streams_notifications() {
2963 use zeroclaw_api::model_provider::{ChatMessage, ConversationMessage};
2964 let cwd = tempfile::tempdir().unwrap();
2965 let store =
2966 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
2967
2968 let session_id = "sess-load-test";
2969 store
2970 .create_session(session_id, &cwd.path().to_string_lossy())
2971 .unwrap();
2972 store
2973 .append_turn(
2974 session_id,
2975 &[
2976 ConversationMessage::Chat(ChatMessage::user("hello")),
2977 ConversationMessage::Chat(ChatMessage::assistant("hi there")),
2978 ],
2979 )
2980 .unwrap();
2981
2982 let (writer_tx, mut writer_rx) = tokio::sync::mpsc::channel::<String>(64);
2983 let server = Arc::new(AcpServer::new_with_writer_and_store(
2984 make_test_config(cwd.path()),
2985 AcpServerConfig::default(),
2986 writer_tx,
2987 Arc::clone(&store),
2988 ));
2989
2990 let result = server
2991 .handle_session_load(&serde_json::json!({
2992 "sessionId": session_id,
2993 "cwd": cwd.path().to_string_lossy()
2994 }))
2995 .await
2996 .expect("session/load must succeed");
2997
2998 assert_eq!(result, serde_json::json!({}));
2999
3000 assert!(server.sessions.lock().await.contains_key(session_id));
3002
3003 let mut notifications = Vec::new();
3005 while let Ok(msg) = writer_rx.try_recv() {
3006 notifications.push(msg);
3007 }
3008
3009 assert_eq!(
3011 notifications.len(),
3012 2,
3013 "expected 2 notifications, got: {notifications:?}"
3014 );
3015 let n0: serde_json::Value = serde_json::from_str(¬ifications[0]).unwrap();
3016 assert_eq!(
3017 n0["params"]["update"]["sessionUpdate"],
3018 "user_message_chunk"
3019 );
3020 assert_eq!(n0["params"]["update"]["content"]["text"], "hello");
3021 let n1: serde_json::Value = serde_json::from_str(¬ifications[1]).unwrap();
3022 assert_eq!(
3023 n1["params"]["update"]["sessionUpdate"],
3024 "agent_message_chunk"
3025 );
3026 assert_eq!(n1["params"]["update"]["content"]["text"], "hi there");
3027 }
3028
3029 #[tokio::test]
3030 async fn session_load_returns_not_found_for_unknown_id() {
3031 let cwd = tempfile::tempdir().unwrap();
3032 let store =
3033 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3034 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3035 let server = AcpServer::new_with_writer_and_store(
3036 make_test_config(cwd.path()),
3037 AcpServerConfig::default(),
3038 writer_tx,
3039 store,
3040 );
3041
3042 let err = server
3043 .handle_session_load(&serde_json::json!({ "sessionId": "ghost" }))
3044 .await
3045 .expect_err("unknown session must fail");
3046
3047 assert_eq!(err.code, SESSION_NOT_FOUND);
3048 }
3049
3050 #[tokio::test]
3051 async fn session_load_rejects_already_active_session() {
3052 let cwd = tempfile::tempdir().unwrap();
3053 let store =
3054 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3055 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3056 let server = Arc::new(AcpServer::new_with_writer_and_store(
3057 make_test_config(cwd.path()),
3058 AcpServerConfig::default(),
3059 writer_tx,
3060 Arc::clone(&store),
3061 ));
3062
3063 let session_id = "sess-already-active";
3065 store
3066 .create_session(session_id, &cwd.path().to_string_lossy())
3067 .unwrap();
3068 server
3069 .handle_session_load(&serde_json::json!({
3070 "sessionId": session_id,
3071 "cwd": cwd.path().to_string_lossy()
3072 }))
3073 .await
3074 .unwrap();
3075
3076 let err = server
3078 .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3079 .await
3080 .expect_err("session/load for active session must fail");
3081
3082 assert_eq!(err.code, INVALID_PARAMS);
3083 }
3084
3085 #[tokio::test]
3086 async fn session_resume_restores_without_replay() {
3087 use zeroclaw_api::model_provider::{ChatMessage, ConversationMessage};
3088 let cwd = tempfile::tempdir().unwrap();
3089 let store =
3090 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3091
3092 let session_id = "sess-resume-test";
3093 store
3094 .create_session(session_id, &cwd.path().to_string_lossy())
3095 .unwrap();
3096 store
3097 .append_turn(
3098 session_id,
3099 &[ConversationMessage::Chat(ChatMessage::user("hello"))],
3100 )
3101 .unwrap();
3102
3103 let (writer_tx, mut writer_rx) = tokio::sync::mpsc::channel::<String>(64);
3104 let server = Arc::new(AcpServer::new_with_writer_and_store(
3105 make_test_config(cwd.path()),
3106 AcpServerConfig::default(),
3107 writer_tx,
3108 Arc::clone(&store),
3109 ));
3110
3111 let result = server
3112 .handle_session_resume(&serde_json::json!({
3113 "sessionId": session_id,
3114 "cwd": cwd.path().to_string_lossy()
3115 }))
3116 .await
3117 .expect("session/resume must succeed");
3118
3119 assert_eq!(result, serde_json::json!({}));
3121
3122 assert!(server.sessions.lock().await.contains_key(session_id));
3124
3125 assert!(
3127 writer_rx.try_recv().is_err(),
3128 "session/resume must not emit session/update notifications"
3129 );
3130 }
3131
3132 #[tokio::test]
3133 async fn session_close_releases_memory_but_keeps_store_record() {
3134 let cwd = tempfile::tempdir().unwrap();
3135 let store =
3136 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3137 let server = Arc::new(AcpServer::new_with_store(
3138 make_test_config(cwd.path()),
3139 AcpServerConfig::default(),
3140 Arc::clone(&store),
3141 ));
3142
3143 let new_result = server
3144 .handle_session_new(&serde_json::json!({
3145 "cwd": cwd.path().to_string_lossy()
3146 }))
3147 .await
3148 .expect("session/new must succeed");
3149 let session_id = new_result["sessionId"].as_str().unwrap().to_string();
3150
3151 assert!(server.sessions.lock().await.contains_key(&session_id));
3152
3153 let result = server
3154 .handle_session_close(&serde_json::json!({ "sessionId": &session_id }))
3155 .await
3156 .expect("session/close must succeed");
3157
3158 assert_eq!(result, serde_json::json!({}));
3159
3160 assert!(!server.sessions.lock().await.contains_key(&session_id));
3162
3163 let data = store.load_session(&session_id).unwrap();
3165 assert!(
3166 data.is_some(),
3167 "session/close must not delete the DB record"
3168 );
3169 }
3170
3171 #[tokio::test]
3172 async fn session_close_returns_not_found_for_unknown_session() {
3173 let cwd = tempfile::tempdir().unwrap();
3174 let server = AcpServer::new(make_test_config(cwd.path()), AcpServerConfig::default());
3175
3176 let err = server
3177 .handle_session_close(&serde_json::json!({ "sessionId": "ghost" }))
3178 .await
3179 .expect_err("unknown session must fail");
3180
3181 assert_eq!(err.code, SESSION_NOT_FOUND);
3182 }
3183
3184 #[tokio::test]
3187 async fn session_load_respects_max_sessions() {
3188 let cwd = tempfile::tempdir().unwrap();
3189 let store =
3190 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3191
3192 let stored_id = "sess-load-limit-test";
3194 store
3195 .create_session(stored_id, &cwd.path().to_string_lossy())
3196 .unwrap();
3197
3198 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3199 let server = Arc::new(AcpServer::new_with_writer_and_store(
3200 make_test_config(cwd.path()),
3201 AcpServerConfig {
3202 max_sessions: 1,
3203 ..AcpServerConfig::default()
3204 },
3205 writer_tx,
3206 Arc::clone(&store),
3207 ));
3208
3209 server
3211 .handle_session_new(&serde_json::json!({
3212 "cwd": cwd.path().to_string_lossy()
3213 }))
3214 .await
3215 .expect("session/new must succeed when under limit");
3216
3217 let err = server
3219 .handle_session_load(&serde_json::json!({ "sessionId": stored_id }))
3220 .await
3221 .expect_err("session/load must fail when max_sessions reached");
3222
3223 assert_eq!(
3224 err.code, SESSION_LIMIT_REACHED,
3225 "expected SESSION_LIMIT_REACHED, got: {:?}",
3226 err
3227 );
3228 }
3229
3230 #[tokio::test]
3233 async fn session_resume_respects_max_sessions() {
3234 let cwd = tempfile::tempdir().unwrap();
3235 let store =
3236 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3237
3238 let stored_id = "sess-resume-limit-test";
3240 store
3241 .create_session(stored_id, &cwd.path().to_string_lossy())
3242 .unwrap();
3243
3244 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3245 let server = Arc::new(AcpServer::new_with_writer_and_store(
3246 make_test_config(cwd.path()),
3247 AcpServerConfig {
3248 max_sessions: 1,
3249 ..AcpServerConfig::default()
3250 },
3251 writer_tx,
3252 Arc::clone(&store),
3253 ));
3254
3255 server
3257 .handle_session_new(&serde_json::json!({
3258 "cwd": cwd.path().to_string_lossy()
3259 }))
3260 .await
3261 .expect("session/new must succeed when under limit");
3262
3263 let err = server
3265 .handle_session_resume(&serde_json::json!({ "sessionId": stored_id }))
3266 .await
3267 .expect_err("session/resume must fail when max_sessions reached");
3268
3269 assert_eq!(
3270 err.code, SESSION_LIMIT_REACHED,
3271 "expected SESSION_LIMIT_REACHED, got: {:?}",
3272 err
3273 );
3274 }
3275
3276 #[tokio::test]
3279 async fn session_load_releases_reservation_on_store_error() {
3280 let cwd = tempfile::tempdir().unwrap();
3281 let store =
3282 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3283
3284 let session_id = "sess-load-store-err";
3285 store
3286 .create_session(session_id, &cwd.path().to_string_lossy())
3287 .unwrap();
3288
3289 let db_path = cwd.path().join("sessions/acp-sessions.db");
3292 {
3293 let second =
3294 rusqlite::Connection::open(&db_path).expect("second conn must open same db");
3295 second
3296 .execute_batch(
3297 "DROP TABLE IF EXISTS acp_messages; DROP TABLE IF EXISTS acp_sessions;",
3298 )
3299 .expect("schema drop must succeed on second conn");
3300 }
3301
3302 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3303 let server = Arc::new(AcpServer::new_with_writer_and_store(
3304 make_test_config(cwd.path()),
3305 AcpServerConfig::default(),
3306 writer_tx,
3307 Arc::clone(&store),
3308 ));
3309
3310 let first_err = server
3312 .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3313 .await
3314 .expect_err("session/load must fail when store returns Err");
3315 assert_eq!(
3316 first_err.code, INTERNAL_ERROR,
3317 "expected INTERNAL_ERROR from store failure, got: {:?}",
3318 first_err
3319 );
3320
3321 let second_err = server
3325 .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3326 .await
3327 .expect_err("second session/load must also fail");
3328 assert_eq!(
3329 second_err.code, INTERNAL_ERROR,
3330 "second load must fail with INTERNAL_ERROR, not INVALID_PARAMS (leaked slot); got: {:?}",
3331 second_err
3332 );
3333 }
3334
3335 #[tokio::test]
3338 async fn session_resume_releases_reservation_on_store_error() {
3339 let cwd = tempfile::tempdir().unwrap();
3340 let store =
3341 Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3342
3343 let session_id = "sess-resume-store-err";
3344 store
3345 .create_session(session_id, &cwd.path().to_string_lossy())
3346 .unwrap();
3347
3348 let db_path = cwd.path().join("sessions/acp-sessions.db");
3349 {
3350 let second =
3351 rusqlite::Connection::open(&db_path).expect("second conn must open same db");
3352 second
3353 .execute_batch(
3354 "DROP TABLE IF EXISTS acp_messages; DROP TABLE IF EXISTS acp_sessions;",
3355 )
3356 .expect("schema drop must succeed on second conn");
3357 }
3358
3359 let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3360 let server = Arc::new(AcpServer::new_with_writer_and_store(
3361 make_test_config(cwd.path()),
3362 AcpServerConfig::default(),
3363 writer_tx,
3364 Arc::clone(&store),
3365 ));
3366
3367 let first_err = server
3368 .handle_session_resume(&serde_json::json!({ "sessionId": session_id }))
3369 .await
3370 .expect_err("session/resume must fail when store returns Err");
3371 assert_eq!(
3372 first_err.code, INTERNAL_ERROR,
3373 "expected INTERNAL_ERROR from store failure, got: {:?}",
3374 first_err
3375 );
3376
3377 let second_err = server
3378 .handle_session_resume(&serde_json::json!({ "sessionId": session_id }))
3379 .await
3380 .expect_err("second session/resume must also fail");
3381 assert_eq!(
3382 second_err.code, INTERNAL_ERROR,
3383 "second resume must fail with INTERNAL_ERROR, not INVALID_PARAMS (leaked slot); got: {:?}",
3384 second_err
3385 );
3386 }
3387}