Skip to main content

zeroclaw_channels/orchestrator/
acp_server.rs

1//! ACP (Agent Control Protocol) Server — JSON-RPC 2.0 over stdio.
2//!
3//! Provides an IDE-friendly interface for spawning and managing isolated agent
4//! sessions. Each session wraps an [`Agent`] built from the global config with
5//! streaming support via JSON-RPC notifications.
6//!
7//! ## Protocol
8//!
9//! Requests and responses are newline-delimited JSON objects on stdin/stdout.
10//!
11//! | Method            | Description                              |
12//! |-------------------|------------------------------------------|
13//! | `initialize`      | Handshake — returns server capabilities (incl. defaultModel when configured) |
14//! | `session/new`     | Create an isolated agent session          |
15//! | `session/prompt`  | Send a prompt, stream back `session/update` events |
16//! | `session/stop`    | Gracefully terminate a session            |
17//! | `session/cancel`  | Abort an in-flight `session/prompt` turn  |
18//! | `session/update`  | Streaming events and bidirectional events |
19
20use anyhow::Result;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::{HashMap, HashSet};
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
28use tokio::sync::{Mutex, mpsc};
29use uuid::Uuid;
30pub use zeroclaw_api::jsonrpc::RpcOutbound;
31use zeroclaw_api::jsonrpc::error_codes::*;
32use zeroclaw_api::jsonrpc::{
33    ACP_PROTOCOL_VERSION, JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
34};
35use zeroclaw_api::model_provider::ConversationMessage;
36use zeroclaw_config::schema::Config;
37use zeroclaw_infra::acp_session_store::AcpSessionStore;
38use zeroclaw_runtime::agent::agent::{Agent, TurnEvent};
39use zeroclaw_runtime::tools::CanvasStore;
40
41use crate::acp_channel::AcpChannel;
42
43// ── Configuration ────────────────────────────────────────────────
44
45/// ACP server configuration (optional `[acp]` section in config.toml).
46#[derive(Debug, Clone, Serialize, Deserialize)]
47#[serde(default)]
48pub struct AcpServerConfig {
49    /// Maximum number of concurrent sessions. Default: 10.
50    pub max_sessions: usize,
51    /// Session inactivity timeout in seconds. Default: 3600 (1 hour).
52    pub session_timeout_secs: u64,
53}
54
55impl Default for AcpServerConfig {
56    fn default() -> Self {
57        Self {
58            max_sessions: 10,
59            session_timeout_secs: 3600,
60        }
61    }
62}
63
64// ── Session state ────────────────────────────────────────────────
65
66struct Session {
67    agent: Agent,
68    #[allow(dead_code)] // WIP: intended for session expiry logic
69    created_at: Instant,
70    last_active: Instant,
71    /// Agent alias (e.g. `"clamps"`) for attributable span logs.
72    agent_alias: String,
73    /// Model-provider ref (e.g. `"anthropic.default"`) for attributable span logs.
74    model_provider: String,
75    /// Model identifier (e.g. `"claude-sonnet-4-6"`) for attributable span logs.
76    model: String,
77}
78
79// ── ACP Server ───────────────────────────────────────────────────
80
81pub struct AcpServer {
82    config: Config,
83    acp_config: AcpServerConfig,
84    sessions: Arc<Mutex<HashMap<String, Arc<Mutex<Session>>>>>,
85    rpc: Arc<RpcOutbound>,
86    /// Receiver for the writer task. Pulled out (replaced with `None`) the
87    /// first time `run()` starts the writer loop.
88    writer_rx: std::sync::Mutex<Option<mpsc::Receiver<String>>>,
89    /// Per-session cancellation tokens for aborting in-flight `session/prompt`
90    /// turns. Lives outside `Session`'s inner `Mutex` so `session/cancel` can
91    /// fire the token without waiting for the turn to release the inner lock.
92    ///
93    /// **Single-turn-per-session invariant:** this map holds at most one token
94    /// per `session_id` because the ACP protocol does not pipeline multiple
95    /// `session/prompt` calls on the same session — each prompt must complete
96    /// (or be cancelled) before the next one is sent. A second prompt is
97    /// rejected before it can overwrite the active turn's token. If pipelining
98    /// is needed in the future, the key should become `(session_id, turn_id)`.
99    cancel_tokens: Arc<std::sync::Mutex<HashMap<String, tokio_util::sync::CancellationToken>>>,
100    /// Tracks session IDs currently being loaded/resumed (between the initial
101    /// check and the final insert into `sessions`). Used to prevent duplicate
102    /// concurrent restores of the same session and to count in-flight slots
103    /// against `max_sessions`.
104    loading_sessions: Arc<tokio::sync::Mutex<HashSet<String>>>,
105    store: Option<Arc<AcpSessionStore>>,
106    /// Shared canvas store from the gateway / daemon supervisor.  When set,
107    /// agents created by this server write canvas frames to the same store
108    /// that `/ws/canvas/:id` WebSocket subscribers read from.  `None` in
109    /// standalone `zeroclaw acp` mode where no gateway is running.
110    canvas_store: Option<CanvasStore>,
111}
112
113impl AcpServer {
114    pub fn new(config: Config, acp_config: AcpServerConfig) -> Self {
115        let (writer_tx, writer_rx) = mpsc::channel::<String>(256);
116        Self::with_writer(config, acp_config, writer_tx, Some(writer_rx), None)
117    }
118
119    pub fn new_with_writer(
120        config: Config,
121        acp_config: AcpServerConfig,
122        writer_tx: mpsc::Sender<String>,
123    ) -> Self {
124        Self::with_writer(config, acp_config, writer_tx, None, None)
125    }
126
127    pub fn new_with_store(
128        config: Config,
129        acp_config: AcpServerConfig,
130        store: Arc<AcpSessionStore>,
131    ) -> Self {
132        let (writer_tx, writer_rx) = mpsc::channel::<String>(256);
133        Self::with_writer(config, acp_config, writer_tx, Some(writer_rx), Some(store))
134    }
135
136    pub fn new_with_writer_and_store(
137        config: Config,
138        acp_config: AcpServerConfig,
139        writer_tx: mpsc::Sender<String>,
140        store: Arc<AcpSessionStore>,
141    ) -> Self {
142        Self::with_writer(config, acp_config, writer_tx, None, Some(store))
143    }
144
145    fn with_writer(
146        config: Config,
147        acp_config: AcpServerConfig,
148        writer_tx: mpsc::Sender<String>,
149        writer_rx: Option<mpsc::Receiver<String>>,
150        store: Option<Arc<AcpSessionStore>>,
151    ) -> Self {
152        Self {
153            config,
154            acp_config,
155            sessions: Arc::new(Mutex::new(HashMap::new())),
156            rpc: Arc::new(RpcOutbound::new(writer_tx)),
157            writer_rx: std::sync::Mutex::new(writer_rx),
158            cancel_tokens: Arc::new(std::sync::Mutex::new(HashMap::new())),
159            loading_sessions: Arc::new(tokio::sync::Mutex::new(HashSet::new())),
160            store,
161            canvas_store: None,
162        }
163    }
164
165    /// Attach the shared gateway [`CanvasStore`] so that agents created by
166    /// this server write canvas frames to the same store that the
167    /// `/ws/canvas/:id` WebSocket endpoint serves.
168    pub fn with_canvas_store(mut self, canvas_store: CanvasStore) -> Self {
169        self.canvas_store = Some(canvas_store);
170        self
171    }
172
173    /// Run the ACP server, reading JSON-RPC requests from stdin and writing
174    /// responses/notifications to stdout.
175    pub async fn run(self: Arc<Self>) -> Result<()> {
176        ::zeroclaw_log::record!(
177            DEBUG,
178            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
179                .with_category(::zeroclaw_log::EventCategory::Channel),
180            &format!(
181                "ACP server starting (max_sessions={}, timeout={}s)",
182                self.acp_config.max_sessions, self.acp_config.session_timeout_secs
183            )
184        );
185
186        // Pull the writer-rx out of self so we can move it into the writer
187        // task. Subsequent `run()` calls would have nothing to drive — but
188        // `run()` is normally invoked once per process.
189        let writer_rx = self
190            .writer_rx
191            .lock()
192            .unwrap_or_else(|e| e.into_inner())
193            .take()
194            .ok_or_else(|| {
195                ::zeroclaw_log::record!(
196                    ERROR,
197                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
198                        .with_category(::zeroclaw_log::EventCategory::Channel)
199                        .with_outcome(::zeroclaw_log::EventOutcome::Failure),
200                    "ACP server writer already started"
201                );
202                anyhow::Error::msg("ACP server writer already started")
203            })?;
204        zeroclaw_spawn::spawn!(writer_task(writer_rx));
205
206        let stdin = tokio::io::stdin();
207        let mut reader = BufReader::new(stdin);
208        let mut line = String::new();
209
210        // Spawn session reaper
211        let sessions = Arc::clone(&self.sessions);
212        let timeout = Duration::from_secs(self.acp_config.session_timeout_secs);
213        zeroclaw_spawn::spawn!(async move {
214            let mut interval = tokio::time::interval(Duration::from_secs(60));
215            loop {
216                interval.tick().await;
217                let mut sessions = sessions.lock().await;
218                let before = sessions.len();
219                sessions.retain(|id, session_arc| {
220                    // Never reap a session whose inner lock is held — it has an
221                    // active prompt turn in flight and is by definition not idle.
222                    match session_arc.try_lock() {
223                        Ok(session) => {
224                            let expired = session.last_active.elapsed() > timeout;
225                            if expired {
226                                ::zeroclaw_log::record!(
227                                    DEBUG,
228                                    ::zeroclaw_log::Event::new(
229                                        module_path!(),
230                                        ::zeroclaw_log::Action::Note
231                                    )
232                                    .with_category(::zeroclaw_log::EventCategory::Channel)
233                                    .with_attrs(
234                                        ::serde_json::json!({
235                                            "id": id,
236                                            "agent_alias": session.agent_alias,
237                                            "model_provider": session.model_provider,
238                                            "model": session.model,
239                                        })
240                                    ),
241                                    "Session expired after inactivity"
242                                );
243                            }
244                            !expired
245                        }
246                        Err(_) => true,
247                    }
248                });
249                let reaped = before - sessions.len();
250                if reaped > 0 {
251                    ::zeroclaw_log::record!(
252                        DEBUG,
253                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
254                            .with_category(::zeroclaw_log::EventCategory::Channel)
255                            .with_attrs(::serde_json::json!({"reaped": reaped})),
256                        "Reaped expired session(s)"
257                    );
258                }
259            }
260        });
261
262        loop {
263            line.clear();
264            let bytes_read = reader.read_line(&mut line).await?;
265            if bytes_read == 0 {
266                ::zeroclaw_log::record!(
267                    DEBUG,
268                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
269                        .with_category(::zeroclaw_log::EventCategory::Channel),
270                    "ACP server: stdin closed, shutting down"
271                );
272                break;
273            }
274
275            let trimmed = line.trim();
276            if trimmed.is_empty() {
277                continue;
278            }
279
280            self.process_line(trimmed).await;
281        }
282
283        Ok(())
284    }
285
286    /// Run the ACP server against an already-framed line source.
287    ///
288    /// This is used by the gateway WebSocket bridge, where inbound WebSocket
289    /// text messages are already complete JSON-RPC frames and outbound frames
290    /// are supplied by the writer channel passed to [`Self::new_with_writer`]
291    /// or [`Self::new_with_writer_and_store`].
292    pub async fn run_messages(self: Arc<Self>, mut input_rx: mpsc::Receiver<String>) -> Result<()> {
293        ::zeroclaw_log::record!(
294            DEBUG,
295            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
296                .with_category(::zeroclaw_log::EventCategory::Channel),
297            "ACP server starting (WebSocket/framed mode)"
298        );
299        while let Some(line) = input_rx.recv().await {
300            let trimmed = line.trim();
301            if trimmed.is_empty() {
302                continue;
303            }
304            self.process_line(trimmed).await;
305        }
306
307        Ok(())
308    }
309
310    async fn process_line(self: &Arc<Self>, trimmed: &str) {
311        // First, peek at whether this is a response (has `result` or
312        // `error`) to a request *we* sent. Inbound requests/notifications
313        // fall through to the JsonRpcRequest path.
314        if let Ok(value) = serde_json::from_str::<Value>(trimmed)
315            && value.is_object()
316            && (value.get("result").is_some() || value.get("error").is_some())
317            && let Some(id) = value.get("id")
318        {
319            let id_str = id
320                .as_str()
321                .map(String::from)
322                .unwrap_or_else(|| id.to_string());
323            let result = value.get("result").cloned();
324            let error: Option<JsonRpcError> = value
325                .get("error")
326                .and_then(|e| serde_json::from_value(e.clone()).ok());
327            self.rpc.dispatch_response(&id_str, result, error);
328            return;
329        }
330
331        match serde_json::from_str::<JsonRpcRequest>(trimmed) {
332            Ok(request) => {
333                if request.jsonrpc != "2.0" {
334                    if let Some(id) = request.id {
335                        self.write_error(id, INVALID_REQUEST, "Invalid JSON-RPC version")
336                            .await;
337                    }
338                    return;
339                }
340                // Spawn so a long-running session/prompt doesn't block the
341                // read loop — outbound RPC responses (e.g. for
342                // session/request_permission) need to be processable
343                // while a prompt turn is in flight. Once `handle_request`
344                // resolves session/agent context and attaches an
345                // attribution scope, every log record emitted from this
346                // task lands attributed in the TUI instead of orphaning.
347                let server = Arc::clone(self);
348                ::zeroclaw_spawn::spawn!(async move {
349                    server.handle_request(request).await;
350                });
351            }
352            Err(e) => {
353                ::zeroclaw_log::record!(
354                    WARN,
355                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
356                        .with_category(::zeroclaw_log::EventCategory::Channel)
357                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
358                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
359                    "Failed to parse JSON-RPC request"
360                );
361                self.write_error(Value::Null, PARSE_ERROR, &format!("Parse error: {e}"))
362                    .await;
363            }
364        }
365    }
366
367    async fn handle_request(&self, request: JsonRpcRequest) {
368        let id = request.id.clone().unwrap_or(Value::Null);
369        let is_notification = request.id.is_none();
370
371        let result = match request.method.as_str() {
372            "initialize" => self.handle_initialize(&request.params),
373            "session/new" => self.handle_session_new(&request.params).await,
374            "session/load" => self.handle_session_load(&request.params).await,
375            "session/resume" => self.handle_session_resume(&request.params).await,
376            "session/close" => self.handle_session_close(&request.params).await,
377            "session/prompt" => self.handle_session_prompt(&request.params, &id).await,
378            "session/stop" => self.handle_session_stop(&request.params).await,
379            "session/cancel" => self.handle_session_cancel(&request.params).await,
380            "session/event" | "session/update" => self.handle_session_event(&request.params).await,
381            _ => {
382                ::zeroclaw_log::record!(
383                    WARN,
384                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
385                        .with_category(::zeroclaw_log::EventCategory::Channel)
386                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
387                        .with_attrs(::serde_json::json!({"method": request.method})),
388                    "ACP method not found"
389                );
390                Err(RpcError {
391                    code: METHOD_NOT_FOUND,
392                    message: format!("Method not found: {}", request.method),
393                    data: None,
394                })
395            }
396        };
397
398        // Only send response for requests (with id), not notifications
399        if !is_notification {
400            match result {
401                Ok(value) => self.write_result(id, value).await,
402                Err(e) => {
403                    ::zeroclaw_log::record!(
404                        WARN,
405                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
406                            .with_category(::zeroclaw_log::EventCategory::Channel)
407                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
408                            .with_attrs(::serde_json::json!({
409                                "method": request.method,
410                                "error_code": e.code,
411                                "error": e.message,
412                            })),
413                        "ACP request failed"
414                    );
415                    self.write_error(id, e.code, &e.message).await;
416                }
417            }
418        }
419    }
420
421    // ── Method handlers ──────────────────────────────────────────
422
423    fn handle_initialize(&self, _params: &Value) -> RpcResult {
424        let default_model = self
425            .config
426            .providers
427            .models
428            .iter_entries()
429            .find_map(|(_, _, e)| e.model.clone());
430
431        let mut zeroclaw_meta = serde_json::json!({
432            "maxSessions": self.acp_config.max_sessions,
433            "sessionTimeoutSecs": self.acp_config.session_timeout_secs,
434        });
435        if let Some(model) = default_model {
436            zeroclaw_meta["defaultModel"] = serde_json::json!(model);
437        }
438
439        let session_capabilities = if self.store.is_some() {
440            serde_json::json!({ "resume": {}, "close": {} })
441        } else {
442            serde_json::json!({})
443        };
444
445        Ok(serde_json::json!({
446            "protocolVersion": ACP_PROTOCOL_VERSION,
447            "agentCapabilities": {
448                "loadSession": self.store.is_some(),
449                "promptCapabilities": {
450                    "image": false,
451                    "audio": false,
452                    "embeddedContext": false,
453                },
454                "mcpCapabilities": {
455                    "http": false,
456                    "sse": false,
457                },
458                "sessionCapabilities": session_capabilities,
459            },
460            "agentInfo": {
461                "name": "zeroclaw-acp",
462                "title": "ZeroClaw ACP",
463                "version": env!("CARGO_PKG_VERSION"),
464            },
465            "authMethods": [],
466            "_meta": {
467                "zeroclaw": zeroclaw_meta,
468            }
469        }))
470    }
471
472    async fn handle_session_new(&self, params: &Value) -> RpcResult {
473        let mut sessions = self.sessions.lock().await;
474
475        let loading_count = self.loading_sessions.lock().await.len();
476        if sessions.len() + loading_count >= self.acp_config.max_sessions {
477            ::zeroclaw_log::record!(
478                WARN,
479                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
480                    .with_category(::zeroclaw_log::EventCategory::Channel)
481                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
482                    .with_attrs(::serde_json::json!({
483                        "active": sessions.len(),
484                        "loading": loading_count,
485                        "max": self.acp_config.max_sessions,
486                    })),
487                "ACP session/new rejected: session limit reached"
488            );
489            return Err(RpcError {
490                code: SESSION_LIMIT_REACHED,
491                message: format!(
492                    "Maximum session limit reached ({})",
493                    self.acp_config.max_sessions
494                ),
495                data: None,
496            });
497        }
498
499        let requested_cwd = self.requested_session_cwd(params);
500
501        let workspace_dir = std::fs::canonicalize(&requested_cwd)
502            .map_err(|e| RpcError {
503                code: INVALID_PARAMS,
504                message: format!(
505                    "cwd is not a usable directory ({}): {e}",
506                    requested_cwd.display()
507                ),
508                data: None,
509            })?
510            .to_string_lossy()
511            .into_owned();
512
513        // Every ACP session is bound to an explicit agent alias.
514        // Accept `agentAlias` (camelCase) or `agent_alias` / `agent`.
515        // When the client omits the alias and exactly one agent is configured,
516        // auto-select it so single-agent setups work without extra config.
517        let agent_alias = params
518            .get("agentAlias")
519            .or_else(|| params.get("agent_alias"))
520            .or_else(|| params.get("agent"))
521            .and_then(Value::as_str)
522            .map(str::trim)
523            .filter(|s| !s.is_empty())
524            .map(str::to_string)
525            .or_else(|| self.config.acp.default_agent.clone())
526            .or_else(|| {
527                let mut keys = self.config.agents.keys();
528                if self.config.agents.len() == 1 {
529                    keys.next().cloned()
530                } else {
531                    None
532                }
533            })
534            .ok_or_else(|| RpcError {
535                code: INVALID_PARAMS,
536                message: "session/new requires `agentAlias` (alias of a configured \
537                          [agents.<alias>] entry)"
538                    .to_string(),
539                data: None,
540            })?;
541        if self.config.agent(&agent_alias).is_none() {
542            return Err(RpcError {
543                code: INVALID_PARAMS,
544                message: format!(
545                    "Unknown agent `{agent_alias}` — no [agents.{agent_alias}] entry configured"
546                ),
547                data: None,
548            });
549        }
550
551        let session_id = Uuid::new_v4().to_string();
552
553        // Build agent from global config, with the session's cwd pinned as
554        // the file/shell sandbox boundary. The agent's data directory
555        // (identity, scheduled tasks) still lives under `config.data_dir`.
556        // ACP sessions exclude persistent memory — context comes from the
557        // persisted session history, not the agent's long-term memory store.
558        let agent = Agent::from_config_with_session_cwd_and_mcp_backchannel(
559            &self.config,
560            &agent_alias,
561            Some(std::path::Path::new(&workspace_dir)),
562            false,
563            true,
564        )
565        .await
566        .map_err(|e| RpcError {
567            code: INTERNAL_ERROR,
568            message: format!("Failed to create agent: {e}"),
569            data: None,
570        })?;
571
572        // Wire an ACP back-channel so tools like `ask_user`,
573        // `escalate_to_human`, and `reaction` can talk to the IDE/CLI client
574        // for this session. Registered as `"acp"`; resolved by name when the
575        // agent picks a channel.
576        let acp_channel = Arc::new(AcpChannel::new(
577            "acp",
578            session_id.clone(),
579            Arc::clone(&self.rpc),
580            Duration::from_secs(self.acp_config.session_timeout_secs),
581        ));
582        agent.channel_handles().register_channel("acp", acp_channel);
583
584        let now = Instant::now();
585        sessions.insert(
586            session_id.clone(),
587            Arc::new(Mutex::new(Session {
588                agent,
589                created_at: now,
590                last_active: now,
591                agent_alias: agent_alias.clone(),
592                model_provider: self
593                    .config
594                    .agent(&agent_alias)
595                    .map(|a| a.model_provider.to_string())
596                    .unwrap_or_default(),
597                model: self
598                    .config
599                    .model_provider_for_agent(&agent_alias)
600                    .and_then(|mp| mp.model.clone())
601                    .unwrap_or_default(),
602            })),
603        );
604
605        if let Some(store) = &self.store {
606            let store = store.clone();
607            let sid = session_id.clone();
608            let alias = agent_alias.clone();
609            let wsd = workspace_dir.clone();
610            let created =
611                tokio::task::spawn_blocking(move || store.create_session(&sid, &alias, &wsd)).await;
612            let error = match created {
613                Ok(Ok(_)) => None,
614                Ok(Err(e)) => Some(e.to_string()),
615                Err(join) => Some(join.to_string()),
616            };
617            if let Some(detail) = error {
618                // Roll back: remove the session we just inserted and surface the error.
619                sessions.remove(&session_id);
620                return Err(RpcError {
621                    code: INTERNAL_ERROR,
622                    message: format!("Failed to persist session: {detail}"),
623                    data: None,
624                });
625            }
626        }
627
628        let mp = self
629            .config
630            .agent(&agent_alias)
631            .map(|a| a.model_provider.to_string())
632            .unwrap_or_default();
633        let model_name = self
634            .config
635            .model_provider_for_agent(&agent_alias)
636            .and_then(|mp| mp.model.clone())
637            .unwrap_or_default();
638        ::zeroclaw_log::record!(
639            INFO,
640            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start)
641                .with_category(::zeroclaw_log::EventCategory::Channel)
642                .with_outcome(::zeroclaw_log::EventOutcome::Success)
643                .with_attrs(::serde_json::json!({
644                    "session_id": session_id,
645                    "workspace_dir": workspace_dir,
646                    "agent_alias": agent_alias,
647                    "model_provider": mp,
648                    "model": model_name,
649                })),
650            "ACP session created"
651        );
652
653        Ok(serde_json::json!({
654            "sessionId": session_id,
655            "workspaceDir": workspace_dir,
656        }))
657    }
658
659    async fn handle_session_load(&self, params: &Value) -> RpcResult {
660        let session_id = params
661            .get("sessionId")
662            .or_else(|| params.get("session_id"))
663            .and_then(|v| v.as_str())
664            .ok_or_else(|| RpcError {
665                code: INVALID_PARAMS,
666                message: "Missing required parameter: sessionId".to_string(),
667                data: None,
668            })?
669            .to_string();
670
671        let store = self.store.as_ref().ok_or_else(|| RpcError {
672            code: SESSION_NOT_FOUND,
673            message: format!("Session not found: {session_id}"),
674            data: None,
675        })?;
676
677        // Atomically check and reserve the session slot
678        {
679            let sessions = self.sessions.lock().await;
680            let mut loading = self.loading_sessions.lock().await;
681            if sessions.len() + loading.len() >= self.acp_config.max_sessions {
682                ::zeroclaw_log::record!(
683                    WARN,
684                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
685                        .with_category(::zeroclaw_log::EventCategory::Channel)
686                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
687                        .with_attrs(::serde_json::json!({
688                            "session_id": session_id,
689                            "active": sessions.len(),
690                            "loading": loading.len(),
691                            "max": self.acp_config.max_sessions,
692                        })),
693                    "ACP session/load rejected: session limit reached"
694                );
695                return Err(RpcError {
696                    code: SESSION_LIMIT_REACHED,
697                    message: format!(
698                        "Maximum session limit reached ({})",
699                        self.acp_config.max_sessions
700                    ),
701                    data: None,
702                });
703            }
704            if sessions.contains_key(&session_id) || loading.contains(&session_id) {
705                ::zeroclaw_log::record!(
706                    WARN,
707                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
708                        .with_category(::zeroclaw_log::EventCategory::Channel)
709                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
710                        .with_attrs(::serde_json::json!({"session_id": session_id})),
711                    "ACP session/load rejected: session already active"
712                );
713                return Err(RpcError {
714                    code: INVALID_PARAMS,
715                    message: format!(
716                        "Session already active: {session_id}. Call session/close first."
717                    ),
718                    data: None,
719                });
720            }
721            loading.insert(session_id.clone());
722        }
723
724        // Flatten both the SQLite error and the not-found case into a single
725        // Result so the cleanup match below runs for every failure after the
726        // reservation was inserted.
727        let data = store
728            .load_session(&session_id)
729            .map_err(|e| RpcError {
730                code: INTERNAL_ERROR,
731                message: format!("Failed to load session: {e}"),
732                data: None,
733            })
734            .and_then(|opt| {
735                opt.ok_or_else(|| RpcError {
736                    code: SESSION_NOT_FOUND,
737                    message: format!("Session not found: {session_id}"),
738                    data: None,
739                })
740            });
741
742        // On error (SQLite failure or not-found), release the reservation.
743        let data = match data {
744            Ok(d) => d,
745            Err(e) => {
746                self.loading_sessions.lock().await.remove(&session_id);
747                return Err(e);
748            }
749        };
750
751        let workspace_dir = std::path::PathBuf::from(&data.workspace_dir);
752
753        let restore_alias = self
754            .config
755            .acp
756            .default_agent
757            .clone()
758            .or_else(|| {
759                let mut keys = self.config.agents.keys();
760                if self.config.agents.len() == 1 {
761                    keys.next().cloned()
762                } else {
763                    None
764                }
765            })
766            .unwrap_or_else(|| "default".to_string());
767
768        let agent_result = Agent::from_config_with_session_cwd_and_mcp_backchannel(
769            &self.config,
770            &restore_alias,
771            Some(&workspace_dir),
772            false,
773            true,
774        )
775        .await
776        .map_err(|e| RpcError {
777            code: INTERNAL_ERROR,
778            message: format!("Failed to create agent: {e}"),
779            data: None,
780        });
781
782        let mut agent = match agent_result {
783            Ok(a) => a,
784            Err(e) => {
785                self.loading_sessions.lock().await.remove(&session_id);
786                return Err(e);
787            }
788        };
789
790        agent.seed_conversation_history(data.messages.clone());
791
792        let acp_channel = Arc::new(AcpChannel::new(
793            "acp",
794            session_id.clone(),
795            Arc::clone(&self.rpc),
796            Duration::from_secs(self.acp_config.session_timeout_secs),
797        ));
798        agent.channel_handles().register_channel("acp", acp_channel);
799
800        let now = Instant::now();
801        // Atomically insert and release reservation
802        {
803            let mut sessions = self.sessions.lock().await;
804            let mut loading = self.loading_sessions.lock().await;
805            loading.remove(&session_id);
806            sessions.insert(
807                session_id.clone(),
808                Arc::new(Mutex::new(Session {
809                    agent,
810                    created_at: now,
811                    last_active: now,
812                    agent_alias: restore_alias.clone(),
813                    model_provider: self
814                        .config
815                        .agent(&restore_alias)
816                        .map(|a| a.model_provider.to_string())
817                        .unwrap_or_default(),
818                    model: self
819                        .config
820                        .model_provider_for_agent(&restore_alias)
821                        .and_then(|mp| mp.model.clone())
822                        .unwrap_or_default(),
823                })),
824            );
825        }
826
827        // Stream conversation history to client as session/update notifications
828        for msg in &data.messages {
829            for notification in history_notifications_for_message(&session_id, msg) {
830                self.write_notification(&notification).await;
831            }
832        }
833
834        let mp = self
835            .config
836            .agent(&restore_alias)
837            .map(|a| a.model_provider.to_string())
838            .unwrap_or_default();
839        let model_name = self
840            .config
841            .model_provider_for_agent(&restore_alias)
842            .and_then(|mp| mp.model.clone())
843            .unwrap_or_default();
844        ::zeroclaw_log::record!(
845            INFO,
846            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start)
847                .with_category(::zeroclaw_log::EventCategory::Channel)
848                .with_outcome(::zeroclaw_log::EventOutcome::Success)
849                .with_attrs(::serde_json::json!({
850                    "session_id": session_id,
851                    "message_count": data.messages.len(),
852                    "agent_alias": restore_alias,
853                    "model_provider": mp,
854                    "model": model_name,
855                })),
856            "ACP session loaded"
857        );
858        Ok(serde_json::json!({}))
859    }
860
861    async fn handle_session_resume(&self, params: &Value) -> RpcResult {
862        let session_id = params
863            .get("sessionId")
864            .or_else(|| params.get("session_id"))
865            .and_then(|v| v.as_str())
866            .ok_or_else(|| RpcError {
867                code: INVALID_PARAMS,
868                message: "Missing required parameter: sessionId".to_string(),
869                data: None,
870            })?
871            .to_string();
872
873        let store = self.store.as_ref().ok_or_else(|| RpcError {
874            code: SESSION_NOT_FOUND,
875            message: format!("Session not found: {session_id}"),
876            data: None,
877        })?;
878
879        // Atomically check and reserve the session slot
880        {
881            let sessions = self.sessions.lock().await;
882            let mut loading = self.loading_sessions.lock().await;
883            if sessions.len() + loading.len() >= self.acp_config.max_sessions {
884                ::zeroclaw_log::record!(
885                    WARN,
886                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
887                        .with_category(::zeroclaw_log::EventCategory::Channel)
888                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
889                        .with_attrs(::serde_json::json!({
890                            "session_id": session_id,
891                            "active": sessions.len(),
892                            "loading": loading.len(),
893                            "max": self.acp_config.max_sessions,
894                        })),
895                    "ACP session/resume rejected: session limit reached"
896                );
897                return Err(RpcError {
898                    code: SESSION_LIMIT_REACHED,
899                    message: format!(
900                        "Maximum session limit reached ({})",
901                        self.acp_config.max_sessions
902                    ),
903                    data: None,
904                });
905            }
906            if sessions.contains_key(&session_id) || loading.contains(&session_id) {
907                ::zeroclaw_log::record!(
908                    WARN,
909                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
910                        .with_category(::zeroclaw_log::EventCategory::Channel)
911                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
912                        .with_attrs(::serde_json::json!({"session_id": session_id})),
913                    "ACP session/resume rejected: session already active"
914                );
915                return Err(RpcError {
916                    code: INVALID_PARAMS,
917                    message: format!(
918                        "Session already active: {session_id}. Call session/close first."
919                    ),
920                    data: None,
921                });
922            }
923            loading.insert(session_id.clone());
924        }
925
926        let data = store
927            .load_session(&session_id)
928            .map_err(|e| RpcError {
929                code: INTERNAL_ERROR,
930                message: format!("Failed to load session: {e}"),
931                data: None,
932            })
933            .and_then(|opt| {
934                opt.ok_or_else(|| RpcError {
935                    code: SESSION_NOT_FOUND,
936                    message: format!("Session not found: {session_id}"),
937                    data: None,
938                })
939            });
940
941        // On error (SQLite failure or not-found), release the reservation.
942        let data = match data {
943            Ok(d) => d,
944            Err(e) => {
945                self.loading_sessions.lock().await.remove(&session_id);
946                return Err(e);
947            }
948        };
949
950        let workspace_dir = std::path::PathBuf::from(&data.workspace_dir);
951
952        let restore_alias = self
953            .config
954            .acp
955            .default_agent
956            .clone()
957            .or_else(|| {
958                let mut keys = self.config.agents.keys();
959                if self.config.agents.len() == 1 {
960                    keys.next().cloned()
961                } else {
962                    None
963                }
964            })
965            .unwrap_or_else(|| "default".to_string());
966
967        let agent_result = Agent::from_config_with_session_cwd_and_mcp_backchannel(
968            &self.config,
969            &restore_alias,
970            Some(&workspace_dir),
971            false,
972            true,
973        )
974        .await
975        .map_err(|e| RpcError {
976            code: INTERNAL_ERROR,
977            message: format!("Failed to create agent: {e}"),
978            data: None,
979        });
980
981        let mut agent = match agent_result {
982            Ok(a) => a,
983            Err(e) => {
984                self.loading_sessions.lock().await.remove(&session_id);
985                return Err(e);
986            }
987        };
988
989        agent.seed_conversation_history(data.messages);
990
991        let acp_channel = Arc::new(AcpChannel::new(
992            "acp",
993            session_id.clone(),
994            Arc::clone(&self.rpc),
995            Duration::from_secs(self.acp_config.session_timeout_secs),
996        ));
997        agent.channel_handles().register_channel("acp", acp_channel);
998
999        let now = Instant::now();
1000        // Atomically insert and release reservation
1001        {
1002            let mut sessions = self.sessions.lock().await;
1003            let mut loading = self.loading_sessions.lock().await;
1004            loading.remove(&session_id);
1005            sessions.insert(
1006                session_id.clone(),
1007                Arc::new(Mutex::new(Session {
1008                    agent,
1009                    created_at: now,
1010                    last_active: now,
1011                    agent_alias: restore_alias.clone(),
1012                    model_provider: self
1013                        .config
1014                        .agent(&restore_alias)
1015                        .map(|a| a.model_provider.to_string())
1016                        .unwrap_or_default(),
1017                    model: self
1018                        .config
1019                        .model_provider_for_agent(&restore_alias)
1020                        .and_then(|mp| mp.model.clone())
1021                        .unwrap_or_default(),
1022                })),
1023            );
1024        }
1025
1026        let mp = self
1027            .config
1028            .agent(&restore_alias)
1029            .map(|a| a.model_provider.to_string())
1030            .unwrap_or_default();
1031        let model_name = self
1032            .config
1033            .model_provider_for_agent(&restore_alias)
1034            .and_then(|mp| mp.model.clone())
1035            .unwrap_or_default();
1036        ::zeroclaw_log::record!(
1037            INFO,
1038            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start)
1039                .with_category(::zeroclaw_log::EventCategory::Channel)
1040                .with_outcome(::zeroclaw_log::EventOutcome::Success)
1041                .with_attrs(::serde_json::json!({
1042                    "session_id": session_id,
1043                    "agent_alias": restore_alias,
1044                    "model_provider": mp,
1045                    "model": model_name,
1046                })),
1047            "ACP session resumed"
1048        );
1049        Ok(serde_json::json!({}))
1050    }
1051
1052    /// Handle `session/close` requests (ACP spec §Session Management).
1053    ///
1054    /// Closes a session: fires the cancel token to interrupt any in-flight turn,
1055    /// removes the session from the in-memory map, and unregisters the ACP channel.
1056    /// The session record in the persistent store is NOT deleted.
1057    ///
1058    /// Returns an empty object on success, or SESSION_NOT_FOUND if the session
1059    /// is not in the in-memory map (it may still exist in the store).
1060    async fn handle_session_close(&self, params: &Value) -> RpcResult {
1061        let session_id = params
1062            .get("sessionId")
1063            .or_else(|| params.get("session_id"))
1064            .and_then(|v| v.as_str())
1065            .ok_or_else(|| RpcError {
1066                code: INVALID_PARAMS,
1067                message: "Missing required parameter: sessionId".to_string(),
1068                data: None,
1069            })?;
1070
1071        // Fire the cancel token for any in-flight turn before acquiring the session lock.
1072        let token = self
1073            .cancel_tokens
1074            .lock()
1075            .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1076            .get(session_id)
1077            .cloned();
1078        if let Some(token) = token {
1079            token.cancel();
1080            ::zeroclaw_log::record!(
1081                INFO,
1082                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1083                    .with_category(::zeroclaw_log::EventCategory::Channel)
1084                    .with_attrs(::serde_json::json!({"session_id": session_id})),
1085                "ACP session/close: cancelled active turn"
1086            );
1087        }
1088
1089        let session_arc = {
1090            let mut sessions = self.sessions.lock().await;
1091            sessions.remove(session_id).ok_or_else(|| RpcError {
1092                code: SESSION_NOT_FOUND,
1093                message: format!("Session not found: {session_id}"),
1094                data: None,
1095            })?
1096        };
1097
1098        // Wait for any in-flight turn to finish (the cancel token may have already stopped it).
1099        let session = session_arc.lock().await;
1100        let agent_alias = session.agent_alias.clone();
1101        let model_provider = session.model_provider.clone();
1102        let model = session.model.clone();
1103        session.agent.channel_handles().unregister_channel("acp");
1104        ::zeroclaw_log::record!(
1105            INFO,
1106            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
1107                .with_category(::zeroclaw_log::EventCategory::Channel)
1108                .with_outcome(::zeroclaw_log::EventOutcome::Success)
1109                .with_attrs(::serde_json::json!({
1110                    "session_id": session_id,
1111                    "agent_alias": agent_alias,
1112                    "model_provider": model_provider,
1113                    "model": model,
1114                })),
1115            "ACP session closed"
1116        );
1117
1118        Ok(serde_json::json!({}))
1119    }
1120
1121    fn requested_session_cwd(&self, params: &Value) -> PathBuf {
1122        params
1123            .get("cwd")
1124            .or_else(|| params.get("workspaceDir"))
1125            .or_else(|| params.get("workspace_dir"))
1126            .and_then(|v| v.as_str())
1127            .map(PathBuf::from)
1128            .unwrap_or_else(|| {
1129                std::env::current_dir().unwrap_or_else(|_| self.config.data_dir.clone())
1130            })
1131    }
1132
1133    async fn handle_session_prompt(&self, params: &Value, _request_id: &Value) -> RpcResult {
1134        let session_id = params
1135            .get("sessionId")
1136            .or_else(|| params.get("session_id"))
1137            .and_then(|v| v.as_str())
1138            .ok_or_else(|| RpcError {
1139                code: INVALID_PARAMS,
1140                message: "Missing required parameter: sessionId".to_string(),
1141                data: None,
1142            })?
1143            .to_string();
1144
1145        let prompt = Self::parse_prompt(params)?;
1146
1147        // Clone the Arc so the session stays visible in the map throughout the
1148        // turn. `session/stop` and the reaper can still find it; they will
1149        // block on the inner Mutex until the turn completes.
1150        let session_arc = {
1151            let sessions = self.sessions.lock().await;
1152            sessions.get(&session_id).cloned().ok_or_else(|| RpcError {
1153                code: SESSION_NOT_FOUND,
1154                message: format!("Session not found: {session_id}"),
1155                data: None,
1156            })?
1157        };
1158
1159        // Snapshot attribution fields before releasing the outer lock.
1160        let (agent_alias, model_provider, model) = {
1161            // Try-lock: if the inner lock is held by an active turn, we'll
1162            // reject below via register_cancel_token anyway. Use a brief
1163            // non-blocking peek so we can log the alias even on the error path.
1164            if let Ok(s) = session_arc.try_lock() {
1165                (
1166                    s.agent_alias.clone(),
1167                    s.model_provider.clone(),
1168                    s.model.clone(),
1169                )
1170            } else {
1171                (String::new(), String::new(), String::new())
1172            }
1173        };
1174
1175        // Instrument the rest of the turn so every record! inside lands in
1176        // the Attribution section of the log viewer with agent_alias,
1177        // model_provider, and session_key populated.
1178        // scope! wraps the body with .instrument() internally — no EnteredSpan
1179        // held across .await points, so the future stays Send.
1180        // Clone before the macro so the owned values remain available inside
1181        // the async move block.
1182        let session_id_s = session_id.clone();
1183        let agent_alias_s = agent_alias.clone();
1184        let model_provider_s = model_provider.clone();
1185        let model_s = model.clone();
1186        ::zeroclaw_log::scope!(
1187            agent_alias: agent_alias_s.as_str(),
1188            model_provider: model_provider_s.as_str(),
1189            model: model_s.as_str(),
1190            session_key: session_id_s.as_str(),
1191            channel: "acp",
1192        => async move {
1193
1194        ::zeroclaw_log::record!(
1195            INFO,
1196            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start).with_category(::zeroclaw_log::EventCategory::Channel)
1197                .with_attrs(::serde_json::json!({
1198                    "prompt_len": prompt.len(),
1199                })),
1200            "ACP session/prompt turn starting"
1201        );
1202
1203        // Create a cancellation token for this turn and register it so that a
1204        // concurrent `session/cancel` notification can fire it without waiting
1205        // for the inner session lock (which is held for the full turn duration).
1206        // The lock can never be poisoned — all critical sections guarded by this
1207        // mutex are short, infallible HashMap operations (insert/remove/get)
1208        // that never call user code, panic, or block on I/O.
1209        let cancel_token = tokio_util::sync::CancellationToken::new();
1210        self.register_cancel_token(&session_id, cancel_token.clone())?;
1211        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(100);
1212
1213        // Move the Arc into the spawned task and lock inside it.  The inner
1214        // Mutex stays locked for the duration of the turn, preventing
1215        // concurrent stop/reap from touching the agent mid-turn. The outer
1216        // map entry remains in place.
1217        let session_id_for_task = session_id.clone();
1218        let turn_handle = zeroclaw_spawn::spawn!(async move {
1219            let mut session = session_arc.lock().await;
1220            let (turn_alias, turn_provider, turn_model) = session.agent.attribution_fields();
1221            let span_session = session_id_for_task.clone();
1222            let result = {
1223                use ::zeroclaw_log::Instrument as _;
1224                let span = ::zeroclaw_log::info_span!(
1225                    target: "zeroclaw_log_internal_scope",
1226                    "zeroclaw_scope",
1227                    session_key = %span_session,
1228                    agent_alias = %turn_alias,
1229                    model_provider = %turn_provider,
1230                    model = %turn_model,
1231                    channel = "acp",
1232                );
1233                zeroclaw_runtime::agent::loop_::scope_session_key(
1234                    Some(session_id_for_task),
1235                    session
1236                        .agent
1237                        .turn_streamed(&prompt, event_tx, Some(cancel_token))
1238                        .instrument(span),
1239                )
1240                .await
1241            };
1242            session.last_active = Instant::now();
1243            result
1244            // guard drops here, releasing the inner lock
1245        });
1246
1247        // Forward events as they arrive. Use standard ACP `session/update`
1248        // notifications: `tool_call` for initial (pending + title/kind for UI/icons),
1249        // `tool_call_update` for completion (status + rawOutput/content). This enables
1250        // proper pending→completed flow in ACP clients.
1251        // Track streamed text so partial content survives cancellation.
1252        let mut accumulated_text = String::new();
1253        let mut tool_call_count: u32 = 0;
1254        while let Some(event) = event_rx.recv().await {
1255            // ACP has no `session/update` shape for token-usage events; the
1256            // task-local cost tracker records them out-of-band. We DO use the
1257            // event to update the per-session `token_count` so the TUI ctx
1258            // bar resumes accurately. Then skip before dispatching to the
1259            // notification builder so the helper match can stay exhaustive
1260            // on the four UI-relevant variants.
1261            if let TurnEvent::Usage { input_tokens, .. } = &event {
1262                // Token-count persistence is best-effort UI bookkeeping (it
1263                // restores the TUI ctx bar on resume). It must never gate the
1264                // draining of `event_rx`: this loop is the sole consumer of the
1265                // turn's bounded `event_tx` (capacity 100). The session store
1266                // wraps a single SQLite connection behind one process-wide
1267                // mutex, so a concurrent session mid-`append_turn` transaction
1268                // can stall this write. Awaiting it here would stop draining,
1269                // fill `event_tx`, and block the agent's unguarded
1270                // `event_tx.send(...).await` — wedging the turn on "working"
1271                // with no cancel path. Fire-and-forget keeps the consumer live.
1272                if let (Some(store), Some(it)) = (&self.store, input_tokens) {
1273                    let store = store.clone();
1274                    let sid = session_id.clone();
1275                    let it = *it;
1276                    zeroclaw_spawn::spawn!(async move {
1277                        let persisted =
1278                            tokio::task::spawn_blocking(move || store.set_token_count(&sid, it))
1279                                .await;
1280                        let error = match persisted {
1281                            Ok(Ok(())) => return,
1282                            Ok(Err(e)) => e.to_string(),
1283                            Err(join) => join.to_string(),
1284                        };
1285                        ::zeroclaw_log::record!(
1286                            WARN,
1287                            ::zeroclaw_log::Event::new(
1288                                module_path!(),
1289                                ::zeroclaw_log::Action::Write,
1290                            )
1291                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1292                            .with_attrs(::serde_json::json!({
1293                                "input_tokens": it,
1294                                "error": error,
1295                            })),
1296                            "Failed to persist ACP session token_count"
1297                        );
1298                    });
1299                }
1300                continue;
1301            }
1302            // Emit attributable span logs for every tool call and result.
1303            // Attribution (agent_alias, model_provider, session_key) flows
1304            // from the enclosing spans — not repeated here in attrs.
1305            match &event {
1306                TurnEvent::ToolCall { id, name, args } => {
1307                    tool_call_count += 1;
1308                    ::zeroclaw_log::record!(
1309                        DEBUG,
1310                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Start).with_category(::zeroclaw_log::EventCategory::Channel)
1311                            .with_attrs(::serde_json::json!({
1312                                "tool_call_id": id,
1313                                "tool": name,
1314                                "args_len": args.to_string().len(),
1315                            })),
1316                        "ACP tool call dispatched"
1317                    );
1318                }
1319                TurnEvent::ToolResult { id, name, output } => {
1320                    ::zeroclaw_log::record!(
1321                        DEBUG,
1322                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete).with_category(::zeroclaw_log::EventCategory::Channel)
1323                            .with_outcome(::zeroclaw_log::EventOutcome::Success)
1324                            .with_attrs(::serde_json::json!({
1325                                "tool_call_id": id,
1326                                "tool": name,
1327                                "output_len": output.len(),
1328                            })),
1329                        "ACP tool call completed"
1330                    );
1331                }
1332                TurnEvent::Chunk { delta } => {
1333                    accumulated_text.push_str(delta);
1334                }
1335                _ => {}
1336            }
1337            if let Some(notification) = notification_for_turn_event(&session_id, &event) {
1338                self.write_notification(&notification).await;
1339            }
1340        }
1341
1342        // Remove the cancel token regardless of outcome — the turn is over.
1343        // Lock poisoned invariant: same as the insert site above.
1344        self.remove_cancel_token(&session_id);
1345
1346        let turn_result = turn_handle.await.map_err(|e| RpcError {
1347            code: INTERNAL_ERROR,
1348            message: format!("Agent task panicked: {e}"),
1349            data: None,
1350        })?;
1351
1352        // Per ACP spec: a cancelled turn must respond with stopReason "cancelled",
1353        // not an error. Detect via ToolLoopCancelled propagated through anyhow.
1354        let was_cancelled = match &turn_result {
1355            Err(e) => zeroclaw_runtime::agent::loop_::is_tool_loop_cancelled(e),
1356            Ok(_) => false,
1357        };
1358
1359        if was_cancelled {
1360            ::zeroclaw_log::record!(
1361                INFO,
1362                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete).with_category(::zeroclaw_log::EventCategory::Channel)
1363                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1364                    .with_attrs(::serde_json::json!({
1365                        "tool_calls": tool_call_count,
1366                        "stop_reason": "cancelled",
1367                    })),
1368                "ACP session/prompt turn cancelled"
1369            );
1370            return Ok(Self::cancelled_prompt_result(session_id, &accumulated_text));
1371        }
1372
1373        let (result_text, new_turn_msgs) = turn_result.map_err(|e| {
1374            ::zeroclaw_log::record!(
1375                ERROR,
1376                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail).with_category(::zeroclaw_log::EventCategory::Channel)
1377                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1378                    .with_attrs(::serde_json::json!({
1379                        "error": e.to_string(),
1380                    })),
1381                "ACP session/prompt turn failed"
1382            );
1383            RpcError {
1384                code: INTERNAL_ERROR,
1385                message: format!("Agent turn failed: {e}"),
1386                data: None,
1387            }
1388        })?;
1389
1390        // Persist new messages on successful, non-cancelled turns.
1391        if let Some(store) = &self.store
1392            && !new_turn_msgs.is_empty()
1393        {
1394            let store = store.clone();
1395            let sid = session_id.clone();
1396            let msgs = new_turn_msgs;
1397            let persisted =
1398                tokio::task::spawn_blocking(move || store.append_turn(&sid, &msgs)).await;
1399            let error = match persisted {
1400                Ok(Ok(())) => None,
1401                Ok(Err(e)) => Some(e.to_string()),
1402                Err(join) => Some(join.to_string()),
1403            };
1404            if let Some(detail) = error {
1405                ::zeroclaw_log::record!(
1406                    WARN,
1407                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_category(::zeroclaw_log::EventCategory::Channel)
1408                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1409                        .with_attrs(::serde_json::json!({
1410                            "error": detail,
1411                        })),
1412                    "Failed to persist turn; session continues in memory"
1413                );
1414            }
1415        }
1416
1417        ::zeroclaw_log::record!(
1418            INFO,
1419            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete).with_category(::zeroclaw_log::EventCategory::Channel)
1420                .with_outcome(::zeroclaw_log::EventOutcome::Success)
1421                .with_attrs(::serde_json::json!({
1422                    "tool_calls": tool_call_count,
1423                    "response_len": result_text.len(),
1424                    "stop_reason": "end_turn",
1425                })),
1426            "ACP session/prompt turn complete"
1427        );
1428
1429        Ok(Self::prompt_result(session_id, "end_turn", result_text))
1430
1431        }).await
1432    }
1433
1434    fn register_cancel_token(
1435        &self,
1436        session_id: &str,
1437        cancel_token: tokio_util::sync::CancellationToken,
1438    ) -> std::result::Result<(), RpcError> {
1439        let mut tokens = self
1440            .cancel_tokens
1441            .lock()
1442            .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops");
1443        if tokens.contains_key(session_id) {
1444            ::zeroclaw_log::record!(
1445                WARN,
1446                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1447                    .with_category(::zeroclaw_log::EventCategory::Channel)
1448                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1449                    .with_attrs(::serde_json::json!({"session_id": session_id})),
1450                "ACP session/prompt rejected: session already has an active turn"
1451            );
1452            return Err(RpcError {
1453                code: SESSION_BUSY,
1454                message: format!("Session already has an active prompt turn: {session_id}"),
1455                data: None,
1456            });
1457        }
1458        tokens.insert(session_id.to_string(), cancel_token);
1459        Ok(())
1460    }
1461
1462    fn remove_cancel_token(&self, session_id: &str) {
1463        self.cancel_tokens
1464            .lock()
1465            .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1466            .remove(session_id);
1467    }
1468
1469    fn prompt_result(session_id: String, stop_reason: &'static str, text: String) -> Value {
1470        serde_json::json!({
1471            "sessionId": session_id,
1472            "stopReason": stop_reason,
1473            "content": text,
1474        })
1475    }
1476
1477    fn cancelled_prompt_result(session_id: String, accumulated_text: &str) -> Value {
1478        let content = if accumulated_text.is_empty() {
1479            "[interrupted by user]".to_string()
1480        } else {
1481            format!("{accumulated_text}\n\n[interrupted by user]")
1482        };
1483        Self::prompt_result(session_id, "cancelled", content)
1484    }
1485
1486    fn parse_prompt(params: &Value) -> std::result::Result<String, RpcError> {
1487        match params.get("prompt") {
1488            Some(Value::String(s)) => Ok(s.clone()),
1489            Some(Value::Array(arr)) => {
1490                let mut joined = String::new();
1491                for part in arr {
1492                    let mut added = false;
1493                    if let Some(text) = part.get("text").and_then(|v| v.as_str()) {
1494                        if !joined.is_empty() {
1495                            joined.push_str("\n\n");
1496                        }
1497                        joined.push_str(text);
1498                        added = true;
1499                    }
1500                    // Support ACP resource blocks for @-notation file attachments
1501                    // (clients send {"type":"resource","resource":{"uri":"...","text":"..."}})
1502                    if let Some(res) = part.get("resource")
1503                        && let Some(text) = res.get("text").and_then(|v| v.as_str())
1504                    {
1505                        if added || !joined.is_empty() {
1506                            joined.push_str("\n\n");
1507                        }
1508                        joined.push_str(text);
1509                    }
1510                }
1511                if joined.is_empty() {
1512                    return Err(RpcError {
1513                        code: INVALID_PARAMS,
1514                        message: "Parameter 'prompt' array must contain at least one text part"
1515                            .to_string(),
1516                        data: None,
1517                    });
1518                }
1519                Ok(joined)
1520            }
1521            _ => Err(RpcError {
1522                code: INVALID_PARAMS,
1523                message: "Missing required parameter: prompt (must be string or array of parts)"
1524                    .to_string(),
1525                data: None,
1526            }),
1527        }
1528    }
1529
1530    async fn handle_session_stop(&self, params: &Value) -> RpcResult {
1531        let session_id = params
1532            .get("sessionId")
1533            .or_else(|| params.get("session_id"))
1534            .and_then(|v| v.as_str())
1535            .ok_or_else(|| RpcError {
1536                code: INVALID_PARAMS,
1537                message: "Missing required parameter: sessionId".to_string(),
1538                data: None,
1539            })?;
1540
1541        let session_arc = {
1542            let mut sessions = self.sessions.lock().await;
1543            sessions.remove(session_id).ok_or_else(|| RpcError {
1544                code: SESSION_NOT_FOUND,
1545                message: format!("Session not found: {session_id}"),
1546                data: None,
1547            })?
1548        };
1549
1550        // Wait for any in-flight prompt turn to finish before cleaning up.
1551        // The inner lock is held by the turn task; this blocks until it drops.
1552        let session = session_arc.lock().await;
1553        let agent_alias = session.agent_alias.clone();
1554        let model_provider = session.model_provider.clone();
1555        let model = session.model.clone();
1556        // Drop the ACP back-channel from each tool's channel map so the
1557        // session's RpcOutbound clone isn't kept alive by stale entries.
1558        session.agent.channel_handles().unregister_channel("acp");
1559        ::zeroclaw_log::record!(
1560            INFO,
1561            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
1562                .with_category(::zeroclaw_log::EventCategory::Channel)
1563                .with_outcome(::zeroclaw_log::EventOutcome::Success)
1564                .with_attrs(::serde_json::json!({
1565                    "session_id": session_id,
1566                    "agent_alias": agent_alias,
1567                    "model_provider": model_provider,
1568                    "model": model,
1569                })),
1570            "ACP session stopped"
1571        );
1572        Ok(serde_json::json!({
1573            "sessionId": session_id,
1574            "stopped": true,
1575        }))
1576    }
1577
1578    /// Handle `session/cancel` notifications (ACP spec §Cancellation).
1579    ///
1580    /// Fires the cancellation token for the named session's active turn, if
1581    /// one is running. Idempotent — silently succeeds when there is no active
1582    /// turn. The return value is ignored for notifications.
1583    ///
1584    /// Cancel-vs-stop interaction: if `session/cancel` and `session/stop` fire
1585    /// nearly simultaneously, both handlers race — cancel fires the token
1586    /// (which may or may not interrupt the turn), and stop sets
1587    /// `session.stopped = true` and awaits the turn handle. The net effect is
1588    /// harmless: either the turn sees the cancellation token or it doesn't, and
1589    /// stop always waits for the turn to finish.
1590    async fn handle_session_cancel(&self, params: &Value) -> RpcResult {
1591        let session_id = params
1592            .get("sessionId")
1593            .or_else(|| params.get("session_id"))
1594            .and_then(|v| v.as_str())
1595            .ok_or_else(|| RpcError {
1596                code: INVALID_PARAMS,
1597                message: "Missing required parameter: sessionId".to_string(),
1598                data: None,
1599            })?;
1600
1601        let token = self
1602            .cancel_tokens
1603            .lock()
1604            .expect("cancel_tokens lock poisoned — invariant: all guarded critical sections are short, infallible HashMap ops")
1605            .get(session_id)
1606            .cloned();
1607
1608        if let Some(token) = token {
1609            token.cancel();
1610            ::zeroclaw_log::record!(
1611                INFO,
1612                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1613                    .with_category(::zeroclaw_log::EventCategory::Channel)
1614                    .with_attrs(::serde_json::json!({"session_id": session_id})),
1615                "ACP session/cancel: fired cancel token for active turn"
1616            );
1617        }
1618
1619        Ok(serde_json::json!({}))
1620    }
1621
1622    /// Handle incoming `session/update` (or legacy `session/event`) notifications.
1623    ///
1624    /// This processes bidirectional events for an active session (e.g. tool results,
1625    /// status updates, or client-side events). Currently updates session activity
1626    /// to prevent premature reaping; future extensions can route specific event
1627    /// types into the Agent.
1628    async fn handle_session_event(&self, params: &Value) -> RpcResult {
1629        let session_id = params
1630            .get("sessionId")
1631            .or_else(|| params.get("session_id"))
1632            .and_then(|v| v.as_str())
1633            .ok_or_else(|| RpcError {
1634                code: INVALID_PARAMS,
1635                message: "Missing required parameter: sessionId".to_string(),
1636                data: None,
1637            })?
1638            .to_string();
1639
1640        let event_type = params
1641            .get("type")
1642            .or_else(|| params.get("update").and_then(|u| u.get("sessionUpdate")))
1643            .and_then(|v| v.as_str())
1644            .unwrap_or("unknown")
1645            .to_string();
1646
1647        ::zeroclaw_log::record!(
1648            DEBUG,
1649            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1650                .with_category(::zeroclaw_log::EventCategory::Channel)
1651                .with_attrs(
1652                    ::serde_json::json!({"event_type": event_type, "session_id": session_id})
1653                ),
1654            "Received session update (type=) for session"
1655        );
1656
1657        let session_arc = {
1658            let sessions = self.sessions.lock().await;
1659            sessions.get(&session_id).cloned()
1660        };
1661
1662        if let Some(session_arc) = session_arc {
1663            // Best-effort last_active update. If the inner lock is held by an
1664            // active turn, skip it — the turn itself updates last_active on completion.
1665            if let Ok(mut session) = session_arc.try_lock() {
1666                session.last_active = Instant::now();
1667            }
1668            Ok(serde_json::json!({
1669                "sessionId": session_id,
1670                "type": event_type,
1671                "status": "processed"
1672            }))
1673        } else {
1674            Err(RpcError {
1675                code: SESSION_NOT_FOUND,
1676                message: format!("Session not found: {session_id}"),
1677                data: None,
1678            })
1679        }
1680    }
1681
1682    // ── I/O helpers ──────────────────────────────────────────────
1683
1684    async fn write_result(&self, id: Value, result: Value) {
1685        let response = JsonRpcResponse {
1686            jsonrpc: "2.0",
1687            result: Some(result),
1688            error: None,
1689            id,
1690        };
1691        self.write_json(&response).await;
1692    }
1693
1694    async fn write_error(&self, id: Value, code: i32, message: &str) {
1695        let response = JsonRpcResponse {
1696            jsonrpc: "2.0",
1697            result: None,
1698            error: Some(JsonRpcError {
1699                code,
1700                message: message.to_string(),
1701                data: None,
1702            }),
1703            id,
1704        };
1705        self.write_json(&response).await;
1706    }
1707
1708    async fn write_notification(&self, notification: &JsonRpcNotification) {
1709        self.write_json(notification).await;
1710    }
1711
1712    async fn write_json<T: Serialize>(&self, value: &T) {
1713        match serde_json::to_string(value) {
1714            Ok(json) => {
1715                if !self.rpc.send_raw(json).await {
1716                    ::zeroclaw_log::record!(
1717                        ERROR,
1718                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1719                            .with_category(::zeroclaw_log::EventCategory::Channel)
1720                            .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1721                        "ACP writer task closed; dropping outbound message"
1722                    );
1723                }
1724            }
1725            Err(e) => {
1726                ::zeroclaw_log::record!(
1727                    ERROR,
1728                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1729                        .with_category(::zeroclaw_log::EventCategory::Channel)
1730                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1731                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1732                    "Failed to serialize JSON-RPC message"
1733                );
1734            }
1735        }
1736    }
1737}
1738
1739/// Single writer task that owns stdout. All outbound JSON-RPC messages flow
1740/// through here, so concurrent notifications and outbound requests don't
1741/// interleave bytes.
1742async fn writer_task(mut rx: mpsc::Receiver<String>) {
1743    let mut stdout = tokio::io::stdout();
1744    while let Some(line) = rx.recv().await {
1745        if let Err(e) = stdout.write_all(line.as_bytes()).await {
1746            ::zeroclaw_log::record!(
1747                ERROR,
1748                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1749                    .with_category(::zeroclaw_log::EventCategory::Channel)
1750                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1751                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1752                "Failed to write to stdout"
1753            );
1754            continue;
1755        }
1756        if let Err(e) = stdout.write_all(b"\n").await {
1757            ::zeroclaw_log::record!(
1758                ERROR,
1759                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1760                    .with_category(::zeroclaw_log::EventCategory::Channel)
1761                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1762                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1763                "Failed to write newline to stdout"
1764            );
1765            continue;
1766        }
1767        if let Err(e) = stdout.flush().await {
1768            ::zeroclaw_log::record!(
1769                ERROR,
1770                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1771                    .with_category(::zeroclaw_log::EventCategory::Channel)
1772                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1773                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1774                "Failed to flush stdout"
1775            );
1776        }
1777    }
1778}
1779
1780/// Translate tool args into the ACP `rawInput` shape.
1781///
1782/// For file-editing tools, the ACP Diff schema uses `oldText`/`newText` (camelCase).
1783/// ZeroClaw's internal tool args use `old_string`/`new_string` (snake_case) for
1784/// `file_edit` and `content` for `file_write`. Without this translation, ACP clients
1785/// (Toad, Zed) cannot recognise the Diff shape and fall back to rendering the raw JSON
1786/// fields as giant strings.
1787fn to_acp_raw_input(name: &str, args: &Value) -> Value {
1788    match name {
1789        "file_edit" => {
1790            let path = args.get("path").cloned().unwrap_or(Value::Null);
1791            let old_text = args.get("old_string").cloned().unwrap_or(Value::Null);
1792            let new_text = args.get("new_string").cloned().unwrap_or(Value::Null);
1793            serde_json::json!({ "path": path, "oldText": old_text, "newText": new_text })
1794        }
1795        "file_write" => {
1796            let path = args.get("path").cloned().unwrap_or(Value::Null);
1797            let new_text = args.get("content").cloned().unwrap_or(Value::Null);
1798            serde_json::json!({ "path": path, "newText": new_text })
1799        }
1800        _ => args.clone(),
1801    }
1802}
1803
1804/// Build the ACP `content` array for a tool call notification.
1805///
1806/// Zed and Toad render tool call content from the `content` array. For
1807/// file-editing tools, emit an ACP Diff content item (`{ "type": "diff", ... }`)
1808/// so clients show a side-by-side diff editor. Non-edit tools return an empty
1809/// array — their `rawInput` is displayed via the standard `raw_input` fallback.
1810fn to_acp_content(name: &str, args: &Value) -> Value {
1811    match name {
1812        "file_edit" => {
1813            let path = args.get("path").cloned().unwrap_or(Value::Null);
1814            let old_text = args.get("old_string").cloned().unwrap_or(Value::Null);
1815            let new_text = args.get("new_string").cloned().unwrap_or(Value::Null);
1816            serde_json::json!([{ "type": "diff", "path": path, "oldText": old_text, "newText": new_text }])
1817        }
1818        "file_write" => {
1819            let path = args.get("path").cloned().unwrap_or(Value::Null);
1820            let new_text = args.get("content").cloned().unwrap_or(Value::Null);
1821            serde_json::json!([{ "type": "diff", "path": path, "newText": new_text }])
1822        }
1823        _ => serde_json::json!([]),
1824    }
1825}
1826
1827fn map_tool_kind(name: &str) -> &'static str {
1828    match name {
1829        "ask_user" | "calculator" | "claude_code" | "claude_code_runner" | "codex_cli"
1830        | "composio" | "delegate" | "escalate_to_human" | "execute_pipeline" | "gemini_cli"
1831        | "jira" | "llm_task" | "opencode_cli" | "schedule" | "security_ops" | "shell"
1832        | "sop_advance" | "sop_approve" | "sop_execute" | "vi_verify" => "execute",
1833        "backup" | "browser_open" | "canvas" | "cloud_ops" | "file_edit" | "file_write"
1834        | "memory_export" | "memory_store" | "report_template" => "edit",
1835        "cron_add" | "poll" | "reaction" => "edit",
1836        "memory_forget" | "memory_purge" => "delete",
1837        // ACP clients often treat `read`/`search`/`fetch` calls as noisy
1838        // background context gathering and keep their content collapsed. These
1839        // ZeroClaw tools return user-visible text, so use `other` to keep the
1840        // result content surfaced consistently across clients.
1841        "content_search" | "discord_search" | "glob_search" | "knowledge" | "search"
1842        | "tool_search" | "web_search_tool" => "other",
1843        "browser"
1844        | "browser_delegate"
1845        | "cloud_patterns"
1846        | "data_management"
1847        | "file_read"
1848        | "git_operations"
1849        | "google_workspace"
1850        | "hardware_board_info"
1851        | "hardware_memory_map"
1852        | "hardware_memory_read"
1853        | "image_info"
1854        | "linkedin"
1855        | "microsoft365"
1856        | "model_routing_config"
1857        | "model_switch"
1858        | "pdf_read"
1859        | "project_intel"
1860        | "proxy_config"
1861        | "read_skill"
1862        | "sessions_history"
1863        | "sessions_list"
1864        | "sop_list"
1865        | "sop_status"
1866        | "text_browser"
1867        | "weather"
1868        | "workspace" => "other",
1869        "cron_list" | "cron_runs" | "memory_recall" => "other",
1870        "http_request" | "web_fetch" => "other",
1871        "image_gen" => "other",
1872        "cron_remove" => "delete",
1873        "cron_run" => "execute",
1874        "sessions_send" => "execute",
1875        _ => "other",
1876    }
1877}
1878
1879fn notification_for_turn_event(session_id: &str, event: &TurnEvent) -> Option<JsonRpcNotification> {
1880    Some(match event {
1881        TurnEvent::Chunk { delta } => JsonRpcNotification {
1882            jsonrpc: "2.0",
1883            method: "session/update",
1884            params: serde_json::json!({
1885                "sessionId": session_id,
1886                "update": {
1887                    "sessionUpdate": "agent_message_chunk",
1888                    "content": {
1889                        "type": "text",
1890                        "text": delta
1891                    }
1892                }
1893            }),
1894        },
1895        TurnEvent::ToolCall { id, name, args } => {
1896            let acp_content = to_acp_content(name, args);
1897            let mut update = serde_json::json!({
1898                "sessionUpdate": "tool_call",
1899                "toolCallId": id,
1900                "name": name,
1901                "title": name,
1902                "kind": map_tool_kind(name),
1903                "rawInput": to_acp_raw_input(name, args),
1904                "status": "pending"
1905            });
1906            if acp_content
1907                .as_array()
1908                .is_some_and(|items| !items.is_empty())
1909            {
1910                update["content"] = acp_content;
1911            }
1912            JsonRpcNotification {
1913                jsonrpc: "2.0",
1914                method: "session/update",
1915                params: serde_json::json!({
1916                    "sessionId": session_id,
1917                    "update": update
1918                }),
1919            }
1920        }
1921        TurnEvent::ToolResult { id, name, output } => JsonRpcNotification {
1922            jsonrpc: "2.0",
1923            method: "session/update",
1924            params: serde_json::json!({
1925                "sessionId": session_id,
1926                "update": {
1927                    "sessionUpdate": "tool_call_update",
1928                    "toolCallId": id,
1929                    "name": name,
1930                    "title": name,
1931                    "kind": map_tool_kind(name),
1932                    "status": "completed",
1933                    "rawOutput": output,
1934                    "body": output,
1935                    "content": [{
1936                        "type": "content",
1937                        "content": {
1938                            "type": "text",
1939                            "text": output
1940                        }
1941                    }]
1942                }
1943            }),
1944        },
1945        TurnEvent::Thinking { delta } => JsonRpcNotification {
1946            jsonrpc: "2.0",
1947            method: "session/update",
1948            params: serde_json::json!({
1949                "sessionId": session_id,
1950                "update": {
1951                    "sessionUpdate": "agent_thought_chunk",
1952                    "content": {
1953                        "type": "text",
1954                        "text": delta
1955                    }
1956                }
1957            }),
1958        },
1959        // ACP has its own approval mechanism via `session/request_permission`
1960        // routed through the channel's `request_choice` impl. The agent only
1961        // emits ApprovalRequest events when a back-channel like the gateway
1962        // WS is registered to handle them; on ACP-only sessions they should
1963        // not arrive here.
1964        TurnEvent::ApprovalRequest { .. } => return None,
1965        // Usage events are filtered out at every call site (ACP has no
1966        // `session/update` shape for them; the cost tracker records them
1967        // out-of-band). Reaching this arm means a caller forgot the filter.
1968        TurnEvent::Usage { .. } => unreachable!(
1969            "TurnEvent::Usage must be filtered before notification_for_turn_event; \
1970             ACP has no session/update notification for token usage"
1971        ),
1972    })
1973}
1974
1975fn history_notifications_for_message(
1976    session_id: &str,
1977    msg: &ConversationMessage,
1978) -> Vec<JsonRpcNotification> {
1979    match msg {
1980        ConversationMessage::Chat(chat) => {
1981            let update_type = match chat.role.as_str() {
1982                "user" => "user_message_chunk",
1983                "assistant" => "agent_message_chunk",
1984                _ => return vec![],
1985            };
1986            vec![JsonRpcNotification {
1987                jsonrpc: "2.0",
1988                method: "session/update",
1989                params: serde_json::json!({
1990                    "sessionId": session_id,
1991                    "update": {
1992                        "sessionUpdate": update_type,
1993                        "content": { "type": "text", "text": &chat.content }
1994                    }
1995                }),
1996            }]
1997        }
1998        ConversationMessage::AssistantToolCalls {
1999            text, tool_calls, ..
2000        } => {
2001            let mut notifications = Vec::new();
2002            if let Some(t) = text
2003                && !t.is_empty()
2004            {
2005                notifications.push(JsonRpcNotification {
2006                    jsonrpc: "2.0",
2007                    method: "session/update",
2008                    params: serde_json::json!({
2009                        "sessionId": session_id,
2010                        "update": {
2011                            "sessionUpdate": "agent_message_chunk",
2012                            "content": { "type": "text", "text": t }
2013                        }
2014                    }),
2015                });
2016            }
2017            for tc in tool_calls {
2018                let args: serde_json::Value =
2019                    serde_json::from_str(&tc.arguments).unwrap_or(serde_json::Value::Null);
2020                let acp_content = to_acp_content(&tc.name, &args);
2021                let mut update = serde_json::json!({
2022                    "sessionUpdate": "tool_call",
2023                    "toolCallId": &tc.id,
2024                    "name": &tc.name,
2025                    "title": &tc.name,
2026                    "kind": map_tool_kind(&tc.name),
2027                    "rawInput": to_acp_raw_input(&tc.name, &args),
2028                    "status": "completed"
2029                });
2030                if acp_content
2031                    .as_array()
2032                    .is_some_and(|items| !items.is_empty())
2033                {
2034                    update["content"] = acp_content;
2035                }
2036                notifications.push(JsonRpcNotification {
2037                    jsonrpc: "2.0",
2038                    method: "session/update",
2039                    params: serde_json::json!({
2040                        "sessionId": session_id,
2041                        "update": update
2042                    }),
2043                });
2044            }
2045            notifications
2046        }
2047        ConversationMessage::ToolResults(results) => results
2048            .iter()
2049            .map(|r| JsonRpcNotification {
2050                jsonrpc: "2.0",
2051                method: "session/update",
2052                params: serde_json::json!({
2053                    "sessionId": session_id,
2054                    "update": {
2055                        "sessionUpdate": "tool_call_update",
2056                        "toolCallId": &r.tool_call_id,
2057                        "status": "completed",
2058                        "rawOutput": &r.content,
2059                        "body": &r.content,
2060                        "content": [{
2061                            "type": "content",
2062                            "content": { "type": "text", "text": &r.content }
2063                        }]
2064                    }
2065                }),
2066            })
2067            .collect(),
2068    }
2069}
2070
2071// ── Error helper ─────────────────────────────────────────────────
2072
2073#[derive(Debug)]
2074struct RpcError {
2075    code: i32,
2076    message: String,
2077    #[allow(dead_code)] // JSON-RPC spec field, used for structured error data
2078    data: Option<Value>,
2079}
2080
2081type RpcResult = std::result::Result<Value, RpcError>;
2082
2083#[cfg(test)]
2084mod tests {
2085    use super::*;
2086
2087    #[test]
2088    fn acp_server_config_defaults() {
2089        let cfg = AcpServerConfig::default();
2090        assert_eq!(cfg.max_sessions, 10);
2091        assert_eq!(cfg.session_timeout_secs, 3600);
2092    }
2093
2094    #[test]
2095    fn acp_server_config_deserialize() {
2096        let json = r#"{"max_sessions": 5, "session_timeout_secs": 1800}"#;
2097        let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
2098        assert_eq!(cfg.max_sessions, 5);
2099        assert_eq!(cfg.session_timeout_secs, 1800);
2100    }
2101
2102    #[test]
2103    fn acp_server_config_deserialize_partial() {
2104        let json = r#"{"max_sessions": 3}"#;
2105        let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
2106        assert_eq!(cfg.max_sessions, 3);
2107        assert_eq!(cfg.session_timeout_secs, 3600);
2108    }
2109
2110    #[test]
2111    fn json_rpc_request_parse() {
2112        let json = r#"{"jsonrpc":"2.0","method":"initialize","params":{},"id":1}"#;
2113        let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
2114        assert_eq!(req.method, "initialize");
2115        assert_eq!(req.id, Some(Value::Number(1.into())));
2116    }
2117
2118    #[test]
2119    fn json_rpc_request_parse_notification() {
2120        let json = r#"{"jsonrpc":"2.0","method":"session/update","params":{}}"#;
2121        let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
2122        assert_eq!(req.method, "session/update");
2123        assert!(req.id.is_none());
2124    }
2125
2126    #[test]
2127    fn json_rpc_response_serialize() {
2128        let resp = JsonRpcResponse {
2129            jsonrpc: "2.0",
2130            result: Some(serde_json::json!({"status": "ok"})),
2131            error: None,
2132            id: Value::Number(1.into()),
2133        };
2134        let json = serde_json::to_string(&resp).unwrap();
2135        let parsed: Value = serde_json::from_str(&json).unwrap();
2136        assert_eq!(parsed["jsonrpc"], "2.0");
2137        assert!(parsed.get("result").is_some());
2138        assert!(parsed.get("error").is_none());
2139        assert_eq!(parsed["id"], 1);
2140    }
2141
2142    #[tokio::test]
2143    async fn rpc_request_timeout_drop_removes_pending_responder() {
2144        let (tx, mut rx) = mpsc::channel::<String>(16);
2145        let rpc = RpcOutbound::new(tx);
2146
2147        let result = tokio::time::timeout(
2148            Duration::from_millis(10),
2149            rpc.request("session/request_permission", serde_json::json!({})),
2150        )
2151        .await;
2152
2153        assert!(result.is_err());
2154        assert!(rx.recv().await.is_some());
2155        assert_eq!(rpc.pending_count(), 0);
2156    }
2157
2158    #[test]
2159    fn initialize_response_uses_acp_v1_shape() {
2160        let server = AcpServer::new(Config::default(), AcpServerConfig::default());
2161        let result = server
2162            .handle_initialize(&serde_json::json!({
2163                "protocolVersion": 1,
2164                "clientCapabilities": {},
2165                "clientInfo": {
2166                    "name": "test-client",
2167                    "version": "1.0.0"
2168                }
2169            }))
2170            .unwrap();
2171
2172        assert_eq!(result["protocolVersion"], 1);
2173        assert_eq!(result["agentInfo"]["name"], "zeroclaw-acp");
2174        assert_eq!(result["agentInfo"]["title"], "ZeroClaw ACP");
2175        assert_eq!(result["agentInfo"]["version"], env!("CARGO_PKG_VERSION"));
2176        assert_eq!(result["authMethods"], serde_json::json!([]));
2177        assert_eq!(result["agentCapabilities"]["loadSession"], false);
2178        assert_eq!(
2179            result["agentCapabilities"]["promptCapabilities"]["image"],
2180            false
2181        );
2182        assert_eq!(
2183            result["agentCapabilities"]["mcpCapabilities"]["http"],
2184            false
2185        );
2186        assert!(result.get("serverInfo").is_none());
2187        assert!(result.get("capabilities").is_none());
2188    }
2189
2190    #[test]
2191    fn initialize_advertises_load_session_when_store_present() {
2192        let cwd = tempfile::tempdir().unwrap();
2193        let store =
2194            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
2195        let server = AcpServer::new_with_store(
2196            make_test_config(cwd.path()),
2197            AcpServerConfig::default(),
2198            store,
2199        );
2200        let result = server.handle_initialize(&serde_json::json!({})).unwrap();
2201        assert_eq!(result["agentCapabilities"]["loadSession"], true);
2202        assert_eq!(
2203            result["agentCapabilities"]["sessionCapabilities"]["resume"],
2204            serde_json::json!({})
2205        );
2206        assert_eq!(
2207            result["agentCapabilities"]["sessionCapabilities"]["close"],
2208            serde_json::json!({})
2209        );
2210    }
2211
2212    #[test]
2213    fn session_new_defaults_to_launch_cwd_when_client_omits_cwd() {
2214        let config = Config {
2215            data_dir: PathBuf::from("/not/the/project"),
2216            ..Default::default()
2217        };
2218        let server = AcpServer::new(config, AcpServerConfig::default());
2219        let expected = std::env::current_dir().unwrap();
2220
2221        assert_eq!(
2222            server.requested_session_cwd(&serde_json::json!({})),
2223            expected
2224        );
2225    }
2226
2227    #[test]
2228    fn session_new_respects_client_cwd_when_present() {
2229        let server = AcpServer::new(Config::default(), AcpServerConfig::default());
2230        let cwd = std::env::current_dir().unwrap();
2231
2232        assert_eq!(
2233            server.requested_session_cwd(&serde_json::json!({"cwd": cwd})),
2234            cwd
2235        );
2236    }
2237
2238    #[tokio::test]
2239    async fn session_new_does_not_wait_for_configured_mcp_servers() {
2240        let cwd = tempfile::tempdir().unwrap();
2241        let mut config = Config {
2242            data_dir: cwd.path().to_path_buf(),
2243            providers: {
2244                let mut p = zeroclaw_config::providers::Providers::default();
2245                p.models.openrouter.insert(
2246                    "default".to_string(),
2247                    zeroclaw_config::schema::OpenRouterModelProviderConfig {
2248                        base: zeroclaw_config::schema::ModelProviderConfig {
2249                            model: Some("test-model".to_string()),
2250                            ..Default::default()
2251                        },
2252                    },
2253                );
2254                p
2255            },
2256            mcp: zeroclaw_config::schema::McpConfig {
2257                enabled: true,
2258                servers: vec![zeroclaw_config::schema::McpServerConfig {
2259                    name: "slow".to_string(),
2260                    transport: zeroclaw_config::schema::McpTransport::Stdio,
2261                    command: "/bin/sh".to_string(),
2262                    args: vec!["-c".to_string(), "sleep 60".to_string()],
2263                    ..Default::default()
2264                }],
2265                ..Default::default()
2266            },
2267            ..Default::default()
2268        };
2269        config.risk_profiles.insert(
2270            "default".to_string(),
2271            zeroclaw_config::schema::RiskProfileConfig::default(),
2272        );
2273        config.agents.insert(
2274            "test-agent".to_string(),
2275            zeroclaw_config::schema::AliasedAgentConfig {
2276                model_provider: "openrouter.default".into(),
2277                risk_profile: "default".to_string(),
2278                ..Default::default()
2279            },
2280        );
2281        let server = AcpServer::new(config, AcpServerConfig::default());
2282
2283        let result = tokio::time::timeout(
2284            Duration::from_secs(2),
2285            server.handle_session_new(&serde_json::json!({
2286                "cwd": cwd.path().to_string_lossy(),
2287                "agentAlias": "test-agent",
2288                "mcpServers": []
2289            })),
2290        )
2291        .await
2292        .expect("session/new should not block on configured MCP startup")
2293        .expect("session/new should create a session");
2294
2295        assert!(result["sessionId"].as_str().is_some());
2296    }
2297
2298    #[tokio::test]
2299    async fn session_new_auto_selects_sole_configured_agent_when_alias_omitted() {
2300        let cwd = tempfile::tempdir().unwrap();
2301        let mut config = Config {
2302            data_dir: cwd.path().to_path_buf(),
2303            providers: {
2304                let mut p = zeroclaw_config::providers::Providers::default();
2305                p.models.openrouter.insert(
2306                    "default".to_string(),
2307                    zeroclaw_config::schema::OpenRouterModelProviderConfig {
2308                        base: zeroclaw_config::schema::ModelProviderConfig {
2309                            api_key: Some("test-key".to_string()),
2310                            model: Some("test-model".to_string()),
2311                            ..Default::default()
2312                        },
2313                    },
2314                );
2315                p
2316            },
2317            ..Default::default()
2318        };
2319        config.risk_profiles.insert(
2320            "default".to_string(),
2321            zeroclaw_config::schema::RiskProfileConfig::default(),
2322        );
2323        config.agents.insert(
2324            "only-agent".to_string(),
2325            zeroclaw_config::schema::AliasedAgentConfig {
2326                model_provider: "openrouter.default".into(),
2327                risk_profile: "default".to_string(),
2328                ..Default::default()
2329            },
2330        );
2331        let server = AcpServer::new(config, AcpServerConfig::default());
2332
2333        let result = tokio::time::timeout(
2334            Duration::from_secs(2),
2335            server.handle_session_new(&serde_json::json!({
2336                "cwd": cwd.path().to_string_lossy(),
2337                "mcpServers": []
2338            })),
2339        )
2340        .await
2341        .expect("session/new should not block")
2342        .expect("session/new should auto-select the sole configured agent");
2343
2344        assert!(result["sessionId"].as_str().is_some());
2345    }
2346
2347    #[tokio::test]
2348    async fn session_new_requires_alias_when_multiple_agents_configured() {
2349        let mut config = Config::default();
2350        config.agents.insert(
2351            "agent-one".to_string(),
2352            zeroclaw_config::schema::AliasedAgentConfig::default(),
2353        );
2354        config.agents.insert(
2355            "agent-two".to_string(),
2356            zeroclaw_config::schema::AliasedAgentConfig::default(),
2357        );
2358        let server = AcpServer::new(config, AcpServerConfig::default());
2359
2360        let err = server
2361            .handle_session_new(&serde_json::json!({"mcpServers": []}))
2362            .await
2363            .expect_err("session/new without agentAlias should fail when multiple agents exist");
2364
2365        assert_eq!(err.code, INVALID_PARAMS);
2366        assert!(
2367            err.message.contains("agentAlias"),
2368            "error should mention agentAlias, got: {}",
2369            err.message
2370        );
2371    }
2372
2373    #[tokio::test]
2374    async fn session_new_uses_config_default_agent_when_alias_omitted_and_multiple_agents() {
2375        let cwd = tempfile::tempdir().unwrap();
2376        let mut config = Config {
2377            data_dir: cwd.path().to_path_buf(),
2378            providers: {
2379                let mut p = zeroclaw_config::providers::Providers::default();
2380                p.models.openrouter.insert(
2381                    "default".to_string(),
2382                    zeroclaw_config::schema::OpenRouterModelProviderConfig {
2383                        base: zeroclaw_config::schema::ModelProviderConfig {
2384                            api_key: Some("test-key".to_string()),
2385                            model: Some("test-model".to_string()),
2386                            ..Default::default()
2387                        },
2388                    },
2389                );
2390                p
2391            },
2392            ..Default::default()
2393        };
2394        config.risk_profiles.insert(
2395            "default".to_string(),
2396            zeroclaw_config::schema::RiskProfileConfig::default(),
2397        );
2398        config.agents.insert(
2399            "agent-alpha".to_string(),
2400            zeroclaw_config::schema::AliasedAgentConfig {
2401                model_provider: "openrouter.default".into(),
2402                risk_profile: "default".to_string(),
2403                ..Default::default()
2404            },
2405        );
2406        config.agents.insert(
2407            "agent-beta".to_string(),
2408            zeroclaw_config::schema::AliasedAgentConfig {
2409                model_provider: "openrouter.default".into(),
2410                risk_profile: "default".to_string(),
2411                ..Default::default()
2412            },
2413        );
2414        config.acp.default_agent = Some("agent-alpha".to_string());
2415        let server = AcpServer::new(config, AcpServerConfig::default());
2416
2417        let result = tokio::time::timeout(
2418            Duration::from_secs(2),
2419            server.handle_session_new(&serde_json::json!({
2420                "cwd": cwd.path().to_string_lossy(),
2421                "mcpServers": []
2422            })),
2423        )
2424        .await
2425        .expect("should not block")
2426        .expect("should select agent-alpha from config.acp.default_agent");
2427
2428        assert!(result["sessionId"].as_str().is_some());
2429    }
2430
2431    #[tokio::test]
2432    async fn session_new_explicit_alias_overrides_config_default_agent() {
2433        let cwd = tempfile::tempdir().unwrap();
2434        let mut config = Config {
2435            data_dir: cwd.path().to_path_buf(),
2436            providers: {
2437                let mut p = zeroclaw_config::providers::Providers::default();
2438                p.models.openrouter.insert(
2439                    "default".to_string(),
2440                    zeroclaw_config::schema::OpenRouterModelProviderConfig {
2441                        base: zeroclaw_config::schema::ModelProviderConfig {
2442                            api_key: Some("test-key".to_string()),
2443                            model: Some("test-model".to_string()),
2444                            ..Default::default()
2445                        },
2446                    },
2447                );
2448                p
2449            },
2450            ..Default::default()
2451        };
2452        config.risk_profiles.insert(
2453            "default".to_string(),
2454            zeroclaw_config::schema::RiskProfileConfig::default(),
2455        );
2456        config.agents.insert(
2457            "agent-alpha".to_string(),
2458            zeroclaw_config::schema::AliasedAgentConfig {
2459                model_provider: "openrouter.default".into(),
2460                risk_profile: "default".to_string(),
2461                ..Default::default()
2462            },
2463        );
2464        config.agents.insert(
2465            "agent-beta".to_string(),
2466            zeroclaw_config::schema::AliasedAgentConfig {
2467                model_provider: "openrouter.default".into(),
2468                risk_profile: "default".to_string(),
2469                ..Default::default()
2470            },
2471        );
2472        config.acp.default_agent = Some("agent-alpha".to_string());
2473        let server = AcpServer::new(config, AcpServerConfig::default());
2474
2475        // Explicit alias should win over config default
2476        let result = tokio::time::timeout(
2477            Duration::from_secs(2),
2478            server.handle_session_new(&serde_json::json!({
2479                "agentAlias": "agent-beta",
2480                "cwd": cwd.path().to_string_lossy(),
2481                "mcpServers": []
2482            })),
2483        )
2484        .await
2485        .expect("should not block")
2486        .expect("should use agent-beta despite default_agent = agent-alpha");
2487
2488        assert!(result["sessionId"].as_str().is_some());
2489    }
2490
2491    #[test]
2492    fn json_rpc_error_response_serialize() {
2493        let resp = JsonRpcResponse {
2494            jsonrpc: "2.0",
2495            result: None,
2496            error: Some(JsonRpcError {
2497                code: METHOD_NOT_FOUND,
2498                message: "Method not found".to_string(),
2499                data: None,
2500            }),
2501            id: Value::Number(1.into()),
2502        };
2503        let json = serde_json::to_string(&resp).unwrap();
2504        let parsed: Value = serde_json::from_str(&json).unwrap();
2505        assert!(parsed.get("error").is_some());
2506        assert_eq!(parsed["error"]["code"], -32601);
2507        assert!(parsed.get("result").is_none());
2508    }
2509
2510    #[test]
2511    fn json_rpc_notification_serialize() {
2512        let notif = JsonRpcNotification {
2513            jsonrpc: "2.0",
2514            method: "session/update",
2515            params: serde_json::json!({
2516                "sessionId": "test-sid",
2517                "update": {
2518                    "sessionUpdate": "agent_message_chunk",
2519                    "content": { "type": "text", "text": "hello" }
2520                }
2521            }),
2522        };
2523        let json = serde_json::to_string(&notif).unwrap();
2524        assert!(json.contains(r#""method":"session/update""#));
2525        assert!(json.contains(r#""sessionUpdate":"agent_message_chunk""#));
2526        assert!(json.contains(r#""text":"hello""#));
2527    }
2528
2529    #[test]
2530    fn test_prompt_parsing() {
2531        // String prompt
2532        let string_params = serde_json::json!({"prompt": "hello world"});
2533        let result = AcpServer::parse_prompt(&string_params).unwrap();
2534        assert_eq!(result, "hello world");
2535
2536        // Array prompt (valid)
2537        let array_params = serde_json::json!({
2538            "prompt": [
2539                {"type": "text", "text": "part 1"},
2540                {"type": "text", "text": "part 2"}
2541            ]
2542        });
2543        let result = AcpServer::parse_prompt(&array_params).unwrap();
2544        assert_eq!(result, "part 1\n\npart 2");
2545
2546        // Array prompt (empty or no text)
2547        let empty_array_params = serde_json::json!({"prompt": []});
2548        let result = AcpServer::parse_prompt(&empty_array_params);
2549        assert!(result.is_err());
2550        assert_eq!(result.unwrap_err().code, INVALID_PARAMS);
2551
2552        let no_text_params = serde_json::json!({
2553            "prompt": [
2554                {"type": "image", "data": "..."}
2555            ]
2556        });
2557        let result = AcpServer::parse_prompt(&no_text_params);
2558        assert!(result.is_err());
2559
2560        // Array prompt with resource (file @-notation from ACP client)
2561        let resource_params = serde_json::json!({
2562            "prompt": [
2563                {"type": "text", "text": "analyze this file:"},
2564                {"type": "resource", "resource": {"uri": "file:///tmp/example.rs", "text": "fn main() { println!(\"hi\"); }", "mimeType": "text/rust"}}
2565            ]
2566        });
2567        let result = AcpServer::parse_prompt(&resource_params).unwrap();
2568        assert!(result.contains("analyze this file:"));
2569        assert!(result.contains("fn main() { println!(\"hi\"); }"));
2570    }
2571
2572    #[test]
2573    fn handle_initialize_default_model_absent_when_unconfigured() {
2574        let server = AcpServer::new(Config::default(), AcpServerConfig::default());
2575        let result = server.handle_initialize(&serde_json::json!({})).unwrap();
2576        assert!(
2577            result["_meta"]["zeroclaw"].get("defaultModel").is_none(),
2578            "defaultModel must be absent when no model_provider is configured, got: {}",
2579            result["_meta"]["zeroclaw"]["defaultModel"]
2580        );
2581    }
2582
2583    #[test]
2584    fn handle_initialize_default_model_reflects_configured_provider() {
2585        use zeroclaw_config::schema::{ModelProviderConfig, OllamaModelProviderConfig};
2586        let mut config = Config::default();
2587        config.providers.models.ollama.insert(
2588            "default".to_string(),
2589            OllamaModelProviderConfig {
2590                base: ModelProviderConfig {
2591                    model: Some("llama3.2".to_string()),
2592                    ..Default::default()
2593                },
2594                ..OllamaModelProviderConfig::default()
2595            },
2596        );
2597        let server = AcpServer::new(config, AcpServerConfig::default());
2598        let result = server.handle_initialize(&serde_json::json!({})).unwrap();
2599        assert_eq!(result["_meta"]["zeroclaw"]["defaultModel"], "llama3.2");
2600    }
2601
2602    #[test]
2603    fn prompt_result_preserves_content_string_shape() {
2604        let result = AcpServer::prompt_result("test-sid".to_string(), "end_turn", "hello".into());
2605        assert_eq!(result["sessionId"], "test-sid");
2606        assert_eq!(result["stopReason"], "end_turn");
2607        assert_eq!(result["content"], "hello");
2608    }
2609
2610    #[test]
2611    fn cancelled_prompt_result_preserves_content_string_shape() {
2612        let with_partial =
2613            AcpServer::cancelled_prompt_result("test-sid".to_string(), "partial text");
2614        assert_eq!(with_partial["sessionId"], "test-sid");
2615        assert_eq!(with_partial["stopReason"], "cancelled");
2616        assert_eq!(
2617            with_partial["content"],
2618            "partial text\n\n[interrupted by user]"
2619        );
2620
2621        let marker_only = AcpServer::cancelled_prompt_result("test-sid".to_string(), "");
2622        assert_eq!(marker_only["content"], "[interrupted by user]");
2623    }
2624
2625    #[test]
2626    fn test_tool_call_and_update_serialization() {
2627        // Test tool_call (initial pending event)
2628        let tool_call_notif = JsonRpcNotification {
2629            jsonrpc: "2.0",
2630            method: "session/update",
2631            params: serde_json::json!({
2632                "sessionId": "test-sid",
2633                "update": {
2634                    "sessionUpdate": "tool_call",
2635                    "toolCallId": "tc-12345",
2636                    "name": "shell",
2637                    "title": "shell",
2638                    "kind": "execute",
2639                    "rawInput": {"command": "ls -la"},
2640                    "status": "pending"
2641                }
2642            }),
2643        };
2644        let json1 = serde_json::to_string(&tool_call_notif).unwrap();
2645        assert!(json1.contains("\"sessionUpdate\":\"tool_call\""));
2646        assert!(json1.contains("\"toolCallId\":\"tc-12345\""));
2647        assert!(json1.contains("\"name\":\"shell\""));
2648        assert!(json1.contains("\"title\":\"shell\""));
2649        assert!(json1.contains("\"kind\":\"execute\""));
2650        assert!(json1.contains("\"status\":\"pending\""));
2651        assert!(json1.contains("\"rawInput\""));
2652
2653        // Test tool_call_update completion payload
2654        let tool_update_notif = JsonRpcNotification {
2655            jsonrpc: "2.0",
2656            method: "session/update",
2657            params: serde_json::json!({
2658                "sessionId": "test-sid",
2659                "update": {
2660                    "sessionUpdate": "tool_call_update",
2661                    "toolCallId": "tc-12345",
2662                    "name": "shell",
2663                    "title": "shell",
2664                    "kind": "execute",
2665                    "status": "completed",
2666                    "rawOutput": "file1.txt\nfile2.txt",
2667                    "body": "file1.txt\nfile2.txt",
2668                    "content": [{
2669                        "type": "content",
2670                        "content": {
2671                            "type": "text",
2672                            "text": "file1.txt\nfile2.txt"
2673                        }
2674                    }]
2675                }
2676            }),
2677        };
2678        let json2 = serde_json::to_string(&tool_update_notif).unwrap();
2679        assert!(json2.contains("\"sessionUpdate\":\"tool_call_update\""));
2680        assert!(json2.contains("\"toolCallId\":\"tc-12345\""));
2681        assert!(json2.contains("\"name\":\"shell\""));
2682        assert!(json2.contains("\"status\":\"completed\""));
2683        assert!(json2.contains("\"rawOutput\""));
2684        assert!(json2.contains("\"body\""));
2685        assert!(json2.contains("\"content\""));
2686        assert!(json2.contains("\"type\":\"content\""));
2687        assert!(json2.contains("file1.txt"));
2688        // Verify matching toolCallId across events
2689        assert!(json1.contains("tc-12345") && json2.contains("tc-12345"));
2690    }
2691
2692    #[test]
2693    fn file_edit_raw_input_uses_acp_diff_field_names() {
2694        let call = notification_for_turn_event(
2695            "sid",
2696            &TurnEvent::ToolCall {
2697                id: "tc-1".to_string(),
2698                name: "file_edit".to_string(),
2699                args: serde_json::json!({
2700                    "path": "src/foo.rs",
2701                    "old_string": "let x = 1;",
2702                    "new_string": "let x = 2;"
2703                }),
2704            },
2705        );
2706        let v = serde_json::to_value(call.unwrap()).unwrap();
2707        let raw = &v["params"]["update"]["rawInput"];
2708        assert_eq!(raw["path"], "src/foo.rs");
2709        assert_eq!(raw["oldText"], "let x = 1;");
2710        assert_eq!(raw["newText"], "let x = 2;");
2711        assert!(
2712            raw.get("old_string").is_none(),
2713            "old_string must not appear in rawInput"
2714        );
2715        assert!(
2716            raw.get("new_string").is_none(),
2717            "new_string must not appear in rawInput"
2718        );
2719
2720        let content = &v["params"]["update"]["content"];
2721        assert!(content.is_array(), "file_edit must emit a content array");
2722        let diff = &content[0];
2723        assert_eq!(diff["type"], "diff");
2724        assert_eq!(diff["path"], "src/foo.rs");
2725        assert_eq!(diff["oldText"], "let x = 1;");
2726        assert_eq!(diff["newText"], "let x = 2;");
2727    }
2728
2729    #[test]
2730    fn file_write_raw_input_uses_acp_diff_field_names() {
2731        let call = notification_for_turn_event(
2732            "sid",
2733            &TurnEvent::ToolCall {
2734                id: "tc-2".to_string(),
2735                name: "file_write".to_string(),
2736                args: serde_json::json!({
2737                    "path": "src/new.rs",
2738                    "content": "fn main() {}"
2739                }),
2740            },
2741        );
2742        let v = serde_json::to_value(call.unwrap()).unwrap();
2743        let raw = &v["params"]["update"]["rawInput"];
2744        assert_eq!(raw["path"], "src/new.rs");
2745        assert_eq!(raw["newText"], "fn main() {}");
2746        assert!(
2747            raw.get("oldText").is_none(),
2748            "oldText must not appear in file_write rawInput"
2749        );
2750        assert!(
2751            raw.get("content").is_none(),
2752            "content must not appear in rawInput"
2753        );
2754
2755        let content = &v["params"]["update"]["content"];
2756        assert!(content.is_array(), "file_write must emit a content array");
2757        let diff = &content[0];
2758        assert_eq!(diff["type"], "diff");
2759        assert_eq!(diff["path"], "src/new.rs");
2760        assert_eq!(diff["newText"], "fn main() {}");
2761        assert!(
2762            diff.get("oldText").is_none(),
2763            "oldText must be absent for file_write diff"
2764        );
2765    }
2766
2767    #[test]
2768    fn map_tool_kind_uses_explicit_tool_names() {
2769        assert_eq!(map_tool_kind("memory_forget"), "delete");
2770        assert_eq!(map_tool_kind("memory_purge"), "delete");
2771        assert_eq!(map_tool_kind("cron_run"), "execute");
2772        assert_eq!(map_tool_kind("file_read"), "other");
2773        assert_eq!(map_tool_kind("knowledge"), "other");
2774        assert_eq!(map_tool_kind("web_fetch"), "other");
2775        assert_eq!(map_tool_kind("file_write"), "edit");
2776        assert_eq!(map_tool_kind("unknown_tool"), "other");
2777    }
2778
2779    #[test]
2780    fn turn_tool_events_include_client_visible_tool_fields() {
2781        let call = notification_for_turn_event(
2782            "test-sid",
2783            &TurnEvent::ToolCall {
2784                id: "tc-12345".to_string(),
2785                name: "shell".to_string(),
2786                args: serde_json::json!({"command": "ls -la"}),
2787            },
2788        );
2789        let call_value =
2790            serde_json::to_value(call.expect("ToolCall maps to a notification")).unwrap();
2791        assert_eq!(call_value["method"], "session/update");
2792        assert_eq!(call_value["params"]["update"]["sessionUpdate"], "tool_call");
2793        assert_eq!(call_value["params"]["update"]["toolCallId"], "tc-12345");
2794        assert_eq!(call_value["params"]["update"]["name"], "shell");
2795        assert_eq!(call_value["params"]["update"]["title"], "shell");
2796        assert_eq!(call_value["params"]["update"]["kind"], "execute");
2797        assert_eq!(
2798            call_value["params"]["update"]["rawInput"],
2799            serde_json::json!({"command": "ls -la"})
2800        );
2801
2802        let result = notification_for_turn_event(
2803            "test-sid",
2804            &TurnEvent::ToolResult {
2805                id: "tc-12345".to_string(),
2806                name: "shell".to_string(),
2807                output: "file1.txt\nfile2.txt".to_string(),
2808            },
2809        );
2810        let result_value =
2811            serde_json::to_value(result.expect("ToolResult maps to a notification")).unwrap();
2812        assert_eq!(
2813            result_value["params"]["update"]["sessionUpdate"],
2814            "tool_call_update"
2815        );
2816        assert_eq!(result_value["params"]["update"]["toolCallId"], "tc-12345");
2817        assert_eq!(result_value["params"]["update"]["name"], "shell");
2818        assert_eq!(result_value["params"]["update"]["title"], "shell");
2819        assert_eq!(result_value["params"]["update"]["kind"], "execute");
2820        assert_eq!(result_value["params"]["update"]["status"], "completed");
2821        assert_eq!(
2822            result_value["params"]["update"]["rawOutput"],
2823            "file1.txt\nfile2.txt"
2824        );
2825        assert_eq!(
2826            result_value["params"]["update"]["body"],
2827            "file1.txt\nfile2.txt"
2828        );
2829        assert_eq!(
2830            result_value["params"]["update"]["content"][0]["content"]["text"],
2831            "file1.txt\nfile2.txt"
2832        );
2833    }
2834
2835    /// `session/stop` must succeed while a `session/prompt` turn is in flight.
2836    ///
2837    /// The session entry lives in the outer map for its entire lifetime.
2838    /// The inner `Arc<Mutex<Session>>` serialises access: the prompt turn holds
2839    /// the inner lock while running; `session/stop` removes the outer entry
2840    /// then waits for the inner lock before cleaning up.  It must never see
2841    /// SESSION_NOT_FOUND just because a turn happens to be running.
2842    #[tokio::test]
2843    async fn session_stop_finds_session_during_active_prompt_turn() {
2844        let cwd = tempfile::tempdir().unwrap();
2845        let mut config = Config {
2846            data_dir: cwd.path().to_path_buf(),
2847            providers: {
2848                let mut p = zeroclaw_config::providers::Providers::default();
2849                p.models.anthropic.insert(
2850                    "default".to_string(),
2851                    zeroclaw_config::schema::AnthropicModelProviderConfig {
2852                        base: zeroclaw_config::schema::ModelProviderConfig {
2853                            model: Some("claude-haiku-4-5".to_string()),
2854                            ..Default::default()
2855                        },
2856                    },
2857                );
2858                p
2859            },
2860            ..Default::default()
2861        };
2862        config.risk_profiles.insert(
2863            "default".to_string(),
2864            zeroclaw_config::schema::RiskProfileConfig::default(),
2865        );
2866        config.agents.insert(
2867            "test-agent".to_string(),
2868            zeroclaw_config::schema::AliasedAgentConfig {
2869                model_provider: "anthropic.default".into(),
2870                risk_profile: "default".to_string(),
2871                ..Default::default()
2872            },
2873        );
2874        let server = Arc::new(AcpServer::new(config, AcpServerConfig::default()));
2875
2876        // Create a real session via the normal path.
2877        let new_result = server
2878            .handle_session_new(&serde_json::json!({
2879                "cwd": cwd.path().to_string_lossy(),
2880                "agentAlias": "test-agent"
2881            }))
2882            .await
2883            .expect("session/new must succeed");
2884        let session_id = new_result["sessionId"].as_str().unwrap().to_string();
2885
2886        // Grab the inner lock to simulate an in-flight prompt turn.
2887        let session_arc = {
2888            let sessions = server.sessions.lock().await;
2889            sessions.get(&session_id).cloned().unwrap()
2890        };
2891        let _guard = session_arc.lock().await;
2892
2893        // session/stop should find the session in the outer map.  With the
2894        // inner lock held it blocks — confirm it does NOT immediately return
2895        // SESSION_NOT_FOUND.
2896        let server_clone = Arc::clone(&server);
2897        let sid_clone = session_id.clone();
2898        let stop_result = tokio::time::timeout(Duration::from_millis(100), async move {
2899            server_clone
2900                .handle_session_stop(&serde_json::json!({ "sessionId": sid_clone }))
2901                .await
2902        })
2903        .await;
2904
2905        match stop_result {
2906            Err(_timeout) => {} // expected — blocked waiting for the inner lock
2907            Ok(Ok(_)) => panic!("stop returned Ok without the lock being released"),
2908            Ok(Err(e)) => {
2909                assert_ne!(
2910                    e.code, SESSION_NOT_FOUND,
2911                    "session/stop must not return SESSION_NOT_FOUND while a turn is in flight"
2912                );
2913            }
2914        }
2915    }
2916
2917    #[tokio::test]
2918    async fn session_new_persists_to_store() {
2919        let cwd = tempfile::tempdir().unwrap();
2920        let store =
2921            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
2922        let server = Arc::new(AcpServer::new_with_store(
2923            make_test_config(cwd.path()),
2924            AcpServerConfig::default(),
2925            Arc::clone(&store),
2926        ));
2927
2928        let result = server
2929            .handle_session_new(&serde_json::json!({
2930                "cwd": cwd.path().to_string_lossy()
2931            }))
2932            .await
2933            .expect("session/new must succeed");
2934
2935        let session_id = result["sessionId"].as_str().unwrap();
2936
2937        // Session must appear in the store
2938        let data = store.load_session(session_id).unwrap();
2939        assert!(
2940            data.is_some(),
2941            "session/new must persist to AcpSessionStore"
2942        );
2943    }
2944
2945    #[tokio::test]
2946    async fn session_new_without_store_still_works() {
2947        let cwd = tempfile::tempdir().unwrap();
2948        let server = Arc::new(AcpServer::new(
2949            make_test_config(cwd.path()),
2950            AcpServerConfig::default(),
2951        ));
2952
2953        let result = server
2954            .handle_session_new(&serde_json::json!({
2955                "cwd": cwd.path().to_string_lossy()
2956            }))
2957            .await
2958            .expect("session/new must succeed without a store");
2959
2960        let session_id = result["sessionId"].as_str().unwrap();
2961        assert!(server.sessions.lock().await.contains_key(session_id));
2962    }
2963
2964    fn make_test_config(cwd: &std::path::Path) -> Config {
2965        let mut cfg = Config {
2966            data_dir: cwd.to_path_buf(),
2967            ..Default::default()
2968        };
2969        cfg.providers.models.anthropic.insert(
2970            "default".to_string(),
2971            zeroclaw_config::schema::AnthropicModelProviderConfig {
2972                base: zeroclaw_config::schema::ModelProviderConfig {
2973                    model: Some("claude-haiku-4-5".to_string()),
2974                    ..Default::default()
2975                },
2976            },
2977        );
2978        cfg.risk_profiles.insert(
2979            "default".to_string(),
2980            zeroclaw_config::schema::RiskProfileConfig::default(),
2981        );
2982        cfg.agents.insert(
2983            "test-agent".to_string(),
2984            zeroclaw_config::schema::AliasedAgentConfig {
2985                model_provider: "anthropic.default".into(),
2986                risk_profile: "default".to_string(),
2987                ..Default::default()
2988            },
2989        );
2990        cfg
2991    }
2992
2993    /// `session/cancel` on an idle session (no active turn) must succeed silently.
2994    #[tokio::test]
2995    async fn session_cancel_idle_session_is_noop() {
2996        let cwd = tempfile::tempdir().unwrap();
2997        let server = Arc::new(AcpServer::new(
2998            make_test_config(cwd.path()),
2999            AcpServerConfig::default(),
3000        ));
3001
3002        let new_result = server
3003            .handle_session_new(&serde_json::json!({
3004                "cwd": cwd.path().to_string_lossy(),
3005                "agentAlias": "test-agent"
3006            }))
3007            .await
3008            .expect("session/new must succeed");
3009        let session_id = new_result["sessionId"].as_str().unwrap().to_string();
3010
3011        // No active turn — cancel must not error.
3012        let result = server
3013            .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
3014            .await;
3015        assert!(result.is_ok(), "idle cancel must succeed: {result:?}");
3016    }
3017
3018    /// `session/cancel` for an unknown session ID must succeed silently (notification
3019    /// semantics: no response, no error propagation).
3020    #[tokio::test]
3021    async fn session_cancel_unknown_session_is_noop() {
3022        let cwd = tempfile::tempdir().unwrap();
3023        let server = Arc::new(AcpServer::new(
3024            make_test_config(cwd.path()),
3025            AcpServerConfig::default(),
3026        ));
3027
3028        let result = server
3029            .handle_session_cancel(&serde_json::json!({ "sessionId": "sess_does_not_exist" }))
3030            .await;
3031        assert!(
3032            result.is_ok(),
3033            "unknown-session cancel must succeed: {result:?}"
3034        );
3035    }
3036
3037    #[tokio::test]
3038    async fn session_cancel_accepts_snake_case_session_id() {
3039        let cwd = tempfile::tempdir().unwrap();
3040        let server = Arc::new(AcpServer::new(
3041            make_test_config(cwd.path()),
3042            AcpServerConfig::default(),
3043        ));
3044
3045        let session_id = "sess_snake_case_cancel";
3046        let active_token = tokio_util::sync::CancellationToken::new();
3047        server
3048            .register_cancel_token(session_id, active_token.clone())
3049            .expect("active turn should register token");
3050
3051        server
3052            .handle_session_cancel(&serde_json::json!({ "session_id": session_id }))
3053            .await
3054            .expect("snake_case session_id should cancel the active turn");
3055
3056        assert!(active_token.is_cancelled());
3057    }
3058
3059    /// A second prompt for the same session must fail before it can overwrite
3060    /// the active turn's cancellation token.
3061    #[tokio::test]
3062    async fn register_cancel_token_rejects_concurrent_prompt_for_session() {
3063        let cwd = tempfile::tempdir().unwrap();
3064        let server = Arc::new(AcpServer::new(
3065            make_test_config(cwd.path()),
3066            AcpServerConfig::default(),
3067        ));
3068
3069        let session_id = "sess_active_turn";
3070        let active_token = tokio_util::sync::CancellationToken::new();
3071        let queued_token = tokio_util::sync::CancellationToken::new();
3072
3073        server
3074            .register_cancel_token(session_id, active_token.clone())
3075            .expect("first prompt should register its token");
3076        let err = server
3077            .register_cancel_token(session_id, queued_token.clone())
3078            .expect_err("second prompt must not overwrite active token");
3079
3080        assert_eq!(err.code, SESSION_BUSY);
3081        assert!(
3082            err.message.contains("active prompt turn"),
3083            "error should explain why prompt was rejected: {}",
3084            err.message
3085        );
3086
3087        server
3088            .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
3089            .await
3090            .expect("cancel should still target active token");
3091
3092        assert!(active_token.is_cancelled());
3093        assert!(
3094            !queued_token.is_cancelled(),
3095            "rejected prompt's token must not become the active cancel target"
3096        );
3097    }
3098
3099    #[tokio::test]
3100    async fn session_prompt_rejects_concurrent_turn_before_agent_starts() {
3101        let cwd = tempfile::tempdir().unwrap();
3102        let server = Arc::new(AcpServer::new(
3103            make_test_config(cwd.path()),
3104            AcpServerConfig::default(),
3105        ));
3106
3107        let new_result = server
3108            .handle_session_new(&serde_json::json!({
3109                "cwd": cwd.path().to_string_lossy(),
3110                "agentAlias": "test-agent"
3111            }))
3112            .await
3113            .expect("session/new must succeed");
3114        let session_id = new_result["sessionId"].as_str().unwrap().to_string();
3115        let active_token = tokio_util::sync::CancellationToken::new();
3116        server
3117            .register_cancel_token(&session_id, active_token.clone())
3118            .expect("simulated active turn should register token");
3119
3120        let err = server
3121            .handle_session_prompt(
3122                &serde_json::json!({
3123                    "sessionId": session_id.clone(),
3124                    "prompt": "queued prompt"
3125                }),
3126                &serde_json::json!(2),
3127            )
3128            .await
3129            .expect_err("concurrent prompt must be rejected before model_provider work starts");
3130
3131        assert_eq!(err.code, SESSION_BUSY);
3132        server
3133            .handle_session_cancel(&serde_json::json!({ "sessionId": session_id }))
3134            .await
3135            .expect("cancel should still target the original active token");
3136        assert!(active_token.is_cancelled());
3137    }
3138
3139    /// Verify that inserting and removing a cancel token from the map works
3140    /// correctly. This tests map mechanics directly rather than the
3141    /// `handle_session_prompt` lifecycle, so a regression in the production
3142    /// path's cleanup wouldn't be caught by this test.
3143    #[tokio::test]
3144    async fn cancel_tokens_map_remove_works() {
3145        let cwd = tempfile::tempdir().unwrap();
3146        let config = Config {
3147            data_dir: cwd.path().to_path_buf(),
3148            ..Default::default()
3149        };
3150        let server = Arc::new(AcpServer::new(config, AcpServerConfig::default()));
3151
3152        // Insert and remove a token directly.
3153        let session_id = "sess_token_leak_test".to_string();
3154        let token = tokio_util::sync::CancellationToken::new();
3155        server
3156            .cancel_tokens
3157            .lock()
3158            .expect("cancel_tokens lock poisoned")
3159            .insert(session_id.clone(), token);
3160
3161        // Remove the token.
3162        server
3163            .cancel_tokens
3164            .lock()
3165            .expect("cancel_tokens lock poisoned")
3166            .remove(&session_id);
3167
3168        let remaining = server
3169            .cancel_tokens
3170            .lock()
3171            .expect("cancel_tokens lock poisoned")
3172            .len();
3173        assert_eq!(remaining, 0, "cancel token must be removed after turn ends");
3174    }
3175
3176    #[tokio::test]
3177    async fn session_load_restores_history_and_streams_notifications() {
3178        use zeroclaw_api::model_provider::{ChatMessage, ConversationMessage};
3179        let cwd = tempfile::tempdir().unwrap();
3180        let store =
3181            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3182
3183        let session_id = "sess-load-test";
3184        store
3185            .create_session(session_id, "test-agent", &cwd.path().to_string_lossy())
3186            .unwrap();
3187        store
3188            .append_turn(
3189                session_id,
3190                &[
3191                    ConversationMessage::Chat(ChatMessage::user("hello")),
3192                    ConversationMessage::Chat(ChatMessage::assistant("hi there")),
3193                ],
3194            )
3195            .unwrap();
3196
3197        let (writer_tx, mut writer_rx) = tokio::sync::mpsc::channel::<String>(64);
3198        let server = Arc::new(AcpServer::new_with_writer_and_store(
3199            make_test_config(cwd.path()),
3200            AcpServerConfig::default(),
3201            writer_tx,
3202            Arc::clone(&store),
3203        ));
3204
3205        let result = server
3206            .handle_session_load(&serde_json::json!({
3207                "sessionId": session_id,
3208                "cwd": cwd.path().to_string_lossy()
3209            }))
3210            .await
3211            .expect("session/load must succeed");
3212
3213        assert_eq!(result, serde_json::json!({}));
3214
3215        // Session must now be in the in-memory map
3216        assert!(server.sessions.lock().await.contains_key(session_id));
3217
3218        // Collect notifications (non-blocking drain)
3219        let mut notifications = Vec::new();
3220        while let Ok(msg) = writer_rx.try_recv() {
3221            notifications.push(msg);
3222        }
3223
3224        // Expect two session/update notifications: user then assistant
3225        assert_eq!(
3226            notifications.len(),
3227            2,
3228            "expected 2 notifications, got: {notifications:?}"
3229        );
3230        let n0: serde_json::Value = serde_json::from_str(&notifications[0]).unwrap();
3231        assert_eq!(
3232            n0["params"]["update"]["sessionUpdate"],
3233            "user_message_chunk"
3234        );
3235        assert_eq!(n0["params"]["update"]["content"]["text"], "hello");
3236        let n1: serde_json::Value = serde_json::from_str(&notifications[1]).unwrap();
3237        assert_eq!(
3238            n1["params"]["update"]["sessionUpdate"],
3239            "agent_message_chunk"
3240        );
3241        assert_eq!(n1["params"]["update"]["content"]["text"], "hi there");
3242    }
3243
3244    #[tokio::test]
3245    async fn session_load_returns_not_found_for_unknown_id() {
3246        let cwd = tempfile::tempdir().unwrap();
3247        let store =
3248            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3249        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3250        let server = AcpServer::new_with_writer_and_store(
3251            make_test_config(cwd.path()),
3252            AcpServerConfig::default(),
3253            writer_tx,
3254            store,
3255        );
3256
3257        let err = server
3258            .handle_session_load(&serde_json::json!({ "sessionId": "ghost" }))
3259            .await
3260            .expect_err("unknown session must fail");
3261
3262        assert_eq!(err.code, SESSION_NOT_FOUND);
3263    }
3264
3265    #[tokio::test]
3266    async fn session_load_rejects_already_active_session() {
3267        let cwd = tempfile::tempdir().unwrap();
3268        let store =
3269            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3270        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3271        let server = Arc::new(AcpServer::new_with_writer_and_store(
3272            make_test_config(cwd.path()),
3273            AcpServerConfig::default(),
3274            writer_tx,
3275            Arc::clone(&store),
3276        ));
3277
3278        // Create and load the session once to put it in memory
3279        let session_id = "sess-already-active";
3280        store
3281            .create_session(session_id, "test-agent", &cwd.path().to_string_lossy())
3282            .unwrap();
3283        server
3284            .handle_session_load(&serde_json::json!({
3285                "sessionId": session_id,
3286                "cwd": cwd.path().to_string_lossy()
3287            }))
3288            .await
3289            .unwrap();
3290
3291        // Second load must be rejected
3292        let err = server
3293            .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3294            .await
3295            .expect_err("session/load for active session must fail");
3296
3297        assert_eq!(err.code, INVALID_PARAMS);
3298    }
3299
3300    #[tokio::test]
3301    async fn session_resume_restores_without_replay() {
3302        use zeroclaw_api::model_provider::{ChatMessage, ConversationMessage};
3303        let cwd = tempfile::tempdir().unwrap();
3304        let store =
3305            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3306
3307        let session_id = "sess-resume-test";
3308        store
3309            .create_session(session_id, "test-agent", &cwd.path().to_string_lossy())
3310            .unwrap();
3311        store
3312            .append_turn(
3313                session_id,
3314                &[ConversationMessage::Chat(ChatMessage::user("hello"))],
3315            )
3316            .unwrap();
3317
3318        let (writer_tx, mut writer_rx) = tokio::sync::mpsc::channel::<String>(64);
3319        let server = Arc::new(AcpServer::new_with_writer_and_store(
3320            make_test_config(cwd.path()),
3321            AcpServerConfig::default(),
3322            writer_tx,
3323            Arc::clone(&store),
3324        ));
3325
3326        let result = server
3327            .handle_session_resume(&serde_json::json!({
3328                "sessionId": session_id,
3329                "cwd": cwd.path().to_string_lossy()
3330            }))
3331            .await
3332            .expect("session/resume must succeed");
3333
3334        // Result is empty object
3335        assert_eq!(result, serde_json::json!({}));
3336
3337        // Session must be in memory
3338        assert!(server.sessions.lock().await.contains_key(session_id));
3339
3340        // No notifications must have been emitted
3341        assert!(
3342            writer_rx.try_recv().is_err(),
3343            "session/resume must not emit session/update notifications"
3344        );
3345    }
3346
3347    #[tokio::test]
3348    async fn session_close_releases_memory_but_keeps_store_record() {
3349        let cwd = tempfile::tempdir().unwrap();
3350        let store =
3351            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3352        let server = Arc::new(AcpServer::new_with_store(
3353            make_test_config(cwd.path()),
3354            AcpServerConfig::default(),
3355            Arc::clone(&store),
3356        ));
3357
3358        let new_result = server
3359            .handle_session_new(&serde_json::json!({
3360                "cwd": cwd.path().to_string_lossy()
3361            }))
3362            .await
3363            .expect("session/new must succeed");
3364        let session_id = new_result["sessionId"].as_str().unwrap().to_string();
3365
3366        assert!(server.sessions.lock().await.contains_key(&session_id));
3367
3368        let result = server
3369            .handle_session_close(&serde_json::json!({ "sessionId": &session_id }))
3370            .await
3371            .expect("session/close must succeed");
3372
3373        assert_eq!(result, serde_json::json!({}));
3374
3375        // Session gone from in-memory map
3376        assert!(!server.sessions.lock().await.contains_key(&session_id));
3377
3378        // Session record still on disk
3379        let data = store.load_session(&session_id).unwrap();
3380        assert!(
3381            data.is_some(),
3382            "session/close must not delete the DB record"
3383        );
3384    }
3385
3386    #[tokio::test]
3387    async fn session_close_returns_not_found_for_unknown_session() {
3388        let cwd = tempfile::tempdir().unwrap();
3389        let server = AcpServer::new(make_test_config(cwd.path()), AcpServerConfig::default());
3390
3391        let err = server
3392            .handle_session_close(&serde_json::json!({ "sessionId": "ghost" }))
3393            .await
3394            .expect_err("unknown session must fail");
3395
3396        assert_eq!(err.code, SESSION_NOT_FOUND);
3397    }
3398
3399    /// `session/load` must return SESSION_LIMIT_REACHED when `max_sessions` is
3400    /// already reached by an active session created via `session/new`.
3401    #[tokio::test]
3402    async fn session_load_respects_max_sessions() {
3403        let cwd = tempfile::tempdir().unwrap();
3404        let store =
3405            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3406
3407        // Pre-create a stored session that we'll attempt to load
3408        let stored_id = "sess-load-limit-test";
3409        store
3410            .create_session(stored_id, "test-agent", &cwd.path().to_string_lossy())
3411            .unwrap();
3412
3413        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3414        let server = Arc::new(AcpServer::new_with_writer_and_store(
3415            make_test_config(cwd.path()),
3416            AcpServerConfig {
3417                max_sessions: 1,
3418                ..AcpServerConfig::default()
3419            },
3420            writer_tx,
3421            Arc::clone(&store),
3422        ));
3423
3424        // Fill the one available slot via session/new
3425        server
3426            .handle_session_new(&serde_json::json!({
3427                "cwd": cwd.path().to_string_lossy()
3428            }))
3429            .await
3430            .expect("session/new must succeed when under limit");
3431
3432        // Now session/load for the stored session must fail with SESSION_LIMIT_REACHED
3433        let err = server
3434            .handle_session_load(&serde_json::json!({ "sessionId": stored_id }))
3435            .await
3436            .expect_err("session/load must fail when max_sessions reached");
3437
3438        assert_eq!(
3439            err.code, SESSION_LIMIT_REACHED,
3440            "expected SESSION_LIMIT_REACHED, got: {:?}",
3441            err
3442        );
3443    }
3444
3445    /// `session/resume` must return SESSION_LIMIT_REACHED when `max_sessions` is
3446    /// already reached by an active session created via `session/new`.
3447    #[tokio::test]
3448    async fn session_resume_respects_max_sessions() {
3449        let cwd = tempfile::tempdir().unwrap();
3450        let store =
3451            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3452
3453        // Pre-create a stored session that we'll attempt to resume
3454        let stored_id = "sess-resume-limit-test";
3455        store
3456            .create_session(stored_id, "test-agent", &cwd.path().to_string_lossy())
3457            .unwrap();
3458
3459        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3460        let server = Arc::new(AcpServer::new_with_writer_and_store(
3461            make_test_config(cwd.path()),
3462            AcpServerConfig {
3463                max_sessions: 1,
3464                ..AcpServerConfig::default()
3465            },
3466            writer_tx,
3467            Arc::clone(&store),
3468        ));
3469
3470        // Fill the one available slot via session/new
3471        server
3472            .handle_session_new(&serde_json::json!({
3473                "cwd": cwd.path().to_string_lossy()
3474            }))
3475            .await
3476            .expect("session/new must succeed when under limit");
3477
3478        // Now session/resume for the stored session must fail with SESSION_LIMIT_REACHED
3479        let err = server
3480            .handle_session_resume(&serde_json::json!({ "sessionId": stored_id }))
3481            .await
3482            .expect_err("session/resume must fail when max_sessions reached");
3483
3484        assert_eq!(
3485            err.code, SESSION_LIMIT_REACHED,
3486            "expected SESSION_LIMIT_REACHED, got: {:?}",
3487            err
3488        );
3489    }
3490
3491    /// A SQLite error during `store.load_session` must release the `loading_sessions`
3492    /// reservation so a subsequent restore attempt is not permanently blocked.
3493    #[tokio::test]
3494    async fn session_load_releases_reservation_on_store_error() {
3495        let cwd = tempfile::tempdir().unwrap();
3496        let store =
3497            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3498
3499        let session_id = "sess-load-store-err";
3500        store
3501            .create_session(session_id, "test-agent", &cwd.path().to_string_lossy())
3502            .unwrap();
3503
3504        // Drop the schema via a second connection to force a "no such table"
3505        // error on the store's next query_row call.
3506        let db_path = cwd.path().join("sessions/acp-sessions.db");
3507        {
3508            let second =
3509                rusqlite::Connection::open(&db_path).expect("second conn must open same db");
3510            second
3511                .execute_batch(
3512                    "DROP TABLE IF EXISTS acp_messages; DROP TABLE IF EXISTS acp_sessions;",
3513                )
3514                .expect("schema drop must succeed on second conn");
3515        }
3516
3517        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3518        let server = Arc::new(AcpServer::new_with_writer_and_store(
3519            make_test_config(cwd.path()),
3520            AcpServerConfig::default(),
3521            writer_tx,
3522            Arc::clone(&store),
3523        ));
3524
3525        // First call: must fail with INTERNAL_ERROR (SQLite "no such table").
3526        let first_err = server
3527            .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3528            .await
3529            .expect_err("session/load must fail when store returns Err");
3530        assert_eq!(
3531            first_err.code, INTERNAL_ERROR,
3532            "expected INTERNAL_ERROR from store failure, got: {:?}",
3533            first_err
3534        );
3535
3536        // Second call for the same session: must also fail with INTERNAL_ERROR,
3537        // NOT with INVALID_PARAMS ("already active"). A leaked reservation would
3538        // cause INVALID_PARAMS, proving the slot was never released.
3539        let second_err = server
3540            .handle_session_load(&serde_json::json!({ "sessionId": session_id }))
3541            .await
3542            .expect_err("second session/load must also fail");
3543        assert_eq!(
3544            second_err.code, INTERNAL_ERROR,
3545            "second load must fail with INTERNAL_ERROR, not INVALID_PARAMS (leaked slot); got: {:?}",
3546            second_err
3547        );
3548    }
3549
3550    /// Same coverage as `session_load_releases_reservation_on_store_error` but
3551    /// for the `session/resume` path.
3552    #[tokio::test]
3553    async fn session_resume_releases_reservation_on_store_error() {
3554        let cwd = tempfile::tempdir().unwrap();
3555        let store =
3556            Arc::new(zeroclaw_infra::acp_session_store::AcpSessionStore::new(cwd.path()).unwrap());
3557
3558        let session_id = "sess-resume-store-err";
3559        store
3560            .create_session(session_id, "test-agent", &cwd.path().to_string_lossy())
3561            .unwrap();
3562
3563        let db_path = cwd.path().join("sessions/acp-sessions.db");
3564        {
3565            let second =
3566                rusqlite::Connection::open(&db_path).expect("second conn must open same db");
3567            second
3568                .execute_batch(
3569                    "DROP TABLE IF EXISTS acp_messages; DROP TABLE IF EXISTS acp_sessions;",
3570                )
3571                .expect("schema drop must succeed on second conn");
3572        }
3573
3574        let (writer_tx, _rx) = tokio::sync::mpsc::channel::<String>(8);
3575        let server = Arc::new(AcpServer::new_with_writer_and_store(
3576            make_test_config(cwd.path()),
3577            AcpServerConfig::default(),
3578            writer_tx,
3579            Arc::clone(&store),
3580        ));
3581
3582        let first_err = server
3583            .handle_session_resume(&serde_json::json!({ "sessionId": session_id }))
3584            .await
3585            .expect_err("session/resume must fail when store returns Err");
3586        assert_eq!(
3587            first_err.code, INTERNAL_ERROR,
3588            "expected INTERNAL_ERROR from store failure, got: {:?}",
3589            first_err
3590        );
3591
3592        let second_err = server
3593            .handle_session_resume(&serde_json::json!({ "sessionId": session_id }))
3594            .await
3595            .expect_err("second session/resume must also fail");
3596        assert_eq!(
3597            second_err.code, INTERNAL_ERROR,
3598            "second resume must fail with INTERNAL_ERROR, not INVALID_PARAMS (leaked slot); got: {:?}",
3599            second_err
3600        );
3601    }
3602}