Skip to main content

zeroclaw_channels/orchestrator/
acp_server.rs

1//! ACP (Agent Control Protocol) Server — JSON-RPC 2.0 over stdio.
2//!
3//! Provides an IDE-friendly interface for spawning and managing isolated agent
4//! sessions. Each session wraps an [`Agent`] built from the global config with
5//! streaming support via JSON-RPC notifications.
6//!
7//! ## Protocol
8//!
9//! Requests and responses are newline-delimited JSON objects on stdin/stdout.
10//!
11//! | Method            | Description                              |
12//! |-------------------|------------------------------------------|
13//! | `initialize`      | Handshake — returns server capabilities (incl. defaultModel when configured) |
14//! | `session/new`     | Create an isolated agent session          |
15//! | `session/prompt`  | Send a prompt, stream back `session/update` events |
16//! | `session/stop`    | Gracefully terminate a session            |
17//! | `session/cancel`  | Abort an in-flight `session/prompt` turn  |
18//! | `session/update`  | Streaming events and bidirectional events |
19
20use anyhow::Result;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::{HashMap, HashSet};
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::time::{Duration, Instant};
28use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
29use tokio::sync::{Mutex, mpsc, oneshot};
30use uuid::Uuid;
31use zeroclaw_api::model_provider::ConversationMessage;
32use zeroclaw_config::schema::Config;
33use zeroclaw_infra::acp_session_store::AcpSessionStore;
34use zeroclaw_runtime::agent::agent::{Agent, TurnEvent};
35use zeroclaw_runtime::tools::CanvasStore;
36
37use crate::acp_channel::AcpChannel;
38
39// ── Configuration ────────────────────────────────────────────────
40
41/// ACP server configuration (optional `[acp]` section in config.toml).
42#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(default)]
44pub struct AcpServerConfig {
45    /// Maximum number of concurrent sessions. Default: 10.
46    pub max_sessions: usize,
47    /// Session inactivity timeout in seconds. Default: 3600 (1 hour).
48    pub session_timeout_secs: u64,
49}
50
51impl Default for AcpServerConfig {
52    fn default() -> Self {
53        Self {
54            max_sessions: 10,
55            session_timeout_secs: 3600,
56        }
57    }
58}
59
60// ── JSON-RPC types ───────────────────────────────────────────────
61
62#[derive(Debug, Deserialize)]
63struct JsonRpcRequest {
64    jsonrpc: String,
65    method: String,
66    #[serde(default)]
67    params: Value,
68    id: Option<Value>,
69}
70
71#[derive(Debug, Serialize)]
72struct JsonRpcResponse {
73    jsonrpc: &'static str,
74    #[serde(skip_serializing_if = "Option::is_none")]
75    result: Option<Value>,
76    #[serde(skip_serializing_if = "Option::is_none")]
77    error: Option<JsonRpcError>,
78    id: Value,
79}
80
81#[derive(Debug, Serialize)]
82struct JsonRpcNotification {
83    jsonrpc: &'static str,
84    method: &'static str,
85    params: Value,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct JsonRpcError {
90    pub code: i32,
91    pub message: String,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub data: Option<Value>,
94}
95
96// Standard JSON-RPC error codes
97const PARSE_ERROR: i32 = -32700;
98const INVALID_REQUEST: i32 = -32600;
99const METHOD_NOT_FOUND: i32 = -32601;
100const INVALID_PARAMS: i32 = -32602;
101const INTERNAL_ERROR: i32 = -32603;
102
103// Custom error codes
104const SESSION_NOT_FOUND: i32 = -32000;
105const SESSION_LIMIT_REACHED: i32 = -32001;
106const SESSION_BUSY: i32 = -32002;
107const ACP_PROTOCOL_VERSION: u64 = 1;
108
109// ── Outbound JSON-RPC plumbing ───────────────────────────────────
110
111/// A pending outbound JSON-RPC call, awaiting a response from the client.
112type PendingResponder = oneshot::Sender<std::result::Result<Value, JsonRpcError>>;
113
114/// Writer + outbound-call tracker shared between the server loop and
115/// per-session bridges (e.g. [`AcpChannel`]).
116///
117/// All stdout writes go through `writer_tx` so concurrent notifications and
118/// outbound requests can't interleave bytes. Outbound requests get string ids
119/// (`zc-out-<n>`) that are disjoint from any client-issued id space.
120pub struct RpcOutbound {
121    writer_tx: mpsc::Sender<String>,
122    pending: std::sync::Mutex<HashMap<String, PendingResponder>>,
123    next_id: AtomicU64,
124}
125
126struct PendingRequestGuard<'a> {
127    pending: &'a std::sync::Mutex<HashMap<String, PendingResponder>>,
128    id: String,
129}
130
131impl Drop for PendingRequestGuard<'_> {
132    fn drop(&mut self) {
133        self.pending
134            .lock()
135            .unwrap_or_else(|e| e.into_inner())
136            .remove(&self.id);
137    }
138}
139
140impl RpcOutbound {
141    fn new(writer_tx: mpsc::Sender<String>) -> Self {
142        Self {
143            writer_tx,
144            pending: std::sync::Mutex::new(HashMap::new()),
145            next_id: AtomicU64::new(0),
146        }
147    }
148
149    /// Send a JSON-RPC notification (no `id`, no response expected).
150    pub async fn notify(&self, method: &str, params: Value) {
151        let n = serde_json::json!({
152            "jsonrpc": "2.0",
153            "method": method,
154            "params": params,
155        });
156        if let Ok(s) = serde_json::to_string(&n)
157            && self.writer_tx.send(s).await.is_err()
158        {
159            ::zeroclaw_log::record!(
160                WARN,
161                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
162                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
163                "ACP writer task closed; dropping outbound notification"
164            );
165        }
166    }
167
168    /// Send a JSON-RPC request and await the response.
169    pub async fn request(
170        &self,
171        method: &str,
172        params: Value,
173    ) -> std::result::Result<Value, JsonRpcError> {
174        let n = self.next_id.fetch_add(1, Ordering::Relaxed);
175        let id = format!("zc-out-{n}");
176        let (tx, rx) = oneshot::channel();
177        {
178            let mut pending = self.pending.lock().unwrap_or_else(|e| e.into_inner());
179            pending.insert(id.clone(), tx);
180        }
181        let _pending_guard = PendingRequestGuard {
182            pending: &self.pending,
183            id: id.clone(),
184        };
185        let req = serde_json::json!({
186            "jsonrpc": "2.0",
187            "method": method,
188            "params": params,
189            "id": id,
190        });
191        let body = match serde_json::to_string(&req) {
192            Ok(s) => s,
193            Err(e) => {
194                return Err(JsonRpcError {
195                    code: INTERNAL_ERROR,
196                    message: format!("Failed to encode request: {e}"),
197                    data: None,
198                });
199            }
200        };
201        if self.writer_tx.send(body).await.is_err() {
202            return Err(JsonRpcError {
203                code: INTERNAL_ERROR,
204                message: "ACP writer task closed".to_string(),
205                data: None,
206            });
207        }
208        rx.await.unwrap_or_else(|_| {
209            Err(JsonRpcError {
210                code: INTERNAL_ERROR,
211                message: "Outbound RPC dropped".to_string(),
212                data: None,
213            })
214        })
215    }
216
217    /// Route an inbound JSON-RPC response (matched by `id`) to the
218    /// corresponding pending caller.
219    pub(crate) fn dispatch_response(
220        &self,
221        id_str: &str,
222        result: Option<Value>,
223        error: Option<JsonRpcError>,
224    ) {
225        let responder = self
226            .pending
227            .lock()
228            .unwrap_or_else(|e| e.into_inner())
229            .remove(id_str);
230        if let Some(tx) = responder {
231            let payload = if let Some(err) = error {
232                Err(err)
233            } else {
234                Ok(result.unwrap_or(Value::Null))
235            };
236            let _ = tx.send(payload);
237        } else {
238            ::zeroclaw_log::record!(
239                DEBUG,
240                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
241                    .with_attrs(::serde_json::json!({"id_str": id_str})),
242                "No pending outbound RPC matched response id="
243            );
244        }
245    }
246}
247
248#[cfg(test)]
249impl RpcOutbound {
250    /// Test-only: build an `RpcOutbound` whose writer channel is provided by
251    /// the test (so outbound frames can be inspected without touching stdout).
252    pub fn for_testing(writer_tx: mpsc::Sender<String>) -> Self {
253        Self::new(writer_tx)
254    }
255
256    /// Test-only wrapper around `dispatch_response` so cross-module tests
257    /// (e.g. in `acp_channel`) can simulate inbound JSON-RPC responses.
258    pub fn dispatch_response_for_test(
259        &self,
260        id_str: &str,
261        result: Option<Value>,
262        error: Option<JsonRpcError>,
263    ) {
264        self.dispatch_response(id_str, result, error);
265    }
266
267    pub fn pending_count_for_test(&self) -> usize {
268        self.pending.lock().unwrap_or_else(|e| e.into_inner()).len()
269    }
270}
271
272// ── Session state ────────────────────────────────────────────────
273
274struct Session {
275    agent: Agent,
276    #[allow(dead_code)] // WIP: intended for session expiry logic
277    created_at: Instant,
278    last_active: Instant,
279}
280
281// ── ACP Server ───────────────────────────────────────────────────
282
283pub struct AcpServer {
284    config: Config,
285    acp_config: AcpServerConfig,
286    sessions: Arc<Mutex<HashMap<String, Arc<Mutex<Session>>>>>,
287    rpc: Arc<RpcOutbound>,
288    /// Receiver for the writer task. Pulled out (replaced with `None`) the
289    /// first time `run()` starts the writer loop.
290    writer_rx: std::sync::Mutex<Option<mpsc::Receiver<String>>>,
291    /// Per-session cancellation tokens for aborting in-flight `session/prompt`
292    /// turns. Lives outside `Session`'s inner `Mutex` so `session/cancel` can
293    /// fire the token without waiting for the turn to release the inner lock.
294    ///
295    /// **Single-turn-per-session invariant:** this map holds at most one token
296    /// per `session_id` because the ACP protocol does not pipeline multiple
297    /// `session/prompt` calls on the same session — each prompt must complete
298    /// (or be cancelled) before the next one is sent. A second prompt is
299    /// rejected before it can overwrite the active turn's token. If pipelining
300    /// is needed in the future, the key should become `(session_id, turn_id)`.
301    cancel_tokens: Arc<std::sync::Mutex<HashMap<String, tokio_util::sync::CancellationToken>>>,
302    /// Tracks session IDs currently being loaded/resumed (between the initial
303    /// check and the final insert into `sessions`). Used to prevent duplicate
304    /// concurrent restores of the same session and to count in-flight slots
305    /// against `max_sessions`.
306    loading_sessions: Arc<tokio::sync::Mutex<HashSet<String>>>,
307    store: Option<Arc<AcpSessionStore>>,
308    /// Shared canvas store from the gateway / daemon supervisor.  When set,
309    /// agents created by this server write canvas frames to the same store
310    /// that `/ws/canvas/:id` WebSocket subscribers read from.  `None` in
311    /// standalone `zeroclaw acp` mode where no gateway is running.
312    canvas_store: Option<CanvasStore>,
313}
314
315impl AcpServer {
316    pub fn new(config: Config, acp_config: AcpServerConfig) -> Self {
317        let (writer_tx, writer_rx) = mpsc::channel::<String>(256);
318        Self::with_writer(config, acp_config, writer_tx, Some(writer_rx), None)
319    }
320
321    pub fn new_with_writer(
322        config: Config,
323        acp_config: AcpServerConfig,
324        writer_tx: mpsc::Sender<String>,
325    ) -> Self {
326        Self::with_writer(config, acp_config, writer_tx, None, None)
327    }
328
329    pub fn new_with_store(
330        config: Config,
331        acp_config: AcpServerConfig,
332        store: Arc<AcpSessionStore>,
333    ) -> Self {
334        let (writer_tx, writer_rx) = mpsc::channel::<String>(256);
335        Self::with_writer(config, acp_config, writer_tx, Some(writer_rx), Some(store))
336    }
337
338    pub fn new_with_writer_and_store(
339        config: Config,
340        acp_config: AcpServerConfig,
341        writer_tx: mpsc::Sender<String>,
342        store: Arc<AcpSessionStore>,
343    ) -> Self {
344        Self::with_writer(config, acp_config, writer_tx, None, Some(store))
345    }
346
347    fn with_writer(
348        config: Config,
349        acp_config: AcpServerConfig,
350        writer_tx: mpsc::Sender<String>,
351        writer_rx: Option<mpsc::Receiver<String>>,
352        store: Option<Arc<AcpSessionStore>>,
353    ) -> Self {
354        Self {
355            config,
356            acp_config,
357            sessions: Arc::new(Mutex::new(HashMap::new())),
358            rpc: Arc::new(RpcOutbound::new(writer_tx)),
359            writer_rx: std::sync::Mutex::new(writer_rx),
360            cancel_tokens: Arc::new(std::sync::Mutex::new(HashMap::new())),
361            loading_sessions: Arc::new(tokio::sync::Mutex::new(HashSet::new())),
362            store,
363            canvas_store: None,
364        }
365    }
366
367    /// Attach the shared gateway [`CanvasStore`] so that agents created by
368    /// this server write canvas frames to the same store that the
369    /// `/ws/canvas/:id` WebSocket endpoint serves.
370    pub fn with_canvas_store(mut self, canvas_store: CanvasStore) -> Self {
371        self.canvas_store = Some(canvas_store);
372        self
373    }
374
375    /// Run the ACP server, reading JSON-RPC requests from stdin and writing
376    /// responses/notifications to stdout.
377    pub async fn run(self: Arc<Self>) -> Result<()> {
378        ::zeroclaw_log::record!(
379            DEBUG,
380            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
381            &format!(
382                "ACP server starting (max_sessions={}, timeout={}s)",
383                self.acp_config.max_sessions, self.acp_config.session_timeout_secs
384            )
385        );
386
387        // Pull the writer-rx out of self so we can move it into the writer
388        // task. Subsequent `run()` calls would have nothing to drive — but
389        // `run()` is normally invoked once per process.
390        let writer_rx = self
391            .writer_rx
392            .lock()
393            .unwrap_or_else(|e| e.into_inner())
394            .take()
395            .ok_or_else(|| {
396                ::zeroclaw_log::record!(
397                    ERROR,
398                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
399                        .with_outcome(::zeroclaw_log::EventOutcome::Failure),
400                    "ACP server writer already started"
401                );
402                anyhow::Error::msg("ACP server writer already started")
403            })?;
404        tokio::spawn(writer_task(writer_rx));
405
406        let stdin = tokio::io::stdin();
407        let mut reader = BufReader::new(stdin);
408        let mut line = String::new();
409
410        // Spawn session reaper
411        let sessions = Arc::clone(&self.sessions);
412        let timeout = Duration::from_secs(self.acp_config.session_timeout_secs);
413        tokio::spawn(async move {
414            let mut interval = tokio::time::interval(Duration::from_secs(60));
415            loop {
416                interval.tick().await;
417                let mut sessions = sessions.lock().await;
418                let before = sessions.len();
419                sessions.retain(|id, session_arc| {
420                    // Never reap a session whose inner lock is held — it has an
421                    // active prompt turn in flight and is by definition not idle.
422                    match session_arc.try_lock() {
423                        Ok(session) => {
424                            let expired = session.last_active.elapsed() > timeout;
425                            if expired {
426                                ::zeroclaw_log::record!(
427                                    DEBUG,
428                                    ::zeroclaw_log::Event::new(
429                                        module_path!(),
430                                        ::zeroclaw_log::Action::Note
431                                    )
432                                    .with_attrs(::serde_json::json!({"id": id})),
433                                    "Session expired after inactivity"
434                                );
435                            }
436                            !expired
437                        }
438                        Err(_) => true,
439                    }
440                });
441                let reaped = before - sessions.len();
442                if reaped > 0 {
443                    ::zeroclaw_log::record!(
444                        DEBUG,
445                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
446                            .with_attrs(::serde_json::json!({"reaped": reaped})),
447                        "Reaped expired session(s)"
448                    );
449                }
450            }
451        });
452
453        loop {
454            line.clear();
455            let bytes_read = reader.read_line(&mut line).await?;
456            if bytes_read == 0 {
457                ::zeroclaw_log::record!(
458                    DEBUG,
459                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
460                    "ACP server: stdin closed, shutting down"
461                );
462                break;
463            }
464
465            let trimmed = line.trim();
466            if trimmed.is_empty() {
467                continue;
468            }
469
470            self.process_line(trimmed).await;
471        }
472
473        Ok(())
474    }
475
476    /// Run the ACP server against an already-framed line source.
477    ///
478    /// This is used by the gateway WebSocket bridge, where inbound WebSocket
479    /// text messages are already complete JSON-RPC frames and outbound frames
480    /// are supplied by the writer channel passed to [`Self::new_with_writer`]
481    /// or [`Self::new_with_writer_and_store`].
482    pub async fn run_messages(self: Arc<Self>, mut input_rx: mpsc::Receiver<String>) -> Result<()> {
483        while let Some(line) = input_rx.recv().await {
484            let trimmed = line.trim();
485            if trimmed.is_empty() {
486                continue;
487            }
488            self.process_line(trimmed).await;
489        }
490
491        Ok(())
492    }
493
494    async fn process_line(self: &Arc<Self>, trimmed: &str) {
495        // First, peek at whether this is a response (has `result` or
496        // `error`) to a request *we* sent. Inbound requests/notifications
497        // fall through to the JsonRpcRequest path.
498        if let Ok(value) = serde_json::from_str::<Value>(trimmed)
499            && value.is_object()
500            && (value.get("result").is_some() || value.get("error").is_some())
501            && let Some(id) = value.get("id")
502        {
503            let id_str = id
504                .as_str()
505                .map(String::from)
506                .unwrap_or_else(|| id.to_string());
507            let result = value.get("result").cloned();
508            let error: Option<JsonRpcError> = value
509                .get("error")
510                .and_then(|e| serde_json::from_value(e.clone()).ok());
511            self.rpc.dispatch_response(&id_str, result, error);
512            return;
513        }
514
515        match serde_json::from_str::<JsonRpcRequest>(trimmed) {
516            Ok(request) => {
517                if request.jsonrpc != "2.0" {
518                    if let Some(id) = request.id {
519                        self.write_error(id, INVALID_REQUEST, "Invalid JSON-RPC version")
520                            .await;
521                    }
522                    return;
523                }
524                // Spawn so a long-running session/prompt doesn't block the
525                // read loop — outbound RPC responses (e.g. for
526                // session/request_permission) need to be processable
527                // while a prompt turn is in flight.
528                let server = Arc::clone(self);
529                tokio::spawn(async move {
530                    server.handle_request(request).await;
531                });
532            }
533            Err(e) => {
534                ::zeroclaw_log::record!(
535                    WARN,
536                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
537                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
538                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
539                    "Failed to parse JSON-RPC request"
540                );
541                self.write_error(Value::Null, PARSE_ERROR, &format!("Parse error: {e}"))
542                    .await;
543            }
544        }
545    }
546
547    async fn handle_request(&self, request: JsonRpcRequest) {
548        let id = request.id.clone().unwrap_or(Value::Null);
549        let is_notification = request.id.is_none();
550
551        let result = match request.method.as_str() {
552            "initialize" => self.handle_initialize(&request.params),
553            "session/new" => self.handle_session_new(&request.params).await,
554            "session/load" => self.handle_session_load(&request.params).await,
555            "session/resume" => self.handle_session_resume(&request.params).await,
556            "session/close" => self.handle_session_close(&request.params).await,
557            "session/prompt" => self.handle_session_prompt(&request.params, &id).await,
558            "session/stop" => self.handle_session_stop(&request.params).await,
559            "session/cancel" => self.handle_session_cancel(&request.params).await,
560            "session/event" | "session/update" => self.handle_session_event(&request.params).await,
561            _ => Err(RpcError {
562                code: METHOD_NOT_FOUND,
563                message: format!("Method not found: {}", request.method),
564                data: None,
565            }),
566        };
567
568        // Only send response for requests (with id), not notifications
569        if !is_notification {
570            match result {
571                Ok(value) => self.write_result(id, value).await,
572                Err(e) => self.write_error(id, e.code, &e.message).await,
573            }
574        }
575    }
576
577    // ── Method handlers ──────────────────────────────────────────
578
579    fn handle_initialize(&self, _params: &Value) -> RpcResult {
580        let default_model = self
581            .config
582            .first_model_provider()
583            .and_then(|e| e.model.clone());
584
585        let mut zeroclaw_meta = serde_json::json!({
586            "maxSessions": self.acp_config.max_sessions,
587            "sessionTimeoutSecs": self.acp_config.session_timeout_secs,
588        });
589        if let Some(model) = default_model {
590            zeroclaw_meta["defaultModel"] = serde_json::json!(model);
591        }
592
593        let session_capabilities = if self.store.is_some() {
594            serde_json::json!({ "resume": {}, "close": {} })
595        } else {
596            serde_json::json!({})
597        };
598
599        Ok(serde_json::json!({
600            "protocolVersion": ACP_PROTOCOL_VERSION,
601            "agentCapabilities": {
602                "loadSession": self.store.is_some(),
603                "promptCapabilities": {
604                    "image": false,
605                    "audio": false,
606                    "embeddedContext": false,
607                },
608                "mcpCapabilities": {
609                    "http": false,
610                    "sse": false,
611                },
612                "sessionCapabilities": session_capabilities,
613            },
614            "agentInfo": {
615                "name": "zeroclaw-acp",
616                "title": "ZeroClaw ACP",
617                "version": env!("CARGO_PKG_VERSION"),
618            },
619            "authMethods": [],
620            "_meta": {
621                "zeroclaw": zeroclaw_meta,
622            }
623        }))
624    }
625
626    async fn handle_session_new(&self, params: &Value) -> RpcResult {
627        let mut sessions = self.sessions.lock().await;
628
629        let loading_count = self.loading_sessions.lock().await.len();
630        if sessions.len() + loading_count >= self.acp_config.max_sessions {
631            return Err(RpcError {
632                code: SESSION_LIMIT_REACHED,
633                message: format!(
634                    "Maximum session limit reached ({})",
635                    self.acp_config.max_sessions
636                ),
637                data: None,
638            });
639        }
640
641        let requested_cwd = self.requested_session_cwd(params);
642
643        let workspace_dir = std::fs::canonicalize(&requested_cwd)
644            .map_err(|e| RpcError {
645                code: INVALID_PARAMS,
646                message: format!(
647                    "cwd is not a usable directory ({}): {e}",
648                    requested_cwd.display()
649                ),
650                data: None,
651            })?
652            .to_string_lossy()
653            .into_owned();
654
655        // Every ACP session is bound to an explicit agent alias.
656        // Accept `agentAlias` (camelCase) or `agent_alias` / `agent`.
657        // When the client omits the alias and exactly one agent is configured,
658        // auto-select it so single-agent setups work without extra config.
659        let agent_alias = params
660            .get("agentAlias")
661            .or_else(|| params.get("agent_alias"))
662            .or_else(|| params.get("agent"))
663            .and_then(Value::as_str)
664            .map(str::trim)
665            .filter(|s| !s.is_empty())
666            .map(str::to_string)
667            .or_else(|| self.config.acp.default_agent.clone())
668            .or_else(|| {
669                let mut keys = self.config.agents.keys();
670                if self.config.agents.len() == 1 {
671                    keys.next().cloned()
672                } else {
673                    None
674                }
675            })
676            .ok_or_else(|| RpcError {
677                code: INVALID_PARAMS,
678                message: "session/new requires `agentAlias` (alias of a configured \
679                          [agents.<alias>] entry)"
680                    .to_string(),
681                data: None,
682            })?;
683        if self.config.agent(&agent_alias).is_none() {
684            return Err(RpcError {
685                code: INVALID_PARAMS,
686                message: format!(
687                    "Unknown agent `{agent_alias}` — no [agents.{agent_alias}] entry configured"
688                ),
689                data: None,
690            });
691        }
692
693        let session_id = Uuid::new_v4().to_string();
694
695        // Build agent from global config, with the session's cwd pinned as
696        // the file/shell sandbox boundary. The agent's data directory
697        // (memory DB, identity, scheduled tasks) still lives under
698        // `config.data_dir`.
699        let agent = Agent::from_config_with_session_cwd_and_mcp_backchannel(
700            &self.config,
701            &agent_alias,
702            Some(std::path::Path::new(&workspace_dir)),
703            false,
704            self.canvas_store.clone(),
705        )
706        .await
707        .map_err(|e| RpcError {
708            code: INTERNAL_ERROR,
709            message: format!("Failed to create agent: {e}"),
710            data: None,
711        })?;
712
713        // Wire an ACP back-channel so tools like `ask_user`,
714        // `escalate_to_human`, and `reaction` can talk to the IDE/CLI client
715        // for this session. Registered as `"acp"`; resolved by name when the
716        // agent picks a channel.
717        let acp_channel = Arc::new(AcpChannel::new(
718            "acp",
719            session_id.clone(),
720            Arc::clone(&self.rpc),
721            Duration::from_secs(self.acp_config.session_timeout_secs),
722        ));
723        agent.channel_handles().register_channel("acp", acp_channel);
724
725        let now = Instant::now();
726        sessions.insert(
727            session_id.clone(),
728            Arc::new(Mutex::new(Session {
729                agent,
730                created_at: now,
731                last_active: now,
732            })),
733        );
734
735        if let Some(store) = &self.store
736            && let Err(e) = store.create_session(&session_id, &workspace_dir)
737        {
738            // Roll back: remove the session we just inserted and surface the error.
739            sessions.remove(&session_id);
740            return Err(RpcError {
741                code: INTERNAL_ERROR,
742                message: format!("Failed to persist session: {e}"),
743                data: None,
744            });
745        }
746
747        ::zeroclaw_log::record!(
748            DEBUG,
749            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(
750                ::serde_json::json!({"session_id": session_id, "workspace_dir": workspace_dir})
751            ),
752            "Created session (workspace: )"
753        );
754
755        Ok(serde_json::json!({
756            "sessionId": session_id,
757            "workspaceDir": workspace_dir,
758        }))
759    }
760
761    async fn handle_session_load(&self, params: &Value) -> RpcResult {
762        let session_id = params
763            .get("sessionId")
764            .or_else(|| params.get("session_id"))
765            .and_then(|v| v.as_str())
766            .ok_or_else(|| RpcError {
767                code: INVALID_PARAMS,
768                message: "Missing required parameter: sessionId".to_string(),
769                data: None,
770            })?
771            .to_string();
772
773        let store = self.store.as_ref().ok_or_else(|| RpcError {
774            code: SESSION_NOT_FOUND,
775            message: format!("Session not found: {session_id}"),
776            data: None,
777        })?;
778
779        // Atomically check and reserve the session slot
780        {
781            let sessions = self.sessions.lock().await;
782            let mut loading = self.loading_sessions.lock().await;
783            if sessions.len() + loading.len() >= self.acp_config.max_sessions {
784                return Err(RpcError {
785                    code: SESSION_LIMIT_REACHED,
786                    message: format!(
787                        "Maximum session limit reached ({})",
788                        self.acp_config.max_sessions
789                    ),
790                    data: None,
791                });
792            }
793            if sessions.contains_key(&session_id) || loading.contains(&session_id) {
794                return Err(RpcError {
795                    code: INVALID_PARAMS,
796                    message: format!(
797                        "Session already active: {session_id}. Call session/close first."
798                    ),
799                    data: None,
800                });
801            }
802            loading.insert(session_id.clone());
803        }
804
805        // Flatten both the SQLite error and the not-found case into a single
806        // Result so the cleanup match below runs for every failure after the
807        // reservation was inserted.
808        let data = store
809            .load_session(&session_id)
810            .map_err(|e| RpcError {
811                code: INTERNAL_ERROR,
812                message: format!("Failed to load session: {e}"),
813                data: None,
814            })
815            .and_then(|opt| {
816                opt.ok_or_else(|| RpcError {
817                    code: SESSION_NOT_FOUND,
818                    message: format!("Session not found: {session_id}"),
819                    data: None,
820                })
821            });
822
823        // On error (SQLite failure or not-found), release the reservation.
824        let data = match data {
825            Ok(d) => d,
826            Err(e) => {
827                self.loading_sessions.lock().await.remove(&session_id);
828                return Err(e);
829            }
830        };
831
832        let workspace_dir = std::path::PathBuf::from(&data.workspace_dir);
833
834        let restore_alias = self
835            .config
836            .acp
837            .default_agent
838            .clone()
839            .or_else(|| {
840                let mut keys = self.config.agents.keys();
841                if self.config.agents.len() == 1 {
842                    keys.next().cloned()
843                } else {
844                    None
845                }
846            })
847            .unwrap_or_else(|| "default".to_string());
848
849        let agent_result = Agent::from_config_with_session_cwd_and_mcp_backchannel(
850            &self.config,
851            &restore_alias,
852            Some(&workspace_dir),
853            false,
854            self.canvas_store.clone(),
855        )
856        .await
857        .map_err(|e| RpcError {
858            code: INTERNAL_ERROR,
859            message: format!("Failed to create agent: {e}"),
860            data: None,
861        });
862
863        let mut agent = match agent_result {
864            Ok(a) => a,
865            Err(e) => {
866                self.loading_sessions.lock().await.remove(&session_id);
867                return Err(e);
868            }
869        };
870
871        agent.seed_conversation_history(data.messages.clone());
872
873        let acp_channel = Arc::new(AcpChannel::new(
874            "acp",
875            session_id.clone(),
876            Arc::clone(&self.rpc),
877            Duration::from_secs(self.acp_config.session_timeout_secs),
878        ));
879        agent.channel_handles().register_channel("acp", acp_channel);
880
881        let now = Instant::now();
882        // Atomically insert and release reservation
883        {
884            let mut sessions = self.sessions.lock().await;
885            let mut loading = self.loading_sessions.lock().await;
886            loading.remove(&session_id);
887            sessions.insert(
888                session_id.clone(),
889                Arc::new(Mutex::new(Session {
890                    agent,
891                    created_at: now,
892                    last_active: now,
893                })),
894            );
895        }
896
897        // Stream conversation history to client as session/update notifications
898        for msg in &data.messages {
899            for notification in history_notifications_for_message(&session_id, msg) {
900                self.write_notification(&notification).await;
901            }
902        }
903
904        ::zeroclaw_log::record!(
905            DEBUG,
906            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
907                .with_attrs(::serde_json::json!({"session_id": session_id, "message_count": data.messages.len()})),
908            "Loaded session"
909        );
910        Ok(serde_json::json!({}))
911    }
912
913    async fn handle_session_resume(&self, params: &Value) -> RpcResult {
914        let session_id = params
915            .get("sessionId")
916            .or_else(|| params.get("session_id"))
917            .and_then(|v| v.as_str())
918            .ok_or_else(|| RpcError {
919                code: INVALID_PARAMS,
920                message: "Missing required parameter: sessionId".to_string(),
921                data: None,
922            })?
923            .to_string();
924
925        let store = self.store.as_ref().ok_or_else(|| RpcError {
926            code: SESSION_NOT_FOUND,
927            message: format!("Session not found: {session_id}"),
928            data: None,
929        })?;
930
931        // Atomically check and reserve the session slot
932        {
933            let sessions = self.sessions.lock().await;
934            let mut loading = self.loading_sessions.lock().await;
935            if sessions.len() + loading.len() >= self.acp_config.max_sessions {
936                return Err(RpcError {
937                    code: SESSION_LIMIT_REACHED,
938                    message: format!(
939                        "Maximum session limit reached ({})",
940                        self.acp_config.max_sessions
941                    ),
942                    data: None,
943                });
944            }
945            if sessions.contains_key(&session_id) || loading.contains(&session_id) {
946                return Err(RpcError {
947                    code: INVALID_PARAMS,
948                    message: format!(
949                        "Session already active: {session_id}. Call session/close first."
950                    ),
951                    data: None,
952                });
953            }
954            loading.insert(session_id.clone());
955        }
956
957        let data = store
958            .load_session(&session_id)
959            .map_err(|e| RpcError {
960                code: INTERNAL_ERROR,
961                message: format!("Failed to load session: {e}"),
962                data: None,
963            })
964            .and_then(|opt| {
965                opt.ok_or_else(|| RpcError {
966                    code: SESSION_NOT_FOUND,
967                    message: format!("Session not found: {session_id}"),
968                    data: None,
969                })
970            });
971
972        // On error (SQLite failure or not-found), release the reservation.
973        let data = match data {
974            Ok(d) => d,
975            Err(e) => {
976                self.loading_sessions.lock().await.remove(&session_id);
977                return Err(e);
978            }
979        };
980
981        let workspace_dir = std::path::PathBuf::from(&data.workspace_dir);
982
983        let restore_alias = self
984            .config
985            .acp
986            .default_agent
987            .clone()
988            .or_else(|| {
989                let mut keys = self.config.agents.keys();
990                if self.config.agents.len() == 1 {
991                    keys.next().cloned()
992                } else {
993                    None
994                }
995            })
996            .unwrap_or_else(|| "default".to_string());
997
998        let agent_result = Agent::from_config_with_session_cwd_and_mcp_backchannel(
999            &self.config,
1000            &restore_alias,
1001            Some(&workspace_dir),
1002            false,
1003            self.canvas_store.clone(),
1004        )
1005        .await
1006        .map_err(|e| RpcError {
1007            code: INTERNAL_ERROR,
1008            message: format!("Failed to create agent: {e}"),
1009            data: None,
1010        });
1011
1012        let mut agent = match agent_result {
1013            Ok(a) => a,
1014            Err(e) => {
1015                self.loading_sessions.lock().await.remove(&session_id);
1016                return Err(e);
1017            }
1018        };
1019
1020        agent.seed_conversation_history(data.messages);
1021
1022        let acp_channel = Arc::new(AcpChannel::new(
1023            "acp",
1024            session_id.clone(),
1025            Arc::clone(&self.rpc),
1026            Duration::from_secs(self.acp_config.session_timeout_secs),
1027        ));
1028        agent.channel_handles().register_channel("acp", acp_channel);
1029
1030        let now = Instant::now();
1031        // Atomically insert and release reservation
1032        {
1033            let mut sessions = self.sessions.lock().await;
1034            let mut loading = self.loading_sessions.lock().await;
1035            loading.remove(&session_id);
1036            sessions.insert(
1037                session_id.clone(),
1038                Arc::new(Mutex::new(Session {
1039                    agent,
1040                    created_at: now,
1041                    last_active: now,
1042                })),
1043            );
1044        }
1045
1046        ::zeroclaw_log::record!(
1047            DEBUG,
1048            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1049                .with_attrs(::serde_json::json!({"session_id": session_id})),
1050            "Resumed session"
1051        );
1052        Ok(serde_json::json!({}))
1053    }
1054
1055    /// Handle `session/close` requests (ACP spec §Session Management).
1056    ///
1057    /// Closes a session: fires the cancel token to interrupt any in-flight turn,
1058    /// removes the session from the in-memory map, and unregisters the ACP channel.
1059    /// The session record in the persistent store is NOT deleted.
1060    ///
1061    /// Returns an empty object on success, or SESSION_NOT_FOUND if the session
1062    /// is not in the in-memory map (it may still exist in the store).
1063    async fn handle_session_close(&self, params: &Value) -> RpcResult {
1064        let session_id = params
1065            .get("sessionId")
1066            .or_else(|| params.get("session_id"))
1067            .and_then(|v| v.as_str())
1068            .ok_or_else(|| RpcError {
1069                code: INVALID_PARAMS,
1070                message: "Missing required parameter: sessionId".to_string(),
1071                data: None,
1072            })?;
1073
1074        // Fire the cancel token for any in-flight turn before acquiring the session lock.
1075        let token = self
1076            .cancel_tokens
1077            .lock()
1078            .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1079            .get(session_id)
1080            .cloned();
1081        if let Some(token) = token {
1082            token.cancel();
1083            ::zeroclaw_log::record!(
1084                DEBUG,
1085                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1086                    .with_attrs(::serde_json::json!({"session_id": session_id})),
1087                "Cancelled active turn for closing session"
1088            );
1089        }
1090
1091        let session_arc = {
1092            let mut sessions = self.sessions.lock().await;
1093            sessions.remove(session_id).ok_or_else(|| RpcError {
1094                code: SESSION_NOT_FOUND,
1095                message: format!("Session not found: {session_id}"),
1096                data: None,
1097            })?
1098        };
1099
1100        // Wait for any in-flight turn to finish (the cancel token may have already stopped it).
1101        let session = session_arc.lock().await;
1102        session.agent.channel_handles().unregister_channel("acp");
1103        ::zeroclaw_log::record!(
1104            DEBUG,
1105            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1106                .with_attrs(::serde_json::json!({"session_id": session_id})),
1107            "Closed session"
1108        );
1109
1110        Ok(serde_json::json!({}))
1111    }
1112
1113    fn requested_session_cwd(&self, params: &Value) -> PathBuf {
1114        params
1115            .get("cwd")
1116            .or_else(|| params.get("workspaceDir"))
1117            .or_else(|| params.get("workspace_dir"))
1118            .and_then(|v| v.as_str())
1119            .map(PathBuf::from)
1120            .unwrap_or_else(|| {
1121                std::env::current_dir().unwrap_or_else(|_| self.config.data_dir.clone())
1122            })
1123    }
1124
1125    async fn handle_session_prompt(&self, params: &Value, _request_id: &Value) -> RpcResult {
1126        let session_id = params
1127            .get("sessionId")
1128            .or_else(|| params.get("session_id"))
1129            .and_then(|v| v.as_str())
1130            .ok_or_else(|| RpcError {
1131                code: INVALID_PARAMS,
1132                message: "Missing required parameter: sessionId".to_string(),
1133                data: None,
1134            })?
1135            .to_string();
1136
1137        let prompt = Self::parse_prompt(params)?;
1138
1139        // Clone the Arc so the session stays visible in the map throughout the
1140        // turn. `session/stop` and the reaper can still find it; they will
1141        // block on the inner Mutex until the turn completes.
1142        let session_arc = {
1143            let sessions = self.sessions.lock().await;
1144            sessions.get(&session_id).cloned().ok_or_else(|| RpcError {
1145                code: SESSION_NOT_FOUND,
1146                message: format!("Session not found: {session_id}"),
1147                data: None,
1148            })?
1149        };
1150
1151        // Create a cancellation token for this turn and register it so that a
1152        // concurrent `session/cancel` notification can fire it without waiting
1153        // for the inner session lock (which is held for the full turn duration).
1154        // The lock can never be poisoned — all critical sections guarded by this
1155        // mutex are short, infallible HashMap operations (insert/remove/get)
1156        // that never call user code, panic, or block on I/O.
1157        let cancel_token = tokio_util::sync::CancellationToken::new();
1158        self.register_cancel_token(&session_id, cancel_token.clone())?;
1159        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(100);
1160
1161        // Move the Arc into the spawned task and lock inside it.  The inner
1162        // Mutex stays locked for the duration of the turn, preventing
1163        // concurrent stop/reap from touching the agent mid-turn. The outer
1164        // map entry remains in place.
1165        let turn_handle = tokio::spawn(async move {
1166            let mut session = session_arc.lock().await;
1167            let result = session
1168                .agent
1169                .turn_streamed(&prompt, event_tx, Some(cancel_token))
1170                .await;
1171            session.last_active = Instant::now();
1172            result
1173            // guard drops here, releasing the inner lock
1174        });
1175
1176        // Forward events as they arrive. Use standard ACP `session/update`
1177        // notifications: `tool_call` for initial (pending + title/kind for UI/icons),
1178        // `tool_call_update` for completion (status + rawOutput/content). This enables
1179        // proper pending→completed flow in ACP clients.
1180        // Track streamed text so partial content survives cancellation.
1181        let mut accumulated_text = String::new();
1182        while let Some(event) = event_rx.recv().await {
1183            // ACP has no `session/update` shape for token-usage events; the
1184            // task-local cost tracker records them out-of-band. Skip before
1185            // dispatching to the notification builder so the helper match
1186            // can stay exhaustive on the four UI-relevant variants.
1187            if matches!(event, TurnEvent::Usage { .. }) {
1188                continue;
1189            }
1190            // Track streamed text so partial content survives cancellation.
1191            if let TurnEvent::Chunk { ref delta } = event {
1192                accumulated_text.push_str(delta);
1193            }
1194            if let Some(notification) = notification_for_turn_event(&session_id, &event) {
1195                self.write_notification(&notification).await;
1196            }
1197        }
1198
1199        // Remove the cancel token regardless of outcome — the turn is over.
1200        // Lock poisoned invariant: same as the insert site above.
1201        self.remove_cancel_token(&session_id);
1202
1203        let turn_result = turn_handle.await.map_err(|e| RpcError {
1204            code: INTERNAL_ERROR,
1205            message: format!("Agent task panicked: {e}"),
1206            data: None,
1207        })?;
1208
1209        // Per ACP spec: a cancelled turn must respond with stopReason "cancelled",
1210        // not an error. Detect via ToolLoopCancelled propagated through anyhow.
1211        let was_cancelled = match &turn_result {
1212            Err(e) => zeroclaw_runtime::agent::loop_::is_tool_loop_cancelled(e),
1213            Ok(_) => false,
1214        };
1215
1216        if was_cancelled {
1217            return Ok(Self::cancelled_prompt_result(session_id, &accumulated_text));
1218        }
1219
1220        let (result_text, new_turn_msgs) = turn_result.map_err(|e| RpcError {
1221            code: INTERNAL_ERROR,
1222            message: format!("Agent turn failed: {e}"),
1223            data: None,
1224        })?;
1225
1226        // Persist new messages on successful, non-cancelled turns.
1227        if let Some(store) = &self.store
1228            && !new_turn_msgs.is_empty()
1229            && let Err(e) = store.append_turn(&session_id, &new_turn_msgs)
1230        {
1231            ::zeroclaw_log::record!(
1232                WARN,
1233                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1234                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1235                    .with_attrs(
1236                        ::serde_json::json!({"session_id": session_id, "error": e.to_string()})
1237                    ),
1238                "Failed to persist turn; session continues in memory"
1239            );
1240        }
1241
1242        Ok(Self::prompt_result(session_id, "end_turn", result_text))
1243    }
1244
1245    fn register_cancel_token(
1246        &self,
1247        session_id: &str,
1248        cancel_token: tokio_util::sync::CancellationToken,
1249    ) -> std::result::Result<(), RpcError> {
1250        let mut tokens = self
1251            .cancel_tokens
1252            .lock()
1253            .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops");
1254        if tokens.contains_key(session_id) {
1255            return Err(RpcError {
1256                code: SESSION_BUSY,
1257                message: format!("Session already has an active prompt turn: {session_id}"),
1258                data: None,
1259            });
1260        }
1261        tokens.insert(session_id.to_string(), cancel_token);
1262        Ok(())
1263    }
1264
1265    fn remove_cancel_token(&self, session_id: &str) {
1266        self.cancel_tokens
1267            .lock()
1268            .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1269            .remove(session_id);
1270    }
1271
1272    fn prompt_result(session_id: String, stop_reason: &'static str, text: String) -> Value {
1273        serde_json::json!({
1274            "sessionId": session_id,
1275            "stopReason": stop_reason,
1276            "content": text,
1277        })
1278    }
1279
1280    fn cancelled_prompt_result(session_id: String, accumulated_text: &str) -> Value {
1281        let content = if accumulated_text.is_empty() {
1282            "[interrupted by user]".to_string()
1283        } else {
1284            format!("{accumulated_text}\n\n[interrupted by user]")
1285        };
1286        Self::prompt_result(session_id, "cancelled", content)
1287    }
1288
1289    fn parse_prompt(params: &Value) -> std::result::Result<String, RpcError> {
1290        match params.get("prompt") {
1291            Some(Value::String(s)) => Ok(s.clone()),
1292            Some(Value::Array(arr)) => {
1293                let mut joined = String::new();
1294                for part in arr {
1295                    let mut added = false;
1296                    if let Some(text) = part.get("text").and_then(|v| v.as_str()) {
1297                        if !joined.is_empty() {
1298                            joined.push_str("\n\n");
1299                        }
1300                        joined.push_str(text);
1301                        added = true;
1302                    }
1303                    // Support ACP resource blocks for @-notation file attachments
1304                    // (clients send {"type":"resource","resource":{"uri":"...","text":"..."}})
1305                    if let Some(res) = part.get("resource")
1306                        && let Some(text) = res.get("text").and_then(|v| v.as_str())
1307                    {
1308                        if added || !joined.is_empty() {
1309                            joined.push_str("\n\n");
1310                        }
1311                        joined.push_str(text);
1312                    }
1313                }
1314                if joined.is_empty() {
1315                    return Err(RpcError {
1316                        code: INVALID_PARAMS,
1317                        message: "Parameter 'prompt' array must contain at least one text part"
1318                            .to_string(),
1319                        data: None,
1320                    });
1321                }
1322                Ok(joined)
1323            }
1324            _ => Err(RpcError {
1325                code: INVALID_PARAMS,
1326                message: "Missing required parameter: prompt (must be string or array of parts)"
1327                    .to_string(),
1328                data: None,
1329            }),
1330        }
1331    }
1332
1333    async fn handle_session_stop(&self, params: &Value) -> RpcResult {
1334        let session_id = params
1335            .get("sessionId")
1336            .or_else(|| params.get("session_id"))
1337            .and_then(|v| v.as_str())
1338            .ok_or_else(|| RpcError {
1339                code: INVALID_PARAMS,
1340                message: "Missing required parameter: sessionId".to_string(),
1341                data: None,
1342            })?;
1343
1344        let session_arc = {
1345            let mut sessions = self.sessions.lock().await;
1346            sessions.remove(session_id).ok_or_else(|| RpcError {
1347                code: SESSION_NOT_FOUND,
1348                message: format!("Session not found: {session_id}"),
1349                data: None,
1350            })?
1351        };
1352
1353        // Wait for any in-flight prompt turn to finish before cleaning up.
1354        // The inner lock is held by the turn task; this blocks until it drops.
1355        let session = session_arc.lock().await;
1356        // Drop the ACP back-channel from each tool's channel map so the
1357        // session's RpcOutbound clone isn't kept alive by stale entries.
1358        session.agent.channel_handles().unregister_channel("acp");
1359        ::zeroclaw_log::record!(
1360            DEBUG,
1361            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1362                .with_attrs(::serde_json::json!({"session_id": session_id})),
1363            "Stopped session"
1364        );
1365        Ok(serde_json::json!({
1366            "sessionId": session_id,
1367            "stopped": true,
1368        }))
1369    }
1370
1371    /// Handle `session/cancel` notifications (ACP spec §Cancellation).
1372    ///
1373    /// Fires the cancellation token for the named session's active turn, if
1374    /// one is running. Idempotent — silently succeeds when there is no active
1375    /// turn. The return value is ignored for notifications.
1376    ///
1377    /// Cancel-vs-stop interaction: if `session/cancel` and `session/stop` fire
1378    /// nearly simultaneously, both handlers race — cancel fires the token
1379    /// (which may or may not interrupt the turn), and stop sets
1380    /// `session.stopped = true` and awaits the turn handle. The net effect is
1381    /// harmless: either the turn sees the cancellation token or it doesn't, and
1382    /// stop always waits for the turn to finish.
1383    async fn handle_session_cancel(&self, params: &Value) -> RpcResult {
1384        let session_id = params
1385            .get("sessionId")
1386            .or_else(|| params.get("session_id"))
1387            .and_then(|v| v.as_str())
1388            .ok_or_else(|| RpcError {
1389                code: INVALID_PARAMS,
1390                message: "Missing required parameter: sessionId".to_string(),
1391                data: None,
1392            })?;
1393
1394        let token = self
1395            .cancel_tokens
1396            .lock()
1397            .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1398            .get(session_id)
1399            .cloned();
1400
1401        if let Some(token) = token {
1402            token.cancel();
1403            ::zeroclaw_log::record!(
1404                DEBUG,
1405                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1406                    .with_attrs(::serde_json::json!({"session_id": session_id})),
1407                "Cancelled active turn for session"
1408            );
1409        }
1410
1411        Ok(serde_json::json!({}))
1412    }
1413
1414    /// Handle incoming `session/update` (or legacy `session/event`) notifications.
1415    ///
1416    /// This processes bidirectional events for an active session (e.g. tool results,
1417    /// status updates, or client-side events). Currently updates session activity
1418    /// to prevent premature reaping; future extensions can route specific event
1419    /// types into the Agent.
1420    async fn handle_session_event(&self, params: &Value) -> RpcResult {
1421        let session_id = params
1422            .get("sessionId")
1423            .or_else(|| params.get("session_id"))
1424            .and_then(|v| v.as_str())
1425            .ok_or_else(|| RpcError {
1426                code: INVALID_PARAMS,
1427                message: "Missing required parameter: sessionId".to_string(),
1428                data: None,
1429            })?
1430            .to_string();
1431
1432        let event_type = params
1433            .get("type")
1434            .or_else(|| params.get("update").and_then(|u| u.get("sessionUpdate")))
1435            .and_then(|v| v.as_str())
1436            .unwrap_or("unknown")
1437            .to_string();
1438
1439        ::zeroclaw_log::record!(
1440            DEBUG,
1441            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(
1442                ::serde_json::json!({"event_type": event_type, "session_id": session_id})
1443            ),
1444            "Received session update (type=) for session"
1445        );
1446
1447        let session_arc = {
1448            let sessions = self.sessions.lock().await;
1449            sessions.get(&session_id).cloned()
1450        };
1451
1452        if let Some(session_arc) = session_arc {
1453            // Best-effort last_active update. If the inner lock is held by an
1454            // active turn, skip it — the turn itself updates last_active on completion.
1455            if let Ok(mut session) = session_arc.try_lock() {
1456                session.last_active = Instant::now();
1457            }
1458            Ok(serde_json::json!({
1459                "sessionId": session_id,
1460                "type": event_type,
1461                "status": "processed"
1462            }))
1463        } else {
1464            Err(RpcError {
1465                code: SESSION_NOT_FOUND,
1466                message: format!("Session not found: {session_id}"),
1467                data: None,
1468            })
1469        }
1470    }
1471
1472    // ── I/O helpers ──────────────────────────────────────────────
1473
1474    async fn write_result(&self, id: Value, result: Value) {
1475        let response = JsonRpcResponse {
1476            jsonrpc: "2.0",
1477            result: Some(result),
1478            error: None,
1479            id,
1480        };
1481        self.write_json(&response).await;
1482    }
1483
1484    async fn write_error(&self, id: Value, code: i32, message: &str) {
1485        let response = JsonRpcResponse {
1486            jsonrpc: "2.0",
1487            result: None,
1488            error: Some(JsonRpcError {
1489                code,
1490                message: message.to_string(),
1491                data: None,
1492            }),
1493            id,
1494        };
1495        self.write_json(&response).await;
1496    }
1497
1498    async fn write_notification(&self, notification: &JsonRpcNotification) {
1499        self.write_json(notification).await;
1500    }
1501
1502    async fn write_json<T: Serialize>(&self, value: &T) {
1503        match serde_json::to_string(value) {
1504            Ok(json) => {
1505                if self.rpc.writer_tx.send(json).await.is_err() {
1506                    ::zeroclaw_log::record!(
1507                        ERROR,
1508                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1509                            .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1510                        "ACP writer task closed; dropping outbound message"
1511                    );
1512                }
1513            }
1514            Err(e) => {
1515                ::zeroclaw_log::record!(
1516                    ERROR,
1517                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1518                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1519                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1520                    "Failed to serialize JSON-RPC message"
1521                );
1522            }
1523        }
1524    }
1525}
1526
1527/// Single writer task that owns stdout. All outbound JSON-RPC messages flow
1528/// through here, so concurrent notifications and outbound requests don't
1529/// interleave bytes.
1530async fn writer_task(mut rx: mpsc::Receiver<String>) {
1531    let mut stdout = tokio::io::stdout();
1532    while let Some(line) = rx.recv().await {
1533        if let Err(e) = stdout.write_all(line.as_bytes()).await {
1534            ::zeroclaw_log::record!(
1535                ERROR,
1536                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1537                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1538                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1539                "Failed to write to stdout"
1540            );
1541            continue;
1542        }
1543        if let Err(e) = stdout.write_all(b"\n").await {
1544            ::zeroclaw_log::record!(
1545                ERROR,
1546                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1547                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1548                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1549                "Failed to write newline to stdout"
1550            );
1551            continue;
1552        }
1553        if let Err(e) = stdout.flush().await {
1554            ::zeroclaw_log::record!(
1555                ERROR,
1556                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1557                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1558                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1559                "Failed to flush stdout"
1560            );
1561        }
1562    }
1563}
1564
1565/// Translate tool args into the ACP `rawInput` shape.
1566///
1567/// For file-editing tools, the ACP Diff schema uses `oldText`/`newText` (camelCase).
1568/// ZeroClaw's internal tool args use `old_string`/`new_string` (snake_case) for
1569/// `file_edit` and `content` for `file_write`. Without this translation, ACP clients
1570/// (Toad, Zed) cannot recognise the Diff shape and fall back to rendering the raw JSON
1571/// fields as giant strings.
1572fn to_acp_raw_input(name: &str, args: &Value) -> Value {
1573    match name {
1574        "file_edit" => {
1575            let path = args.get("path").cloned().unwrap_or(Value::Null);
1576            let old_text = args.get("old_string").cloned().unwrap_or(Value::Null);
1577            let new_text = args.get("new_string").cloned().unwrap_or(Value::Null);
1578            serde_json::json!({ "path": path, "oldText": old_text, "newText": new_text })
1579        }
1580        "file_write" => {
1581            let path = args.get("path").cloned().unwrap_or(Value::Null);
1582            let new_text = args.get("content").cloned().unwrap_or(Value::Null);
1583            serde_json::json!({ "path": path, "newText": new_text })
1584        }
1585        _ => args.clone(),
1586    }
1587}
1588
1589/// Build the ACP `content` array for a tool call notification.
1590///
1591/// Zed and Toad render tool call content from the `content` array. For
1592/// file-editing tools, emit an ACP Diff content item (`{ "type": "diff", ... }`)
1593/// so clients show a side-by-side diff editor. Non-edit tools return an empty
1594/// array — their `rawInput` is displayed via the standard `raw_input` fallback.
1595fn to_acp_content(name: &str, args: &Value) -> Value {
1596    match name {
1597        "file_edit" => {
1598            let path = args.get("path").cloned().unwrap_or(Value::Null);
1599            let old_text = args.get("old_string").cloned().unwrap_or(Value::Null);
1600            let new_text = args.get("new_string").cloned().unwrap_or(Value::Null);
1601            serde_json::json!([{ "type": "diff", "path": path, "oldText": old_text, "newText": new_text }])
1602        }
1603        "file_write" => {
1604            let path = args.get("path").cloned().unwrap_or(Value::Null);
1605            let new_text = args.get("content").cloned().unwrap_or(Value::Null);
1606            serde_json::json!([{ "type": "diff", "path": path, "newText": new_text }])
1607        }
1608        _ => serde_json::json!([]),
1609    }
1610}
1611
1612fn map_tool_kind(name: &str) -> &'static str {
1613    match name {
1614        "ask_user" | "calculator" | "claude_code" | "claude_code_runner" | "codex_cli"
1615        | "composio" | "delegate" | "escalate_to_human" | "execute_pipeline" | "gemini_cli"
1616        | "jira" | "llm_task" | "opencode_cli" | "schedule" | "security_ops" | "shell"
1617        | "sop_advance" | "sop_approve" | "sop_execute" | "vi_verify" => "execute",
1618        "backup" | "browser_open" | "canvas" | "cloud_ops" | "file_edit" | "file_write"
1619        | "memory_export" | "memory_store" | "report_template" => "edit",
1620        "cron_add" | "poll" | "reaction" => "edit",
1621        "memory_forget" | "memory_purge" => "delete",
1622        // ACP clients often treat `read`/`search`/`fetch` calls as noisy
1623        // background context gathering and keep their content collapsed. These
1624        // ZeroClaw tools return user-visible text, so use `other` to keep the
1625        // result content surfaced consistently across clients.
1626        "content_search" | "discord_search" | "glob_search" | "knowledge" | "search"
1627        | "tool_search" | "web_search_tool" => "other",
1628        "browser"
1629        | "browser_delegate"
1630        | "cloud_patterns"
1631        | "data_management"
1632        | "file_read"
1633        | "git_operations"
1634        | "google_workspace"
1635        | "hardware_board_info"
1636        | "hardware_memory_map"
1637        | "hardware_memory_read"
1638        | "image_info"
1639        | "linkedin"
1640        | "microsoft365"
1641        | "model_routing_config"
1642        | "model_switch"
1643        | "pdf_read"
1644        | "project_intel"
1645        | "proxy_config"
1646        | "read_skill"
1647        | "sessions_history"
1648        | "sessions_list"
1649        | "sop_list"
1650        | "sop_status"
1651        | "text_browser"
1652        | "weather"
1653        | "workspace" => "other",
1654        "cron_list" | "cron_runs" | "memory_recall" => "other",
1655        "http_request" | "web_fetch" => "other",
1656        "image_gen" => "other",
1657        "cron_remove" => "delete",
1658        "cron_run" => "execute",
1659        "sessions_send" => "execute",
1660        _ => "other",
1661    }
1662}
1663
1664fn notification_for_turn_event(session_id: &str, event: &TurnEvent) -> Option<JsonRpcNotification> {
1665    Some(match event {
1666        TurnEvent::Chunk { delta } => JsonRpcNotification {
1667            jsonrpc: "2.0",
1668            method: "session/update",
1669            params: serde_json::json!({
1670                "sessionId": session_id,
1671                "update": {
1672                    "sessionUpdate": "agent_message_chunk",
1673                    "content": {
1674                        "type": "text",
1675                        "text": delta
1676                    }
1677                }
1678            }),
1679        },
1680        TurnEvent::ToolCall { id, name, args } => {
1681            let acp_content = to_acp_content(name, args);
1682            let mut update = serde_json::json!({
1683                "sessionUpdate": "tool_call",
1684                "toolCallId": id,
1685                "name": name,
1686                "title": name,
1687                "kind": map_tool_kind(name),
1688                "rawInput": to_acp_raw_input(name, args),
1689                "status": "pending"
1690            });
1691            if acp_content
1692                .as_array()
1693                .is_some_and(|items| !items.is_empty())
1694            {
1695                update["content"] = acp_content;
1696            }
1697            JsonRpcNotification {
1698                jsonrpc: "2.0",
1699                method: "session/update",
1700                params: serde_json::json!({
1701                    "sessionId": session_id,
1702                    "update": update
1703                }),
1704            }
1705        }
1706        TurnEvent::ToolResult { id, name, output } => JsonRpcNotification {
1707            jsonrpc: "2.0",
1708            method: "session/update",
1709            params: serde_json::json!({
1710                "sessionId": session_id,
1711                "update": {
1712                    "sessionUpdate": "tool_call_update",
1713                    "toolCallId": id,
1714                    "name": name,
1715                    "title": name,
1716                    "kind": map_tool_kind(name),
1717                    "status": "completed",
1718                    "rawOutput": output,
1719                    "body": output,
1720                    "content": [{
1721                        "type": "content",
1722                        "content": {
1723                            "type": "text",
1724                            "text": output
1725                        }
1726                    }]
1727                }
1728            }),
1729        },
1730        TurnEvent::Thinking { delta } => JsonRpcNotification {
1731            jsonrpc: "2.0",
1732            method: "session/update",
1733            params: serde_json::json!({
1734                "sessionId": session_id,
1735                "update": {
1736                    "sessionUpdate": "agent_thought_chunk",
1737                    "content": {
1738                        "type": "text",
1739                        "text": delta
1740                    }
1741                }
1742            }),
1743        },
1744        // ACP has its own approval mechanism via `session/request_permission`
1745        // routed through the channel's `request_choice` impl. The agent only
1746        // emits ApprovalRequest events when a back-channel like the gateway
1747        // WS is registered to handle them; on ACP-only sessions they should
1748        // not arrive here.
1749        TurnEvent::ApprovalRequest { .. } => return None,
1750        // Usage events are filtered out at every call site (ACP has no
1751        // `session/update` shape for them; the cost tracker records them
1752        // out-of-band). Reaching this arm means a caller forgot the filter.
1753        TurnEvent::Usage { .. } => unreachable!(
1754            "TurnEvent::Usage must be filtered before notification_for_turn_event; \
1755             ACP has no session/update notification for token usage"
1756        ),
1757    })
1758}
1759
1760fn history_notifications_for_message(
1761    session_id: &str,
1762    msg: &ConversationMessage,
1763) -> Vec<JsonRpcNotification> {
1764    match msg {
1765        ConversationMessage::Chat(chat) => {
1766            let update_type = match chat.role.as_str() {
1767                "user" => "user_message_chunk",
1768                "assistant" => "agent_message_chunk",
1769                _ => return vec![],
1770            };
1771            vec![JsonRpcNotification {
1772                jsonrpc: "2.0",
1773                method: "session/update",
1774                params: serde_json::json!({
1775                    "sessionId": session_id,
1776                    "update": {
1777                        "sessionUpdate": update_type,
1778                        "content": { "type": "text", "text": &chat.content }
1779                    }
1780                }),
1781            }]
1782        }
1783        ConversationMessage::AssistantToolCalls {
1784            text, tool_calls, ..
1785        } => {
1786            let mut notifications = Vec::new();
1787            if let Some(t) = text
1788                && !t.is_empty()
1789            {
1790                notifications.push(JsonRpcNotification {
1791                    jsonrpc: "2.0",
1792                    method: "session/update",
1793                    params: serde_json::json!({
1794                        "sessionId": session_id,
1795                        "update": {
1796                            "sessionUpdate": "agent_message_chunk",
1797                            "content": { "type": "text", "text": t }
1798                        }
1799                    }),
1800                });
1801            }
1802            for tc in tool_calls {
1803                let args: serde_json::Value =
1804                    serde_json::from_str(&tc.arguments).unwrap_or(serde_json::Value::Null);
1805                let acp_content = to_acp_content(&tc.name, &args);
1806                let mut update = serde_json::json!({
1807                    "sessionUpdate": "tool_call",
1808                    "toolCallId": &tc.id,
1809                    "name": &tc.name,
1810                    "title": &tc.name,
1811                    "kind": map_tool_kind(&tc.name),
1812                    "rawInput": to_acp_raw_input(&tc.name, &args),
1813                    "status": "completed"
1814                });
1815                if acp_content
1816                    .as_array()
1817                    .is_some_and(|items| !items.is_empty())
1818                {
1819                    update["content"] = acp_content;
1820                }
1821                notifications.push(JsonRpcNotification {
1822                    jsonrpc: "2.0",
1823                    method: "session/update",
1824                    params: serde_json::json!({
1825                        "sessionId": session_id,
1826                        "update": update
1827                    }),
1828                });
1829            }
1830            notifications
1831        }
1832        ConversationMessage::ToolResults(results) => results
1833            .iter()
1834            .map(|r| JsonRpcNotification {
1835                jsonrpc: "2.0",
1836                method: "session/update",
1837                params: serde_json::json!({
1838                    "sessionId": session_id,
1839                    "update": {
1840                        "sessionUpdate": "tool_call_update",
1841                        "toolCallId": &r.tool_call_id,
1842                        "status": "completed",
1843                        "rawOutput": &r.content,
1844                        "body": &r.content,
1845                        "content": [{
1846                            "type": "content",
1847                            "content": { "type": "text", "text": &r.content }
1848                        }]
1849                    }
1850                }),
1851            })
1852            .collect(),
1853    }
1854}
1855
1856// ── Error helper ─────────────────────────────────────────────────
1857
1858#[derive(Debug)]
1859struct RpcError {
1860    code: i32,
1861    message: String,
1862    #[allow(dead_code)] // JSON-RPC spec field, used for structured error data
1863    data: Option<Value>,
1864}
1865
1866type RpcResult = std::result::Result<Value, RpcError>;
1867
1868#[cfg(test)]
1869mod tests {
1870    use super::*;
1871
1872    #[test]
1873    fn acp_server_config_defaults() {
1874        let cfg = AcpServerConfig::default();
1875        assert_eq!(cfg.max_sessions, 10);
1876        assert_eq!(cfg.session_timeout_secs, 3600);
1877    }
1878
1879    #[test]
1880    fn acp_server_config_deserialize() {
1881        let json = r#"{"max_sessions": 5, "session_timeout_secs": 1800}"#;
1882        let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
1883        assert_eq!(cfg.max_sessions, 5);
1884        assert_eq!(cfg.session_timeout_secs, 1800);
1885    }
1886
1887    #[test]
1888    fn acp_server_config_deserialize_partial() {
1889        let json = r#"{"max_sessions": 3}"#;
1890        let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
1891        assert_eq!(cfg.max_sessions, 3);
1892        assert_eq!(cfg.session_timeout_secs, 3600);
1893    }
1894
1895    #[test]
1896    fn json_rpc_request_parse() {
1897        let json = r#"{"jsonrpc":"2.0","method":"initialize","params":{},"id":1}"#;
1898        let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
1899        assert_eq!(req.method, "initialize");
1900        assert_eq!(req.id, Some(Value::Number(1.into())));
1901    }
1902
1903    #[test]
1904    fn json_rpc_request_parse_notification() {
1905        let json = r#"{"jsonrpc":"2.0","method":"session/update","params":{}}"#;
1906        let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
1907        assert_eq!(req.method, "session/update");
1908        assert!(req.id.is_none());
1909    }
1910
1911    #[test]
1912    fn json_rpc_response_serialize() {
1913        let resp = JsonRpcResponse {
1914            jsonrpc: "2.0",
1915            result: Some(serde_json::json!({"status": "ok"})),
1916            error: None,
1917            id: Value::Number(1.into()),
1918        };
1919        let json = serde_json::to_string(&resp).unwrap();
1920        let parsed: Value = serde_json::from_str(&json).unwrap();
1921        assert_eq!(parsed["jsonrpc"], "2.0");
1922        assert!(parsed.get("result").is_some());
1923        assert!(parsed.get("error").is_none());
1924        assert_eq!(parsed["id"], 1);
1925    }
1926
1927    #[tokio::test]
1928    async fn rpc_request_timeout_drop_removes_pending_responder() {
1929        let (tx, mut rx) = mpsc::channel::<String>(16);
1930        let rpc = RpcOutbound::for_testing(tx);
1931
1932        let result = tokio::time::timeout(
1933            Duration::from_millis(10),
1934            rpc.request("session/request_permission", serde_json::json!({})),
1935        )
1936        .await;
1937
1938        assert!(result.is_err());
1939        assert!(rx.recv().await.is_some());
1940        assert_eq!(rpc.pending_count_for_test(), 0);
1941    }
1942
1943    #[test]
1944    fn initialize_response_uses_acp_v1_shape() {
1945        let server = AcpServer::new(Config::default(), AcpServerConfig::default());
1946        let result = server
1947            .handle_initialize(&serde_json::json!({
1948                "protocolVersion": 1,
1949                "clientCapabilities": {},
1950                "clientInfo": {
1951                    "name": "test-client",
1952                    "version": "1.0.0"
1953                }
1954            }))
1955            .unwrap();
1956
1957        assert_eq!(result["protocolVersion"], 1);
1958        assert_eq!(result["agentInfo"]["name"], "zeroclaw-acp");
1959        assert_eq!(result["agentInfo"]["title"], "ZeroClaw ACP");
1960        assert_eq!(result["agentInfo"]["version"], env!("CARGO_PKG_VERSION"));
1961        assert_eq!(result["authMethods"], serde_json::json!([]));
1962        assert_eq!(result["agentCapabilities"]["loadSession"], false);
1963        assert_eq!(
1964            result["agentCapabilities"]["promptCapabilities"]["image"],
1965            false
1966        );
1967        assert_eq!(
1968            result["agentCapabilities"]["mcpCapabilities"]["http"],
1969            false
1970        );
1971        assert!(result.get("serverInfo").is_none());
1972        assert!(result.get("capabilities").is_none());
1973    }
1974
1975    #[test]
1976    fn initialize_advertises_load_session_when_store_present() {
1977        let cwd = tempfile::tempdir().unwrap();
1978        let store =
1979            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
1980        let server = AcpServer::new_with_store(
1981            make_test_config(cwd.path()),
1982            AcpServerConfig::default(),
1983            store,
1984        );
1985        let result = server.handle_initialize(&serde_json::json!({})).unwrap();
1986        assert_eq!(result["agentCapabilities"]["loadSession"], true);
1987        assert_eq!(
1988            result["agentCapabilities"]["sessionCapabilities"]["resume"],
1989            serde_json::json!({})
1990        );
1991        assert_eq!(
1992            result["agentCapabilities"]["sessionCapabilities"]["close"],
1993            serde_json::json!({})
1994        );
1995    }
1996
1997    #[test]
1998    fn session_new_defaults_to_launch_cwd_when_client_omits_cwd() {
1999        let config = Config {
2000            data_dir: PathBuf::from("/not/the/project"),
2001            ..Default::default()
2002        };
2003        let server = AcpServer::new(config, AcpServerConfig::default());
2004        let expected = std::env::current_dir().unwrap();
2005
2006        assert_eq!(
2007            server.requested_session_cwd(&serde_json::json!({})),
2008            expected
2009        );
2010    }
2011
2012    #[test]
2013    fn session_new_respects_client_cwd_when_present() {
2014        let server = AcpServer::new(Config::default(), AcpServerConfig::default());
2015        let cwd = std::env::current_dir().unwrap();
2016
2017        assert_eq!(
2018            server.requested_session_cwd(&serde_json::json!({"cwd": cwd})),
2019            cwd
2020        );
2021    }
2022
2023    #[tokio::test]
2024    async fn session_new_does_not_wait_for_configured_mcp_servers() {
2025        let cwd = tempfile::tempdir().unwrap();
2026        let mut config = Config {
2027            data_dir: cwd.path().to_path_buf(),
2028            providers: {
2029                let mut p = zeroclaw_config::providers::Providers::default();
2030                p.models.openrouter.insert(
2031                    "default".to_string(),
2032                    zeroclaw_config::schema::OpenRouterModelProviderConfig {
2033                        base: zeroclaw_config::schema::ModelProviderConfig {
2034                            model: Some("test-model".to_string()),
2035                            ..Default::default()
2036                        },
2037                    },
2038                );
2039                p
2040            },
2041            mcp: zeroclaw_config::schema::McpConfig {
2042                enabled: true,
2043                servers: vec![zeroclaw_config::schema::McpServerConfig {
2044                    name: "slow".to_string(),
2045                    transport: zeroclaw_config::schema::McpTransport::Stdio,
2046                    command: "/bin/sh".to_string(),
2047                    args: vec!["-c".to_string(), "sleep 60".to_string()],
2048                    ..Default::default()
2049                }],
2050                ..Default::default()
2051            },
2052            ..Default::default()
2053        };
2054        config.risk_profiles.insert(
2055            "default".to_string(),
2056            zeroclaw_config::schema::RiskProfileConfig::default(),
2057        );
2058        config.agents.insert(
2059            "test-agent".to_string(),
2060            zeroclaw_config::schema::AliasedAgentConfig {
2061                model_provider: "openrouter.default".into(),
2062                risk_profile: "default".to_string(),
2063                ..Default::default()
2064            },
2065        );
2066        let server = AcpServer::new(config, AcpServerConfig::default());
2067
2068        let result = tokio::time::timeout(
2069            Duration::from_secs(2),
2070            server.handle_session_new(&serde_json::json!({
2071                "cwd": cwd.path().to_string_lossy(),
2072                "agentAlias": "test-agent",
2073                "mcpServers": []
2074            })),
2075        )
2076        .await
2077        .expect("session/new should not block on configured MCP startup")
2078        .expect("session/new should create a session");
2079
2080        assert!(result["sessionId"].as_str().is_some());
2081    }
2082
2083    #[tokio::test]
2084    async fn session_new_auto_selects_sole_configured_agent_when_alias_omitted() {
2085        let cwd = tempfile::tempdir().unwrap();
2086        let mut config = Config {
2087            data_dir: cwd.path().to_path_buf(),
2088            providers: {
2089                let mut p = zeroclaw_config::providers::Providers::default();
2090                p.models.openrouter.insert(
2091                    "default".to_string(),
2092                    zeroclaw_config::schema::OpenRouterModelProviderConfig {
2093                        base: zeroclaw_config::schema::ModelProviderConfig {
2094                            api_key: Some("test-key".to_string()),
2095                            model: Some("test-model".to_string()),
2096                            ..Default::default()
2097                        },
2098                    },
2099                );
2100                p
2101            },
2102            ..Default::default()
2103        };
2104        config.risk_profiles.insert(
2105            "default".to_string(),
2106            zeroclaw_config::schema::RiskProfileConfig::default(),
2107        );
2108        config.agents.insert(
2109            "only-agent".to_string(),
2110            zeroclaw_config::schema::AliasedAgentConfig {
2111                model_provider: "openrouter.default".into(),
2112                risk_profile: "default".to_string(),
2113                ..Default::default()
2114            },
2115        );
2116        let server = AcpServer::new(config, AcpServerConfig::default());
2117
2118        let result = tokio::time::timeout(
2119            Duration::from_secs(2),
2120            server.handle_session_new(&serde_json::json!({
2121                "cwd": cwd.path().to_string_lossy(),
2122                "mcpServers": []
2123            })),
2124        )
2125        .await
2126        .expect("session/new should not block")
2127        .expect("session/new should auto-select the sole configured agent");
2128
2129        assert!(result["sessionId"].as_str().is_some());
2130    }
2131
2132    #[tokio::test]
2133    async fn session_new_requires_alias_when_multiple_agents_configured() {
2134        let mut config = Config::default();
2135        config.agents.insert(
2136            "agent-one".to_string(),
2137            zeroclaw_config::schema::AliasedAgentConfig::default(),
2138        );
2139        config.agents.insert(
2140            "agent-two".to_string(),
2141            zeroclaw_config::schema::AliasedAgentConfig::default(),
2142        );
2143        let server = AcpServer::new(config, AcpServerConfig::default());
2144
2145        let err = server
2146            .handle_session_new(&serde_json::json!({"mcpServers": []}))
2147            .await
2148            .expect_err("session/new without agentAlias should fail when multiple agents exist");
2149
2150        assert_eq!(err.code, INVALID_PARAMS);
2151        assert!(
2152            err.message.contains("agentAlias"),
2153            "error should mention agentAlias, got: {}",
2154            err.message
2155        );
2156    }
2157
2158    #[tokio::test]
2159    async fn session_new_uses_config_default_agent_when_alias_omitted_and_multiple_agents() {
2160        let cwd = tempfile::tempdir().unwrap();
2161        let mut config = Config {
2162            data_dir: cwd.path().to_path_buf(),
2163            providers: {
2164                let mut p = zeroclaw_config::providers::Providers::default();
2165                p.models.openrouter.insert(
2166                    "default".to_string(),
2167                    zeroclaw_config::schema::OpenRouterModelProviderConfig {
2168                        base: zeroclaw_config::schema::ModelProviderConfig {
2169                            api_key: Some("test-key".to_string()),
2170                            model: Some("test-model".to_string()),
2171                            ..Default::default()
2172                        },
2173                    },
2174                );
2175                p
2176            },
2177            ..Default::default()
2178        };
2179        config.risk_profiles.insert(
2180            "default".to_string(),
2181            zeroclaw_config::schema::RiskProfileConfig::default(),
2182        );
2183        config.agents.insert(
2184            "agent-alpha".to_string(),
2185            zeroclaw_config::schema::AliasedAgentConfig {
2186                model_provider: "openrouter.default".into(),
2187                risk_profile: "default".to_string(),
2188                ..Default::default()
2189            },
2190        );
2191        config.agents.insert(
2192            "agent-beta".to_string(),
2193            zeroclaw_config::schema::AliasedAgentConfig {
2194                model_provider: "openrouter.default".into(),
2195                risk_profile: "default".to_string(),
2196                ..Default::default()
2197            },
2198        );
2199        config.acp.default_agent = Some("agent-alpha".to_string());
2200        let server = AcpServer::new(config, AcpServerConfig::default());
2201
2202        let result = tokio::time::timeout(
2203            Duration::from_secs(2),
2204            server.handle_session_new(&serde_json::json!({
2205                "cwd": cwd.path().to_string_lossy(),
2206                "mcpServers": []
2207            })),
2208        )
2209        .await
2210        .expect("should not block")
2211        .expect("should select agent-alpha from config.acp.default_agent");
2212
2213        assert!(result["sessionId"].as_str().is_some());
2214    }
2215
2216    #[tokio::test]
2217    async fn session_new_explicit_alias_overrides_config_default_agent() {
2218        let cwd = tempfile::tempdir().unwrap();
2219        let mut config = Config {
2220            data_dir: cwd.path().to_path_buf(),
2221            providers: {
2222                let mut p = zeroclaw_config::providers::Providers::default();
2223                p.models.openrouter.insert(
2224                    "default".to_string(),
2225                    zeroclaw_config::schema::OpenRouterModelProviderConfig {
2226                        base: zeroclaw_config::schema::ModelProviderConfig {
2227                            api_key: Some("test-key".to_string()),
2228                            model: Some("test-model".to_string()),
2229                            ..Default::default()
2230                        },
2231                    },
2232                );
2233                p
2234            },
2235            ..Default::default()
2236        };
2237        config.risk_profiles.insert(
2238            "default".to_string(),
2239            zeroclaw_config::schema::RiskProfileConfig::default(),
2240        );
2241        config.agents.insert(
2242            "agent-alpha".to_string(),
2243            zeroclaw_config::schema::AliasedAgentConfig {
2244                model_provider: "openrouter.default".into(),
2245                risk_profile: "default".to_string(),
2246                ..Default::default()
2247            },
2248        );
2249        config.agents.insert(
2250            "agent-beta".to_string(),
2251            zeroclaw_config::schema::AliasedAgentConfig {
2252                model_provider: "openrouter.default".into(),
2253                risk_profile: "default".to_string(),
2254                ..Default::default()
2255            },
2256        );
2257        config.acp.default_agent = Some("agent-alpha".to_string());
2258        let server = AcpServer::new(config, AcpServerConfig::default());
2259
2260        // Explicit alias should win over config default
2261        let result = tokio::time::timeout(
2262            Duration::from_secs(2),
2263            server.handle_session_new(&serde_json::json!({
2264                "agentAlias": "agent-beta",
2265                "cwd": cwd.path().to_string_lossy(),
2266                "mcpServers": []
2267            })),
2268        )
2269        .await
2270        .expect("should not block")
2271        .expect("should use agent-beta despite default_agent = agent-alpha");
2272
2273        assert!(result["sessionId"].as_str().is_some());
2274    }
2275
2276    #[test]
2277    fn json_rpc_error_response_serialize() {
2278        let resp = JsonRpcResponse {
2279            jsonrpc: "2.0",
2280            result: None,
2281            error: Some(JsonRpcError {
2282                code: METHOD_NOT_FOUND,
2283                message: "Method not found".to_string(),
2284                data: None,
2285            }),
2286            id: Value::Number(1.into()),
2287        };
2288        let json = serde_json::to_string(&resp).unwrap();
2289        let parsed: Value = serde_json::from_str(&json).unwrap();
2290        assert!(parsed.get("error").is_some());
2291        assert_eq!(parsed["error"]["code"], -32601);
2292        assert!(parsed.get("result").is_none());
2293    }
2294
2295    #[test]
2296    fn json_rpc_notification_serialize() {
2297        let notif = JsonRpcNotification {
2298            jsonrpc: "2.0",
2299            method: "session/update",
2300            params: serde_json::json!({
2301                "sessionId": "test-sid",
2302                "update": {
2303                    "sessionUpdate": "agent_message_chunk",
2304                    "content": { "type": "text", "text": "hello" }
2305                }
2306            }),
2307        };
2308        let json = serde_json::to_string(&notif).unwrap();
2309        assert!(json.contains(r#""method":"session/update""#));
2310        assert!(json.contains(r#""sessionUpdate":"agent_message_chunk""#));
2311        assert!(json.contains(r#""text":"hello""#));
2312    }
2313
2314    #[test]
2315    fn test_prompt_parsing() {
2316        // String prompt
2317        let string_params = serde_json::json!({"prompt": "hello world"});
2318        let result = AcpServer::parse_prompt(&string_params).unwrap();
2319        assert_eq!(result, "hello world");
2320
2321        // Array prompt (valid)
2322        let array_params = serde_json::json!({
2323            "prompt": [
2324                {"type": "text", "text": "part 1"},
2325                {"type": "text", "text": "part 2"}
2326            ]
2327        });
2328        let result = AcpServer::parse_prompt(&array_params).unwrap();
2329        assert_eq!(result, "part 1\n\npart 2");
2330
2331        // Array prompt (empty or no text)
2332        let empty_array_params = serde_json::json!({"prompt": []});
2333        let result = AcpServer::parse_prompt(&empty_array_params);
2334        assert!(result.is_err());
2335        assert_eq!(result.unwrap_err().code, INVALID_PARAMS);
2336
2337        let no_text_params = serde_json::json!({
2338            "prompt": [
2339                {"type": "image", "data": "..."}
2340            ]
2341        });
2342        let result = AcpServer::parse_prompt(&no_text_params);
2343        assert!(result.is_err());
2344
2345        // Array prompt with resource (file @-notation from ACP client)
2346        let resource_params = serde_json::json!({
2347            "prompt": [
2348                {"type": "text", "text": "analyze this file:"},
2349                {"type": "resource", "resource": {"uri": "file:///tmp/example.rs", "text": "fn main() { println!(\"hi\"); }", "mimeType": "text/rust"}}
2350            ]
2351        });
2352        let result = AcpServer::parse_prompt(&resource_params).unwrap();
2353        assert!(result.contains("analyze this file:"));
2354        assert!(result.contains("fn main() { println!(\"hi\"); }"));
2355    }
2356
2357    #[test]
2358    fn handle_initialize_default_model_absent_when_unconfigured() {
2359        let server = AcpServer::new(Config::default(), AcpServerConfig::default());
2360        let result = server.handle_initialize(&serde_json::json!({})).unwrap();
2361        assert!(
2362            result["_meta"]["zeroclaw"].get("defaultModel").is_none(),
2363            "defaultModel must be absent when no model_provider is configured, got: {}",
2364            result["_meta"]["zeroclaw"]["defaultModel"]
2365        );
2366    }
2367
2368    #[test]
2369    fn handle_initialize_default_model_reflects_configured_provider() {
2370        use zeroclaw_config::schema::{ModelProviderConfig, OllamaModelProviderConfig};
2371        let mut config = Config::default();
2372        config.providers.models.ollama.insert(
2373            "default".to_string(),
2374            OllamaModelProviderConfig {
2375                base: ModelProviderConfig {
2376                    model: Some("llama3.2".to_string()),
2377                    ..Default::default()
2378                },
2379                ..OllamaModelProviderConfig::default()
2380            },
2381        );
2382        let server = AcpServer::new(config, AcpServerConfig::default());
2383        let result = server.handle_initialize(&serde_json::json!({})).unwrap();
2384        assert_eq!(result["_meta"]["zeroclaw"]["defaultModel"], "llama3.2");
2385    }
2386
2387    #[test]
2388    fn prompt_result_preserves_content_string_shape() {
2389        let result = AcpServer::prompt_result("test-sid".to_string(), "end_turn", "hello".into());
2390        assert_eq!(result["sessionId"], "test-sid");
2391        assert_eq!(result["stopReason"], "end_turn");
2392        assert_eq!(result["content"], "hello");
2393    }
2394
2395    #[test]
2396    fn cancelled_prompt_result_preserves_content_string_shape() {
2397        let with_partial =
2398            AcpServer::cancelled_prompt_result("test-sid".to_string(), "partial text");
2399        assert_eq!(with_partial["sessionId"], "test-sid");
2400        assert_eq!(with_partial["stopReason"], "cancelled");
2401        assert_eq!(
2402            with_partial["content"],
2403            "partial text\n\n[interrupted by user]"
2404        );
2405
2406        let marker_only = AcpServer::cancelled_prompt_result("test-sid".to_string(), "");
2407        assert_eq!(marker_only["content"], "[interrupted by user]");
2408    }
2409
2410    #[test]
2411    fn test_tool_call_and_update_serialization() {
2412        // Test tool_call (initial pending event)
2413        let tool_call_notif = JsonRpcNotification {
2414            jsonrpc: "2.0",
2415            method: "session/update",
2416            params: serde_json::json!({
2417                "sessionId": "test-sid",
2418                "update": {
2419                    "sessionUpdate": "tool_call",
2420                    "toolCallId": "tc-12345",
2421                    "name": "shell",
2422                    "title": "shell",
2423                    "kind": "execute",
2424                    "rawInput": {"command": "ls -la"},
2425                    "status": "pending"
2426                }
2427            }),
2428        };
2429        let json1 = serde_json::to_string(&tool_call_notif).unwrap();
2430        assert!(json1.contains("\"sessionUpdate\":\"tool_call\""));
2431        assert!(json1.contains("\"toolCallId\":\"tc-12345\""));
2432        assert!(json1.contains("\"name\":\"shell\""));
2433        assert!(json1.contains("\"title\":\"shell\""));
2434        assert!(json1.contains("\"kind\":\"execute\""));
2435        assert!(json1.contains("\"status\":\"pending\""));
2436        assert!(json1.contains("\"rawInput\""));
2437
2438        // Test tool_call_update completion payload
2439        let tool_update_notif = JsonRpcNotification {
2440            jsonrpc: "2.0",
2441            method: "session/update",
2442            params: serde_json::json!({
2443                "sessionId": "test-sid",
2444                "update": {
2445                    "sessionUpdate": "tool_call_update",
2446                    "toolCallId": "tc-12345",
2447                    "name": "shell",
2448                    "title": "shell",
2449                    "kind": "execute",
2450                    "status": "completed",
2451                    "rawOutput": "file1.txt\nfile2.txt",
2452                    "body": "file1.txt\nfile2.txt",
2453                    "content": [{
2454                        "type": "content",
2455                        "content": {
2456                            "type": "text",
2457                            "text": "file1.txt\nfile2.txt"
2458                        }
2459                    }]
2460                }
2461            }),
2462        };
2463        let json2 = serde_json::to_string(&tool_update_notif).unwrap();
2464        assert!(json2.contains("\"sessionUpdate\":\"tool_call_update\""));
2465        assert!(json2.contains("\"toolCallId\":\"tc-12345\""));
2466        assert!(json2.contains("\"name\":\"shell\""));
2467        assert!(json2.contains("\"status\":\"completed\""));
2468        assert!(json2.contains("\"rawOutput\""));
2469        assert!(json2.contains("\"body\""));
2470        assert!(json2.contains("\"content\""));
2471        assert!(json2.contains("\"type\":\"content\""));
2472        assert!(json2.contains("file1.txt"));
2473        // Verify matching toolCallId across events
2474        assert!(json1.contains("tc-12345") && json2.contains("tc-12345"));
2475    }
2476
2477    #[test]
2478    fn file_edit_raw_input_uses_acp_diff_field_names() {
2479        let call = notification_for_turn_event(
2480            "sid",
2481            &TurnEvent::ToolCall {
2482                id: "tc-1".to_string(),
2483                name: "file_edit".to_string(),
2484                args: serde_json::json!({
2485                    "path": "src/foo.rs",
2486                    "old_string": "let x = 1;",
2487                    "new_string": "let x = 2;"
2488                }),
2489            },
2490        );
2491        let v = serde_json::to_value(call.unwrap()).unwrap();
2492        let raw = &v["params"]["update"]["rawInput"];
2493        assert_eq!(raw["path"], "src/foo.rs");
2494        assert_eq!(raw["oldText"], "let x = 1;");
2495        assert_eq!(raw["newText"], "let x = 2;");
2496        assert!(
2497            raw.get("old_string").is_none(),
2498            "old_string must not appear in rawInput"
2499        );
2500        assert!(
2501            raw.get("new_string").is_none(),
2502            "new_string must not appear in rawInput"
2503        );
2504
2505        let content = &v["params"]["update"]["content"];
2506        assert!(content.is_array(), "file_edit must emit a content array");
2507        let diff = &content[0];
2508        assert_eq!(diff["type"], "diff");
2509        assert_eq!(diff["path"], "src/foo.rs");
2510        assert_eq!(diff["oldText"], "let x = 1;");
2511        assert_eq!(diff["newText"], "let x = 2;");
2512    }
2513
2514    #[test]
2515    fn file_write_raw_input_uses_acp_diff_field_names() {
2516        let call = notification_for_turn_event(
2517            "sid",
2518            &TurnEvent::ToolCall {
2519                id: "tc-2".to_string(),
2520                name: "file_write".to_string(),
2521                args: serde_json::json!({
2522                    "path": "src/new.rs",
2523                    "content": "fn main() {}"
2524                }),
2525            },
2526        );
2527        let v = serde_json::to_value(call.unwrap()).unwrap();
2528        let raw = &v["params"]["update"]["rawInput"];
2529        assert_eq!(raw["path"], "src/new.rs");
2530        assert_eq!(raw["newText"], "fn main() {}");
2531        assert!(
2532            raw.get("oldText").is_none(),
2533            "oldText must not appear in file_write rawInput"
2534        );
2535        assert!(
2536            raw.get("content").is_none(),
2537            "content must not appear in rawInput"
2538        );
2539
2540        let content = &v["params"]["update"]["content"];
2541        assert!(content.is_array(), "file_write must emit a content array");
2542        let diff = &content[0];
2543        assert_eq!(diff["type"], "diff");
2544        assert_eq!(diff["path"], "src/new.rs");
2545        assert_eq!(diff["newText"], "fn main() {}");
2546        assert!(
2547            diff.get("oldText").is_none(),
2548            "oldText must be absent for file_write diff"
2549        );
2550    }
2551
2552    #[test]
2553    fn map_tool_kind_uses_explicit_tool_names() {
2554        assert_eq!(map_tool_kind("memory_forget"), "delete");
2555        assert_eq!(map_tool_kind("memory_purge"), "delete");
2556        assert_eq!(map_tool_kind("cron_run"), "execute");
2557        assert_eq!(map_tool_kind("file_read"), "other");
2558        assert_eq!(map_tool_kind("knowledge"), "other");
2559        assert_eq!(map_tool_kind("web_fetch"), "other");
2560        assert_eq!(map_tool_kind("file_write"), "edit");
2561        assert_eq!(map_tool_kind("unknown_tool"), "other");
2562    }
2563
2564    #[test]
2565    fn turn_tool_events_include_client_visible_tool_fields() {
2566        let call = notification_for_turn_event(
2567            "test-sid",
2568            &TurnEvent::ToolCall {
2569                id: "tc-12345".to_string(),
2570                name: "shell".to_string(),
2571                args: serde_json::json!({"command": "ls -la"}),
2572            },
2573        );
2574        let call_value =
2575            serde_json::to_value(call.expect("ToolCall maps to a notification")).unwrap();
2576        assert_eq!(call_value["method"], "session/update");
2577        assert_eq!(call_value["params"]["update"]["sessionUpdate"], "tool_call");
2578        assert_eq!(call_value["params"]["update"]["toolCallId"], "tc-12345");
2579        assert_eq!(call_value["params"]["update"]["name"], "shell");
2580        assert_eq!(call_value["params"]["update"]["title"], "shell");
2581        assert_eq!(call_value["params"]["update"]["kind"], "execute");
2582        assert_eq!(
2583            call_value["params"]["update"]["rawInput"],
2584            serde_json::json!({"command": "ls -la"})
2585        );
2586
2587        let result = notification_for_turn_event(
2588            "test-sid",
2589            &TurnEvent::ToolResult {
2590                id: "tc-12345".to_string(),
2591                name: "shell".to_string(),
2592                output: "file1.txt\nfile2.txt".to_string(),
2593            },
2594        );
2595        let result_value =
2596            serde_json::to_value(result.expect("ToolResult maps to a notification")).unwrap();
2597        assert_eq!(
2598            result_value["params"]["update"]["sessionUpdate"],
2599            "tool_call_update"
2600        );
2601        assert_eq!(result_value["params"]["update"]["toolCallId"], "tc-12345");
2602        assert_eq!(result_value["params"]["update"]["name"], "shell");
2603        assert_eq!(result_value["params"]["update"]["title"], "shell");
2604        assert_eq!(result_value["params"]["update"]["kind"], "execute");
2605        assert_eq!(result_value["params"]["update"]["status"], "completed");
2606        assert_eq!(
2607            result_value["params"]["update"]["rawOutput"],
2608            "file1.txt\nfile2.txt"
2609        );
2610        assert_eq!(
2611            result_value["params"]["update"]["body"],
2612            "file1.txt\nfile2.txt"
2613        );
2614        assert_eq!(
2615            result_value["params"]["update"]["content"][0]["content"]["text"],
2616            "file1.txt\nfile2.txt"
2617        );
2618    }
2619
2620    /// `session/stop` must succeed while a `session/prompt` turn is in flight.
2621    ///
2622    /// The session entry lives in the outer map for its entire lifetime.
2623    /// The inner `Arc<Mutex<Session>>` serialises access: the prompt turn holds
2624    /// the inner lock while running; `session/stop` removes the outer entry
2625    /// then waits for the inner lock before cleaning up.  It must never see
2626    /// SESSION_NOT_FOUND just because a turn happens to be running.
2627    #[tokio::test]
2628    async fn session_stop_finds_session_during_active_prompt_turn() {
2629        let cwd = tempfile::tempdir().unwrap();
2630        let mut config = Config {
2631            data_dir: cwd.path().to_path_buf(),
2632            providers: {
2633                let mut p = zeroclaw_config::providers::Providers::default();
2634                p.models.anthropic.insert(
2635                    "default".to_string(),
2636                    zeroclaw_config::schema::AnthropicModelProviderConfig {
2637                        base: zeroclaw_config::schema::ModelProviderConfig {
2638                            model: Some("claude-haiku-4-5".to_string()),
2639                            ..Default::default()
2640                        },
2641                    },
2642                );
2643                p
2644            },
2645            ..Default::default()
2646        };
2647        config.risk_profiles.insert(
2648            "default".to_string(),
2649            zeroclaw_config::schema::RiskProfileConfig::default(),
2650        );
2651        config.agents.insert(
2652            "test-agent".to_string(),
2653            zeroclaw_config::schema::AliasedAgentConfig {
2654                model_provider: "anthropic.default".into(),
2655                risk_profile: "default".to_string(),
2656                ..Default::default()
2657            },
2658        );
2659        let server = Arc::new(AcpServer::new(config, AcpServerConfig::default()));
2660
2661        // Create a real session via the normal path.
2662        let new_result = server
2663            .handle_session_new(&serde_json::json!({
2664                "cwd": cwd.path().to_string_lossy(),
2665                "agentAlias": "test-agent"
2666            }))
2667            .await
2668            .expect("session/new must succeed");
2669        let session_id = new_result["sessionId"].as_str().unwrap().to_string();
2670
2671        // Grab the inner lock to simulate an in-flight prompt turn.
2672        let session_arc = {
2673            let sessions = server.sessions.lock().await;
2674            sessions.get(&session_id).cloned().unwrap()
2675        };
2676        let _guard = session_arc.lock().await;
2677
2678        // session/stop should find the session in the outer map.  With the
2679        // inner lock held it blocks — confirm it does NOT immediately return
2680        // SESSION_NOT_FOUND.
2681        let server_clone = Arc::clone(&server);
2682        let sid_clone = session_id.clone();
2683        let stop_result = tokio::time::timeout(Duration::from_millis(100), async move {
2684            server_clone
2685                .handle_session_stop(&serde_json::json!({ "sessionId": sid_clone }))
2686                .await
2687        })
2688        .await;
2689
2690        match stop_result {
2691            Err(_timeout) => {} // expected — blocked waiting for the inner lock
2692            Ok(Ok(_)) => panic!("stop returned Ok without the lock being released"),
2693            Ok(Err(e)) => {
2694                assert_ne!(
2695                    e.code, SESSION_NOT_FOUND,
2696                    "session/stop must not return SESSION_NOT_FOUND while a turn is in flight"
2697                );
2698            }
2699        }
2700    }
2701
2702    #[tokio::test]
2703    async fn session_new_persists_to_store() {
2704        let cwd = tempfile::tempdir().unwrap();
2705        let store =
2706            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
2707        let server = Arc::new(AcpServer::new_with_store(
2708            make_test_config(cwd.path()),
2709            AcpServerConfig::default(),
2710            Arc::clone(&store),
2711        ));
2712
2713        let result = server
2714            .handle_session_new(&serde_json::json!({
2715                "cwd": cwd.path().to_string_lossy()
2716            }))
2717            .await
2718            .expect("session/new must succeed");
2719
2720        let session_id = result["sessionId"].as_str().unwrap();
2721
2722        // Session must appear in the store
2723        let data = store.load_session(session_id).unwrap();
2724        assert!(
2725            data.is_some(),
2726            "session/new must persist to AcpSessionStore"
2727        );
2728    }
2729
2730    #[tokio::test]
2731    async fn session_new_without_store_still_works() {
2732        let cwd = tempfile::tempdir().unwrap();
2733        let server = Arc::new(AcpServer::new(
2734            make_test_config(cwd.path()),
2735            AcpServerConfig::default(),
2736        ));
2737
2738        let result = server
2739            .handle_session_new(&serde_json::json!({
2740                "cwd": cwd.path().to_string_lossy()
2741            }))
2742            .await
2743            .expect("session/new must succeed without a store");
2744
2745        let session_id = result["sessionId"].as_str().unwrap();
2746        assert!(server.sessions.lock().await.contains_key(session_id));
2747    }
2748
2749    fn make_test_config(cwd: &std::path::Path) -> Config {
2750        let mut cfg = Config {
2751            data_dir: cwd.to_path_buf(),
2752            ..Default::default()
2753        };
2754        cfg.providers.models.anthropic.insert(
2755            "default".to_string(),
2756            zeroclaw_config::schema::AnthropicModelProviderConfig {
2757                base: zeroclaw_config::schema::ModelProviderConfig {
2758                    model: Some("claude-haiku-4-5".to_string()),
2759                    ..Default::default()
2760                },
2761            },
2762        );
2763        cfg.risk_profiles.insert(
2764            "default".to_string(),
2765            zeroclaw_config::schema::RiskProfileConfig::default(),
2766        );
2767        cfg.agents.insert(
2768            "test-agent".to_string(),
2769            zeroclaw_config::schema::AliasedAgentConfig {
2770                model_provider: "anthropic.default".into(),
2771                risk_profile: "default".to_string(),
2772                ..Default::default()
2773            },
2774        );
2775        cfg
2776    }
2777
2778    /// `session/cancel` on an idle session (no active turn) must succeed silently.
2779    #[tokio::test]
2780    async fn session_cancel_idle_session_is_noop() {
2781        let cwd = tempfile::tempdir().unwrap();
2782        let server = Arc::new(AcpServer::new(
2783            make_test_config(cwd.path()),
2784            AcpServerConfig::default(),
2785        ));
2786
2787        let new_result = server
2788            .handle_session_new(&serde_json::json!({
2789                "cwd": cwd.path().to_string_lossy(),
2790                "agentAlias": "test-agent"
2791            }))
2792            .await
2793            .expect("session/new must succeed");
2794        let session_id = new_result["sessionId"].as_str().unwrap().to_string();
2795
2796        // No active turn — cancel must not error.
2797        let result = server
2798            .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
2799            .await;
2800        assert!(result.is_ok(), "idle cancel must succeed: {result:?}");
2801    }
2802
2803    /// `session/cancel` for an unknown session ID must succeed silently (notification
2804    /// semantics: no response, no error propagation).
2805    #[tokio::test]
2806    async fn session_cancel_unknown_session_is_noop() {
2807        let cwd = tempfile::tempdir().unwrap();
2808        let server = Arc::new(AcpServer::new(
2809            make_test_config(cwd.path()),
2810            AcpServerConfig::default(),
2811        ));
2812
2813        let result = server
2814            .handle_session_cancel(&serde_json::json!({ "sessionId": "sess_does_not_exist" }))
2815            .await;
2816        assert!(
2817            result.is_ok(),
2818            "unknown-session cancel must succeed: {result:?}"
2819        );
2820    }
2821
2822    #[tokio::test]
2823    async fn session_cancel_accepts_snake_case_session_id() {
2824        let cwd = tempfile::tempdir().unwrap();
2825        let server = Arc::new(AcpServer::new(
2826            make_test_config(cwd.path()),
2827            AcpServerConfig::default(),
2828        ));
2829
2830        let session_id = "sess_snake_case_cancel";
2831        let active_token = tokio_util::sync::CancellationToken::new();
2832        server
2833            .register_cancel_token(session_id, active_token.clone())
2834            .expect("active turn should register token");
2835
2836        server
2837            .handle_session_cancel(&serde_json::json!({ "session_id": session_id }))
2838            .await
2839            .expect("snake_case session_id should cancel the active turn");
2840
2841        assert!(active_token.is_cancelled());
2842    }
2843
2844    /// A second prompt for the same session must fail before it can overwrite
2845    /// the active turn's cancellation token.
2846    #[tokio::test]
2847    async fn register_cancel_token_rejects_concurrent_prompt_for_session() {
2848        let cwd = tempfile::tempdir().unwrap();
2849        let server = Arc::new(AcpServer::new(
2850            make_test_config(cwd.path()),
2851            AcpServerConfig::default(),
2852        ));
2853
2854        let session_id = "sess_active_turn";
2855        let active_token = tokio_util::sync::CancellationToken::new();
2856        let queued_token = tokio_util::sync::CancellationToken::new();
2857
2858        server
2859            .register_cancel_token(session_id, active_token.clone())
2860            .expect("first prompt should register its token");
2861        let err = server
2862            .register_cancel_token(session_id, queued_token.clone())
2863            .expect_err("second prompt must not overwrite active token");
2864
2865        assert_eq!(err.code, SESSION_BUSY);
2866        assert!(
2867            err.message.contains("active prompt turn"),
2868            "error should explain why prompt was rejected: {}",
2869            err.message
2870        );
2871
2872        server
2873            .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
2874            .await
2875            .expect("cancel should still target active token");
2876
2877        assert!(active_token.is_cancelled());
2878        assert!(
2879            !queued_token.is_cancelled(),
2880            "rejected prompt's token must not become the active cancel target"
2881        );
2882    }
2883
2884    #[tokio::test]
2885    async fn session_prompt_rejects_concurrent_turn_before_agent_starts() {
2886        let cwd = tempfile::tempdir().unwrap();
2887        let server = Arc::new(AcpServer::new(
2888            make_test_config(cwd.path()),
2889            AcpServerConfig::default(),
2890        ));
2891
2892        let new_result = server
2893            .handle_session_new(&serde_json::json!({
2894                "cwd": cwd.path().to_string_lossy(),
2895                "agentAlias": "test-agent"
2896            }))
2897            .await
2898            .expect("session/new must succeed");
2899        let session_id = new_result["sessionId"].as_str().unwrap().to_string();
2900        let active_token = tokio_util::sync::CancellationToken::new();
2901        server
2902            .register_cancel_token(&session_id, active_token.clone())
2903            .expect("simulated active turn should register token");
2904
2905        let err = server
2906            .handle_session_prompt(
2907                &serde_json::json!({
2908                    "sessionId": session_id.clone(),
2909                    "prompt": "queued prompt"
2910                }),
2911                &serde_json::json!(2),
2912            )
2913            .await
2914            .expect_err("concurrent prompt must be rejected before model_provider work starts");
2915
2916        assert_eq!(err.code, SESSION_BUSY);
2917        server
2918            .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
2919            .await
2920            .expect("cancel should still target the original active token");
2921        assert!(active_token.is_cancelled());
2922    }
2923
2924    /// Verify that inserting and removing a cancel token from the map works
2925    /// correctly. This tests map mechanics directly rather than the
2926    /// `handle_session_prompt` lifecycle, so a regression in the production
2927    /// path's cleanup wouldn't be caught by this test.
2928    #[tokio::test]
2929    async fn cancel_tokens_map_remove_works() {
2930        let cwd = tempfile::tempdir().unwrap();
2931        let config = Config {
2932            data_dir: cwd.path().to_path_buf(),
2933            ..Default::default()
2934        };
2935        let server = Arc::new(AcpServer::new(config, AcpServerConfig::default()));
2936
2937        // Insert and remove a token directly.
2938        let session_id = "sess_token_leak_test".to_string();
2939        let token = tokio_util::sync::CancellationToken::new();
2940        server
2941            .cancel_tokens
2942            .lock()
2943            .expect("cancel_tokens lock poisoned")
2944            .insert(session_id.clone(), token);
2945
2946        // Remove the token.
2947        server
2948            .cancel_tokens
2949            .lock()
2950            .expect("cancel_tokens lock poisoned")
2951            .remove(&session_id);
2952
2953        let remaining = server
2954            .cancel_tokens
2955            .lock()
2956            .expect("cancel_tokens lock poisoned")
2957            .len();
2958        assert_eq!(remaining, 0, "cancel token must be removed after turn ends");
2959    }
2960
2961    #[tokio::test]
2962    async fn session_load_restores_history_and_streams_notifications() {
2963        use zeroclaw_api::model_provider::{ChatMessage, ConversationMessage};
2964        let cwd = tempfile::tempdir().unwrap();
2965        let store =
2966            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
2967
2968        let session_id = "sess-load-test";
2969        store
2970            .create_session(session_id, &cwd.path().to_string_lossy())
2971            .unwrap();
2972        store
2973            .append_turn(
2974                session_id,
2975                &[
2976                    ConversationMessage::Chat(ChatMessage::user("hello")),
2977                    ConversationMessage::Chat(ChatMessage::assistant("hi there")),
2978                ],
2979            )
2980            .unwrap();
2981
2982        let (writer_tx, mut writer_rx) = tokio::sync::mpsc::channel::<String>(64);
2983        let server = Arc::new(AcpServer::new_with_writer_and_store(
2984            make_test_config(cwd.path()),
2985            AcpServerConfig::default(),
2986            writer_tx,
2987            Arc::clone(&store),
2988        ));
2989
2990        let result = server
2991            .handle_session_load(&serde_json::json!({
2992                "sessionId": session_id,
2993                "cwd": cwd.path().to_string_lossy()
2994            }))
2995            .await
2996            .expect("session/load must succeed");
2997
2998        assert_eq!(result, serde_json::json!({}));
2999
3000        // Session must now be in the in-memory map
3001        assert!(server.sessions.lock().await.contains_key(session_id));
3002
3003        // Collect notifications (non-blocking drain)
3004        let mut notifications = Vec::new();
3005        while let Ok(msg) = writer_rx.try_recv() {
3006            notifications.push(msg);
3007        }
3008
3009        // Expect two session/update notifications: user then assistant
3010        assert_eq!(
3011            notifications.len(),
3012            2,
3013            "expected 2 notifications, got: {notifications:?}"
3014        );
3015        let n0: serde_json::Value = serde_json::from_str(&notifications[0]).unwrap();
3016        assert_eq!(
3017            n0["params"]["update"]["sessionUpdate"],
3018            "user_message_chunk"
3019        );
3020        assert_eq!(n0["params"]["update"]["content"]["text"], "hello");
3021        let n1: serde_json::Value = serde_json::from_str(&notifications[1]).unwrap();
3022        assert_eq!(
3023            n1["params"]["update"]["sessionUpdate"],
3024            "agent_message_chunk"
3025        );
3026        assert_eq!(n1["params"]["update"]["content"]["text"], "hi there");
3027    }
3028
3029    #[tokio::test]
3030    async fn session_load_returns_not_found_for_unknown_id() {
3031        let cwd = tempfile::tempdir().unwrap();
3032        let store =
3033            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3034        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3035        let server = AcpServer::new_with_writer_and_store(
3036            make_test_config(cwd.path()),
3037            AcpServerConfig::default(),
3038            writer_tx,
3039            store,
3040        );
3041
3042        let err = server
3043            .handle_session_load(&serde_json::json!({ "sessionId": "ghost" }))
3044            .await
3045            .expect_err("unknown session must fail");
3046
3047        assert_eq!(err.code, SESSION_NOT_FOUND);
3048    }
3049
3050    #[tokio::test]
3051    async fn session_load_rejects_already_active_session() {
3052        let cwd = tempfile::tempdir().unwrap();
3053        let store =
3054            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3055        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3056        let server = Arc::new(AcpServer::new_with_writer_and_store(
3057            make_test_config(cwd.path()),
3058            AcpServerConfig::default(),
3059            writer_tx,
3060            Arc::clone(&store),
3061        ));
3062
3063        // Create and load the session once to put it in memory
3064        let session_id = "sess-already-active";
3065        store
3066            .create_session(session_id, &cwd.path().to_string_lossy())
3067            .unwrap();
3068        server
3069            .handle_session_load(&serde_json::json!({
3070                "sessionId": session_id,
3071                "cwd": cwd.path().to_string_lossy()
3072            }))
3073            .await
3074            .unwrap();
3075
3076        // Second load must be rejected
3077        let err = server
3078            .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3079            .await
3080            .expect_err("session/load for active session must fail");
3081
3082        assert_eq!(err.code, INVALID_PARAMS);
3083    }
3084
3085    #[tokio::test]
3086    async fn session_resume_restores_without_replay() {
3087        use zeroclaw_api::model_provider::{ChatMessage, ConversationMessage};
3088        let cwd = tempfile::tempdir().unwrap();
3089        let store =
3090            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3091
3092        let session_id = "sess-resume-test";
3093        store
3094            .create_session(session_id, &cwd.path().to_string_lossy())
3095            .unwrap();
3096        store
3097            .append_turn(
3098                session_id,
3099                &[ConversationMessage::Chat(ChatMessage::user("hello"))],
3100            )
3101            .unwrap();
3102
3103        let (writer_tx, mut writer_rx) = tokio::sync::mpsc::channel::<String>(64);
3104        let server = Arc::new(AcpServer::new_with_writer_and_store(
3105            make_test_config(cwd.path()),
3106            AcpServerConfig::default(),
3107            writer_tx,
3108            Arc::clone(&store),
3109        ));
3110
3111        let result = server
3112            .handle_session_resume(&serde_json::json!({
3113                "sessionId": session_id,
3114                "cwd": cwd.path().to_string_lossy()
3115            }))
3116            .await
3117            .expect("session/resume must succeed");
3118
3119        // Result is empty object
3120        assert_eq!(result, serde_json::json!({}));
3121
3122        // Session must be in memory
3123        assert!(server.sessions.lock().await.contains_key(session_id));
3124
3125        // No notifications must have been emitted
3126        assert!(
3127            writer_rx.try_recv().is_err(),
3128            "session/resume must not emit session/update notifications"
3129        );
3130    }
3131
3132    #[tokio::test]
3133    async fn session_close_releases_memory_but_keeps_store_record() {
3134        let cwd = tempfile::tempdir().unwrap();
3135        let store =
3136            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3137        let server = Arc::new(AcpServer::new_with_store(
3138            make_test_config(cwd.path()),
3139            AcpServerConfig::default(),
3140            Arc::clone(&store),
3141        ));
3142
3143        let new_result = server
3144            .handle_session_new(&serde_json::json!({
3145                "cwd": cwd.path().to_string_lossy()
3146            }))
3147            .await
3148            .expect("session/new must succeed");
3149        let session_id = new_result["sessionId"].as_str().unwrap().to_string();
3150
3151        assert!(server.sessions.lock().await.contains_key(&session_id));
3152
3153        let result = server
3154            .handle_session_close(&serde_json::json!({ "sessionId": &session_id }))
3155            .await
3156            .expect("session/close must succeed");
3157
3158        assert_eq!(result, serde_json::json!({}));
3159
3160        // Session gone from in-memory map
3161        assert!(!server.sessions.lock().await.contains_key(&session_id));
3162
3163        // Session record still on disk
3164        let data = store.load_session(&session_id).unwrap();
3165        assert!(
3166            data.is_some(),
3167            "session/close must not delete the DB record"
3168        );
3169    }
3170
3171    #[tokio::test]
3172    async fn session_close_returns_not_found_for_unknown_session() {
3173        let cwd = tempfile::tempdir().unwrap();
3174        let server = AcpServer::new(make_test_config(cwd.path()), AcpServerConfig::default());
3175
3176        let err = server
3177            .handle_session_close(&serde_json::json!({ "sessionId": "ghost" }))
3178            .await
3179            .expect_err("unknown session must fail");
3180
3181        assert_eq!(err.code, SESSION_NOT_FOUND);
3182    }
3183
3184    /// `session/load` must return SESSION_LIMIT_REACHED when `max_sessions` is
3185    /// already reached by an active session created via `session/new`.
3186    #[tokio::test]
3187    async fn session_load_respects_max_sessions() {
3188        let cwd = tempfile::tempdir().unwrap();
3189        let store =
3190            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3191
3192        // Pre-create a stored session that we'll attempt to load
3193        let stored_id = "sess-load-limit-test";
3194        store
3195            .create_session(stored_id, &cwd.path().to_string_lossy())
3196            .unwrap();
3197
3198        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3199        let server = Arc::new(AcpServer::new_with_writer_and_store(
3200            make_test_config(cwd.path()),
3201            AcpServerConfig {
3202                max_sessions: 1,
3203                ..AcpServerConfig::default()
3204            },
3205            writer_tx,
3206            Arc::clone(&store),
3207        ));
3208
3209        // Fill the one available slot via session/new
3210        server
3211            .handle_session_new(&serde_json::json!({
3212                "cwd": cwd.path().to_string_lossy()
3213            }))
3214            .await
3215            .expect("session/new must succeed when under limit");
3216
3217        // Now session/load for the stored session must fail with SESSION_LIMIT_REACHED
3218        let err = server
3219            .handle_session_load(&serde_json::json!({ "sessionId": stored_id }))
3220            .await
3221            .expect_err("session/load must fail when max_sessions reached");
3222
3223        assert_eq!(
3224            err.code, SESSION_LIMIT_REACHED,
3225            "expected SESSION_LIMIT_REACHED, got: {:?}",
3226            err
3227        );
3228    }
3229
3230    /// `session/resume` must return SESSION_LIMIT_REACHED when `max_sessions` is
3231    /// already reached by an active session created via `session/new`.
3232    #[tokio::test]
3233    async fn session_resume_respects_max_sessions() {
3234        let cwd = tempfile::tempdir().unwrap();
3235        let store =
3236            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3237
3238        // Pre-create a stored session that we'll attempt to resume
3239        let stored_id = "sess-resume-limit-test";
3240        store
3241            .create_session(stored_id, &cwd.path().to_string_lossy())
3242            .unwrap();
3243
3244        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3245        let server = Arc::new(AcpServer::new_with_writer_and_store(
3246            make_test_config(cwd.path()),
3247            AcpServerConfig {
3248                max_sessions: 1,
3249                ..AcpServerConfig::default()
3250            },
3251            writer_tx,
3252            Arc::clone(&store),
3253        ));
3254
3255        // Fill the one available slot via session/new
3256        server
3257            .handle_session_new(&serde_json::json!({
3258                "cwd": cwd.path().to_string_lossy()
3259            }))
3260            .await
3261            .expect("session/new must succeed when under limit");
3262
3263        // Now session/resume for the stored session must fail with SESSION_LIMIT_REACHED
3264        let err = server
3265            .handle_session_resume(&serde_json::json!({ "sessionId": stored_id }))
3266            .await
3267            .expect_err("session/resume must fail when max_sessions reached");
3268
3269        assert_eq!(
3270            err.code, SESSION_LIMIT_REACHED,
3271            "expected SESSION_LIMIT_REACHED, got: {:?}",
3272            err
3273        );
3274    }
3275
3276    /// A SQLite error during `store.load_session` must release the `loading_sessions`
3277    /// reservation so a subsequent restore attempt is not permanently blocked.
3278    #[tokio::test]
3279    async fn session_load_releases_reservation_on_store_error() {
3280        let cwd = tempfile::tempdir().unwrap();
3281        let store =
3282            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3283
3284        let session_id = "sess-load-store-err";
3285        store
3286            .create_session(session_id, &cwd.path().to_string_lossy())
3287            .unwrap();
3288
3289        // Drop the schema via a second connection to force a "no such table"
3290        // error on the store's next query_row call.
3291        let db_path = cwd.path().join("sessions/acp-sessions.db");
3292        {
3293            let second =
3294                rusqlite::Connection::open(&db_path).expect("second conn must open same db");
3295            second
3296                .execute_batch(
3297                    "DROP TABLE IF EXISTS acp_messages; DROP TABLE IF EXISTS acp_sessions;",
3298                )
3299                .expect("schema drop must succeed on second conn");
3300        }
3301
3302        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3303        let server = Arc::new(AcpServer::new_with_writer_and_store(
3304            make_test_config(cwd.path()),
3305            AcpServerConfig::default(),
3306            writer_tx,
3307            Arc::clone(&store),
3308        ));
3309
3310        // First call: must fail with INTERNAL_ERROR (SQLite "no such table").
3311        let first_err = server
3312            .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3313            .await
3314            .expect_err("session/load must fail when store returns Err");
3315        assert_eq!(
3316            first_err.code, INTERNAL_ERROR,
3317            "expected INTERNAL_ERROR from store failure, got: {:?}",
3318            first_err
3319        );
3320
3321        // Second call for the same session: must also fail with INTERNAL_ERROR,
3322        // NOT with INVALID_PARAMS ("already active"). A leaked reservation would
3323        // cause INVALID_PARAMS, proving the slot was never released.
3324        let second_err = server
3325            .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3326            .await
3327            .expect_err("second session/load must also fail");
3328        assert_eq!(
3329            second_err.code, INTERNAL_ERROR,
3330            "second load must fail with INTERNAL_ERROR, not INVALID_PARAMS (leaked slot); got: {:?}",
3331            second_err
3332        );
3333    }
3334
3335    /// Same coverage as `session_load_releases_reservation_on_store_error` but
3336    /// for the `session/resume` path.
3337    #[tokio::test]
3338    async fn session_resume_releases_reservation_on_store_error() {
3339        let cwd = tempfile::tempdir().unwrap();
3340        let store =
3341            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3342
3343        let session_id = "sess-resume-store-err";
3344        store
3345            .create_session(session_id, &cwd.path().to_string_lossy())
3346            .unwrap();
3347
3348        let db_path = cwd.path().join("sessions/acp-sessions.db");
3349        {
3350            let second =
3351                rusqlite::Connection::open(&db_path).expect("second conn must open same db");
3352            second
3353                .execute_batch(
3354                    "DROP TABLE IF EXISTS acp_messages; DROP TABLE IF EXISTS acp_sessions;",
3355                )
3356                .expect("schema drop must succeed on second conn");
3357        }
3358
3359        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3360        let server = Arc::new(AcpServer::new_with_writer_and_store(
3361            make_test_config(cwd.path()),
3362            AcpServerConfig::default(),
3363            writer_tx,
3364            Arc::clone(&store),
3365        ));
3366
3367        let first_err = server
3368            .handle_session_resume(&serde_json::json!({ "sessionId": session_id }))
3369            .await
3370            .expect_err("session/resume must fail when store returns Err");
3371        assert_eq!(
3372            first_err.code, INTERNAL_ERROR,
3373            "expected INTERNAL_ERROR from store failure, got: {:?}",
3374            first_err
3375        );
3376
3377        let second_err = server
3378            .handle_session_resume(&serde_json::json!({ "sessionId": session_id }))
3379            .await
3380            .expect_err("second session/resume must also fail");
3381        assert_eq!(
3382            second_err.code, INTERNAL_ERROR,
3383            "second resume must fail with INTERNAL_ERROR, not INVALID_PARAMS (leaked slot); got: {:?}",
3384            second_err
3385        );
3386    }
3387}