Skip to main content

zeroclaw_infra/
acp_session_store.rs

1//! ACP session persistence.
2//!
3//! Storage shape:
4//!
5//! ```text
6//! acp_sessions
7//!   ├── acp_messages (FK by integer id)
8//!   │     └── acp_tool_calls (FK by integer id, two rows per call:
9//!   │                          one event_kind='in', one 'out')
10//!   └── acp_session_events
11//! ```
12//!
13//! Token tracking: `acp_sessions.token_count` holds the most recently
14//! provider-reported `input_tokens` (total prompt size). Replace-on-write
15//! after every turn. The TUI ctx bar reads this on resume.
16//!
17//! Enums (Rust-side): `ToolEventKind` is internal to this module — callers
18//! invoke `append_tool_call_in` / `append_tool_call_out` as distinct methods
19//! and never see the enum. `Action` and `EventOutcome` from `zeroclaw_log`
20//! are the canonical taxonomies for `acp_session_events`.
21
22use anyhow::{Context, Result};
23use chrono::{DateTime, Utc};
24use parking_lot::Mutex;
25use rusqlite::{Connection, params};
26use std::path::Path;
27use zeroclaw_api::model_provider::{ChatMessage, ConversationMessage, ToolCall, ToolResultMessage};
28use zeroclaw_log::{Action, EventOutcome};
29
30/// Internal discriminator for `acp_tool_calls.event_kind`. The 'in' row
31/// records the call args; the 'out' row records the result. Two append-only
32/// rows per call, correlated by the provider-issued `tool_call_id`.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34enum ToolEventKind {
35    In,
36    Out,
37}
38
39impl ToolEventKind {
40    fn as_str(self) -> &'static str {
41        match self {
42            Self::In => "in",
43            Self::Out => "out",
44        }
45    }
46}
47
48pub struct AcpSessionStore {
49    conn: Mutex<Connection>,
50}
51
52pub struct AcpSessionData {
53    pub session_uuid: String,
54    pub agent_alias: String,
55    pub workspace_dir: String,
56    pub token_count: u64,
57    pub created_at: DateTime<Utc>,
58    pub last_activity: DateTime<Utc>,
59    pub messages: Vec<ConversationMessage>,
60}
61
62pub enum AcpSessionRestore {
63    Missing,
64    Killed,
65    Restorable(AcpSessionData),
66}
67
68/// Lightweight summary for the ACP session picker. Avoids loading the full
69/// message history just to render a one-line label per session.
70pub struct AcpSessionSummary {
71    pub session_uuid: String,
72    pub agent_alias: String,
73    pub workspace_dir: String,
74    pub token_count: u64,
75    pub created_at: DateTime<Utc>,
76    pub last_activity: DateTime<Utc>,
77    pub message_count: usize,
78}
79
80impl AcpSessionStore {
81    pub fn new(workspace_dir: &Path) -> Result<Self> {
82        let sessions_dir = workspace_dir.join("sessions");
83        std::fs::create_dir_all(&sessions_dir).context("Failed to create sessions directory")?;
84        let db_path = sessions_dir.join("acp-sessions.db");
85
86        let conn = Connection::open(&db_path)
87            .with_context(|| format!("Failed to open ACP session DB: {}", db_path.display()))?;
88
89        conn.execute_batch(
90            "PRAGMA journal_mode = WAL;
91             PRAGMA synchronous = NORMAL;
92             PRAGMA busy_timeout = 5000;
93             PRAGMA foreign_keys = ON;
94             PRAGMA temp_store = MEMORY;",
95        )
96        .context("Failed to configure ACP session DB pragmas")?;
97
98        // Schema is create-if-missing: ACP sessions are long-lived user data
99        // and must survive daemon restarts. Never drop existing tables here.
100        conn.execute_batch(
101            "CREATE TABLE IF NOT EXISTS acp_sessions (
102                 id            INTEGER PRIMARY KEY AUTOINCREMENT,
103                 session_uuid  TEXT NOT NULL UNIQUE,
104                 agent_alias   TEXT NOT NULL,
105                 workspace_dir TEXT NOT NULL,
106                 token_count   INTEGER NOT NULL DEFAULT 0,
107                 killed_at     TEXT,
108                 created_at    TEXT NOT NULL,
109                 last_activity TEXT NOT NULL
110             );
111             CREATE INDEX IF NOT EXISTS idx_acp_sessions_uuid  ON acp_sessions(session_uuid);
112             CREATE INDEX IF NOT EXISTS idx_acp_sessions_alias ON acp_sessions(agent_alias);
113
114             CREATE TABLE IF NOT EXISTS acp_messages (
115                 id                INTEGER PRIMARY KEY AUTOINCREMENT,
116                 session_id        INTEGER NOT NULL REFERENCES acp_sessions(id) ON DELETE CASCADE,
117                 role              TEXT NOT NULL,
118                 content           TEXT NOT NULL,
119                 reasoning_content TEXT,
120                 created_at        TEXT NOT NULL
121             );
122             CREATE INDEX IF NOT EXISTS idx_acp_messages_session ON acp_messages(session_id, id);
123
124             CREATE TABLE IF NOT EXISTS acp_tool_calls (
125                 id           INTEGER PRIMARY KEY AUTOINCREMENT,
126                 message_id   INTEGER NOT NULL REFERENCES acp_messages(id) ON DELETE CASCADE,
127                 tool_call_id TEXT NOT NULL,
128                 tool_name    TEXT NOT NULL,
129                 event_kind   TEXT NOT NULL,
130                 payload      TEXT NOT NULL,
131                 outcome      TEXT,
132                 created_at   TEXT NOT NULL
133             );
134             CREATE INDEX IF NOT EXISTS idx_acp_tool_calls_message ON acp_tool_calls(message_id, id);
135             CREATE INDEX IF NOT EXISTS idx_acp_tool_calls_lookup  ON acp_tool_calls(tool_call_id);
136
137             CREATE TABLE IF NOT EXISTS acp_session_events (
138                 id         INTEGER PRIMARY KEY AUTOINCREMENT,
139                 session_id INTEGER NOT NULL REFERENCES acp_sessions(id) ON DELETE CASCADE,
140                 action     TEXT NOT NULL,
141                 outcome    TEXT NOT NULL,
142                 payload    TEXT,
143                 created_at TEXT NOT NULL
144             );
145             CREATE INDEX IF NOT EXISTS idx_acp_session_events_session ON acp_session_events(session_id, id);",
146        )
147        .context("Failed to create ACP session schema")?;
148
149        Self::ensure_killed_at_column(&conn)
150            .context("Failed to migrate ACP session killed marker")?;
151
152        Ok(Self {
153            conn: Mutex::new(conn),
154        })
155    }
156
157    fn ensure_killed_at_column(conn: &Connection) -> Result<()> {
158        let mut stmt = conn
159            .prepare("PRAGMA table_info(acp_sessions)")
160            .context("Failed to inspect ACP session schema")?;
161        let mut rows = stmt
162            .query([])
163            .context("Failed to read ACP session schema")?;
164        while let Some(row) = rows
165            .next()
166            .context("Failed to read ACP session schema row")?
167        {
168            let column: String = row
169                .get(1)
170                .context("Failed to read ACP session column name")?;
171            if column == "killed_at" {
172                return Ok(());
173            }
174        }
175        drop(rows);
176        drop(stmt);
177
178        match conn.execute("ALTER TABLE acp_sessions ADD COLUMN killed_at TEXT", []) {
179            Ok(_) => Ok(()),
180            Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
181                if msg.contains("duplicate column name") =>
182            {
183                Ok(())
184            }
185            Err(e) => Err(e).context("Failed to add ACP session killed marker"),
186        }
187    }
188
189    /// Record a new session. Returns the integer `id` assigned by SQLite.
190    pub fn create_session(
191        &self,
192        session_uuid: &str,
193        agent_alias: &str,
194        workspace_dir: &str,
195    ) -> Result<i64> {
196        let now = Utc::now().to_rfc3339();
197        let conn = self.conn.lock();
198        conn.execute(
199            "INSERT INTO acp_sessions
200               (session_uuid, agent_alias, workspace_dir, token_count, created_at, last_activity)
201             VALUES (?1, ?2, ?3, 0, ?4, ?4)",
202            params![session_uuid, agent_alias, workspace_dir, now],
203        )
204        .context("Failed to create ACP session")?;
205        Ok(conn.last_insert_rowid())
206    }
207
208    /// Load session metadata and full message history for restore.
209    /// Returns `None` if the session_uuid is not found.
210    pub fn load_session(&self, session_uuid: &str) -> Result<Option<AcpSessionData>> {
211        let conn = self.conn.lock();
212
213        let row = conn.query_row(
214            "SELECT id, agent_alias, workspace_dir, token_count, created_at, last_activity
215             FROM acp_sessions WHERE session_uuid = ?1",
216            params![session_uuid],
217            |row| {
218                Ok((
219                    row.get::<_, i64>(0)?,
220                    row.get::<_, String>(1)?,
221                    row.get::<_, String>(2)?,
222                    row.get::<_, i64>(3)?,
223                    row.get::<_, String>(4)?,
224                    row.get::<_, String>(5)?,
225                ))
226            },
227        );
228
229        let (session_id, agent_alias, workspace_dir, token_count, created_at_s, last_activity_s) =
230            match row {
231                Ok(r) => r,
232                Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
233                Err(e) => return Err(e).context("Failed to query ACP session"),
234            };
235
236        let created_at = parse_ts(&created_at_s, "created_at", session_uuid);
237        let last_activity = parse_ts(&last_activity_s, "last_activity", session_uuid);
238
239        // Reconstruct ConversationMessages by walking acp_messages in id order
240        // and, for each assistant row, joining its tool calls from acp_tool_calls
241        // (event_kind='in' for the call args).
242        //
243        // Tool results land as their own ConversationMessage::ToolResults at the
244        // position of the LAST 'out' row in the result-batch. The replay strategy
245        // is: every contiguous run of 'out' rows for a given message_id becomes
246        // one ToolResults message inserted between the assistant's
247        // AssistantToolCalls and the next message.
248        let messages = Self::load_messages(&conn, session_id)?;
249
250        Ok(Some(AcpSessionData {
251            session_uuid: session_uuid.to_string(),
252            agent_alias,
253            workspace_dir,
254            token_count: token_count.max(0) as u64,
255            created_at,
256            last_activity,
257            messages,
258        }))
259    }
260
261    /// Load only durable ACP rows that are allowed to become live sessions.
262    /// Killed rows keep their transcript for history/export but are terminal
263    /// for runtime restore paths.
264    pub fn load_session_for_restore(&self, session_uuid: &str) -> Result<AcpSessionRestore> {
265        if self.is_session_killed(session_uuid)? {
266            return Ok(AcpSessionRestore::Killed);
267        }
268
269        match self.load_session(session_uuid)? {
270            Some(data) => Ok(AcpSessionRestore::Restorable(data)),
271            None => Ok(AcpSessionRestore::Missing),
272        }
273    }
274
275    /// List all sessions as lightweight summaries, ordered by most recent
276    /// activity first. This is the picker-facing read: it avoids the full
277    /// message-history hydration that `load_session` performs.
278    pub fn list_sessions(&self) -> Result<Vec<AcpSessionSummary>> {
279        let conn = self.conn.lock();
280        let mut stmt = conn
281            .prepare(
282                "SELECT s.session_uuid,
283                        s.agent_alias,
284                        s.workspace_dir,
285                        s.token_count,
286                        s.created_at,
287                        s.last_activity,
288                        (SELECT COUNT(*) FROM acp_messages m WHERE m.session_id = s.id) AS message_count
289                 FROM acp_sessions s
290                 ORDER BY s.last_activity DESC",
291            )
292            .context("Failed to prepare ACP session list query")?;
293
294        let rows = stmt
295            .query_map([], |row| {
296                Ok((
297                    row.get::<_, String>(0)?,
298                    row.get::<_, String>(1)?,
299                    row.get::<_, String>(2)?,
300                    row.get::<_, i64>(3)?,
301                    row.get::<_, String>(4)?,
302                    row.get::<_, String>(5)?,
303                    row.get::<_, i64>(6)?,
304                ))
305            })
306            .context("Failed to query ACP sessions")?;
307
308        let mut out = Vec::new();
309        for row in rows {
310            let (
311                session_uuid,
312                agent_alias,
313                workspace_dir,
314                token_count,
315                created_s,
316                activity_s,
317                msg_count,
318            ) = row.context("Failed to read ACP session row")?;
319            out.push(AcpSessionSummary {
320                created_at: parse_ts(&created_s, "created_at", &session_uuid),
321                last_activity: parse_ts(&activity_s, "last_activity", &session_uuid),
322                session_uuid,
323                agent_alias,
324                workspace_dir,
325                token_count: token_count.max(0) as u64,
326                message_count: msg_count.max(0) as usize,
327            });
328        }
329        Ok(out)
330    }
331
332    fn load_messages(conn: &Connection, session_id: i64) -> Result<Vec<ConversationMessage>> {
333        // Pull all message rows.
334        let mut msg_stmt = conn
335            .prepare(
336                "SELECT id, role, content, reasoning_content
337                 FROM acp_messages WHERE session_id = ?1 ORDER BY id ASC",
338            )
339            .context("Failed to prepare message query")?;
340
341        let msg_rows: Vec<(i64, String, String, Option<String>)> = msg_stmt
342            .query_map(params![session_id], |row| {
343                Ok((
344                    row.get::<_, i64>(0)?,
345                    row.get::<_, String>(1)?,
346                    row.get::<_, String>(2)?,
347                    row.get::<_, Option<String>>(3)?,
348                ))
349            })?
350            .collect::<Result<Vec<_>, _>>()
351            .context("Failed to read message rows")?;
352
353        // For each message row, pull its tool_calls (event_kind='in') and
354        // tool_results (event_kind='out') in id order.
355        let mut tc_stmt = conn
356            .prepare(
357                "SELECT tool_call_id, tool_name, event_kind, payload
358                 FROM acp_tool_calls WHERE message_id = ?1 ORDER BY id ASC",
359            )
360            .context("Failed to prepare tool_call query")?;
361
362        let mut out = Vec::with_capacity(msg_rows.len());
363        for (msg_id, role, content, reasoning_content) in msg_rows {
364            // Split this message's tool_calls into ins and outs preserving order.
365            let mut ins: Vec<ToolCall> = Vec::new();
366            let mut outs: Vec<ToolResultMessage> = Vec::new();
367            let rows = tc_stmt
368                .query_map(params![msg_id], |row| {
369                    Ok((
370                        row.get::<_, String>(0)?,
371                        row.get::<_, String>(1)?,
372                        row.get::<_, String>(2)?,
373                        row.get::<_, String>(3)?,
374                    ))
375                })?
376                .collect::<Result<Vec<_>, _>>()
377                .context("Failed to read tool_call rows")?;
378            for (tool_call_id, tool_name, event_kind, payload) in rows {
379                match event_kind.as_str() {
380                    "in" => ins.push(ToolCall {
381                        id: tool_call_id,
382                        name: tool_name,
383                        arguments: payload,
384                        extra_content: None,
385                    }),
386                    "out" => outs.push(ToolResultMessage {
387                        tool_call_id,
388                        content: payload,
389                    }),
390                    other => {
391                        ::zeroclaw_log::record!(
392                            ERROR,
393                            ::zeroclaw_log::Event::new(
394                                module_path!(),
395                                ::zeroclaw_log::Action::Read,
396                            )
397                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
398                            .with_attrs(::serde_json::json!({
399                                "session_id": session_id,
400                                "message_id": msg_id,
401                                "event_kind": other,
402                            })),
403                            "unknown event_kind in acp_tool_calls"
404                        );
405                        return Err(anyhow::Error::msg(format!(
406                            "unknown event_kind '{other}' in acp_tool_calls for message_id {msg_id}"
407                        )));
408                    }
409                }
410            }
411
412            if ins.is_empty() && outs.is_empty() {
413                // Pure chat message.
414                out.push(ConversationMessage::Chat(ChatMessage { role, content }));
415            } else {
416                if !ins.is_empty() {
417                    // Assistant turn that issued tool calls. The text may be empty.
418                    out.push(ConversationMessage::AssistantToolCalls {
419                        text: if content.is_empty() {
420                            None
421                        } else {
422                            Some(content)
423                        },
424                        tool_calls: ins,
425                        reasoning_content,
426                    });
427                }
428                if !outs.is_empty() {
429                    out.push(ConversationMessage::ToolResults(outs));
430                }
431            }
432        }
433
434        Ok(out)
435    }
436
437    /// Append all ConversationMessages from one completed turn, decomposing
438    /// AssistantToolCalls / ToolResults variants into the appropriate tables.
439    /// Single transaction.
440    pub fn append_turn(&self, session_uuid: &str, messages: &[ConversationMessage]) -> Result<()> {
441        if messages.is_empty() {
442            return Ok(());
443        }
444
445        let now = Utc::now().to_rfc3339();
446        let mut conn = self.conn.lock();
447
448        // Resolve the integer session_id once. Fail loudly if the UUID is
449        // unknown — we want an error here, not orphaned inserts.
450        let session_id: i64 = conn
451            .query_row(
452                "SELECT id FROM acp_sessions WHERE session_uuid = ?1",
453                params![session_uuid],
454                |row| row.get(0),
455            )
456            .with_context(|| format!("unknown session_uuid: {session_uuid}"))?;
457
458        let tx = conn
459            .transaction()
460            .context("Failed to begin append_turn transaction")?;
461
462        // Track the most recent assistant message_id so a following
463        // ToolResults variant can attach its 'out' rows back to it.
464        let mut last_assistant_msg_id: Option<i64> = None;
465
466        for msg in messages {
467            match msg {
468                ConversationMessage::Chat(chat) => {
469                    tx.execute(
470                        "INSERT INTO acp_messages
471                           (session_id, role, content, reasoning_content, created_at)
472                         VALUES (?1, ?2, ?3, NULL, ?4)",
473                        params![session_id, chat.role, chat.content, now],
474                    )
475                    .context("Failed to insert chat message")?;
476                    if chat.role == "assistant" {
477                        last_assistant_msg_id = Some(tx.last_insert_rowid());
478                    }
479                }
480                ConversationMessage::AssistantToolCalls {
481                    text,
482                    tool_calls,
483                    reasoning_content,
484                } => {
485                    tx.execute(
486                        "INSERT INTO acp_messages
487                           (session_id, role, content, reasoning_content, created_at)
488                         VALUES (?1, 'assistant', ?2, ?3, ?4)",
489                        params![
490                            session_id,
491                            text.as_deref().unwrap_or(""),
492                            reasoning_content,
493                            now,
494                        ],
495                    )
496                    .context("Failed to insert assistant tool-call message")?;
497                    let msg_id = tx.last_insert_rowid();
498                    last_assistant_msg_id = Some(msg_id);
499
500                    for tc in tool_calls {
501                        tx.execute(
502                            "INSERT INTO acp_tool_calls
503                               (message_id, tool_call_id, tool_name, event_kind, payload, outcome, created_at)
504                             VALUES (?1, ?2, ?3, ?4, ?5, NULL, ?6)",
505                            params![
506                                msg_id,
507                                tc.id,
508                                tc.name,
509                                ToolEventKind::In.as_str(),
510                                tc.arguments,
511                                now,
512                            ],
513                        )
514                        .context("Failed to insert tool_call 'in' row")?;
515                    }
516                }
517                ConversationMessage::ToolResults(results) => {
518                    let msg_id = match last_assistant_msg_id {
519                        Some(id) => id,
520                        None => {
521                            ::zeroclaw_log::record!(
522                                ERROR,
523                                ::zeroclaw_log::Event::new(
524                                    module_path!(),
525                                    ::zeroclaw_log::Action::Write,
526                                )
527                                .with_outcome(::zeroclaw_log::EventOutcome::Failure)
528                                .with_attrs(::serde_json::json!({
529                                    "session_uuid": session_uuid,
530                                })),
531                                "ToolResults without preceding AssistantToolCalls"
532                            );
533                            return Err(anyhow::Error::msg(
534                                "ToolResults appeared without a preceding AssistantToolCalls \
535                                 message in this turn — cannot determine parent message_id",
536                            ));
537                        }
538                    };
539                    for result in results {
540                        // Look up tool_name from the matching 'in' row so the
541                        // 'out' row carries it too. Outcome is 'unknown' at
542                        // this layer — `ConversationMessage::ToolResults`
543                        // doesn't tell us whether the tool succeeded or
544                        // failed. Future wiring from the dispatcher will
545                        // call a dedicated method to record outcome.
546                        let tool_name: String = tx
547                            .query_row(
548                                "SELECT tool_name FROM acp_tool_calls
549                                 WHERE tool_call_id = ?1 AND event_kind = 'in'
550                                 ORDER BY id DESC LIMIT 1",
551                                params![result.tool_call_id],
552                                |row| row.get(0),
553                            )
554                            .unwrap_or_else(|_| String::from("unknown"));
555                        tx.execute(
556                            "INSERT INTO acp_tool_calls
557                               (message_id, tool_call_id, tool_name, event_kind, payload, outcome, created_at)
558                             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
559                            params![
560                                msg_id,
561                                result.tool_call_id,
562                                tool_name,
563                                ToolEventKind::Out.as_str(),
564                                result.content,
565                                EventOutcome::Unknown.as_str(),
566                                now,
567                            ],
568                        )
569                        .context("Failed to insert tool_call 'out' row")?;
570                    }
571                }
572            }
573        }
574
575        tx.execute(
576            "UPDATE acp_sessions SET last_activity = ?1 WHERE id = ?2",
577            params![now, session_id],
578        )
579        .context("Failed to update last_activity")?;
580
581        tx.commit().context("Failed to commit append_turn")?;
582        Ok(())
583    }
584
585    /// Overwrite the session's `token_count`. Called after every turn with
586    /// the latest provider-reported `input_tokens`.
587    ///
588    /// Returns an error if `session_uuid` does not exist — a silent zero-row
589    /// UPDATE would mask a race where the session was deleted out from
590    /// under us, which the caller almost certainly wants to log.
591    pub fn set_token_count(&self, session_uuid: &str, token_count: u64) -> Result<()> {
592        let conn = self.conn.lock();
593        let rows = conn
594            .execute(
595                "UPDATE acp_sessions SET token_count = ?1 WHERE session_uuid = ?2",
596                params![token_count as i64, session_uuid],
597            )
598            .context("Failed to set token_count")?;
599        if rows == 0 {
600            return Err(anyhow::Error::msg(format!(
601                "set_token_count: no session with uuid {session_uuid}"
602            )));
603        }
604        Ok(())
605    }
606
607    /// Record a session-lifecycle event. Caller passes typed enums; the SQLite
608    /// layer is the only place strings appear. Same `Action` / `EventOutcome`
609    /// values are used at the matching `zeroclaw_log::record!` call site.
610    pub fn append_event(
611        &self,
612        session_uuid: &str,
613        action: Action,
614        outcome: EventOutcome,
615        payload: Option<&str>,
616    ) -> Result<()> {
617        let now = Utc::now().to_rfc3339();
618        let conn = self.conn.lock();
619        let session_id: i64 = conn
620            .query_row(
621                "SELECT id FROM acp_sessions WHERE session_uuid = ?1",
622                params![session_uuid],
623                |row| row.get(0),
624            )
625            .with_context(|| format!("unknown session_uuid: {session_uuid}"))?;
626        conn.execute(
627            "INSERT INTO acp_session_events
628               (session_id, action, outcome, payload, created_at)
629             VALUES (?1, ?2, ?3, ?4, ?5)",
630            params![session_id, action.as_str(), outcome.as_str(), payload, now],
631        )
632        .context("Failed to insert session event")?;
633        Ok(())
634    }
635
636    /// Delete a session and all its child rows (messages, tool calls, events
637    /// cascade via FK). Returns `true` if the session existed.
638    pub fn delete_session(&self, session_uuid: &str) -> Result<bool> {
639        let conn = self.conn.lock();
640        let rows = conn
641            .execute(
642                "DELETE FROM acp_sessions WHERE session_uuid = ?1",
643                params![session_uuid],
644            )
645            .context("Failed to delete ACP session")?;
646        Ok(rows > 0)
647    }
648
649    /// Persist that an admin intentionally killed this ACP session. The
650    /// transcript stays durable, but runtime rehydration must not revive it.
651    pub fn mark_session_killed(&self, session_uuid: &str) -> Result<bool> {
652        let now = Utc::now().to_rfc3339();
653        let conn = self.conn.lock();
654        let rows = conn
655            .execute(
656                "UPDATE acp_sessions
657                    SET killed_at = COALESCE(killed_at, ?1),
658                        last_activity = ?1
659                  WHERE session_uuid = ?2",
660                params![now, session_uuid],
661            )
662            .context("Failed to mark ACP session killed")?;
663        Ok(rows > 0)
664    }
665
666    /// Return whether this durable ACP session has been intentionally killed.
667    /// Missing rows are not killed; callers can then use normal load handling
668    /// to distinguish SESSION_NOT_FOUND from a terminal killed session.
669    pub fn is_session_killed(&self, session_uuid: &str) -> Result<bool> {
670        let conn = self.conn.lock();
671        let row = conn.query_row(
672            "SELECT CASE WHEN killed_at IS NULL THEN 0 ELSE 1 END
673             FROM acp_sessions WHERE session_uuid = ?1",
674            params![session_uuid],
675            |row| row.get::<_, i64>(0),
676        );
677        match row {
678            Ok(killed) => Ok(killed != 0),
679            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
680            Err(e) => Err(e).context("Failed to query ACP session killed marker"),
681        }
682    }
683
684    /// Update `last_activity` without appending messages.
685    pub fn touch_session(&self, session_uuid: &str) -> Result<()> {
686        let now = Utc::now().to_rfc3339();
687        let conn = self.conn.lock();
688        conn.execute(
689            "UPDATE acp_sessions SET last_activity = ?1 WHERE session_uuid = ?2",
690            params![now, session_uuid],
691        )
692        .context("Failed to touch ACP session")?;
693        Ok(())
694    }
695}
696
697fn parse_ts(s: &str, field: &'static str, session_uuid: &str) -> DateTime<Utc> {
698    s.parse::<DateTime<Utc>>().unwrap_or_else(|e| {
699        ::zeroclaw_log::record!(
700            WARN,
701            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(
702                ::serde_json::json!({
703                    "session_uuid": session_uuid,
704                    "field": field,
705                    "error": e.to_string(),
706                })
707            ),
708            "Failed to parse session timestamp"
709        );
710        Utc::now()
711    })
712}
713
714#[cfg(test)]
715mod tests {
716    use super::*;
717    use tempfile::TempDir;
718    use zeroclaw_api::model_provider::{ChatMessage, ToolCall, ToolResultMessage};
719
720    fn open_store() -> (TempDir, AcpSessionStore) {
721        let tmp = TempDir::new().unwrap();
722        let store = AcpSessionStore::new(tmp.path()).unwrap();
723        (tmp, store)
724    }
725
726    #[test]
727    fn new_creates_all_four_tables() {
728        let (_tmp, store) = open_store();
729        let conn = store.conn.lock();
730        for table in [
731            "acp_sessions",
732            "acp_messages",
733            "acp_tool_calls",
734            "acp_session_events",
735        ] {
736            let name: String = conn
737                .query_row(
738                    "SELECT name FROM sqlite_master WHERE type='table' AND name = ?1",
739                    [table],
740                    |r| r.get(0),
741                )
742                .unwrap_or_else(|_| panic!("table {table} should exist"));
743            assert_eq!(name, table);
744        }
745    }
746
747    #[test]
748    fn opens_in_wal_mode_to_avoid_blocking_runtime_threads() {
749        let (_tmp, store) = open_store();
750        let conn = store.conn.lock();
751        let mode: String = conn
752            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
753            .unwrap();
754        assert_eq!(mode.to_lowercase(), "wal", "ACP session DB must use WAL");
755    }
756
757    #[test]
758    fn create_and_load_session_metadata() {
759        let (_tmp, store) = open_store();
760        store
761            .create_session("sess-abc", "personal_code", "/home/user/project")
762            .unwrap();
763
764        let data = store.load_session("sess-abc").unwrap().unwrap();
765        assert_eq!(data.session_uuid, "sess-abc");
766        assert_eq!(data.agent_alias, "personal_code");
767        assert_eq!(data.workspace_dir, "/home/user/project");
768        assert_eq!(data.token_count, 0);
769        assert!(data.messages.is_empty());
770    }
771
772    #[test]
773    fn load_nonexistent_session_returns_none() {
774        let (_tmp, store) = open_store();
775        assert!(store.load_session("nonexistent").unwrap().is_none());
776    }
777
778    #[test]
779    fn append_turn_round_trips_chat_messages() {
780        let (_tmp, store) = open_store();
781        store
782            .create_session("sess-msgs", "alpha", "/tmp/proj")
783            .unwrap();
784
785        let msgs = vec![
786            ConversationMessage::Chat(ChatMessage::user("hello")),
787            ConversationMessage::Chat(ChatMessage::assistant("hi")),
788        ];
789        store.append_turn("sess-msgs", &msgs).unwrap();
790
791        let data = store.load_session("sess-msgs").unwrap().unwrap();
792        assert_eq!(data.messages.len(), 2);
793        assert!(matches!(
794            &data.messages[0],
795            ConversationMessage::Chat(m) if m.role == "user" && m.content == "hello"
796        ));
797        assert!(matches!(
798            &data.messages[1],
799            ConversationMessage::Chat(m) if m.role == "assistant" && m.content == "hi"
800        ));
801    }
802
803    #[test]
804    fn append_turn_decomposes_assistant_tool_calls_and_results() {
805        let (_tmp, store) = open_store();
806        store
807            .create_session("sess-variants", "alpha", "/tmp/proj")
808            .unwrap();
809
810        let msgs = vec![
811            ConversationMessage::Chat(ChatMessage::user("task")),
812            ConversationMessage::AssistantToolCalls {
813                text: Some("calling shell".into()),
814                tool_calls: vec![ToolCall {
815                    id: "tc-1".into(),
816                    name: "shell".into(),
817                    arguments: r#"{"command":"ls"}"#.into(),
818                    extra_content: None,
819                }],
820                reasoning_content: Some("think think".into()),
821            },
822            ConversationMessage::ToolResults(vec![ToolResultMessage {
823                tool_call_id: "tc-1".into(),
824                content: "file.txt\n".into(),
825            }]),
826            ConversationMessage::Chat(ChatMessage::assistant("done")),
827        ];
828        store.append_turn("sess-variants", &msgs).unwrap();
829
830        let data = store.load_session("sess-variants").unwrap().unwrap();
831        assert_eq!(data.messages.len(), 4);
832
833        // Round-trip: AssistantToolCalls preserves text + tool_calls + reasoning
834        match &data.messages[1] {
835            ConversationMessage::AssistantToolCalls {
836                text,
837                tool_calls,
838                reasoning_content,
839            } => {
840                assert_eq!(text.as_deref(), Some("calling shell"));
841                assert_eq!(tool_calls.len(), 1);
842                assert_eq!(tool_calls[0].id, "tc-1");
843                assert_eq!(tool_calls[0].name, "shell");
844                assert_eq!(tool_calls[0].arguments, r#"{"command":"ls"}"#);
845                assert_eq!(reasoning_content.as_deref(), Some("think think"));
846            }
847            other => panic!("expected AssistantToolCalls, got {other:?}"),
848        }
849
850        // Round-trip: ToolResults preserves tool_call_id + content
851        match &data.messages[2] {
852            ConversationMessage::ToolResults(results) => {
853                assert_eq!(results.len(), 1);
854                assert_eq!(results[0].tool_call_id, "tc-1");
855                assert_eq!(results[0].content, "file.txt\n");
856            }
857            other => panic!("expected ToolResults, got {other:?}"),
858        }
859    }
860
861    #[test]
862    fn no_data_duplication_tool_call_payload_only_in_tool_calls_table() {
863        // The schema contract: tool-call args and results live ONLY in
864        // acp_tool_calls. The assistant's message row carries only the text.
865        let (_tmp, store) = open_store();
866        store
867            .create_session("sess-dup", "alpha", "/tmp/proj")
868            .unwrap();
869
870        store
871            .append_turn(
872                "sess-dup",
873                &[ConversationMessage::AssistantToolCalls {
874                    text: Some("running".into()),
875                    tool_calls: vec![ToolCall {
876                        id: "tc-x".into(),
877                        name: "shell".into(),
878                        arguments: r#"{"command":"echo hi"}"#.into(),
879                        extra_content: None,
880                    }],
881                    reasoning_content: None,
882                }],
883            )
884            .unwrap();
885
886        let conn = store.conn.lock();
887        let msg_content: String = conn
888            .query_row(
889                "SELECT content FROM acp_messages WHERE role = 'assistant' LIMIT 1",
890                [],
891                |r| r.get(0),
892            )
893            .unwrap();
894        assert_eq!(msg_content, "running");
895        assert!(
896            !msg_content.contains("echo hi"),
897            "message content must not contain tool-call args"
898        );
899    }
900
901    #[test]
902    fn append_turn_empty_slice_is_noop() {
903        let (_tmp, store) = open_store();
904        store
905            .create_session("sess-empty", "alpha", "/tmp/proj")
906            .unwrap();
907        store.append_turn("sess-empty", &[]).unwrap();
908        let data = store.load_session("sess-empty").unwrap().unwrap();
909        assert!(data.messages.is_empty());
910    }
911
912    #[test]
913    fn last_activity_updated_on_append() {
914        let (_tmp, store) = open_store();
915        store
916            .create_session("sess-activity", "alpha", "/tmp/proj")
917            .unwrap();
918        let before = store
919            .load_session("sess-activity")
920            .unwrap()
921            .unwrap()
922            .last_activity;
923        std::thread::sleep(std::time::Duration::from_millis(10));
924        store
925            .append_turn(
926                "sess-activity",
927                &[ConversationMessage::Chat(ChatMessage::user("hi"))],
928            )
929            .unwrap();
930        let after = store
931            .load_session("sess-activity")
932            .unwrap()
933            .unwrap()
934            .last_activity;
935        assert!(after >= before);
936    }
937
938    #[test]
939    fn append_turn_unknown_session_errors_atomically() {
940        let (_tmp, store) = open_store();
941        let result = store.append_turn(
942            "does-not-exist",
943            &[ConversationMessage::Chat(ChatMessage::user("hello"))],
944        );
945        assert!(result.is_err());
946        let conn = store.conn.lock();
947        let count: i64 = conn
948            .query_row("SELECT COUNT(*) FROM acp_messages", [], |r| r.get(0))
949            .unwrap();
950        assert_eq!(count, 0, "no orphan rows after failed append_turn");
951    }
952
953    #[test]
954    fn delete_session_cascades_to_children() {
955        let (_tmp, store) = open_store();
956        store
957            .create_session("sess-del", "alpha", "/tmp/proj")
958            .unwrap();
959        store
960            .append_turn(
961                "sess-del",
962                &[
963                    ConversationMessage::AssistantToolCalls {
964                        text: Some("calling".into()),
965                        tool_calls: vec![ToolCall {
966                            id: "tc-1".into(),
967                            name: "shell".into(),
968                            arguments: "{}".into(),
969                            extra_content: None,
970                        }],
971                        reasoning_content: None,
972                    },
973                    ConversationMessage::ToolResults(vec![ToolResultMessage {
974                        tool_call_id: "tc-1".into(),
975                        content: "ok".into(),
976                    }]),
977                ],
978            )
979            .unwrap();
980        store
981            .append_event("sess-del", Action::Disconnect, EventOutcome::Success, None)
982            .unwrap();
983
984        assert!(store.delete_session("sess-del").unwrap());
985
986        let conn = store.conn.lock();
987        for table in ["acp_messages", "acp_tool_calls", "acp_session_events"] {
988            let count: i64 = conn
989                .query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |r| r.get(0))
990                .unwrap();
991            assert_eq!(count, 0, "cascade should empty {table}");
992        }
993    }
994
995    #[test]
996    fn delete_nonexistent_session_returns_false() {
997        let (_tmp, store) = open_store();
998        assert!(!store.delete_session("ghost").unwrap());
999    }
1000
1001    #[test]
1002    fn mark_session_killed_persists_without_deleting_history() {
1003        let (tmp, store) = open_store();
1004        store
1005            .create_session("sess-kill", "alpha", "/tmp/proj")
1006            .unwrap();
1007        store
1008            .append_turn(
1009                "sess-kill",
1010                &[ConversationMessage::Chat(ChatMessage::user("keep this"))],
1011            )
1012            .unwrap();
1013
1014        assert!(!store.is_session_killed("sess-kill").unwrap());
1015        assert!(store.mark_session_killed("sess-kill").unwrap());
1016        assert!(store.is_session_killed("sess-kill").unwrap());
1017
1018        let data = store.load_session("sess-kill").unwrap().unwrap();
1019        assert_eq!(
1020            data.messages.len(),
1021            1,
1022            "kill marker must not delete durable transcript history"
1023        );
1024
1025        drop(store);
1026        let reopened = AcpSessionStore::new(tmp.path()).unwrap();
1027        assert!(
1028            reopened.is_session_killed("sess-kill").unwrap(),
1029            "kill marker must survive store reopen"
1030        );
1031        assert!(
1032            reopened.load_session("sess-kill").unwrap().is_some(),
1033            "durable history remains loadable after reopen"
1034        );
1035    }
1036
1037    #[test]
1038    fn mark_nonexistent_session_killed_returns_false() {
1039        let (_tmp, store) = open_store();
1040        assert!(!store.mark_session_killed("ghost").unwrap());
1041        assert!(!store.is_session_killed("ghost").unwrap());
1042    }
1043
1044    #[test]
1045    fn touch_session_updates_last_activity() {
1046        let (_tmp, store) = open_store();
1047        store
1048            .create_session("sess-touch", "alpha", "/tmp/proj")
1049            .unwrap();
1050        let before = store
1051            .load_session("sess-touch")
1052            .unwrap()
1053            .unwrap()
1054            .last_activity;
1055        std::thread::sleep(std::time::Duration::from_millis(10));
1056        store.touch_session("sess-touch").unwrap();
1057        let after = store
1058            .load_session("sess-touch")
1059            .unwrap()
1060            .unwrap()
1061            .last_activity;
1062        assert!(after >= before);
1063    }
1064
1065    #[test]
1066    fn set_token_count_persists_and_load_reads_it() {
1067        let (_tmp, store) = open_store();
1068        store
1069            .create_session("sess-tok", "alpha", "/tmp/proj")
1070            .unwrap();
1071        assert_eq!(
1072            store.load_session("sess-tok").unwrap().unwrap().token_count,
1073            0
1074        );
1075
1076        store.set_token_count("sess-tok", 152_306).unwrap();
1077        assert_eq!(
1078            store.load_session("sess-tok").unwrap().unwrap().token_count,
1079            152_306,
1080            "ctx-bar value must round-trip through the store"
1081        );
1082
1083        // Overwrite semantics (not cumulative).
1084        store.set_token_count("sess-tok", 42).unwrap();
1085        assert_eq!(
1086            store.load_session("sess-tok").unwrap().unwrap().token_count,
1087            42
1088        );
1089    }
1090
1091    #[test]
1092    fn set_token_count_errors_on_unknown_session() {
1093        // Defensive: a silent zero-row UPDATE would mask a race where the
1094        // session was deleted while a Usage event was in flight. The caller
1095        // needs the error so the failure is loggable.
1096        let (_tmp, store) = open_store();
1097        let err = store.set_token_count("nonexistent", 100).unwrap_err();
1098        assert!(
1099            err.to_string().contains("nonexistent"),
1100            "error must name the missing session_uuid; got: {err}"
1101        );
1102    }
1103
1104    #[test]
1105    fn append_event_writes_action_outcome_payload() {
1106        let (_tmp, store) = open_store();
1107        store
1108            .create_session("sess-evt", "alpha", "/tmp/proj")
1109            .unwrap();
1110
1111        store
1112            .append_event(
1113                "sess-evt",
1114                Action::Cancel,
1115                EventOutcome::Failure,
1116                Some("turn cancelled by user"),
1117            )
1118            .unwrap();
1119
1120        let conn = store.conn.lock();
1121        let (action, outcome, payload): (String, String, Option<String>) = conn
1122            .query_row(
1123                "SELECT action, outcome, payload FROM acp_session_events LIMIT 1",
1124                [],
1125                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1126            )
1127            .unwrap();
1128        assert_eq!(action, "cancel");
1129        assert_eq!(outcome, "failure");
1130        assert_eq!(payload.as_deref(), Some("turn cancelled by user"));
1131    }
1132
1133    #[test]
1134    fn list_sessions_returns_summaries_ordered_by_recent_activity() {
1135        let (_tmp, store) = open_store();
1136        store.create_session("sess-old", "alpha", "/tmp/a").unwrap();
1137        std::thread::sleep(std::time::Duration::from_millis(10));
1138        store.create_session("sess-new", "beta", "/tmp/b").unwrap();
1139        store
1140            .append_turn(
1141                "sess-new",
1142                &[ConversationMessage::Chat(ChatMessage::user("hi"))],
1143            )
1144            .unwrap();
1145        store.set_token_count("sess-new", 1234).unwrap();
1146
1147        let list = store.list_sessions().unwrap();
1148        assert_eq!(list.len(), 2);
1149        // Most recent activity first.
1150        assert_eq!(list[0].session_uuid, "sess-new");
1151        assert_eq!(list[0].agent_alias, "beta");
1152        assert_eq!(list[0].workspace_dir, "/tmp/b");
1153        assert_eq!(list[0].message_count, 1);
1154        assert_eq!(list[0].token_count, 1234);
1155        assert_eq!(list[1].session_uuid, "sess-old");
1156        assert_eq!(list[1].message_count, 0);
1157    }
1158
1159    #[test]
1160    fn list_sessions_empty_when_no_sessions() {
1161        let (_tmp, store) = open_store();
1162        assert!(store.list_sessions().unwrap().is_empty());
1163    }
1164}