1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use parking_lot::Mutex;
4use rusqlite::{Connection, params};
5use std::path::Path;
6use zeroclaw_api::model_provider::ConversationMessage;
7
8pub struct AcpSessionStore {
9 conn: Mutex<Connection>,
10}
11
12pub struct AcpSessionData {
13 pub workspace_dir: String,
14 pub created_at: DateTime<Utc>,
15 pub last_activity: DateTime<Utc>,
16 pub messages: Vec<ConversationMessage>,
17}
18
19impl AcpSessionStore {
20 pub fn new(workspace_dir: &Path) -> Result<Self> {
21 let sessions_dir = workspace_dir.join("sessions");
22 std::fs::create_dir_all(&sessions_dir).context("Failed to create sessions directory")?;
23 let db_path = sessions_dir.join("acp-sessions.db");
24
25 let conn = Connection::open(&db_path)
26 .with_context(|| format!("Failed to open ACP session DB: {}", db_path.display()))?;
27
28 conn.execute_batch(
29 "PRAGMA journal_mode = WAL;
30 PRAGMA synchronous = NORMAL;
31 PRAGMA foreign_keys = ON;
32 PRAGMA temp_store = MEMORY;",
33 )
34 .context("Failed to configure ACP session DB pragmas")?;
35
36 conn.execute_batch(
37 "CREATE TABLE IF NOT EXISTS acp_sessions (
38 session_id TEXT PRIMARY KEY,
39 workspace_dir TEXT NOT NULL,
40 created_at TEXT NOT NULL,
41 last_activity TEXT NOT NULL
42 );
43
44 CREATE TABLE IF NOT EXISTS acp_messages (
45 id INTEGER PRIMARY KEY AUTOINCREMENT,
46 session_id TEXT NOT NULL REFERENCES acp_sessions(session_id) ON DELETE CASCADE,
47 message_json TEXT NOT NULL,
48 created_at TEXT NOT NULL
49 );
50
51 CREATE INDEX IF NOT EXISTS idx_acp_messages_session
52 ON acp_messages(session_id, id);",
53 )
54 .context("Failed to create ACP session schema")?;
55
56 Ok(Self {
57 conn: Mutex::new(conn),
58 })
59 }
60
61 pub fn create_session(&self, session_id: &str, workspace_dir: &str) -> Result<()> {
63 let now = Utc::now().to_rfc3339();
64 let conn = self.conn.lock();
65 conn.execute(
66 "INSERT INTO acp_sessions (session_id, workspace_dir, created_at, last_activity)
67 VALUES (?1, ?2, ?3, ?3)",
68 params![session_id, workspace_dir, now],
69 )
70 .context("Failed to create ACP session")?;
71 Ok(())
72 }
73
74 pub fn load_session(&self, session_id: &str) -> Result<Option<AcpSessionData>> {
77 let conn = self.conn.lock();
78
79 let row = conn.query_row(
80 "SELECT workspace_dir, created_at, last_activity
81 FROM acp_sessions WHERE session_id = ?1",
82 params![session_id],
83 |row| {
84 Ok((
85 row.get::<_, String>(0)?,
86 row.get::<_, String>(1)?,
87 row.get::<_, String>(2)?,
88 ))
89 },
90 );
91
92 let (workspace_dir, created_at_str, last_activity_str) = match row {
93 Ok(r) => r,
94 Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
95 Err(e) => return Err(e).context("Failed to query ACP session"),
96 };
97
98 let created_at = created_at_str.parse::<DateTime<Utc>>().unwrap_or_else(|e| {
99 ::zeroclaw_log::record!(
100 WARN,
101 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
102 .with_attrs(
103 ::serde_json::json!({"session_id": session_id, "error": e.to_string()})
104 ),
105 "Failed to parse created_at"
106 );
107 Utc::now()
108 });
109 let last_activity = last_activity_str
110 .parse::<DateTime<Utc>>()
111 .unwrap_or_else(|e| {
112 ::zeroclaw_log::record!(
113 WARN,
114 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
115 .with_attrs(
116 ::serde_json::json!({"session_id": session_id, "error": e.to_string()})
117 ),
118 "Failed to parse last_activity"
119 );
120 Utc::now()
121 });
122
123 let mut stmt = conn
124 .prepare(
125 "SELECT message_json FROM acp_messages
126 WHERE session_id = ?1 ORDER BY id ASC",
127 )
128 .context("Failed to prepare message query")?;
129
130 let messages: Vec<ConversationMessage> = stmt
131 .query_map(params![session_id], |row| row.get::<_, String>(0))?
132 .filter_map(|r| match r {
133 Ok(json) => match serde_json::from_str::<ConversationMessage>(&json) {
134 Ok(msg) => Some(msg),
135 Err(e) => {
136 ::zeroclaw_log::record!(
137 WARN,
138 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
139 .with_attrs(::serde_json::json!({"session_id": session_id, "error": e.to_string()})),
140 "Skipping corrupt ACP message"
141 );
142 None
143 }
144 },
145 Err(e) => {
146 ::zeroclaw_log::record!(
147 WARN,
148 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
149 .with_attrs(::serde_json::json!({"session_id": session_id, "error": e.to_string()})),
150 "Failed to read ACP message row"
151 );
152 None
153 }
154 })
155 .collect();
156
157 Ok(Some(AcpSessionData {
158 workspace_dir,
159 created_at,
160 last_activity,
161 messages,
162 }))
163 }
164
165 pub fn append_turn(&self, session_id: &str, messages: &[ConversationMessage]) -> Result<()> {
169 if messages.is_empty() {
170 return Ok(());
171 }
172
173 let now = Utc::now().to_rfc3339();
174 let mut conn = self.conn.lock();
175 let tx = conn
176 .transaction()
177 .context("Failed to begin append_turn transaction")?;
178
179 for msg in messages {
180 let json =
181 serde_json::to_string(msg).context("Failed to serialize ConversationMessage")?;
182 tx.execute(
183 "INSERT INTO acp_messages (session_id, message_json, created_at)
184 VALUES (?1, ?2, ?3)",
185 params![session_id, json, now],
186 )
187 .context("Failed to insert ACP message")?;
188 }
189
190 tx.execute(
191 "UPDATE acp_sessions SET last_activity = ?1 WHERE session_id = ?2",
192 params![now, session_id],
193 )
194 .context("Failed to update last_activity")?;
195
196 tx.commit().context("Failed to commit append_turn")?;
197 Ok(())
198 }
199
200 pub fn delete_session(&self, session_id: &str) -> Result<bool> {
203 let conn = self.conn.lock();
204 let rows = conn
205 .execute(
206 "DELETE FROM acp_sessions WHERE session_id = ?1",
207 params![session_id],
208 )
209 .context("Failed to delete ACP session")?;
210 Ok(rows > 0)
211 }
212
213 pub fn touch_session(&self, session_id: &str) -> Result<()> {
215 let now = Utc::now().to_rfc3339();
216 let conn = self.conn.lock();
217 conn.execute(
218 "UPDATE acp_sessions SET last_activity = ?1 WHERE session_id = ?2",
219 params![now, session_id],
220 )
221 .context("Failed to touch ACP session")?;
222 Ok(())
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use tempfile::TempDir;
230
231 #[test]
232 fn new_creates_db_and_tables() {
233 let tmp = TempDir::new().unwrap();
234 let store = AcpSessionStore::new(tmp.path()).unwrap();
235
236 let conn = store.conn.lock();
237 let session_table: String = conn
238 .query_row(
239 "SELECT name FROM sqlite_master WHERE type='table' AND name='acp_sessions'",
240 [],
241 |r| r.get(0),
242 )
243 .unwrap();
244 assert_eq!(session_table, "acp_sessions");
245
246 let msg_table: String = conn
247 .query_row(
248 "SELECT name FROM sqlite_master WHERE type='table' AND name='acp_messages'",
249 [],
250 |r| r.get(0),
251 )
252 .unwrap();
253 assert_eq!(msg_table, "acp_messages");
254 }
255
256 #[test]
257 fn create_and_load_session_metadata() {
258 let tmp = TempDir::new().unwrap();
259 let store = AcpSessionStore::new(tmp.path()).unwrap();
260
261 store
262 .create_session("sess-abc", "/home/user/project")
263 .unwrap();
264
265 let data = store.load_session("sess-abc").unwrap().unwrap();
266 assert_eq!(data.workspace_dir, "/home/user/project");
267 assert!(data.messages.is_empty());
268 }
269
270 #[test]
271 fn load_nonexistent_session_returns_none() {
272 let tmp = TempDir::new().unwrap();
273 let store = AcpSessionStore::new(tmp.path()).unwrap();
274 assert!(store.load_session("nonexistent").unwrap().is_none());
275 }
276
277 #[test]
278 fn append_turn_and_load_messages() {
279 let tmp = TempDir::new().unwrap();
280 let store = AcpSessionStore::new(tmp.path()).unwrap();
281 store.create_session("sess-msgs", "/tmp/proj").unwrap();
282
283 let msgs = vec![
284 ConversationMessage::Chat(zeroclaw_api::model_provider::ChatMessage::user("hello")),
285 ConversationMessage::Chat(zeroclaw_api::model_provider::ChatMessage::assistant("hi")),
286 ];
287 store.append_turn("sess-msgs", &msgs).unwrap();
288
289 let data = store.load_session("sess-msgs").unwrap().unwrap();
290 assert_eq!(data.messages.len(), 2);
291 assert!(
292 matches!(&data.messages[0], ConversationMessage::Chat(m) if m.role == "user" && m.content == "hello")
293 );
294 assert!(
295 matches!(&data.messages[1], ConversationMessage::Chat(m) if m.role == "assistant" && m.content == "hi")
296 );
297 }
298
299 #[test]
300 fn all_conversation_message_variants_round_trip() {
301 use zeroclaw_api::model_provider::{ChatMessage, ToolCall, ToolResultMessage};
302 let tmp = TempDir::new().unwrap();
303 let store = AcpSessionStore::new(tmp.path()).unwrap();
304 store.create_session("sess-variants", "/tmp/proj").unwrap();
305
306 let msgs = vec![
307 ConversationMessage::Chat(ChatMessage::user("task")),
308 ConversationMessage::AssistantToolCalls {
309 text: Some("calling shell".into()),
310 tool_calls: vec![ToolCall {
311 id: "tc-1".into(),
312 name: "shell".into(),
313 arguments: r#"{"command":"ls"}"#.into(),
314 extra_content: None,
315 }],
316 reasoning_content: None,
317 },
318 ConversationMessage::ToolResults(vec![ToolResultMessage {
319 tool_call_id: "tc-1".into(),
320 content: "file.txt\n".into(),
321 }]),
322 ConversationMessage::Chat(ChatMessage::assistant("done")),
323 ];
324 store.append_turn("sess-variants", &msgs).unwrap();
325
326 let data = store.load_session("sess-variants").unwrap().unwrap();
327 assert_eq!(data.messages.len(), 4);
328 assert!(
329 matches!(&data.messages[1], ConversationMessage::AssistantToolCalls { tool_calls, .. } if tool_calls[0].id == "tc-1")
330 );
331 assert!(
332 matches!(&data.messages[2], ConversationMessage::ToolResults(r) if r[0].content == "file.txt\n")
333 );
334 }
335
336 #[test]
337 fn append_turn_empty_slice_is_noop() {
338 let tmp = TempDir::new().unwrap();
339 let store = AcpSessionStore::new(tmp.path()).unwrap();
340 store.create_session("sess-empty", "/tmp/proj").unwrap();
341
342 store.append_turn("sess-empty", &[]).unwrap();
343
344 let data = store.load_session("sess-empty").unwrap().unwrap();
345 assert!(data.messages.is_empty());
346 }
347
348 #[test]
349 fn last_activity_updated_on_append() {
350 let tmp = TempDir::new().unwrap();
351 let store = AcpSessionStore::new(tmp.path()).unwrap();
352 store.create_session("sess-activity", "/tmp/proj").unwrap();
353
354 let before = store
355 .load_session("sess-activity")
356 .unwrap()
357 .unwrap()
358 .last_activity;
359
360 std::thread::sleep(std::time::Duration::from_millis(10));
362
363 let msg = ConversationMessage::Chat(zeroclaw_api::model_provider::ChatMessage::user("hi"));
364 store.append_turn("sess-activity", &[msg]).unwrap();
365
366 let after = store
367 .load_session("sess-activity")
368 .unwrap()
369 .unwrap()
370 .last_activity;
371 assert!(after >= before);
372 }
373
374 #[test]
375 fn append_turn_rolls_back_on_unknown_session() {
376 let tmp = TempDir::new().unwrap();
379 let store = AcpSessionStore::new(tmp.path()).unwrap();
380
381 let msg =
382 ConversationMessage::Chat(zeroclaw_api::model_provider::ChatMessage::user("hello"));
383 let result = store.append_turn("does-not-exist", &[msg]);
384 assert!(result.is_err());
385
386 let conn = store.conn.lock();
388 let count: i64 = conn
389 .query_row(
390 "SELECT COUNT(*) FROM acp_messages WHERE session_id = 'does-not-exist'",
391 [],
392 |r| r.get(0),
393 )
394 .unwrap();
395 assert_eq!(count, 0);
396 }
397
398 #[test]
399 fn delete_session_removes_session_and_messages() {
400 let tmp = TempDir::new().unwrap();
401 let store = AcpSessionStore::new(tmp.path()).unwrap();
402 store.create_session("sess-del", "/tmp/proj").unwrap();
403 let msg = ConversationMessage::Chat(zeroclaw_api::model_provider::ChatMessage::user("hi"));
404 store.append_turn("sess-del", &[msg]).unwrap();
405
406 let deleted = store.delete_session("sess-del").unwrap();
407 assert!(deleted);
408 assert!(store.load_session("sess-del").unwrap().is_none());
409
410 let conn = store.conn.lock();
412 let count: i64 = conn
413 .query_row(
414 "SELECT COUNT(*) FROM acp_messages WHERE session_id = 'sess-del'",
415 [],
416 |r| r.get(0),
417 )
418 .unwrap();
419 assert_eq!(count, 0);
420 }
421
422 #[test]
423 fn delete_nonexistent_session_returns_false() {
424 let tmp = TempDir::new().unwrap();
425 let store = AcpSessionStore::new(tmp.path()).unwrap();
426 assert!(!store.delete_session("ghost").unwrap());
427 }
428
429 #[test]
430 fn touch_session_updates_last_activity() {
431 let tmp = TempDir::new().unwrap();
432 let store = AcpSessionStore::new(tmp.path()).unwrap();
433 store.create_session("sess-touch", "/tmp/proj").unwrap();
434
435 let before = store
436 .load_session("sess-touch")
437 .unwrap()
438 .unwrap()
439 .last_activity;
440 std::thread::sleep(std::time::Duration::from_millis(10));
441 store.touch_session("sess-touch").unwrap();
442 let after = store
443 .load_session("sess-touch")
444 .unwrap()
445 .unwrap()
446 .last_activity;
447
448 assert!(after >= before);
449 }
450}