Skip to main content

zeroclaw_infra/
session_store.rs

1//! JSONL-based session persistence for channel conversations.
2//!
3//! Each session (keyed by `channel_sender` or `channel_thread_sender`) is stored
4//! as an append-only JSONL file in `{workspace}/sessions/`. Messages are appended
5//! one-per-line as JSON, never modifying old lines. On daemon restart, sessions
6//! are loaded from disk to restore conversation context.
7
8use crate::session_backend::SessionBackend;
9use std::io::{BufRead, Write};
10use std::path::{Path, PathBuf};
11use zeroclaw_api::model_provider::ChatMessage;
12pub use zeroclaw_api::session_keys::sanitize_session_key;
13
14/// Append-only JSONL session store for channel conversations.
15pub struct SessionStore {
16    sessions_dir: PathBuf,
17}
18
19impl SessionStore {
20    /// Create a new session store, ensuring the sessions directory exists.
21    pub fn new(workspace_dir: &Path) -> std::io::Result<Self> {
22        let sessions_dir = workspace_dir.join("sessions");
23        std::fs::create_dir_all(&sessions_dir)?;
24        Ok(Self { sessions_dir })
25    }
26
27    /// Compute the file path for a session key, sanitizing for filesystem safety.
28    fn session_path(&self, session_key: &str) -> PathBuf {
29        self.sessions_dir
30            .join(format!("{}.jsonl", sanitize_session_key(session_key)))
31    }
32
33    /// Load all messages for a session from its JSONL file.
34    /// Returns an empty vec if the file does not exist or is unreadable.
35    pub fn load(&self, session_key: &str) -> Vec<ChatMessage> {
36        let path = self.session_path(session_key);
37        let file = match std::fs::File::open(&path) {
38            Ok(f) => f,
39            Err(_) => return Vec::new(),
40        };
41
42        let reader = std::io::BufReader::new(file);
43        let mut messages = Vec::new();
44
45        for line in reader.lines() {
46            let Ok(line) = line else { continue };
47            let trimmed = line.trim();
48            if trimmed.is_empty() {
49                continue;
50            }
51            if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
52                messages.push(msg);
53            }
54        }
55
56        messages
57    }
58
59    /// Append a single message to the session JSONL file.
60    pub fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
61        let path = self.session_path(session_key);
62        let mut file = std::fs::OpenOptions::new()
63            .create(true)
64            .append(true)
65            .open(&path)?;
66
67        let json = serde_json::to_string(message)
68            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
69
70        writeln!(file, "{json}")?;
71        Ok(())
72    }
73
74    /// Remove the last message from a session's JSONL file.
75    ///
76    /// Rewrite approach: load all messages, drop the last, rewrite. This is
77    /// O(n) but rollbacks are rare.
78    pub fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
79        let mut messages = self.load(session_key);
80        if messages.is_empty() {
81            return Ok(false);
82        }
83        messages.pop();
84        self.rewrite(session_key, &messages)?;
85        Ok(true)
86    }
87
88    /// Compact a session file by rewriting only valid messages (removes corrupt lines).
89    pub fn compact(&self, session_key: &str) -> std::io::Result<()> {
90        let messages = self.load(session_key);
91        self.rewrite(session_key, &messages)
92    }
93
94    fn rewrite(&self, session_key: &str, messages: &[ChatMessage]) -> std::io::Result<()> {
95        let path = self.session_path(session_key);
96        let mut file = std::fs::File::create(&path)?;
97        for msg in messages {
98            let json = serde_json::to_string(msg)
99                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
100            writeln!(file, "{json}")?;
101        }
102        Ok(())
103    }
104
105    /// Clear all messages from a session by truncating its JSONL file.
106    /// The file is preserved (empty) so the session key remains in `list_sessions`.
107    pub fn clear_messages(&self, session_key: &str) -> std::io::Result<usize> {
108        let count = self.load(session_key).len();
109        if count > 0 {
110            self.rewrite(session_key, &[])?;
111        }
112        Ok(count)
113    }
114
115    /// Delete a session's JSONL file. Returns `true` if the file existed.
116    pub fn delete_session(&self, session_key: &str) -> std::io::Result<bool> {
117        let path = self.session_path(session_key);
118        if !path.exists() {
119            return Ok(false);
120        }
121        std::fs::remove_file(&path)?;
122        Ok(true)
123    }
124
125    /// Return the modification time of a session's JSONL file.
126    pub fn session_mtime(&self, session_key: &str) -> Option<std::time::SystemTime> {
127        std::fs::metadata(self.session_path(session_key))
128            .and_then(|m| m.modified())
129            .ok()
130    }
131
132    /// List all session keys that have files on disk.
133    pub fn list_sessions(&self) -> Vec<String> {
134        let entries = match std::fs::read_dir(&self.sessions_dir) {
135            Ok(e) => e,
136            Err(_) => return Vec::new(),
137        };
138
139        entries
140            .filter_map(|entry| {
141                let entry = entry.ok()?;
142                let name = entry.file_name().into_string().ok()?;
143                name.strip_suffix(".jsonl").map(String::from)
144            })
145            .collect()
146    }
147}
148
149impl SessionBackend for SessionStore {
150    fn load(&self, session_key: &str) -> Vec<ChatMessage> {
151        self.load(session_key)
152    }
153
154    fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
155        self.append(session_key, message)
156    }
157
158    fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
159        self.remove_last(session_key)
160    }
161
162    fn list_sessions(&self) -> Vec<String> {
163        self.list_sessions()
164    }
165
166    /// Override the trait default so JSONL-backed channel hydration picks
167    /// the most-recent sessions when truncating to MAX_CONVERSATION_SENDERS.
168    /// The trait default stamps every key with `Utc::now()`, which makes
169    /// the orchestrator's `sort_by_key(|m| Reverse(m.last_activity))`
170    /// arbitrary once more than that many sessions are persisted.
171    fn list_sessions_with_metadata(&self) -> Vec<crate::session_backend::SessionMetadata> {
172        use chrono::{DateTime, Utc};
173        self.list_sessions()
174            .into_iter()
175            .map(|key| {
176                let last_activity: DateTime<Utc> = self
177                    .session_mtime(&key)
178                    .map(DateTime::<Utc>::from)
179                    .unwrap_or_else(Utc::now);
180                crate::session_backend::SessionMetadata {
181                    name: None,
182                    created_at: last_activity,
183                    last_activity,
184                    message_count: 0,
185                    key,
186                    agent_alias: None,
187                    channel_id: None,
188                    room_id: None,
189                    sender_id: None,
190                }
191            })
192            .collect()
193    }
194
195    fn compact(&self, session_key: &str) -> std::io::Result<()> {
196        self.compact(session_key)
197    }
198
199    fn clear_messages(&self, session_key: &str) -> std::io::Result<usize> {
200        self.clear_messages(session_key)
201    }
202
203    fn delete_session(&self, session_key: &str) -> std::io::Result<bool> {
204        self.delete_session(session_key)
205    }
206
207    /// Quick existence probe mirroring how `delete_session` decides whether
208    /// the session is on disk (#7126). Checking file presence is the same
209    /// O(1) `stat` that `delete_session` itself performs.
210    fn session_exists(&self, session_key: &str) -> bool {
211        self.session_path(session_key).exists()
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use tempfile::TempDir;
219
220    #[test]
221    fn round_trip_append_and_load() {
222        let tmp = TempDir::new().unwrap();
223        let store = SessionStore::new(tmp.path()).unwrap();
224
225        store
226            .append("telegram_user123", &ChatMessage::user("hello"))
227            .unwrap();
228        store
229            .append("telegram_user123", &ChatMessage::assistant("hi there"))
230            .unwrap();
231
232        let messages = store.load("telegram_user123");
233        assert_eq!(messages.len(), 2);
234        assert_eq!(messages[0].role, "user");
235        assert_eq!(messages[0].content, "hello");
236        assert_eq!(messages[1].role, "assistant");
237        assert_eq!(messages[1].content, "hi there");
238    }
239
240    #[test]
241    fn load_nonexistent_session_returns_empty() {
242        let tmp = TempDir::new().unwrap();
243        let store = SessionStore::new(tmp.path()).unwrap();
244
245        let messages = store.load("nonexistent");
246        assert!(messages.is_empty());
247    }
248
249    #[test]
250    fn key_sanitization() {
251        let tmp = TempDir::new().unwrap();
252        let store = SessionStore::new(tmp.path()).unwrap();
253
254        store
255            .append("slack/thread:123/user", &ChatMessage::user("test"))
256            .unwrap();
257
258        let messages = store.load("slack/thread:123/user");
259        assert_eq!(messages.len(), 1);
260    }
261
262    #[test]
263    fn sanitize_session_key_is_idempotent() {
264        let raw = "slack_C123_1.2_user one";
265        let once = sanitize_session_key(raw);
266        let twice = sanitize_session_key(&once);
267        assert_eq!(once, "slack_C123_1_2_user_one");
268        assert_eq!(once, twice);
269    }
270
271    #[test]
272    fn restart_simulation_matches_when_caller_pre_sanitizes() {
273        let tmp = TempDir::new().unwrap();
274        let runtime_key = sanitize_session_key("slack_C123_1.2_user one");
275
276        {
277            let store = SessionStore::new(tmp.path()).unwrap();
278            store
279                .append(&runtime_key, &ChatMessage::user("first"))
280                .unwrap();
281            store
282                .append(&runtime_key, &ChatMessage::assistant("ack"))
283                .unwrap();
284        }
285
286        let store = SessionStore::new(tmp.path()).unwrap();
287        let listed = store.list_sessions();
288        assert_eq!(listed, vec![runtime_key.clone()]);
289
290        let msgs = store.load(&listed[0]);
291        assert_eq!(msgs.len(), 2);
292        assert_eq!(msgs[0].content, "first");
293        assert_eq!(msgs[1].content, "ack");
294    }
295
296    #[test]
297    fn list_sessions_returns_keys() {
298        let tmp = TempDir::new().unwrap();
299        let store = SessionStore::new(tmp.path()).unwrap();
300
301        store
302            .append("telegram_alice", &ChatMessage::user("hi"))
303            .unwrap();
304        store
305            .append("discord_bob", &ChatMessage::user("hey"))
306            .unwrap();
307
308        let mut sessions = store.list_sessions();
309        sessions.sort();
310        assert_eq!(sessions.len(), 2);
311        assert!(sessions.contains(&"discord_bob".to_string()));
312        assert!(sessions.contains(&"telegram_alice".to_string()));
313    }
314
315    #[test]
316    fn append_is_truly_append_only() {
317        let tmp = TempDir::new().unwrap();
318        let store = SessionStore::new(tmp.path()).unwrap();
319        let key = "test_session";
320
321        store.append(key, &ChatMessage::user("msg1")).unwrap();
322        store.append(key, &ChatMessage::user("msg2")).unwrap();
323
324        // Read raw file to verify append-only format
325        let path = store.session_path(key);
326        let content = std::fs::read_to_string(&path).unwrap();
327        let lines: Vec<&str> = content.trim().lines().collect();
328        assert_eq!(lines.len(), 2);
329    }
330
331    #[test]
332    fn remove_last_drops_final_message() {
333        let tmp = TempDir::new().unwrap();
334        let store = SessionStore::new(tmp.path()).unwrap();
335
336        store
337            .append("rm_test", &ChatMessage::user("first"))
338            .unwrap();
339        store
340            .append("rm_test", &ChatMessage::user("second"))
341            .unwrap();
342
343        assert!(store.remove_last("rm_test").unwrap());
344        let messages = store.load("rm_test");
345        assert_eq!(messages.len(), 1);
346        assert_eq!(messages[0].content, "first");
347    }
348
349    #[test]
350    fn remove_last_empty_returns_false() {
351        let tmp = TempDir::new().unwrap();
352        let store = SessionStore::new(tmp.path()).unwrap();
353        assert!(!store.remove_last("nonexistent").unwrap());
354    }
355
356    #[test]
357    fn compact_removes_corrupt_lines() {
358        let tmp = TempDir::new().unwrap();
359        let store = SessionStore::new(tmp.path()).unwrap();
360        let key = "compact_test";
361
362        let path = store.session_path(key);
363        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
364        let mut file = std::fs::File::create(&path).unwrap();
365        writeln!(file, r#"{{"role":"user","content":"ok"}}"#).unwrap();
366        writeln!(file, "corrupt line").unwrap();
367        writeln!(file, r#"{{"role":"assistant","content":"hi"}}"#).unwrap();
368
369        store.compact(key).unwrap();
370
371        let raw = std::fs::read_to_string(&path).unwrap();
372        assert_eq!(raw.trim().lines().count(), 2);
373    }
374
375    #[test]
376    fn session_backend_trait_works_via_dyn() {
377        let tmp = TempDir::new().unwrap();
378        let store = SessionStore::new(tmp.path()).unwrap();
379        let backend: &dyn SessionBackend = &store;
380
381        backend
382            .append("trait_test", &ChatMessage::user("hello"))
383            .unwrap();
384        let msgs = backend.load("trait_test");
385        assert_eq!(msgs.len(), 1);
386    }
387
388    #[test]
389    fn handles_corrupt_lines_gracefully() {
390        let tmp = TempDir::new().unwrap();
391        let store = SessionStore::new(tmp.path()).unwrap();
392        let key = "corrupt_test";
393
394        // Write valid message + corrupt line + valid message
395        let path = store.session_path(key);
396        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
397        let mut file = std::fs::File::create(&path).unwrap();
398        writeln!(file, r#"{{"role":"user","content":"hello"}}"#).unwrap();
399        writeln!(file, "this is not valid json").unwrap();
400        writeln!(file, r#"{{"role":"assistant","content":"world"}}"#).unwrap();
401
402        let messages = store.load(key);
403        assert_eq!(messages.len(), 2);
404        assert_eq!(messages[0].content, "hello");
405        assert_eq!(messages[1].content, "world");
406    }
407
408    #[test]
409    fn clear_messages_truncates_file() {
410        let tmp = TempDir::new().unwrap();
411        let store = SessionStore::new(tmp.path()).unwrap();
412        let key = "clear_test";
413
414        store.append(key, &ChatMessage::user("hello")).unwrap();
415        store.append(key, &ChatMessage::assistant("world")).unwrap();
416
417        let cleared = store.clear_messages(key).unwrap();
418        assert_eq!(cleared, 2);
419        assert!(store.load(key).is_empty());
420        // File still exists — session key remains in list_sessions
421        assert!(store.session_path(key).exists());
422    }
423
424    #[test]
425    fn clear_messages_empty_returns_zero() {
426        let tmp = TempDir::new().unwrap();
427        let store = SessionStore::new(tmp.path()).unwrap();
428        assert_eq!(store.clear_messages("nonexistent").unwrap(), 0);
429    }
430
431    #[test]
432    fn clear_messages_does_not_affect_other_sessions() {
433        let tmp = TempDir::new().unwrap();
434        let store = SessionStore::new(tmp.path()).unwrap();
435
436        store
437            .append("alice", &ChatMessage::user("alice msg"))
438            .unwrap();
439        store.append("bob", &ChatMessage::user("bob msg")).unwrap();
440
441        store.clear_messages("alice").unwrap();
442        assert!(store.load("alice").is_empty());
443        assert_eq!(store.load("bob").len(), 1);
444    }
445
446    #[test]
447    fn clear_messages_then_append_works() {
448        let tmp = TempDir::new().unwrap();
449        let store = SessionStore::new(tmp.path()).unwrap();
450        let key = "reuse_test";
451
452        store.append(key, &ChatMessage::user("old")).unwrap();
453        store.clear_messages(key).unwrap();
454        store.append(key, &ChatMessage::user("new")).unwrap();
455
456        let messages = store.load(key);
457        assert_eq!(messages.len(), 1);
458        assert_eq!(messages[0].content, "new");
459    }
460
461    #[test]
462    fn delete_session_removes_jsonl_file() {
463        let tmp = TempDir::new().unwrap();
464        let store = SessionStore::new(tmp.path()).unwrap();
465        let key = "delete_test";
466
467        store.append(key, &ChatMessage::user("hello")).unwrap();
468        assert_eq!(store.load(key).len(), 1);
469
470        let deleted = store.delete_session(key).unwrap();
471        assert!(deleted);
472        assert!(store.load(key).is_empty());
473        assert!(!store.session_path(key).exists());
474    }
475
476    #[test]
477    fn delete_session_nonexistent_returns_false() {
478        let tmp = TempDir::new().unwrap();
479        let store = SessionStore::new(tmp.path()).unwrap();
480
481        let deleted = store.delete_session("nonexistent").unwrap();
482        assert!(!deleted);
483    }
484
485    #[test]
486    fn delete_session_via_trait() {
487        let tmp = TempDir::new().unwrap();
488        let store = SessionStore::new(tmp.path()).unwrap();
489        let backend: &dyn SessionBackend = &store;
490
491        backend
492            .append("trait_delete", &ChatMessage::user("hello"))
493            .unwrap();
494        assert_eq!(backend.load("trait_delete").len(), 1);
495
496        let deleted = backend.delete_session("trait_delete").unwrap();
497        assert!(deleted);
498        assert!(backend.load("trait_delete").is_empty());
499    }
500
501    // ── session_exists (#7126) ─────────────────────────────────────
502    #[test]
503    fn session_exists_tracks_lifecycle() {
504        let tmp = TempDir::new().unwrap();
505        let store = SessionStore::new(tmp.path()).unwrap();
506        let backend: &dyn SessionBackend = &store;
507
508        assert!(!backend.session_exists("ghost"));
509
510        backend
511            .append("ghost", &ChatMessage::user("first"))
512            .unwrap();
513        assert!(backend.session_exists("ghost"));
514
515        backend.delete_session("ghost").unwrap();
516        assert!(!backend.session_exists("ghost"));
517    }
518
519    // ── get_session_metadata (trait default) tests ──────────────────
520
521    #[test]
522    fn get_session_metadata_returns_none_for_missing() {
523        let tmp = TempDir::new().unwrap();
524        let store = SessionStore::new(tmp.path()).unwrap();
525        let backend: &dyn SessionBackend = &store;
526        assert!(backend.get_session_metadata("nonexistent").is_none());
527    }
528
529    #[test]
530    fn get_session_metadata_returns_correct_count() {
531        let tmp = TempDir::new().unwrap();
532        let store = SessionStore::new(tmp.path()).unwrap();
533        let backend: &dyn SessionBackend = &store;
534
535        backend
536            .append("test_session", &ChatMessage::user("hello"))
537            .unwrap();
538        backend
539            .append("test_session", &ChatMessage::assistant("hi"))
540            .unwrap();
541
542        let meta = backend.get_session_metadata("test_session").unwrap();
543        assert_eq!(meta.key, "test_session");
544        assert_eq!(meta.message_count, 2);
545        assert!(meta.name.is_none());
546    }
547}