1use anyhow::{Context, Result};
23use chrono::{DateTime, Utc};
24use parking_lot::Mutex;
25use rusqlite::{Connection, params};
26use std::path::Path;
27use zeroclaw_api::model_provider::{ChatMessage, ConversationMessage, ToolCall, ToolResultMessage};
28use zeroclaw_log::{Action, EventOutcome};
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34enum ToolEventKind {
35 In,
36 Out,
37}
38
39impl ToolEventKind {
40 fn as_str(self) -> &'static str {
41 match self {
42 Self::In => "in",
43 Self::Out => "out",
44 }
45 }
46}
47
48pub struct AcpSessionStore {
49 conn: Mutex<Connection>,
50}
51
52pub struct AcpSessionData {
53 pub session_uuid: String,
54 pub agent_alias: String,
55 pub workspace_dir: String,
56 pub token_count: u64,
57 pub created_at: DateTime<Utc>,
58 pub last_activity: DateTime<Utc>,
59 pub messages: Vec<ConversationMessage>,
60}
61
62pub enum AcpSessionRestore {
63 Missing,
64 Killed,
65 Restorable(AcpSessionData),
66}
67
68pub struct AcpSessionSummary {
71 pub session_uuid: String,
72 pub agent_alias: String,
73 pub workspace_dir: String,
74 pub token_count: u64,
75 pub created_at: DateTime<Utc>,
76 pub last_activity: DateTime<Utc>,
77 pub message_count: usize,
78}
79
80impl AcpSessionStore {
81 pub fn new(workspace_dir: &Path) -> Result<Self> {
82 let sessions_dir = workspace_dir.join("sessions");
83 std::fs::create_dir_all(&sessions_dir).context("Failed to create sessions directory")?;
84 let db_path = sessions_dir.join("acp-sessions.db");
85
86 let conn = Connection::open(&db_path)
87 .with_context(|| format!("Failed to open ACP session DB: {}", db_path.display()))?;
88
89 conn.execute_batch(
90 "PRAGMA journal_mode = WAL;
91 PRAGMA synchronous = NORMAL;
92 PRAGMA busy_timeout = 5000;
93 PRAGMA foreign_keys = ON;
94 PRAGMA temp_store = MEMORY;",
95 )
96 .context("Failed to configure ACP session DB pragmas")?;
97
98 conn.execute_batch(
101 "CREATE TABLE IF NOT EXISTS acp_sessions (
102 id INTEGER PRIMARY KEY AUTOINCREMENT,
103 session_uuid TEXT NOT NULL UNIQUE,
104 agent_alias TEXT NOT NULL,
105 workspace_dir TEXT NOT NULL,
106 token_count INTEGER NOT NULL DEFAULT 0,
107 killed_at TEXT,
108 created_at TEXT NOT NULL,
109 last_activity TEXT NOT NULL
110 );
111 CREATE INDEX IF NOT EXISTS idx_acp_sessions_uuid ON acp_sessions(session_uuid);
112 CREATE INDEX IF NOT EXISTS idx_acp_sessions_alias ON acp_sessions(agent_alias);
113
114 CREATE TABLE IF NOT EXISTS acp_messages (
115 id INTEGER PRIMARY KEY AUTOINCREMENT,
116 session_id INTEGER NOT NULL REFERENCES acp_sessions(id) ON DELETE CASCADE,
117 role TEXT NOT NULL,
118 content TEXT NOT NULL,
119 reasoning_content TEXT,
120 created_at TEXT NOT NULL
121 );
122 CREATE INDEX IF NOT EXISTS idx_acp_messages_session ON acp_messages(session_id, id);
123
124 CREATE TABLE IF NOT EXISTS acp_tool_calls (
125 id INTEGER PRIMARY KEY AUTOINCREMENT,
126 message_id INTEGER NOT NULL REFERENCES acp_messages(id) ON DELETE CASCADE,
127 tool_call_id TEXT NOT NULL,
128 tool_name TEXT NOT NULL,
129 event_kind TEXT NOT NULL,
130 payload TEXT NOT NULL,
131 outcome TEXT,
132 created_at TEXT NOT NULL
133 );
134 CREATE INDEX IF NOT EXISTS idx_acp_tool_calls_message ON acp_tool_calls(message_id, id);
135 CREATE INDEX IF NOT EXISTS idx_acp_tool_calls_lookup ON acp_tool_calls(tool_call_id);
136
137 CREATE TABLE IF NOT EXISTS acp_session_events (
138 id INTEGER PRIMARY KEY AUTOINCREMENT,
139 session_id INTEGER NOT NULL REFERENCES acp_sessions(id) ON DELETE CASCADE,
140 action TEXT NOT NULL,
141 outcome TEXT NOT NULL,
142 payload TEXT,
143 created_at TEXT NOT NULL
144 );
145 CREATE INDEX IF NOT EXISTS idx_acp_session_events_session ON acp_session_events(session_id, id);",
146 )
147 .context("Failed to create ACP session schema")?;
148
149 Self::ensure_killed_at_column(&conn)
150 .context("Failed to migrate ACP session killed marker")?;
151
152 Ok(Self {
153 conn: Mutex::new(conn),
154 })
155 }
156
157 fn ensure_killed_at_column(conn: &Connection) -> Result<()> {
158 let mut stmt = conn
159 .prepare("PRAGMA table_info(acp_sessions)")
160 .context("Failed to inspect ACP session schema")?;
161 let mut rows = stmt
162 .query([])
163 .context("Failed to read ACP session schema")?;
164 while let Some(row) = rows
165 .next()
166 .context("Failed to read ACP session schema row")?
167 {
168 let column: String = row
169 .get(1)
170 .context("Failed to read ACP session column name")?;
171 if column == "killed_at" {
172 return Ok(());
173 }
174 }
175 drop(rows);
176 drop(stmt);
177
178 match conn.execute("ALTER TABLE acp_sessions ADD COLUMN killed_at TEXT", []) {
179 Ok(_) => Ok(()),
180 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
181 if msg.contains("duplicate column name") =>
182 {
183 Ok(())
184 }
185 Err(e) => Err(e).context("Failed to add ACP session killed marker"),
186 }
187 }
188
189 pub fn create_session(
191 &self,
192 session_uuid: &str,
193 agent_alias: &str,
194 workspace_dir: &str,
195 ) -> Result<i64> {
196 let now = Utc::now().to_rfc3339();
197 let conn = self.conn.lock();
198 conn.execute(
199 "INSERT INTO acp_sessions
200 (session_uuid, agent_alias, workspace_dir, token_count, created_at, last_activity)
201 VALUES (?1, ?2, ?3, 0, ?4, ?4)",
202 params![session_uuid, agent_alias, workspace_dir, now],
203 )
204 .context("Failed to create ACP session")?;
205 Ok(conn.last_insert_rowid())
206 }
207
208 pub fn load_session(&self, session_uuid: &str) -> Result<Option<AcpSessionData>> {
211 let conn = self.conn.lock();
212
213 let row = conn.query_row(
214 "SELECT id, agent_alias, workspace_dir, token_count, created_at, last_activity
215 FROM acp_sessions WHERE session_uuid = ?1",
216 params![session_uuid],
217 |row| {
218 Ok((
219 row.get::<_, i64>(0)?,
220 row.get::<_, String>(1)?,
221 row.get::<_, String>(2)?,
222 row.get::<_, i64>(3)?,
223 row.get::<_, String>(4)?,
224 row.get::<_, String>(5)?,
225 ))
226 },
227 );
228
229 let (session_id, agent_alias, workspace_dir, token_count, created_at_s, last_activity_s) =
230 match row {
231 Ok(r) => r,
232 Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
233 Err(e) => return Err(e).context("Failed to query ACP session"),
234 };
235
236 let created_at = parse_ts(&created_at_s, "created_at", session_uuid);
237 let last_activity = parse_ts(&last_activity_s, "last_activity", session_uuid);
238
239 let messages = Self::load_messages(&conn, session_id)?;
249
250 Ok(Some(AcpSessionData {
251 session_uuid: session_uuid.to_string(),
252 agent_alias,
253 workspace_dir,
254 token_count: token_count.max(0) as u64,
255 created_at,
256 last_activity,
257 messages,
258 }))
259 }
260
261 pub fn load_session_for_restore(&self, session_uuid: &str) -> Result<AcpSessionRestore> {
265 if self.is_session_killed(session_uuid)? {
266 return Ok(AcpSessionRestore::Killed);
267 }
268
269 match self.load_session(session_uuid)? {
270 Some(data) => Ok(AcpSessionRestore::Restorable(data)),
271 None => Ok(AcpSessionRestore::Missing),
272 }
273 }
274
275 pub fn list_sessions(&self) -> Result<Vec<AcpSessionSummary>> {
279 let conn = self.conn.lock();
280 let mut stmt = conn
281 .prepare(
282 "SELECT s.session_uuid,
283 s.agent_alias,
284 s.workspace_dir,
285 s.token_count,
286 s.created_at,
287 s.last_activity,
288 (SELECT COUNT(*) FROM acp_messages m WHERE m.session_id = s.id) AS message_count
289 FROM acp_sessions s
290 ORDER BY s.last_activity DESC",
291 )
292 .context("Failed to prepare ACP session list query")?;
293
294 let rows = stmt
295 .query_map([], |row| {
296 Ok((
297 row.get::<_, String>(0)?,
298 row.get::<_, String>(1)?,
299 row.get::<_, String>(2)?,
300 row.get::<_, i64>(3)?,
301 row.get::<_, String>(4)?,
302 row.get::<_, String>(5)?,
303 row.get::<_, i64>(6)?,
304 ))
305 })
306 .context("Failed to query ACP sessions")?;
307
308 let mut out = Vec::new();
309 for row in rows {
310 let (
311 session_uuid,
312 agent_alias,
313 workspace_dir,
314 token_count,
315 created_s,
316 activity_s,
317 msg_count,
318 ) = row.context("Failed to read ACP session row")?;
319 out.push(AcpSessionSummary {
320 created_at: parse_ts(&created_s, "created_at", &session_uuid),
321 last_activity: parse_ts(&activity_s, "last_activity", &session_uuid),
322 session_uuid,
323 agent_alias,
324 workspace_dir,
325 token_count: token_count.max(0) as u64,
326 message_count: msg_count.max(0) as usize,
327 });
328 }
329 Ok(out)
330 }
331
332 fn load_messages(conn: &Connection, session_id: i64) -> Result<Vec<ConversationMessage>> {
333 let mut msg_stmt = conn
335 .prepare(
336 "SELECT id, role, content, reasoning_content
337 FROM acp_messages WHERE session_id = ?1 ORDER BY id ASC",
338 )
339 .context("Failed to prepare message query")?;
340
341 let msg_rows: Vec<(i64, String, String, Option<String>)> = msg_stmt
342 .query_map(params![session_id], |row| {
343 Ok((
344 row.get::<_, i64>(0)?,
345 row.get::<_, String>(1)?,
346 row.get::<_, String>(2)?,
347 row.get::<_, Option<String>>(3)?,
348 ))
349 })?
350 .collect::<Result<Vec<_>, _>>()
351 .context("Failed to read message rows")?;
352
353 let mut tc_stmt = conn
356 .prepare(
357 "SELECT tool_call_id, tool_name, event_kind, payload
358 FROM acp_tool_calls WHERE message_id = ?1 ORDER BY id ASC",
359 )
360 .context("Failed to prepare tool_call query")?;
361
362 let mut out = Vec::with_capacity(msg_rows.len());
363 for (msg_id, role, content, reasoning_content) in msg_rows {
364 let mut ins: Vec<ToolCall> = Vec::new();
366 let mut outs: Vec<ToolResultMessage> = Vec::new();
367 let rows = tc_stmt
368 .query_map(params![msg_id], |row| {
369 Ok((
370 row.get::<_, String>(0)?,
371 row.get::<_, String>(1)?,
372 row.get::<_, String>(2)?,
373 row.get::<_, String>(3)?,
374 ))
375 })?
376 .collect::<Result<Vec<_>, _>>()
377 .context("Failed to read tool_call rows")?;
378 for (tool_call_id, tool_name, event_kind, payload) in rows {
379 match event_kind.as_str() {
380 "in" => ins.push(ToolCall {
381 id: tool_call_id,
382 name: tool_name,
383 arguments: payload,
384 extra_content: None,
385 }),
386 "out" => outs.push(ToolResultMessage {
387 tool_call_id,
388 content: payload,
389 }),
390 other => {
391 ::zeroclaw_log::record!(
392 ERROR,
393 ::zeroclaw_log::Event::new(
394 module_path!(),
395 ::zeroclaw_log::Action::Read,
396 )
397 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
398 .with_attrs(::serde_json::json!({
399 "session_id": session_id,
400 "message_id": msg_id,
401 "event_kind": other,
402 })),
403 "unknown event_kind in acp_tool_calls"
404 );
405 return Err(anyhow::Error::msg(format!(
406 "unknown event_kind '{other}' in acp_tool_calls for message_id {msg_id}"
407 )));
408 }
409 }
410 }
411
412 if ins.is_empty() && outs.is_empty() {
413 out.push(ConversationMessage::Chat(ChatMessage { role, content }));
415 } else {
416 if !ins.is_empty() {
417 out.push(ConversationMessage::AssistantToolCalls {
419 text: if content.is_empty() {
420 None
421 } else {
422 Some(content)
423 },
424 tool_calls: ins,
425 reasoning_content,
426 });
427 }
428 if !outs.is_empty() {
429 out.push(ConversationMessage::ToolResults(outs));
430 }
431 }
432 }
433
434 Ok(out)
435 }
436
437 pub fn append_turn(&self, session_uuid: &str, messages: &[ConversationMessage]) -> Result<()> {
441 if messages.is_empty() {
442 return Ok(());
443 }
444
445 let now = Utc::now().to_rfc3339();
446 let mut conn = self.conn.lock();
447
448 let session_id: i64 = conn
451 .query_row(
452 "SELECT id FROM acp_sessions WHERE session_uuid = ?1",
453 params![session_uuid],
454 |row| row.get(0),
455 )
456 .with_context(|| format!("unknown session_uuid: {session_uuid}"))?;
457
458 let tx = conn
459 .transaction()
460 .context("Failed to begin append_turn transaction")?;
461
462 let mut last_assistant_msg_id: Option<i64> = None;
465
466 for msg in messages {
467 match msg {
468 ConversationMessage::Chat(chat) => {
469 tx.execute(
470 "INSERT INTO acp_messages
471 (session_id, role, content, reasoning_content, created_at)
472 VALUES (?1, ?2, ?3, NULL, ?4)",
473 params![session_id, chat.role, chat.content, now],
474 )
475 .context("Failed to insert chat message")?;
476 if chat.role == "assistant" {
477 last_assistant_msg_id = Some(tx.last_insert_rowid());
478 }
479 }
480 ConversationMessage::AssistantToolCalls {
481 text,
482 tool_calls,
483 reasoning_content,
484 } => {
485 tx.execute(
486 "INSERT INTO acp_messages
487 (session_id, role, content, reasoning_content, created_at)
488 VALUES (?1, 'assistant', ?2, ?3, ?4)",
489 params![
490 session_id,
491 text.as_deref().unwrap_or(""),
492 reasoning_content,
493 now,
494 ],
495 )
496 .context("Failed to insert assistant tool-call message")?;
497 let msg_id = tx.last_insert_rowid();
498 last_assistant_msg_id = Some(msg_id);
499
500 for tc in tool_calls {
501 tx.execute(
502 "INSERT INTO acp_tool_calls
503 (message_id, tool_call_id, tool_name, event_kind, payload, outcome, created_at)
504 VALUES (?1, ?2, ?3, ?4, ?5, NULL, ?6)",
505 params![
506 msg_id,
507 tc.id,
508 tc.name,
509 ToolEventKind::In.as_str(),
510 tc.arguments,
511 now,
512 ],
513 )
514 .context("Failed to insert tool_call 'in' row")?;
515 }
516 }
517 ConversationMessage::ToolResults(results) => {
518 let msg_id = match last_assistant_msg_id {
519 Some(id) => id,
520 None => {
521 ::zeroclaw_log::record!(
522 ERROR,
523 ::zeroclaw_log::Event::new(
524 module_path!(),
525 ::zeroclaw_log::Action::Write,
526 )
527 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
528 .with_attrs(::serde_json::json!({
529 "session_uuid": session_uuid,
530 })),
531 "ToolResults without preceding AssistantToolCalls"
532 );
533 return Err(anyhow::Error::msg(
534 "ToolResults appeared without a preceding AssistantToolCalls \
535 message in this turn — cannot determine parent message_id",
536 ));
537 }
538 };
539 for result in results {
540 let tool_name: String = tx
547 .query_row(
548 "SELECT tool_name FROM acp_tool_calls
549 WHERE tool_call_id = ?1 AND event_kind = 'in'
550 ORDER BY id DESC LIMIT 1",
551 params![result.tool_call_id],
552 |row| row.get(0),
553 )
554 .unwrap_or_else(|_| String::from("unknown"));
555 tx.execute(
556 "INSERT INTO acp_tool_calls
557 (message_id, tool_call_id, tool_name, event_kind, payload, outcome, created_at)
558 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
559 params![
560 msg_id,
561 result.tool_call_id,
562 tool_name,
563 ToolEventKind::Out.as_str(),
564 result.content,
565 EventOutcome::Unknown.as_str(),
566 now,
567 ],
568 )
569 .context("Failed to insert tool_call 'out' row")?;
570 }
571 }
572 }
573 }
574
575 tx.execute(
576 "UPDATE acp_sessions SET last_activity = ?1 WHERE id = ?2",
577 params![now, session_id],
578 )
579 .context("Failed to update last_activity")?;
580
581 tx.commit().context("Failed to commit append_turn")?;
582 Ok(())
583 }
584
585 pub fn set_token_count(&self, session_uuid: &str, token_count: u64) -> Result<()> {
592 let conn = self.conn.lock();
593 let rows = conn
594 .execute(
595 "UPDATE acp_sessions SET token_count = ?1 WHERE session_uuid = ?2",
596 params![token_count as i64, session_uuid],
597 )
598 .context("Failed to set token_count")?;
599 if rows == 0 {
600 return Err(anyhow::Error::msg(format!(
601 "set_token_count: no session with uuid {session_uuid}"
602 )));
603 }
604 Ok(())
605 }
606
607 pub fn append_event(
611 &self,
612 session_uuid: &str,
613 action: Action,
614 outcome: EventOutcome,
615 payload: Option<&str>,
616 ) -> Result<()> {
617 let now = Utc::now().to_rfc3339();
618 let conn = self.conn.lock();
619 let session_id: i64 = conn
620 .query_row(
621 "SELECT id FROM acp_sessions WHERE session_uuid = ?1",
622 params![session_uuid],
623 |row| row.get(0),
624 )
625 .with_context(|| format!("unknown session_uuid: {session_uuid}"))?;
626 conn.execute(
627 "INSERT INTO acp_session_events
628 (session_id, action, outcome, payload, created_at)
629 VALUES (?1, ?2, ?3, ?4, ?5)",
630 params![session_id, action.as_str(), outcome.as_str(), payload, now],
631 )
632 .context("Failed to insert session event")?;
633 Ok(())
634 }
635
636 pub fn delete_session(&self, session_uuid: &str) -> Result<bool> {
639 let conn = self.conn.lock();
640 let rows = conn
641 .execute(
642 "DELETE FROM acp_sessions WHERE session_uuid = ?1",
643 params![session_uuid],
644 )
645 .context("Failed to delete ACP session")?;
646 Ok(rows > 0)
647 }
648
649 pub fn mark_session_killed(&self, session_uuid: &str) -> Result<bool> {
652 let now = Utc::now().to_rfc3339();
653 let conn = self.conn.lock();
654 let rows = conn
655 .execute(
656 "UPDATE acp_sessions
657 SET killed_at = COALESCE(killed_at, ?1),
658 last_activity = ?1
659 WHERE session_uuid = ?2",
660 params![now, session_uuid],
661 )
662 .context("Failed to mark ACP session killed")?;
663 Ok(rows > 0)
664 }
665
666 pub fn is_session_killed(&self, session_uuid: &str) -> Result<bool> {
670 let conn = self.conn.lock();
671 let row = conn.query_row(
672 "SELECT CASE WHEN killed_at IS NULL THEN 0 ELSE 1 END
673 FROM acp_sessions WHERE session_uuid = ?1",
674 params![session_uuid],
675 |row| row.get::<_, i64>(0),
676 );
677 match row {
678 Ok(killed) => Ok(killed != 0),
679 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
680 Err(e) => Err(e).context("Failed to query ACP session killed marker"),
681 }
682 }
683
684 pub fn touch_session(&self, session_uuid: &str) -> Result<()> {
686 let now = Utc::now().to_rfc3339();
687 let conn = self.conn.lock();
688 conn.execute(
689 "UPDATE acp_sessions SET last_activity = ?1 WHERE session_uuid = ?2",
690 params![now, session_uuid],
691 )
692 .context("Failed to touch ACP session")?;
693 Ok(())
694 }
695}
696
697fn parse_ts(s: &str, field: &'static str, session_uuid: &str) -> DateTime<Utc> {
698 s.parse::<DateTime<Utc>>().unwrap_or_else(|e| {
699 ::zeroclaw_log::record!(
700 WARN,
701 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(
702 ::serde_json::json!({
703 "session_uuid": session_uuid,
704 "field": field,
705 "error": e.to_string(),
706 })
707 ),
708 "Failed to parse session timestamp"
709 );
710 Utc::now()
711 })
712}
713
714#[cfg(test)]
715mod tests {
716 use super::*;
717 use tempfile::TempDir;
718 use zeroclaw_api::model_provider::{ChatMessage, ToolCall, ToolResultMessage};
719
720 fn open_store() -> (TempDir, AcpSessionStore) {
721 let tmp = TempDir::new().unwrap();
722 let store = AcpSessionStore::new(tmp.path()).unwrap();
723 (tmp, store)
724 }
725
726 #[test]
727 fn new_creates_all_four_tables() {
728 let (_tmp, store) = open_store();
729 let conn = store.conn.lock();
730 for table in [
731 "acp_sessions",
732 "acp_messages",
733 "acp_tool_calls",
734 "acp_session_events",
735 ] {
736 let name: String = conn
737 .query_row(
738 "SELECT name FROM sqlite_master WHERE type='table' AND name = ?1",
739 [table],
740 |r| r.get(0),
741 )
742 .unwrap_or_else(|_| panic!("table {table} should exist"));
743 assert_eq!(name, table);
744 }
745 }
746
747 #[test]
748 fn opens_in_wal_mode_to_avoid_blocking_runtime_threads() {
749 let (_tmp, store) = open_store();
750 let conn = store.conn.lock();
751 let mode: String = conn
752 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
753 .unwrap();
754 assert_eq!(mode.to_lowercase(), "wal", "ACP session DB must use WAL");
755 }
756
757 #[test]
758 fn create_and_load_session_metadata() {
759 let (_tmp, store) = open_store();
760 store
761 .create_session("sess-abc", "personal_code", "/home/user/project")
762 .unwrap();
763
764 let data = store.load_session("sess-abc").unwrap().unwrap();
765 assert_eq!(data.session_uuid, "sess-abc");
766 assert_eq!(data.agent_alias, "personal_code");
767 assert_eq!(data.workspace_dir, "/home/user/project");
768 assert_eq!(data.token_count, 0);
769 assert!(data.messages.is_empty());
770 }
771
772 #[test]
773 fn load_nonexistent_session_returns_none() {
774 let (_tmp, store) = open_store();
775 assert!(store.load_session("nonexistent").unwrap().is_none());
776 }
777
778 #[test]
779 fn append_turn_round_trips_chat_messages() {
780 let (_tmp, store) = open_store();
781 store
782 .create_session("sess-msgs", "alpha", "/tmp/proj")
783 .unwrap();
784
785 let msgs = vec![
786 ConversationMessage::Chat(ChatMessage::user("hello")),
787 ConversationMessage::Chat(ChatMessage::assistant("hi")),
788 ];
789 store.append_turn("sess-msgs", &msgs).unwrap();
790
791 let data = store.load_session("sess-msgs").unwrap().unwrap();
792 assert_eq!(data.messages.len(), 2);
793 assert!(matches!(
794 &data.messages[0],
795 ConversationMessage::Chat(m) if m.role == "user" && m.content == "hello"
796 ));
797 assert!(matches!(
798 &data.messages[1],
799 ConversationMessage::Chat(m) if m.role == "assistant" && m.content == "hi"
800 ));
801 }
802
803 #[test]
804 fn append_turn_decomposes_assistant_tool_calls_and_results() {
805 let (_tmp, store) = open_store();
806 store
807 .create_session("sess-variants", "alpha", "/tmp/proj")
808 .unwrap();
809
810 let msgs = vec![
811 ConversationMessage::Chat(ChatMessage::user("task")),
812 ConversationMessage::AssistantToolCalls {
813 text: Some("calling shell".into()),
814 tool_calls: vec![ToolCall {
815 id: "tc-1".into(),
816 name: "shell".into(),
817 arguments: r#"{"command":"ls"}"#.into(),
818 extra_content: None,
819 }],
820 reasoning_content: Some("think think".into()),
821 },
822 ConversationMessage::ToolResults(vec![ToolResultMessage {
823 tool_call_id: "tc-1".into(),
824 content: "file.txt\n".into(),
825 }]),
826 ConversationMessage::Chat(ChatMessage::assistant("done")),
827 ];
828 store.append_turn("sess-variants", &msgs).unwrap();
829
830 let data = store.load_session("sess-variants").unwrap().unwrap();
831 assert_eq!(data.messages.len(), 4);
832
833 match &data.messages[1] {
835 ConversationMessage::AssistantToolCalls {
836 text,
837 tool_calls,
838 reasoning_content,
839 } => {
840 assert_eq!(text.as_deref(), Some("calling shell"));
841 assert_eq!(tool_calls.len(), 1);
842 assert_eq!(tool_calls[0].id, "tc-1");
843 assert_eq!(tool_calls[0].name, "shell");
844 assert_eq!(tool_calls[0].arguments, r#"{"command":"ls"}"#);
845 assert_eq!(reasoning_content.as_deref(), Some("think think"));
846 }
847 other => panic!("expected AssistantToolCalls, got {other:?}"),
848 }
849
850 match &data.messages[2] {
852 ConversationMessage::ToolResults(results) => {
853 assert_eq!(results.len(), 1);
854 assert_eq!(results[0].tool_call_id, "tc-1");
855 assert_eq!(results[0].content, "file.txt\n");
856 }
857 other => panic!("expected ToolResults, got {other:?}"),
858 }
859 }
860
861 #[test]
862 fn no_data_duplication_tool_call_payload_only_in_tool_calls_table() {
863 let (_tmp, store) = open_store();
866 store
867 .create_session("sess-dup", "alpha", "/tmp/proj")
868 .unwrap();
869
870 store
871 .append_turn(
872 "sess-dup",
873 &[ConversationMessage::AssistantToolCalls {
874 text: Some("running".into()),
875 tool_calls: vec![ToolCall {
876 id: "tc-x".into(),
877 name: "shell".into(),
878 arguments: r#"{"command":"echo hi"}"#.into(),
879 extra_content: None,
880 }],
881 reasoning_content: None,
882 }],
883 )
884 .unwrap();
885
886 let conn = store.conn.lock();
887 let msg_content: String = conn
888 .query_row(
889 "SELECT content FROM acp_messages WHERE role = 'assistant' LIMIT 1",
890 [],
891 |r| r.get(0),
892 )
893 .unwrap();
894 assert_eq!(msg_content, "running");
895 assert!(
896 !msg_content.contains("echo hi"),
897 "message content must not contain tool-call args"
898 );
899 }
900
901 #[test]
902 fn append_turn_empty_slice_is_noop() {
903 let (_tmp, store) = open_store();
904 store
905 .create_session("sess-empty", "alpha", "/tmp/proj")
906 .unwrap();
907 store.append_turn("sess-empty", &[]).unwrap();
908 let data = store.load_session("sess-empty").unwrap().unwrap();
909 assert!(data.messages.is_empty());
910 }
911
912 #[test]
913 fn last_activity_updated_on_append() {
914 let (_tmp, store) = open_store();
915 store
916 .create_session("sess-activity", "alpha", "/tmp/proj")
917 .unwrap();
918 let before = store
919 .load_session("sess-activity")
920 .unwrap()
921 .unwrap()
922 .last_activity;
923 std::thread::sleep(std::time::Duration::from_millis(10));
924 store
925 .append_turn(
926 "sess-activity",
927 &[ConversationMessage::Chat(ChatMessage::user("hi"))],
928 )
929 .unwrap();
930 let after = store
931 .load_session("sess-activity")
932 .unwrap()
933 .unwrap()
934 .last_activity;
935 assert!(after >= before);
936 }
937
938 #[test]
939 fn append_turn_unknown_session_errors_atomically() {
940 let (_tmp, store) = open_store();
941 let result = store.append_turn(
942 "does-not-exist",
943 &[ConversationMessage::Chat(ChatMessage::user("hello"))],
944 );
945 assert!(result.is_err());
946 let conn = store.conn.lock();
947 let count: i64 = conn
948 .query_row("SELECT COUNT(*) FROM acp_messages", [], |r| r.get(0))
949 .unwrap();
950 assert_eq!(count, 0, "no orphan rows after failed append_turn");
951 }
952
953 #[test]
954 fn delete_session_cascades_to_children() {
955 let (_tmp, store) = open_store();
956 store
957 .create_session("sess-del", "alpha", "/tmp/proj")
958 .unwrap();
959 store
960 .append_turn(
961 "sess-del",
962 &[
963 ConversationMessage::AssistantToolCalls {
964 text: Some("calling".into()),
965 tool_calls: vec![ToolCall {
966 id: "tc-1".into(),
967 name: "shell".into(),
968 arguments: "{}".into(),
969 extra_content: None,
970 }],
971 reasoning_content: None,
972 },
973 ConversationMessage::ToolResults(vec![ToolResultMessage {
974 tool_call_id: "tc-1".into(),
975 content: "ok".into(),
976 }]),
977 ],
978 )
979 .unwrap();
980 store
981 .append_event("sess-del", Action::Disconnect, EventOutcome::Success, None)
982 .unwrap();
983
984 assert!(store.delete_session("sess-del").unwrap());
985
986 let conn = store.conn.lock();
987 for table in ["acp_messages", "acp_tool_calls", "acp_session_events"] {
988 let count: i64 = conn
989 .query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |r| r.get(0))
990 .unwrap();
991 assert_eq!(count, 0, "cascade should empty {table}");
992 }
993 }
994
995 #[test]
996 fn delete_nonexistent_session_returns_false() {
997 let (_tmp, store) = open_store();
998 assert!(!store.delete_session("ghost").unwrap());
999 }
1000
1001 #[test]
1002 fn mark_session_killed_persists_without_deleting_history() {
1003 let (tmp, store) = open_store();
1004 store
1005 .create_session("sess-kill", "alpha", "/tmp/proj")
1006 .unwrap();
1007 store
1008 .append_turn(
1009 "sess-kill",
1010 &[ConversationMessage::Chat(ChatMessage::user("keep this"))],
1011 )
1012 .unwrap();
1013
1014 assert!(!store.is_session_killed("sess-kill").unwrap());
1015 assert!(store.mark_session_killed("sess-kill").unwrap());
1016 assert!(store.is_session_killed("sess-kill").unwrap());
1017
1018 let data = store.load_session("sess-kill").unwrap().unwrap();
1019 assert_eq!(
1020 data.messages.len(),
1021 1,
1022 "kill marker must not delete durable transcript history"
1023 );
1024
1025 drop(store);
1026 let reopened = AcpSessionStore::new(tmp.path()).unwrap();
1027 assert!(
1028 reopened.is_session_killed("sess-kill").unwrap(),
1029 "kill marker must survive store reopen"
1030 );
1031 assert!(
1032 reopened.load_session("sess-kill").unwrap().is_some(),
1033 "durable history remains loadable after reopen"
1034 );
1035 }
1036
1037 #[test]
1038 fn mark_nonexistent_session_killed_returns_false() {
1039 let (_tmp, store) = open_store();
1040 assert!(!store.mark_session_killed("ghost").unwrap());
1041 assert!(!store.is_session_killed("ghost").unwrap());
1042 }
1043
1044 #[test]
1045 fn touch_session_updates_last_activity() {
1046 let (_tmp, store) = open_store();
1047 store
1048 .create_session("sess-touch", "alpha", "/tmp/proj")
1049 .unwrap();
1050 let before = store
1051 .load_session("sess-touch")
1052 .unwrap()
1053 .unwrap()
1054 .last_activity;
1055 std::thread::sleep(std::time::Duration::from_millis(10));
1056 store.touch_session("sess-touch").unwrap();
1057 let after = store
1058 .load_session("sess-touch")
1059 .unwrap()
1060 .unwrap()
1061 .last_activity;
1062 assert!(after >= before);
1063 }
1064
1065 #[test]
1066 fn set_token_count_persists_and_load_reads_it() {
1067 let (_tmp, store) = open_store();
1068 store
1069 .create_session("sess-tok", "alpha", "/tmp/proj")
1070 .unwrap();
1071 assert_eq!(
1072 store.load_session("sess-tok").unwrap().unwrap().token_count,
1073 0
1074 );
1075
1076 store.set_token_count("sess-tok", 152_306).unwrap();
1077 assert_eq!(
1078 store.load_session("sess-tok").unwrap().unwrap().token_count,
1079 152_306,
1080 "ctx-bar value must round-trip through the store"
1081 );
1082
1083 store.set_token_count("sess-tok", 42).unwrap();
1085 assert_eq!(
1086 store.load_session("sess-tok").unwrap().unwrap().token_count,
1087 42
1088 );
1089 }
1090
1091 #[test]
1092 fn set_token_count_errors_on_unknown_session() {
1093 let (_tmp, store) = open_store();
1097 let err = store.set_token_count("nonexistent", 100).unwrap_err();
1098 assert!(
1099 err.to_string().contains("nonexistent"),
1100 "error must name the missing session_uuid; got: {err}"
1101 );
1102 }
1103
1104 #[test]
1105 fn append_event_writes_action_outcome_payload() {
1106 let (_tmp, store) = open_store();
1107 store
1108 .create_session("sess-evt", "alpha", "/tmp/proj")
1109 .unwrap();
1110
1111 store
1112 .append_event(
1113 "sess-evt",
1114 Action::Cancel,
1115 EventOutcome::Failure,
1116 Some("turn cancelled by user"),
1117 )
1118 .unwrap();
1119
1120 let conn = store.conn.lock();
1121 let (action, outcome, payload): (String, String, Option<String>) = conn
1122 .query_row(
1123 "SELECT action, outcome, payload FROM acp_session_events LIMIT 1",
1124 [],
1125 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1126 )
1127 .unwrap();
1128 assert_eq!(action, "cancel");
1129 assert_eq!(outcome, "failure");
1130 assert_eq!(payload.as_deref(), Some("turn cancelled by user"));
1131 }
1132
1133 #[test]
1134 fn list_sessions_returns_summaries_ordered_by_recent_activity() {
1135 let (_tmp, store) = open_store();
1136 store.create_session("sess-old", "alpha", "/tmp/a").unwrap();
1137 std::thread::sleep(std::time::Duration::from_millis(10));
1138 store.create_session("sess-new", "beta", "/tmp/b").unwrap();
1139 store
1140 .append_turn(
1141 "sess-new",
1142 &[ConversationMessage::Chat(ChatMessage::user("hi"))],
1143 )
1144 .unwrap();
1145 store.set_token_count("sess-new", 1234).unwrap();
1146
1147 let list = store.list_sessions().unwrap();
1148 assert_eq!(list.len(), 2);
1149 assert_eq!(list[0].session_uuid, "sess-new");
1151 assert_eq!(list[0].agent_alias, "beta");
1152 assert_eq!(list[0].workspace_dir, "/tmp/b");
1153 assert_eq!(list[0].message_count, 1);
1154 assert_eq!(list[0].token_count, 1234);
1155 assert_eq!(list[1].session_uuid, "sess-old");
1156 assert_eq!(list[1].message_count, 0);
1157 }
1158
1159 #[test]
1160 fn list_sessions_empty_when_no_sessions() {
1161 let (_tmp, store) = open_store();
1162 assert!(store.list_sessions().unwrap().is_empty());
1163 }
1164}