Skip to main content

zeroclaw_infra/
session_store.rs

1//! JSONL-based session persistence for channel conversations.
2//!
3//! Each session (keyed by `channel_sender` or `channel_thread_sender`) is stored
4//! as an append-only JSONL file in `{workspace}/sessions/`. Messages are appended
5//! one-per-line as JSON, never modifying old lines. On daemon restart, sessions
6//! are loaded from disk to restore conversation context.
7
8use crate::session_backend::SessionBackend;
9use std::io::{BufRead, Write};
10use std::path::{Path, PathBuf};
11use zeroclaw_api::model_provider::ChatMessage;
12pub use zeroclaw_api::session_keys::sanitize_session_key;
13
14/// Append-only JSONL session store for channel conversations.
15pub struct SessionStore {
16    sessions_dir: PathBuf,
17}
18
19impl SessionStore {
20    /// Create a new session store, ensuring the sessions directory exists.
21    pub fn new(workspace_dir: &Path) -> std::io::Result<Self> {
22        let sessions_dir = workspace_dir.join("sessions");
23        std::fs::create_dir_all(&sessions_dir)?;
24        Ok(Self { sessions_dir })
25    }
26
27    /// Compute the file path for a session key, sanitizing for filesystem safety.
28    fn session_path(&self, session_key: &str) -> PathBuf {
29        self.sessions_dir
30            .join(format!("{}.jsonl", sanitize_session_key(session_key)))
31    }
32
33    /// Load all messages for a session from its JSONL file.
34    /// Returns an empty vec if the file does not exist or is unreadable.
35    pub fn load(&self, session_key: &str) -> Vec<ChatMessage> {
36        let path = self.session_path(session_key);
37        let file = match std::fs::File::open(&path) {
38            Ok(f) => f,
39            Err(_) => return Vec::new(),
40        };
41
42        let reader = std::io::BufReader::new(file);
43        let mut messages = Vec::new();
44
45        for line in reader.lines() {
46            let Ok(line) = line else { continue };
47            let trimmed = line.trim();
48            if trimmed.is_empty() {
49                continue;
50            }
51            if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
52                messages.push(msg);
53            }
54        }
55
56        messages
57    }
58
59    /// Append a single message to the session JSONL file.
60    pub fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
61        let path = self.session_path(session_key);
62        let mut file = std::fs::OpenOptions::new()
63            .create(true)
64            .append(true)
65            .open(&path)?;
66
67        let json = serde_json::to_string(message)
68            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
69
70        writeln!(file, "{json}")?;
71        Ok(())
72    }
73
74    /// Remove the last message from a session's JSONL file.
75    ///
76    /// Rewrite approach: load all messages, drop the last, rewrite. This is
77    /// O(n) but rollbacks are rare.
78    pub fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
79        let mut messages = self.load(session_key);
80        if messages.is_empty() {
81            return Ok(false);
82        }
83        messages.pop();
84        self.rewrite(session_key, &messages)?;
85        Ok(true)
86    }
87
88    /// Compact a session file by rewriting only valid messages (removes corrupt lines).
89    pub fn compact(&self, session_key: &str) -> std::io::Result<()> {
90        let messages = self.load(session_key);
91        self.rewrite(session_key, &messages)
92    }
93
94    fn rewrite(&self, session_key: &str, messages: &[ChatMessage]) -> std::io::Result<()> {
95        let path = self.session_path(session_key);
96        let mut file = std::fs::File::create(&path)?;
97        for msg in messages {
98            let json = serde_json::to_string(msg)
99                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
100            writeln!(file, "{json}")?;
101        }
102        Ok(())
103    }
104
105    /// Clear all messages from a session by truncating its JSONL file.
106    /// The file is preserved (empty) so the session key remains in `list_sessions`.
107    pub fn clear_messages(&self, session_key: &str) -> std::io::Result<usize> {
108        let count = self.load(session_key).len();
109        if count > 0 {
110            self.rewrite(session_key, &[])?;
111        }
112        Ok(count)
113    }
114
115    /// Delete a session's JSONL file. Returns `true` if the file existed.
116    pub fn delete_session(&self, session_key: &str) -> std::io::Result<bool> {
117        let path = self.session_path(session_key);
118        if !path.exists() {
119            return Ok(false);
120        }
121        std::fs::remove_file(&path)?;
122        Ok(true)
123    }
124
125    /// Return the modification time of a session's JSONL file.
126    pub fn session_mtime(&self, session_key: &str) -> Option<std::time::SystemTime> {
127        std::fs::metadata(self.session_path(session_key))
128            .and_then(|m| m.modified())
129            .ok()
130    }
131
132    /// List all session keys that have files on disk.
133    pub fn list_sessions(&self) -> Vec<String> {
134        let entries = match std::fs::read_dir(&self.sessions_dir) {
135            Ok(e) => e,
136            Err(_) => return Vec::new(),
137        };
138
139        entries
140            .filter_map(|entry| {
141                let entry = entry.ok()?;
142                let name = entry.file_name().into_string().ok()?;
143                name.strip_suffix(".jsonl").map(String::from)
144            })
145            .collect()
146    }
147}
148
149impl SessionBackend for SessionStore {
150    fn load(&self, session_key: &str) -> Vec<ChatMessage> {
151        self.load(session_key)
152    }
153
154    fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
155        self.append(session_key, message)
156    }
157
158    fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
159        self.remove_last(session_key)
160    }
161
162    fn list_sessions(&self) -> Vec<String> {
163        self.list_sessions()
164    }
165
166    /// Override the trait default so JSONL-backed channel hydration picks
167    /// the most-recent sessions when truncating to MAX_CONVERSATION_SENDERS.
168    /// The trait default stamps every key with `Utc::now()`, which makes
169    /// the orchestrator's `sort_by_key(|m| Reverse(m.last_activity))`
170    /// arbitrary once more than that many sessions are persisted.
171    fn list_sessions_with_metadata(&self) -> Vec<crate::session_backend::SessionMetadata> {
172        use chrono::{DateTime, Utc};
173        self.list_sessions()
174            .into_iter()
175            .map(|key| {
176                let last_activity: DateTime<Utc> = self
177                    .session_mtime(&key)
178                    .map(DateTime::<Utc>::from)
179                    .unwrap_or_else(Utc::now);
180                crate::session_backend::SessionMetadata {
181                    name: None,
182                    created_at: last_activity,
183                    last_activity,
184                    message_count: 0,
185                    key,
186                    agent_alias: None,
187                    channel_id: None,
188                    room_id: None,
189                    sender_id: None,
190                }
191            })
192            .collect()
193    }
194
195    fn compact(&self, session_key: &str) -> std::io::Result<()> {
196        self.compact(session_key)
197    }
198
199    fn clear_messages(&self, session_key: &str) -> std::io::Result<usize> {
200        self.clear_messages(session_key)
201    }
202
203    fn delete_session(&self, session_key: &str) -> std::io::Result<bool> {
204        self.delete_session(session_key)
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use tempfile::TempDir;
212
213    #[test]
214    fn round_trip_append_and_load() {
215        let tmp = TempDir::new().unwrap();
216        let store = SessionStore::new(tmp.path()).unwrap();
217
218        store
219            .append("telegram_user123", &ChatMessage::user("hello"))
220            .unwrap();
221        store
222            .append("telegram_user123", &ChatMessage::assistant("hi there"))
223            .unwrap();
224
225        let messages = store.load("telegram_user123");
226        assert_eq!(messages.len(), 2);
227        assert_eq!(messages[0].role, "user");
228        assert_eq!(messages[0].content, "hello");
229        assert_eq!(messages[1].role, "assistant");
230        assert_eq!(messages[1].content, "hi there");
231    }
232
233    #[test]
234    fn load_nonexistent_session_returns_empty() {
235        let tmp = TempDir::new().unwrap();
236        let store = SessionStore::new(tmp.path()).unwrap();
237
238        let messages = store.load("nonexistent");
239        assert!(messages.is_empty());
240    }
241
242    #[test]
243    fn key_sanitization() {
244        let tmp = TempDir::new().unwrap();
245        let store = SessionStore::new(tmp.path()).unwrap();
246
247        store
248            .append("slack/thread:123/user", &ChatMessage::user("test"))
249            .unwrap();
250
251        let messages = store.load("slack/thread:123/user");
252        assert_eq!(messages.len(), 1);
253    }
254
255    #[test]
256    fn sanitize_session_key_is_idempotent() {
257        let raw = "slack_C123_1.2_user one";
258        let once = sanitize_session_key(raw);
259        let twice = sanitize_session_key(&once);
260        assert_eq!(once, "slack_C123_1_2_user_one");
261        assert_eq!(once, twice);
262    }
263
264    #[test]
265    fn restart_simulation_matches_when_caller_pre_sanitizes() {
266        let tmp = TempDir::new().unwrap();
267        let runtime_key = sanitize_session_key("slack_C123_1.2_user one");
268
269        {
270            let store = SessionStore::new(tmp.path()).unwrap();
271            store
272                .append(&runtime_key, &ChatMessage::user("first"))
273                .unwrap();
274            store
275                .append(&runtime_key, &ChatMessage::assistant("ack"))
276                .unwrap();
277        }
278
279        let store = SessionStore::new(tmp.path()).unwrap();
280        let listed = store.list_sessions();
281        assert_eq!(listed, vec![runtime_key.clone()]);
282
283        let msgs = store.load(&listed[0]);
284        assert_eq!(msgs.len(), 2);
285        assert_eq!(msgs[0].content, "first");
286        assert_eq!(msgs[1].content, "ack");
287    }
288
289    #[test]
290    fn list_sessions_returns_keys() {
291        let tmp = TempDir::new().unwrap();
292        let store = SessionStore::new(tmp.path()).unwrap();
293
294        store
295            .append("telegram_alice", &ChatMessage::user("hi"))
296            .unwrap();
297        store
298            .append("discord_bob", &ChatMessage::user("hey"))
299            .unwrap();
300
301        let mut sessions = store.list_sessions();
302        sessions.sort();
303        assert_eq!(sessions.len(), 2);
304        assert!(sessions.contains(&"discord_bob".to_string()));
305        assert!(sessions.contains(&"telegram_alice".to_string()));
306    }
307
308    #[test]
309    fn append_is_truly_append_only() {
310        let tmp = TempDir::new().unwrap();
311        let store = SessionStore::new(tmp.path()).unwrap();
312        let key = "test_session";
313
314        store.append(key, &ChatMessage::user("msg1")).unwrap();
315        store.append(key, &ChatMessage::user("msg2")).unwrap();
316
317        // Read raw file to verify append-only format
318        let path = store.session_path(key);
319        let content = std::fs::read_to_string(&path).unwrap();
320        let lines: Vec<&str> = content.trim().lines().collect();
321        assert_eq!(lines.len(), 2);
322    }
323
324    #[test]
325    fn remove_last_drops_final_message() {
326        let tmp = TempDir::new().unwrap();
327        let store = SessionStore::new(tmp.path()).unwrap();
328
329        store
330            .append("rm_test", &ChatMessage::user("first"))
331            .unwrap();
332        store
333            .append("rm_test", &ChatMessage::user("second"))
334            .unwrap();
335
336        assert!(store.remove_last("rm_test").unwrap());
337        let messages = store.load("rm_test");
338        assert_eq!(messages.len(), 1);
339        assert_eq!(messages[0].content, "first");
340    }
341
342    #[test]
343    fn remove_last_empty_returns_false() {
344        let tmp = TempDir::new().unwrap();
345        let store = SessionStore::new(tmp.path()).unwrap();
346        assert!(!store.remove_last("nonexistent").unwrap());
347    }
348
349    #[test]
350    fn compact_removes_corrupt_lines() {
351        let tmp = TempDir::new().unwrap();
352        let store = SessionStore::new(tmp.path()).unwrap();
353        let key = "compact_test";
354
355        let path = store.session_path(key);
356        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
357        let mut file = std::fs::File::create(&path).unwrap();
358        writeln!(file, r#"{{"role":"user","content":"ok"}}"#).unwrap();
359        writeln!(file, "corrupt line").unwrap();
360        writeln!(file, r#"{{"role":"assistant","content":"hi"}}"#).unwrap();
361
362        store.compact(key).unwrap();
363
364        let raw = std::fs::read_to_string(&path).unwrap();
365        assert_eq!(raw.trim().lines().count(), 2);
366    }
367
368    #[test]
369    fn session_backend_trait_works_via_dyn() {
370        let tmp = TempDir::new().unwrap();
371        let store = SessionStore::new(tmp.path()).unwrap();
372        let backend: &dyn SessionBackend = &store;
373
374        backend
375            .append("trait_test", &ChatMessage::user("hello"))
376            .unwrap();
377        let msgs = backend.load("trait_test");
378        assert_eq!(msgs.len(), 1);
379    }
380
381    #[test]
382    fn handles_corrupt_lines_gracefully() {
383        let tmp = TempDir::new().unwrap();
384        let store = SessionStore::new(tmp.path()).unwrap();
385        let key = "corrupt_test";
386
387        // Write valid message + corrupt line + valid message
388        let path = store.session_path(key);
389        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
390        let mut file = std::fs::File::create(&path).unwrap();
391        writeln!(file, r#"{{"role":"user","content":"hello"}}"#).unwrap();
392        writeln!(file, "this is not valid json").unwrap();
393        writeln!(file, r#"{{"role":"assistant","content":"world"}}"#).unwrap();
394
395        let messages = store.load(key);
396        assert_eq!(messages.len(), 2);
397        assert_eq!(messages[0].content, "hello");
398        assert_eq!(messages[1].content, "world");
399    }
400
401    #[test]
402    fn clear_messages_truncates_file() {
403        let tmp = TempDir::new().unwrap();
404        let store = SessionStore::new(tmp.path()).unwrap();
405        let key = "clear_test";
406
407        store.append(key, &ChatMessage::user("hello")).unwrap();
408        store.append(key, &ChatMessage::assistant("world")).unwrap();
409
410        let cleared = store.clear_messages(key).unwrap();
411        assert_eq!(cleared, 2);
412        assert!(store.load(key).is_empty());
413        // File still exists — session key remains in list_sessions
414        assert!(store.session_path(key).exists());
415    }
416
417    #[test]
418    fn clear_messages_empty_returns_zero() {
419        let tmp = TempDir::new().unwrap();
420        let store = SessionStore::new(tmp.path()).unwrap();
421        assert_eq!(store.clear_messages("nonexistent").unwrap(), 0);
422    }
423
424    #[test]
425    fn clear_messages_does_not_affect_other_sessions() {
426        let tmp = TempDir::new().unwrap();
427        let store = SessionStore::new(tmp.path()).unwrap();
428
429        store
430            .append("alice", &ChatMessage::user("alice msg"))
431            .unwrap();
432        store.append("bob", &ChatMessage::user("bob msg")).unwrap();
433
434        store.clear_messages("alice").unwrap();
435        assert!(store.load("alice").is_empty());
436        assert_eq!(store.load("bob").len(), 1);
437    }
438
439    #[test]
440    fn clear_messages_then_append_works() {
441        let tmp = TempDir::new().unwrap();
442        let store = SessionStore::new(tmp.path()).unwrap();
443        let key = "reuse_test";
444
445        store.append(key, &ChatMessage::user("old")).unwrap();
446        store.clear_messages(key).unwrap();
447        store.append(key, &ChatMessage::user("new")).unwrap();
448
449        let messages = store.load(key);
450        assert_eq!(messages.len(), 1);
451        assert_eq!(messages[0].content, "new");
452    }
453
454    #[test]
455    fn delete_session_removes_jsonl_file() {
456        let tmp = TempDir::new().unwrap();
457        let store = SessionStore::new(tmp.path()).unwrap();
458        let key = "delete_test";
459
460        store.append(key, &ChatMessage::user("hello")).unwrap();
461        assert_eq!(store.load(key).len(), 1);
462
463        let deleted = store.delete_session(key).unwrap();
464        assert!(deleted);
465        assert!(store.load(key).is_empty());
466        assert!(!store.session_path(key).exists());
467    }
468
469    #[test]
470    fn delete_session_nonexistent_returns_false() {
471        let tmp = TempDir::new().unwrap();
472        let store = SessionStore::new(tmp.path()).unwrap();
473
474        let deleted = store.delete_session("nonexistent").unwrap();
475        assert!(!deleted);
476    }
477
478    #[test]
479    fn delete_session_via_trait() {
480        let tmp = TempDir::new().unwrap();
481        let store = SessionStore::new(tmp.path()).unwrap();
482        let backend: &dyn SessionBackend = &store;
483
484        backend
485            .append("trait_delete", &ChatMessage::user("hello"))
486            .unwrap();
487        assert_eq!(backend.load("trait_delete").len(), 1);
488
489        let deleted = backend.delete_session("trait_delete").unwrap();
490        assert!(deleted);
491        assert!(backend.load("trait_delete").is_empty());
492    }
493
494    // ── get_session_metadata (trait default) tests ──────────────────
495
496    #[test]
497    fn get_session_metadata_returns_none_for_missing() {
498        let tmp = TempDir::new().unwrap();
499        let store = SessionStore::new(tmp.path()).unwrap();
500        let backend: &dyn SessionBackend = &store;
501        assert!(backend.get_session_metadata("nonexistent").is_none());
502    }
503
504    #[test]
505    fn get_session_metadata_returns_correct_count() {
506        let tmp = TempDir::new().unwrap();
507        let store = SessionStore::new(tmp.path()).unwrap();
508        let backend: &dyn SessionBackend = &store;
509
510        backend
511            .append("test_session", &ChatMessage::user("hello"))
512            .unwrap();
513        backend
514            .append("test_session", &ChatMessage::assistant("hi"))
515            .unwrap();
516
517        let meta = backend.get_session_metadata("test_session").unwrap();
518        assert_eq!(meta.key, "test_session");
519        assert_eq!(meta.message_count, 2);
520        assert!(meta.name.is_none());
521    }
522}