Skip to main content

zeroclaw_infra/
acp_session_store.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use parking_lot::Mutex;
4use rusqlite::{Connection, params};
5use std::path::Path;
6use zeroclaw_api::model_provider::ConversationMessage;
7
8pub struct AcpSessionStore {
9    conn: Mutex<Connection>,
10}
11
12pub struct AcpSessionData {
13    pub workspace_dir: String,
14    pub created_at: DateTime<Utc>,
15    pub last_activity: DateTime<Utc>,
16    pub messages: Vec<ConversationMessage>,
17}
18
19impl AcpSessionStore {
20    pub fn new(workspace_dir: &Path) -> Result<Self> {
21        let sessions_dir = workspace_dir.join("sessions");
22        std::fs::create_dir_all(&sessions_dir).context("Failed to create sessions directory")?;
23        let db_path = sessions_dir.join("acp-sessions.db");
24
25        let conn = Connection::open(&db_path)
26            .with_context(|| format!("Failed to open ACP session DB: {}", db_path.display()))?;
27
28        conn.execute_batch(
29            "PRAGMA journal_mode = WAL;
30             PRAGMA synchronous = NORMAL;
31             PRAGMA foreign_keys = ON;
32             PRAGMA temp_store = MEMORY;",
33        )
34        .context("Failed to configure ACP session DB pragmas")?;
35
36        conn.execute_batch(
37            "CREATE TABLE IF NOT EXISTS acp_sessions (
38                session_id    TEXT PRIMARY KEY,
39                workspace_dir TEXT NOT NULL,
40                created_at    TEXT NOT NULL,
41                last_activity TEXT NOT NULL
42             );
43
44             CREATE TABLE IF NOT EXISTS acp_messages (
45                id            INTEGER PRIMARY KEY AUTOINCREMENT,
46                session_id    TEXT NOT NULL REFERENCES acp_sessions(session_id) ON DELETE CASCADE,
47                message_json  TEXT NOT NULL,
48                created_at    TEXT NOT NULL
49             );
50
51             CREATE INDEX IF NOT EXISTS idx_acp_messages_session
52                ON acp_messages(session_id, id);",
53        )
54        .context("Failed to create ACP session schema")?;
55
56        Ok(Self {
57            conn: Mutex::new(conn),
58        })
59    }
60
61    /// Record a new session. Call immediately after session/new succeeds.
62    pub fn create_session(&self, session_id: &str, workspace_dir: &str) -> Result<()> {
63        let now = Utc::now().to_rfc3339();
64        let conn = self.conn.lock();
65        conn.execute(
66            "INSERT INTO acp_sessions (session_id, workspace_dir, created_at, last_activity)
67             VALUES (?1, ?2, ?3, ?3)",
68            params![session_id, workspace_dir, now],
69        )
70        .context("Failed to create ACP session")?;
71        Ok(())
72    }
73
74    /// Load session metadata and full message history for restore.
75    /// Returns `None` if the session_id is not found.
76    pub fn load_session(&self, session_id: &str) -> Result<Option<AcpSessionData>> {
77        let conn = self.conn.lock();
78
79        let row = conn.query_row(
80            "SELECT workspace_dir, created_at, last_activity
81             FROM acp_sessions WHERE session_id = ?1",
82            params![session_id],
83            |row| {
84                Ok((
85                    row.get::<_, String>(0)?,
86                    row.get::<_, String>(1)?,
87                    row.get::<_, String>(2)?,
88                ))
89            },
90        );
91
92        let (workspace_dir, created_at_str, last_activity_str) = match row {
93            Ok(r) => r,
94            Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
95            Err(e) => return Err(e).context("Failed to query ACP session"),
96        };
97
98        let created_at = created_at_str.parse::<DateTime<Utc>>().unwrap_or_else(|e| {
99            ::zeroclaw_log::record!(
100                WARN,
101                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
102                    .with_attrs(
103                        ::serde_json::json!({"session_id": session_id, "error": e.to_string()})
104                    ),
105                "Failed to parse created_at"
106            );
107            Utc::now()
108        });
109        let last_activity = last_activity_str
110            .parse::<DateTime<Utc>>()
111            .unwrap_or_else(|e| {
112                ::zeroclaw_log::record!(
113                    WARN,
114                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
115                        .with_attrs(
116                            ::serde_json::json!({"session_id": session_id, "error": e.to_string()})
117                        ),
118                    "Failed to parse last_activity"
119                );
120                Utc::now()
121            });
122
123        let mut stmt = conn
124            .prepare(
125                "SELECT message_json FROM acp_messages
126                 WHERE session_id = ?1 ORDER BY id ASC",
127            )
128            .context("Failed to prepare message query")?;
129
130        let messages: Vec<ConversationMessage> = stmt
131            .query_map(params![session_id], |row| row.get::<_, String>(0))?
132            .filter_map(|r| match r {
133                Ok(json) => match serde_json::from_str::<ConversationMessage>(&json) {
134                    Ok(msg) => Some(msg),
135                    Err(e) => {
136                        ::zeroclaw_log::record!(
137                            WARN,
138                            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
139                                .with_attrs(::serde_json::json!({"session_id": session_id, "error": e.to_string()})),
140                            "Skipping corrupt ACP message"
141                        );
142                        None
143                    }
144                },
145                Err(e) => {
146                    ::zeroclaw_log::record!(
147                        WARN,
148                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
149                            .with_attrs(::serde_json::json!({"session_id": session_id, "error": e.to_string()})),
150                        "Failed to read ACP message row"
151                    );
152                    None
153                }
154            })
155            .collect();
156
157        Ok(Some(AcpSessionData {
158            workspace_dir,
159            created_at,
160            last_activity,
161            messages,
162        }))
163    }
164
165    /// Append all ConversationMessages from one completed turn.
166    /// Single transaction: N message INSERTs + last_activity UPDATE.
167    /// Returns early without writing if `messages` is empty.
168    pub fn append_turn(&self, session_id: &str, messages: &[ConversationMessage]) -> Result<()> {
169        if messages.is_empty() {
170            return Ok(());
171        }
172
173        let now = Utc::now().to_rfc3339();
174        let mut conn = self.conn.lock();
175        let tx = conn
176            .transaction()
177            .context("Failed to begin append_turn transaction")?;
178
179        for msg in messages {
180            let json =
181                serde_json::to_string(msg).context("Failed to serialize ConversationMessage")?;
182            tx.execute(
183                "INSERT INTO acp_messages (session_id, message_json, created_at)
184                 VALUES (?1, ?2, ?3)",
185                params![session_id, json, now],
186            )
187            .context("Failed to insert ACP message")?;
188        }
189
190        tx.execute(
191            "UPDATE acp_sessions SET last_activity = ?1 WHERE session_id = ?2",
192            params![now, session_id],
193        )
194        .context("Failed to update last_activity")?;
195
196        tx.commit().context("Failed to commit append_turn")?;
197        Ok(())
198    }
199
200    /// Delete a session and all its messages. Returns `true` if the session existed.
201    /// Intended for operator tooling — not triggered by `session/close`.
202    pub fn delete_session(&self, session_id: &str) -> Result<bool> {
203        let conn = self.conn.lock();
204        let rows = conn
205            .execute(
206                "DELETE FROM acp_sessions WHERE session_id = ?1",
207                params![session_id],
208            )
209            .context("Failed to delete ACP session")?;
210        Ok(rows > 0)
211    }
212
213    /// Update `last_activity` without appending messages.
214    pub fn touch_session(&self, session_id: &str) -> Result<()> {
215        let now = Utc::now().to_rfc3339();
216        let conn = self.conn.lock();
217        conn.execute(
218            "UPDATE acp_sessions SET last_activity = ?1 WHERE session_id = ?2",
219            params![now, session_id],
220        )
221        .context("Failed to touch ACP session")?;
222        Ok(())
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use tempfile::TempDir;
230
231    #[test]
232    fn new_creates_db_and_tables() {
233        let tmp = TempDir::new().unwrap();
234        let store = AcpSessionStore::new(tmp.path()).unwrap();
235
236        let conn = store.conn.lock();
237        let session_table: String = conn
238            .query_row(
239                "SELECT name FROM sqlite_master WHERE type='table' AND name='acp_sessions'",
240                [],
241                |r| r.get(0),
242            )
243            .unwrap();
244        assert_eq!(session_table, "acp_sessions");
245
246        let msg_table: String = conn
247            .query_row(
248                "SELECT name FROM sqlite_master WHERE type='table' AND name='acp_messages'",
249                [],
250                |r| r.get(0),
251            )
252            .unwrap();
253        assert_eq!(msg_table, "acp_messages");
254    }
255
256    #[test]
257    fn create_and_load_session_metadata() {
258        let tmp = TempDir::new().unwrap();
259        let store = AcpSessionStore::new(tmp.path()).unwrap();
260
261        store
262            .create_session("sess-abc", "/home/user/project")
263            .unwrap();
264
265        let data = store.load_session("sess-abc").unwrap().unwrap();
266        assert_eq!(data.workspace_dir, "/home/user/project");
267        assert!(data.messages.is_empty());
268    }
269
270    #[test]
271    fn load_nonexistent_session_returns_none() {
272        let tmp = TempDir::new().unwrap();
273        let store = AcpSessionStore::new(tmp.path()).unwrap();
274        assert!(store.load_session("nonexistent").unwrap().is_none());
275    }
276
277    #[test]
278    fn append_turn_and_load_messages() {
279        let tmp = TempDir::new().unwrap();
280        let store = AcpSessionStore::new(tmp.path()).unwrap();
281        store.create_session("sess-msgs", "/tmp/proj").unwrap();
282
283        let msgs = vec![
284            ConversationMessage::Chat(zeroclaw_api::model_provider::ChatMessage::user("hello")),
285            ConversationMessage::Chat(zeroclaw_api::model_provider::ChatMessage::assistant("hi")),
286        ];
287        store.append_turn("sess-msgs", &msgs).unwrap();
288
289        let data = store.load_session("sess-msgs").unwrap().unwrap();
290        assert_eq!(data.messages.len(), 2);
291        assert!(
292            matches!(&data.messages[0], ConversationMessage::Chat(m) if m.role == "user" && m.content == "hello")
293        );
294        assert!(
295            matches!(&data.messages[1], ConversationMessage::Chat(m) if m.role == "assistant" && m.content == "hi")
296        );
297    }
298
299    #[test]
300    fn all_conversation_message_variants_round_trip() {
301        use zeroclaw_api::model_provider::{ChatMessage, ToolCall, ToolResultMessage};
302        let tmp = TempDir::new().unwrap();
303        let store = AcpSessionStore::new(tmp.path()).unwrap();
304        store.create_session("sess-variants", "/tmp/proj").unwrap();
305
306        let msgs = vec![
307            ConversationMessage::Chat(ChatMessage::user("task")),
308            ConversationMessage::AssistantToolCalls {
309                text: Some("calling shell".into()),
310                tool_calls: vec![ToolCall {
311                    id: "tc-1".into(),
312                    name: "shell".into(),
313                    arguments: r#"{"command":"ls"}"#.into(),
314                    extra_content: None,
315                }],
316                reasoning_content: None,
317            },
318            ConversationMessage::ToolResults(vec![ToolResultMessage {
319                tool_call_id: "tc-1".into(),
320                content: "file.txt\n".into(),
321            }]),
322            ConversationMessage::Chat(ChatMessage::assistant("done")),
323        ];
324        store.append_turn("sess-variants", &msgs).unwrap();
325
326        let data = store.load_session("sess-variants").unwrap().unwrap();
327        assert_eq!(data.messages.len(), 4);
328        assert!(
329            matches!(&data.messages[1], ConversationMessage::AssistantToolCalls { tool_calls, .. } if tool_calls[0].id == "tc-1")
330        );
331        assert!(
332            matches!(&data.messages[2], ConversationMessage::ToolResults(r) if r[0].content == "file.txt\n")
333        );
334    }
335
336    #[test]
337    fn append_turn_empty_slice_is_noop() {
338        let tmp = TempDir::new().unwrap();
339        let store = AcpSessionStore::new(tmp.path()).unwrap();
340        store.create_session("sess-empty", "/tmp/proj").unwrap();
341
342        store.append_turn("sess-empty", &[]).unwrap();
343
344        let data = store.load_session("sess-empty").unwrap().unwrap();
345        assert!(data.messages.is_empty());
346    }
347
348    #[test]
349    fn last_activity_updated_on_append() {
350        let tmp = TempDir::new().unwrap();
351        let store = AcpSessionStore::new(tmp.path()).unwrap();
352        store.create_session("sess-activity", "/tmp/proj").unwrap();
353
354        let before = store
355            .load_session("sess-activity")
356            .unwrap()
357            .unwrap()
358            .last_activity;
359
360        // Brief sleep to ensure timestamp advances
361        std::thread::sleep(std::time::Duration::from_millis(10));
362
363        let msg = ConversationMessage::Chat(zeroclaw_api::model_provider::ChatMessage::user("hi"));
364        store.append_turn("sess-activity", &[msg]).unwrap();
365
366        let after = store
367            .load_session("sess-activity")
368            .unwrap()
369            .unwrap()
370            .last_activity;
371        assert!(after >= before);
372    }
373
374    #[test]
375    fn append_turn_rolls_back_on_unknown_session() {
376        // Foreign key constraint: inserting messages for a nonexistent session
377        // must fail atomically — no orphaned message rows.
378        let tmp = TempDir::new().unwrap();
379        let store = AcpSessionStore::new(tmp.path()).unwrap();
380
381        let msg =
382            ConversationMessage::Chat(zeroclaw_api::model_provider::ChatMessage::user("hello"));
383        let result = store.append_turn("does-not-exist", &[msg]);
384        assert!(result.is_err());
385
386        // No orphaned rows
387        let conn = store.conn.lock();
388        let count: i64 = conn
389            .query_row(
390                "SELECT COUNT(*) FROM acp_messages WHERE session_id = 'does-not-exist'",
391                [],
392                |r| r.get(0),
393            )
394            .unwrap();
395        assert_eq!(count, 0);
396    }
397
398    #[test]
399    fn delete_session_removes_session_and_messages() {
400        let tmp = TempDir::new().unwrap();
401        let store = AcpSessionStore::new(tmp.path()).unwrap();
402        store.create_session("sess-del", "/tmp/proj").unwrap();
403        let msg = ConversationMessage::Chat(zeroclaw_api::model_provider::ChatMessage::user("hi"));
404        store.append_turn("sess-del", &[msg]).unwrap();
405
406        let deleted = store.delete_session("sess-del").unwrap();
407        assert!(deleted);
408        assert!(store.load_session("sess-del").unwrap().is_none());
409
410        // Cascade: messages gone too
411        let conn = store.conn.lock();
412        let count: i64 = conn
413            .query_row(
414                "SELECT COUNT(*) FROM acp_messages WHERE session_id = 'sess-del'",
415                [],
416                |r| r.get(0),
417            )
418            .unwrap();
419        assert_eq!(count, 0);
420    }
421
422    #[test]
423    fn delete_nonexistent_session_returns_false() {
424        let tmp = TempDir::new().unwrap();
425        let store = AcpSessionStore::new(tmp.path()).unwrap();
426        assert!(!store.delete_session("ghost").unwrap());
427    }
428
429    #[test]
430    fn touch_session_updates_last_activity() {
431        let tmp = TempDir::new().unwrap();
432        let store = AcpSessionStore::new(tmp.path()).unwrap();
433        store.create_session("sess-touch", "/tmp/proj").unwrap();
434
435        let before = store
436            .load_session("sess-touch")
437            .unwrap()
438            .unwrap()
439            .last_activity;
440        std::thread::sleep(std::time::Duration::from_millis(10));
441        store.touch_session("sess-touch").unwrap();
442        let after = store
443            .load_session("sess-touch")
444            .unwrap()
445            .unwrap()
446            .last_activity;
447
448        assert!(after >= before);
449    }
450}