Skip to main content

zeroclaw_infra/
session_sqlite.rs

1//! SQLite-backed session persistence with FTS5 search.
2//!
3//! Stores sessions in `{workspace}/sessions/sessions.db` using WAL mode.
4//! Provides full-text search via FTS5 and automatic TTL-based cleanup.
5//! Designed as the default backend, replacing JSONL for new installations.
6
7use crate::session_backend::{
8    SessionBackend, SessionContext, SessionMetadata, SessionQuery, SessionState,
9};
10use anyhow::{Context, Result};
11use chrono::{DateTime, Duration, Utc};
12use parking_lot::Mutex;
13use rusqlite::{Connection, params};
14use std::path::Path;
15use zeroclaw_api::model_provider::ChatMessage;
16
17/// SQLite-backed session store with FTS5 and WAL mode.
18pub struct SqliteSessionBackend {
19    conn: Mutex<Connection>,
20}
21
22impl SqliteSessionBackend {
23    /// Open or create the sessions database.
24    pub fn new(workspace_dir: &Path) -> Result<Self> {
25        let sessions_dir = workspace_dir.join("sessions");
26        std::fs::create_dir_all(&sessions_dir).context("Failed to create sessions directory")?;
27        let db_path = sessions_dir.join("sessions.db");
28
29        let conn = Connection::open(&db_path)
30            .with_context(|| format!("Failed to open session DB: {}", db_path.display()))?;
31
32        conn.execute_batch(
33            "PRAGMA journal_mode = WAL;
34             PRAGMA synchronous = NORMAL;
35             PRAGMA temp_store = MEMORY;
36             PRAGMA mmap_size = 4194304;",
37        )?;
38
39        conn.execute_batch(
40            "CREATE TABLE IF NOT EXISTS sessions (
41                id          INTEGER PRIMARY KEY AUTOINCREMENT,
42                session_key TEXT NOT NULL,
43                role        TEXT NOT NULL,
44                content     TEXT NOT NULL,
45                created_at  TEXT NOT NULL
46             );
47             CREATE INDEX IF NOT EXISTS idx_sessions_key ON sessions(session_key);
48             CREATE INDEX IF NOT EXISTS idx_sessions_key_id ON sessions(session_key, id);
49
50             CREATE TABLE IF NOT EXISTS session_metadata (
51                session_key  TEXT PRIMARY KEY,
52                created_at   TEXT NOT NULL,
53                last_activity TEXT NOT NULL,
54                message_count INTEGER NOT NULL DEFAULT 0,
55                name         TEXT
56             );
57
58             CREATE VIRTUAL TABLE IF NOT EXISTS sessions_fts USING fts5(
59                session_key, content, content=sessions, content_rowid=id
60             );
61
62             CREATE TRIGGER IF NOT EXISTS sessions_ai AFTER INSERT ON sessions BEGIN
63                INSERT INTO sessions_fts(rowid, session_key, content)
64                VALUES (new.id, new.session_key, new.content);
65             END;
66             CREATE TRIGGER IF NOT EXISTS sessions_ad AFTER DELETE ON sessions BEGIN
67                INSERT INTO sessions_fts(sessions_fts, rowid, session_key, content)
68                VALUES ('delete', old.id, old.session_key, old.content);
69             END;
70             CREATE TRIGGER IF NOT EXISTS sessions_au AFTER UPDATE ON sessions BEGIN
71                INSERT INTO sessions_fts(sessions_fts, rowid, session_key, content)
72                VALUES ('delete', old.id, old.session_key, old.content);
73                INSERT INTO sessions_fts(rowid, session_key, content)
74                VALUES (new.id, new.session_key, new.content);
75             END;",
76        )
77        .context("Failed to initialize session schema")?;
78
79        // Migration: add name column to existing databases
80        let has_name: bool = conn
81            .query_row(
82                "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'name'",
83                [],
84                |row| row.get(0),
85            )
86            .unwrap_or(false);
87        if !has_name {
88            let _ = conn.execute("ALTER TABLE session_metadata ADD COLUMN name TEXT", []);
89        }
90
91        // Migration: add state tracking columns
92        let has_state: bool = conn
93            .query_row(
94                "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'state'",
95                [],
96                |row| row.get(0),
97            )
98            .unwrap_or(false);
99        if !has_state {
100            let _ = conn.execute(
101                "ALTER TABLE session_metadata ADD COLUMN state TEXT NOT NULL DEFAULT 'idle'",
102                [],
103            );
104            let _ = conn.execute("ALTER TABLE session_metadata ADD COLUMN turn_id TEXT", []);
105            let _ = conn.execute(
106                "ALTER TABLE session_metadata ADD COLUMN turn_started_at TEXT",
107                [],
108            );
109        }
110
111        // Migration: add agent_alias column for per-agent attribution
112        let has_agent_alias: bool = conn
113            .query_row(
114                "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'agent_alias'",
115                [],
116                |row| row.get(0),
117            )
118            .unwrap_or(false);
119        if !has_agent_alias {
120            let _ = conn.execute(
121                "ALTER TABLE session_metadata ADD COLUMN agent_alias TEXT",
122                [],
123            );
124            let _ = conn.execute(
125                "CREATE INDEX IF NOT EXISTS idx_session_metadata_agent_alias \
126                 ON session_metadata(agent_alias)",
127                [],
128            );
129        }
130
131        // Migration: structured routing columns. Each session metadata row
132        // gets the channel ref (`<type>.<alias>` like `discord.clamps`),
133        // the platform-side room/thread id, and the inbound sender id so
134        // dashboard filters and audit queries don't have to re-parse the
135        // `session_key` composition that orchestrator::conversation_history_key
136        // builds.  All three are nullable for backfill compatibility.
137        for (column, ddl) in [
138            (
139                "channel_id",
140                "ALTER TABLE session_metadata ADD COLUMN channel_id TEXT",
141            ),
142            (
143                "room_id",
144                "ALTER TABLE session_metadata ADD COLUMN room_id TEXT",
145            ),
146            (
147                "sender_id",
148                "ALTER TABLE session_metadata ADD COLUMN sender_id TEXT",
149            ),
150        ] {
151            let present: bool = conn
152                .query_row(
153                    "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') \
154                     WHERE name = ?1",
155                    params![column],
156                    |row| row.get(0),
157                )
158                .unwrap_or(false);
159            if !present {
160                let _ = conn.execute(ddl, []);
161            }
162        }
163        let _ = conn.execute(
164            "CREATE INDEX IF NOT EXISTS idx_session_metadata_channel_id \
165             ON session_metadata(channel_id)",
166            [],
167        );
168        let _ = conn.execute(
169            "CREATE INDEX IF NOT EXISTS idx_session_metadata_room_id \
170             ON session_metadata(room_id)",
171            [],
172        );
173        let _ = conn.execute(
174            "CREATE INDEX IF NOT EXISTS idx_session_metadata_sender_id \
175             ON session_metadata(sender_id)",
176            [],
177        );
178
179        Ok(Self {
180            conn: Mutex::new(conn),
181        })
182    }
183
184    /// Migrate JSONL session files into SQLite. Renames migrated files to `.jsonl.migrated`.
185    pub fn migrate_from_jsonl(&self, workspace_dir: &Path) -> Result<usize> {
186        let sessions_dir = workspace_dir.join("sessions");
187        let entries = match std::fs::read_dir(&sessions_dir) {
188            Ok(e) => e,
189            Err(_) => return Ok(0),
190        };
191
192        let mut migrated = 0;
193        for entry in entries {
194            let entry = match entry {
195                Ok(e) => e,
196                Err(_) => continue,
197            };
198            let name = match entry.file_name().into_string() {
199                Ok(n) => n,
200                Err(_) => continue,
201            };
202            let Some(key) = name.strip_suffix(".jsonl") else {
203                continue;
204            };
205
206            let path = entry.path();
207            let file = match std::fs::File::open(&path) {
208                Ok(f) => f,
209                Err(_) => continue,
210            };
211
212            let reader = std::io::BufReader::new(file);
213            let mut count = 0;
214            for line in std::io::BufRead::lines(reader) {
215                let Ok(line) = line else { continue };
216                let trimmed = line.trim();
217                if trimmed.is_empty() {
218                    continue;
219                }
220                if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed)
221                    && self.append(key, &msg).is_ok()
222                {
223                    count += 1;
224                }
225            }
226
227            if count > 0 {
228                let migrated_path = path.with_extension("jsonl.migrated");
229                let _ = std::fs::rename(&path, &migrated_path);
230                migrated += 1;
231            }
232        }
233
234        Ok(migrated)
235    }
236}
237
238impl SessionBackend for SqliteSessionBackend {
239    fn load(&self, session_key: &str) -> Vec<ChatMessage> {
240        let conn = self.conn.lock();
241        let mut stmt = match conn
242            .prepare("SELECT role, content FROM sessions WHERE session_key = ?1 ORDER BY id ASC")
243        {
244            Ok(s) => s,
245            Err(_) => return Vec::new(),
246        };
247
248        let rows = match stmt.query_map(params![session_key], |row| {
249            Ok(ChatMessage {
250                role: row.get(0)?,
251                content: row.get(1)?,
252            })
253        }) {
254            Ok(r) => r,
255            Err(_) => return Vec::new(),
256        };
257
258        rows.filter_map(|r| r.ok()).collect()
259    }
260
261    fn load_with_timestamps(
262        &self,
263        session_key: &str,
264    ) -> Vec<crate::session_backend::TimestampedMessage> {
265        use crate::session_backend::TimestampedMessage;
266        let conn = self.conn.lock();
267        let mut stmt = match conn.prepare(
268            "SELECT role, content, created_at FROM sessions WHERE session_key = ?1 ORDER BY id ASC",
269        ) {
270            Ok(s) => s,
271            Err(_) => return Vec::new(),
272        };
273
274        let rows = match stmt.query_map(params![session_key], |row| {
275            let role: String = row.get(0)?;
276            let content: String = row.get(1)?;
277            let created_at_raw: Option<String> = row.get(2).ok();
278            let created_at = created_at_raw
279                .as_deref()
280                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
281                .map(|dt| dt.with_timezone(&Utc));
282            Ok(TimestampedMessage {
283                message: ChatMessage { role, content },
284                created_at,
285            })
286        }) {
287            Ok(r) => r,
288            Err(_) => return Vec::new(),
289        };
290
291        rows.filter_map(|r| r.ok()).collect()
292    }
293
294    fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
295        let conn = self.conn.lock();
296        let now = Utc::now().to_rfc3339();
297
298        conn.execute(
299            "INSERT INTO sessions (session_key, role, content, created_at)
300             VALUES (?1, ?2, ?3, ?4)",
301            params![session_key, message.role, message.content, now],
302        )
303        .map_err(std::io::Error::other)?;
304
305        // Upsert metadata
306        conn.execute(
307            "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count)
308             VALUES (?1, ?2, ?3, 1)
309             ON CONFLICT(session_key) DO UPDATE SET
310                last_activity = excluded.last_activity,
311                message_count = message_count + 1",
312            params![session_key, now, now],
313        )
314        .map_err(std::io::Error::other)?;
315
316        Ok(())
317    }
318
319    fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
320        let conn = self.conn.lock();
321
322        let last_id: Option<i64> = conn
323            .query_row(
324                "SELECT id FROM sessions WHERE session_key = ?1 ORDER BY id DESC LIMIT 1",
325                params![session_key],
326                |row| row.get(0),
327            )
328            .ok();
329
330        let Some(id) = last_id else {
331            return Ok(false);
332        };
333
334        conn.execute("DELETE FROM sessions WHERE id = ?1", params![id])
335            .map_err(std::io::Error::other)?;
336
337        // Update metadata count
338        conn.execute(
339            "UPDATE session_metadata SET message_count = MAX(0, message_count - 1)
340             WHERE session_key = ?1",
341            params![session_key],
342        )
343        .map_err(std::io::Error::other)?;
344
345        Ok(true)
346    }
347
348    /// Efficiently update the last message in-place (single UPDATE instead of
349    /// DELETE + INSERT). Used for incremental persistence during streaming.
350    fn update_last(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<bool> {
351        let conn = self.conn.lock();
352
353        let last_id: Option<i64> = conn
354            .query_row(
355                "SELECT id FROM sessions WHERE session_key = ?1 ORDER BY id DESC LIMIT 1",
356                params![session_key],
357                |row| row.get(0),
358            )
359            .ok();
360
361        let Some(id) = last_id else {
362            return Ok(false);
363        };
364
365        conn.execute(
366            "UPDATE sessions SET role = ?1, content = ?2 WHERE id = ?3",
367            params![message.role, message.content, id],
368        )
369        .map_err(std::io::Error::other)?;
370
371        // NOTE: FTS index becomes stale here (no UPDATE trigger, only
372        // INSERT/DELETE triggers). This is acceptable — update_last is
373        // used for transient streaming snapshots. The final content will
374        // be correct in the sessions table for load().
375
376        let now = chrono::Utc::now().to_rfc3339();
377        conn.execute(
378            "UPDATE session_metadata SET last_activity = ?1 WHERE session_key = ?2",
379            params![now, session_key],
380        )
381        .map_err(std::io::Error::other)?;
382
383        Ok(true)
384    }
385
386    fn list_sessions(&self) -> Vec<String> {
387        let conn = self.conn.lock();
388        let mut stmt = match conn
389            .prepare("SELECT session_key FROM session_metadata ORDER BY last_activity DESC")
390        {
391            Ok(s) => s,
392            Err(_) => return Vec::new(),
393        };
394
395        let rows = match stmt.query_map([], |row| row.get(0)) {
396            Ok(r) => r,
397            Err(_) => return Vec::new(),
398        };
399
400        rows.filter_map(|r| r.ok()).collect()
401    }
402
403    fn list_sessions_with_metadata(&self) -> Vec<SessionMetadata> {
404        let conn = self.conn.lock();
405        let mut stmt = match conn.prepare(
406            "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
407             FROM session_metadata ORDER BY last_activity DESC",
408        ) {
409            Ok(s) => s,
410            Err(_) => return Vec::new(),
411        };
412
413        let rows = match stmt.query_map([], |row| {
414            let key: String = row.get(0)?;
415            let created_str: String = row.get(1)?;
416            let activity_str: String = row.get(2)?;
417            let count: i64 = row.get(3)?;
418            let name: Option<String> = row.get(4)?;
419            let agent_alias: Option<String> = row.get(5)?;
420            let channel_id: Option<String> = row.get(6)?;
421            let room_id: Option<String> = row.get(7)?;
422            let sender_id: Option<String> = row.get(8)?;
423
424            let created = DateTime::parse_from_rfc3339(&created_str)
425                .map(|dt| dt.with_timezone(&Utc))
426                .unwrap_or_else(|_| Utc::now());
427            let activity = DateTime::parse_from_rfc3339(&activity_str)
428                .map(|dt| dt.with_timezone(&Utc))
429                .unwrap_or_else(|_| Utc::now());
430
431            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
432            Ok(SessionMetadata {
433                key,
434                name,
435                created_at: created,
436                last_activity: activity,
437                message_count: count as usize,
438                agent_alias,
439                channel_id,
440                room_id,
441                sender_id,
442            })
443        }) {
444            Ok(r) => r,
445            Err(_) => return Vec::new(),
446        };
447
448        rows.filter_map(|r| r.ok()).collect()
449    }
450
451    fn cleanup_stale(&self, ttl_hours: u32) -> std::io::Result<usize> {
452        let conn = self.conn.lock();
453        let cutoff = (Utc::now() - Duration::hours(i64::from(ttl_hours))).to_rfc3339();
454
455        // Find stale sessions
456        let stale_keys: Vec<String> = {
457            let mut stmt = conn
458                .prepare("SELECT session_key FROM session_metadata WHERE last_activity < ?1")
459                .map_err(std::io::Error::other)?;
460            let rows = stmt
461                .query_map(params![cutoff], |row| row.get(0))
462                .map_err(std::io::Error::other)?;
463            rows.filter_map(|r| r.ok()).collect()
464        };
465
466        let count = stale_keys.len();
467        for key in &stale_keys {
468            let _ = conn.execute("DELETE FROM sessions WHERE session_key = ?1", params![key]);
469            let _ = conn.execute(
470                "DELETE FROM session_metadata WHERE session_key = ?1",
471                params![key],
472            );
473        }
474
475        Ok(count)
476    }
477
478    fn clear_messages(&self, session_key: &str) -> std::io::Result<usize> {
479        let conn = self.conn.lock();
480
481        conn.execute(
482            "DELETE FROM sessions WHERE session_key = ?1",
483            params![session_key],
484        )
485        .map_err(std::io::Error::other)?;
486
487        let count = conn.changes() as usize;
488
489        if count > 0 {
490            conn.execute(
491                "UPDATE session_metadata SET message_count = 0, last_activity = ?1 WHERE session_key = ?2",
492                params![Utc::now().to_rfc3339(), session_key],
493            )
494            .map_err(std::io::Error::other)?;
495        }
496
497        Ok(count)
498    }
499
500    fn delete_session(&self, session_key: &str) -> std::io::Result<bool> {
501        let conn = self.conn.lock();
502
503        // Check if session exists
504        let exists: bool = conn
505            .query_row(
506                "SELECT COUNT(*) > 0 FROM session_metadata WHERE session_key = ?1",
507                params![session_key],
508                |row| row.get(0),
509            )
510            .unwrap_or(false);
511
512        if !exists {
513            return Ok(false);
514        }
515
516        // Delete messages (FTS5 trigger handles sessions_fts cleanup)
517        conn.execute(
518            "DELETE FROM sessions WHERE session_key = ?1",
519            params![session_key],
520        )
521        .map_err(std::io::Error::other)?;
522
523        // Delete metadata
524        conn.execute(
525            "DELETE FROM session_metadata WHERE session_key = ?1",
526            params![session_key],
527        )
528        .map_err(std::io::Error::other)?;
529
530        Ok(true)
531    }
532
533    fn set_session_name(&self, session_key: &str, name: &str) -> std::io::Result<()> {
534        let conn = self.conn.lock();
535        let name_val = if name.is_empty() { None } else { Some(name) };
536        conn.execute(
537            "UPDATE session_metadata SET name = ?1 WHERE session_key = ?2",
538            params![name_val, session_key],
539        )
540        .map_err(std::io::Error::other)?;
541        Ok(())
542    }
543
544    fn get_session_name(&self, session_key: &str) -> std::io::Result<Option<String>> {
545        let conn = self.conn.lock();
546        conn.query_row(
547            "SELECT name FROM session_metadata WHERE session_key = ?1",
548            params![session_key],
549            |row| row.get(0),
550        )
551        .map_err(std::io::Error::other)
552    }
553
554    fn get_session_metadata(&self, session_key: &str) -> Option<SessionMetadata> {
555        let conn = self.conn.lock();
556        conn.query_row(
557            "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
558             FROM session_metadata WHERE session_key = ?1",
559            params![session_key],
560            |row| {
561                let key: String = row.get(0)?;
562                let created_str: String = row.get(1)?;
563                let activity_str: String = row.get(2)?;
564                let count: i64 = row.get(3)?;
565                let name: Option<String> = row.get(4)?;
566                let agent_alias: Option<String> = row.get(5)?;
567                let channel_id: Option<String> = row.get(6)?;
568                let room_id: Option<String> = row.get(7)?;
569                let sender_id: Option<String> = row.get(8)?;
570
571                let created = DateTime::parse_from_rfc3339(&created_str)
572                    .map(|dt| dt.with_timezone(&Utc))
573                    .unwrap_or_else(|_| Utc::now());
574                let activity = DateTime::parse_from_rfc3339(&activity_str)
575                    .map(|dt| dt.with_timezone(&Utc))
576                    .unwrap_or_else(|_| Utc::now());
577
578                #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
579                Ok(SessionMetadata {
580                    key,
581                    name,
582                    created_at: created,
583                    last_activity: activity,
584                    message_count: count as usize,
585                    agent_alias,
586                    channel_id,
587                    room_id,
588                    sender_id,
589                })
590            },
591        )
592        .ok()
593    }
594
595    fn set_session_state(
596        &self,
597        session_key: &str,
598        state: &str,
599        turn_id: Option<&str>,
600    ) -> std::io::Result<()> {
601        let conn = self.conn.lock();
602        let now = Utc::now().to_rfc3339();
603        let started_at = if state == "running" {
604            Some(now.as_str())
605        } else {
606            None
607        };
608        conn.execute(
609            "UPDATE session_metadata SET state = ?1, turn_id = ?2, turn_started_at = ?3
610             WHERE session_key = ?4",
611            params![state, turn_id, started_at, session_key],
612        )
613        .map_err(std::io::Error::other)?;
614        Ok(())
615    }
616
617    fn get_session_state(&self, session_key: &str) -> std::io::Result<Option<SessionState>> {
618        let conn = self.conn.lock();
619        conn.query_row(
620            "SELECT state, turn_id, turn_started_at FROM session_metadata WHERE session_key = ?1",
621            params![session_key],
622            |row| {
623                let state: String = row.get(0)?;
624                let turn_id: Option<String> = row.get(1)?;
625                let started_str: Option<String> = row.get(2)?;
626                let turn_started_at = started_str.and_then(|s| {
627                    chrono::DateTime::parse_from_rfc3339(&s)
628                        .ok()
629                        .map(|dt| dt.with_timezone(&Utc))
630                });
631                Ok(SessionState {
632                    state,
633                    turn_id,
634                    turn_started_at,
635                })
636            },
637        )
638        .map(Some)
639        .or_else(|e| match e {
640            rusqlite::Error::QueryReturnedNoRows => Ok(None),
641            other => Err(std::io::Error::other(other)),
642        })
643    }
644
645    fn list_running_sessions(&self) -> Vec<SessionMetadata> {
646        let conn = self.conn.lock();
647        let mut stmt = match conn.prepare(
648            "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
649             FROM session_metadata WHERE state = 'running' ORDER BY turn_started_at DESC",
650        ) {
651            Ok(s) => s,
652            Err(_) => return Vec::new(),
653        };
654
655        let rows = match stmt.query_map([], |row| {
656            let key: String = row.get(0)?;
657            let created_str: String = row.get(1)?;
658            let activity_str: String = row.get(2)?;
659            let count: i64 = row.get(3)?;
660            let name: Option<String> = row.get(4)?;
661            let agent_alias: Option<String> = row.get(5)?;
662            let channel_id: Option<String> = row.get(6)?;
663            let room_id: Option<String> = row.get(7)?;
664            let sender_id: Option<String> = row.get(8)?;
665            let created = DateTime::parse_from_rfc3339(&created_str)
666                .map(|dt| dt.with_timezone(&Utc))
667                .unwrap_or_else(|_| Utc::now());
668            let activity = DateTime::parse_from_rfc3339(&activity_str)
669                .map(|dt| dt.with_timezone(&Utc))
670                .unwrap_or_else(|_| Utc::now());
671            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
672            Ok(SessionMetadata {
673                key,
674                name,
675                created_at: created,
676                last_activity: activity,
677                message_count: count as usize,
678                agent_alias,
679                channel_id,
680                room_id,
681                sender_id,
682            })
683        }) {
684            Ok(r) => r,
685            Err(_) => return Vec::new(),
686        };
687
688        rows.filter_map(|r| r.ok()).collect()
689    }
690
691    fn list_stuck_sessions(&self, threshold_secs: u64) -> Vec<SessionMetadata> {
692        let conn = self.conn.lock();
693        #[allow(clippy::cast_possible_wrap)]
694        let cutoff = (Utc::now() - chrono::Duration::seconds(threshold_secs as i64)).to_rfc3339();
695        let mut stmt = match conn.prepare(
696            "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
697             FROM session_metadata
698             WHERE state = 'running' AND turn_started_at < ?1
699             ORDER BY turn_started_at ASC",
700        ) {
701            Ok(s) => s,
702            Err(_) => return Vec::new(),
703        };
704
705        let rows = match stmt.query_map(params![cutoff], |row| {
706            let key: String = row.get(0)?;
707            let created_str: String = row.get(1)?;
708            let activity_str: String = row.get(2)?;
709            let count: i64 = row.get(3)?;
710            let name: Option<String> = row.get(4)?;
711            let agent_alias: Option<String> = row.get(5)?;
712            let channel_id: Option<String> = row.get(6)?;
713            let room_id: Option<String> = row.get(7)?;
714            let sender_id: Option<String> = row.get(8)?;
715            let created = DateTime::parse_from_rfc3339(&created_str)
716                .map(|dt| dt.with_timezone(&Utc))
717                .unwrap_or_else(|_| Utc::now());
718            let activity = DateTime::parse_from_rfc3339(&activity_str)
719                .map(|dt| dt.with_timezone(&Utc))
720                .unwrap_or_else(|_| Utc::now());
721            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
722            Ok(SessionMetadata {
723                key,
724                name,
725                created_at: created,
726                last_activity: activity,
727                message_count: count as usize,
728                agent_alias,
729                channel_id,
730                room_id,
731                sender_id,
732            })
733        }) {
734            Ok(r) => r,
735            Err(_) => return Vec::new(),
736        };
737
738        rows.filter_map(|r| r.ok()).collect()
739    }
740
741    fn search(&self, query: &SessionQuery) -> Vec<SessionMetadata> {
742        let Some(keyword) = &query.keyword else {
743            return self.list_sessions_with_metadata();
744        };
745
746        let conn = self.conn.lock();
747        #[allow(clippy::cast_possible_wrap)]
748        let limit = query.limit.unwrap_or(50) as i64;
749
750        // FTS5 search
751        let mut stmt = match conn.prepare(
752            "SELECT DISTINCT f.session_key
753             FROM sessions_fts f
754             WHERE sessions_fts MATCH ?1
755             LIMIT ?2",
756        ) {
757            Ok(s) => s,
758            Err(_) => return Vec::new(),
759        };
760
761        // Quote each word for FTS5
762        let fts_query: String = keyword
763            .split_whitespace()
764            .map(|w| format!("\"{w}\""))
765            .collect::<Vec<_>>()
766            .join(" OR ");
767
768        let keys: Vec<String> = match stmt.query_map(params![fts_query, limit], |row| row.get(0)) {
769            Ok(r) => r.filter_map(|r| r.ok()).collect(),
770            Err(_) => return Vec::new(),
771        };
772
773        // Look up metadata for matched sessions
774        keys.iter()
775            .filter_map(|key| {
776                conn.query_row(
777                    "SELECT created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id FROM session_metadata WHERE session_key = ?1",
778                    params![key],
779                    |row| {
780                        let created_str: String = row.get(0)?;
781                        let activity_str: String = row.get(1)?;
782                        let count: i64 = row.get(2)?;
783                        let name: Option<String> = row.get(3)?;
784                        let agent_alias: Option<String> = row.get(4)?;
785                        let channel_id: Option<String> = row.get(5)?;
786                        let room_id: Option<String> = row.get(6)?;
787                        let sender_id: Option<String> = row.get(7)?;
788                        Ok(SessionMetadata {
789                            key: key.clone(),
790                            name,
791                            created_at: DateTime::parse_from_rfc3339(&created_str)
792                                .map(|dt| dt.with_timezone(&Utc))
793                                .unwrap_or_else(|_| Utc::now()),
794                            last_activity: DateTime::parse_from_rfc3339(&activity_str)
795                                .map(|dt| dt.with_timezone(&Utc))
796                                .unwrap_or_else(|_| Utc::now()),
797                            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
798                            message_count: count as usize,
799                            agent_alias,
800                            channel_id,
801                            room_id,
802                            sender_id,
803                        })
804                    },
805                )
806                .ok()
807            })
808            .collect()
809    }
810
811    fn set_session_agent_alias(&self, session_key: &str, agent_alias: &str) -> std::io::Result<()> {
812        let conn = self.conn.lock();
813        let alias_val = if agent_alias.is_empty() {
814            None
815        } else {
816            Some(agent_alias)
817        };
818        let now = Utc::now().to_rfc3339();
819        conn.execute(
820            "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count, agent_alias)
821             VALUES (?1, ?2, ?3, 0, ?4)
822             ON CONFLICT(session_key) DO UPDATE SET agent_alias = excluded.agent_alias",
823            params![session_key, now, now, alias_val],
824        )
825        .map_err(std::io::Error::other)?;
826        Ok(())
827    }
828
829    fn get_session_agent_alias(&self, session_key: &str) -> std::io::Result<Option<String>> {
830        let conn = self.conn.lock();
831        conn.query_row(
832            "SELECT agent_alias FROM session_metadata WHERE session_key = ?1",
833            params![session_key],
834            |row| row.get(0),
835        )
836        .or_else(|e| match e {
837            rusqlite::Error::QueryReturnedNoRows => Ok(None),
838            other => Err(std::io::Error::other(other)),
839        })
840    }
841
842    fn set_session_context(
843        &self,
844        session_key: &str,
845        context: SessionContext<'_>,
846    ) -> std::io::Result<()> {
847        let conn = self.conn.lock();
848        fn normalize(v: Option<&str>) -> Option<&str> {
849            v.map(str::trim).filter(|s| !s.is_empty())
850        }
851        let channel_id = normalize(context.channel_id);
852        let room_id = normalize(context.room_id);
853        let sender_id = normalize(context.sender_id);
854        let now = Utc::now().to_rfc3339();
855        // Insert a metadata stub row when missing so the per-platform
856        // fields land even before the first message append fires the
857        // upsert path. The COALESCE clauses preserve any field a prior
858        // append/set already stamped — channel-side updates only fill in
859        // gaps, they don't overwrite earlier routing context.
860        conn.execute(
861            "INSERT INTO session_metadata
862                (session_key, created_at, last_activity, message_count, channel_id, room_id, sender_id)
863             VALUES (?1, ?2, ?3, 0, ?4, ?5, ?6)
864             ON CONFLICT(session_key) DO UPDATE SET
865                channel_id = COALESCE(excluded.channel_id, session_metadata.channel_id),
866                room_id    = COALESCE(excluded.room_id,    session_metadata.room_id),
867                sender_id  = COALESCE(excluded.sender_id,  session_metadata.sender_id)",
868            params![session_key, now, now, channel_id, room_id, sender_id],
869        )
870        .map_err(std::io::Error::other)?;
871        Ok(())
872    }
873}
874
875#[cfg(test)]
876mod tests {
877    use super::*;
878    use tempfile::TempDir;
879
880    #[test]
881    fn round_trip_sqlite() {
882        let tmp = TempDir::new().unwrap();
883        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
884
885        backend
886            .append("user1", &ChatMessage::user("hello"))
887            .unwrap();
888        backend
889            .append("user1", &ChatMessage::assistant("hi"))
890            .unwrap();
891
892        let msgs = backend.load("user1");
893        assert_eq!(msgs.len(), 2);
894        assert_eq!(msgs[0].role, "user");
895        assert_eq!(msgs[1].role, "assistant");
896    }
897
898    #[test]
899    fn remove_last_sqlite() {
900        let tmp = TempDir::new().unwrap();
901        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
902
903        backend.append("u", &ChatMessage::user("a")).unwrap();
904        backend.append("u", &ChatMessage::user("b")).unwrap();
905
906        assert!(backend.remove_last("u").unwrap());
907        let msgs = backend.load("u");
908        assert_eq!(msgs.len(), 1);
909        assert_eq!(msgs[0].content, "a");
910    }
911
912    #[test]
913    fn remove_last_empty_sqlite() {
914        let tmp = TempDir::new().unwrap();
915        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
916        assert!(!backend.remove_last("nonexistent").unwrap());
917    }
918
919    #[test]
920    fn list_sessions_sqlite() {
921        let tmp = TempDir::new().unwrap();
922        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
923
924        backend.append("a", &ChatMessage::user("hi")).unwrap();
925        backend.append("b", &ChatMessage::user("hey")).unwrap();
926
927        let sessions = backend.list_sessions();
928        assert_eq!(sessions.len(), 2);
929    }
930
931    #[test]
932    fn metadata_tracks_counts() {
933        let tmp = TempDir::new().unwrap();
934        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
935
936        backend.append("s1", &ChatMessage::user("a")).unwrap();
937        backend.append("s1", &ChatMessage::user("b")).unwrap();
938        backend.append("s1", &ChatMessage::user("c")).unwrap();
939
940        let meta = backend.list_sessions_with_metadata();
941        assert_eq!(meta.len(), 1);
942        assert_eq!(meta[0].message_count, 3);
943    }
944
945    #[test]
946    fn fts5_search_finds_content() {
947        let tmp = TempDir::new().unwrap();
948        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
949
950        backend
951            .append(
952                "code_chat",
953                &ChatMessage::user("How do I parse JSON in Rust?"),
954            )
955            .unwrap();
956        backend
957            .append("weather", &ChatMessage::user("What's the weather today?"))
958            .unwrap();
959
960        let results = backend.search(&SessionQuery {
961            keyword: Some("Rust".into()),
962            limit: Some(10),
963        });
964        assert_eq!(results.len(), 1);
965        assert_eq!(results[0].key, "code_chat");
966    }
967
968    #[test]
969    fn fts5_update_trigger_syncs_index() {
970        let tmp = TempDir::new().unwrap();
971        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
972
973        backend
974            .append("chat", &ChatMessage::user("hello world"))
975            .unwrap();
976
977        // Verify initial content is searchable
978        let results = backend.search(&SessionQuery {
979            keyword: Some("hello".into()),
980            limit: Some(10),
981        });
982        assert_eq!(results.len(), 1);
983        assert_eq!(results[0].key, "chat");
984
985        // Directly update the session content (simulates update_last behavior)
986        {
987            let conn = backend.conn.lock();
988            conn.execute(
989                "UPDATE sessions SET content = ?1 WHERE session_key = ?2",
990                params!["goodbye world", "chat"],
991            )
992            .unwrap();
993        }
994
995        // Old keyword should no longer match
996        let results = backend.search(&SessionQuery {
997            keyword: Some("hello".into()),
998            limit: Some(10),
999        });
1000        assert!(results.is_empty());
1001
1002        // New keyword should match after UPDATE trigger syncs FTS index
1003        let results = backend.search(&SessionQuery {
1004            keyword: Some("goodbye".into()),
1005            limit: Some(10),
1006        });
1007        assert_eq!(results.len(), 1);
1008        assert_eq!(results[0].key, "chat");
1009    }
1010
1011    #[test]
1012    fn cleanup_stale_removes_old_sessions() {
1013        let tmp = TempDir::new().unwrap();
1014        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1015
1016        // Insert a session with old timestamp
1017        {
1018            let conn = backend.conn.lock();
1019            let old_time = (Utc::now() - Duration::hours(100)).to_rfc3339();
1020            conn.execute(
1021                "INSERT INTO sessions (session_key, role, content, created_at) VALUES (?1, ?2, ?3, ?4)",
1022                params!["old_session", "user", "ancient", old_time],
1023            ).unwrap();
1024            conn.execute(
1025                "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count) VALUES (?1, ?2, ?3, 1)",
1026                params!["old_session", old_time, old_time],
1027            ).unwrap();
1028        }
1029
1030        backend
1031            .append("new_session", &ChatMessage::user("fresh"))
1032            .unwrap();
1033
1034        let cleaned = backend.cleanup_stale(48).unwrap(); // 48h TTL
1035        assert_eq!(cleaned, 1);
1036
1037        let sessions = backend.list_sessions();
1038        assert_eq!(sessions.len(), 1);
1039        assert_eq!(sessions[0], "new_session");
1040    }
1041
1042    #[test]
1043    fn clear_messages_removes_rows_keeps_metadata() {
1044        let tmp = TempDir::new().unwrap();
1045        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1046
1047        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1048        backend.append("s1", &ChatMessage::assistant("hi")).unwrap();
1049        backend.set_session_name("s1", "My Session").unwrap();
1050
1051        let cleared = backend.clear_messages("s1").unwrap();
1052        assert_eq!(cleared, 2);
1053        assert!(backend.load("s1").is_empty());
1054        // Session still exists in metadata with name preserved
1055        let meta = backend.list_sessions_with_metadata();
1056        assert_eq!(meta.len(), 1);
1057        assert_eq!(meta[0].message_count, 0);
1058        assert_eq!(meta[0].name.as_deref(), Some("My Session"));
1059    }
1060
1061    #[test]
1062    fn clear_messages_empty_returns_zero() {
1063        let tmp = TempDir::new().unwrap();
1064        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1065        assert_eq!(backend.clear_messages("nonexistent").unwrap(), 0);
1066    }
1067
1068    #[test]
1069    fn clear_messages_does_not_affect_other_sessions() {
1070        let tmp = TempDir::new().unwrap();
1071        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1072
1073        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1074        backend.append("s2", &ChatMessage::user("world")).unwrap();
1075
1076        backend.clear_messages("s1").unwrap();
1077        assert!(backend.load("s1").is_empty());
1078        assert_eq!(backend.load("s2").len(), 1);
1079    }
1080
1081    #[test]
1082    fn clear_messages_then_append_works() {
1083        let tmp = TempDir::new().unwrap();
1084        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1085
1086        backend.append("s1", &ChatMessage::user("old")).unwrap();
1087        backend.clear_messages("s1").unwrap();
1088        backend.append("s1", &ChatMessage::user("new")).unwrap();
1089
1090        let messages = backend.load("s1");
1091        assert_eq!(messages.len(), 1);
1092        assert_eq!(messages[0].content, "new");
1093        // Metadata count should reflect the new message
1094        let meta = backend.list_sessions_with_metadata();
1095        assert_eq!(meta[0].message_count, 1);
1096    }
1097
1098    #[test]
1099    fn delete_session_removes_all_data() {
1100        let tmp = TempDir::new().unwrap();
1101        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1102
1103        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1104        backend.append("s1", &ChatMessage::assistant("hi")).unwrap();
1105        backend.append("s2", &ChatMessage::user("other")).unwrap();
1106
1107        assert!(backend.delete_session("s1").unwrap());
1108        assert!(backend.load("s1").is_empty());
1109        assert_eq!(backend.list_sessions().len(), 1);
1110        assert_eq!(backend.list_sessions()[0], "s2");
1111    }
1112
1113    #[test]
1114    fn delete_session_returns_false_for_missing() {
1115        let tmp = TempDir::new().unwrap();
1116        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1117        assert!(!backend.delete_session("nonexistent").unwrap());
1118    }
1119
1120    #[test]
1121    fn migrate_from_jsonl_imports_and_renames() {
1122        let tmp = TempDir::new().unwrap();
1123        let sessions_dir = tmp.path().join("sessions");
1124        std::fs::create_dir_all(&sessions_dir).unwrap();
1125
1126        // Create a JSONL file
1127        let jsonl_path = sessions_dir.join("test_user.jsonl");
1128        std::fs::write(
1129            &jsonl_path,
1130            "{\"role\":\"user\",\"content\":\"hello\"}\n{\"role\":\"assistant\",\"content\":\"hi\"}\n",
1131        )
1132        .unwrap();
1133
1134        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1135        let migrated = backend.migrate_from_jsonl(tmp.path()).unwrap();
1136        assert_eq!(migrated, 1);
1137
1138        // JSONL should be renamed
1139        assert!(!jsonl_path.exists());
1140        assert!(sessions_dir.join("test_user.jsonl.migrated").exists());
1141
1142        // Messages should be in SQLite
1143        let msgs = backend.load("test_user");
1144        assert_eq!(msgs.len(), 2);
1145        assert_eq!(msgs[0].content, "hello");
1146    }
1147
1148    #[test]
1149    fn set_session_name_persists() {
1150        let tmp = TempDir::new().unwrap();
1151        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1152
1153        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1154        backend.set_session_name("s1", "My Session").unwrap();
1155
1156        let meta = backend.list_sessions_with_metadata();
1157        assert_eq!(meta.len(), 1);
1158        assert_eq!(meta[0].name.as_deref(), Some("My Session"));
1159    }
1160
1161    #[test]
1162    fn set_session_name_updates_existing() {
1163        let tmp = TempDir::new().unwrap();
1164        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1165
1166        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1167        backend.set_session_name("s1", "First").unwrap();
1168        backend.set_session_name("s1", "Second").unwrap();
1169
1170        let meta = backend.list_sessions_with_metadata();
1171        assert_eq!(meta[0].name.as_deref(), Some("Second"));
1172    }
1173
1174    #[test]
1175    fn sessions_without_name_return_none() {
1176        let tmp = TempDir::new().unwrap();
1177        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1178
1179        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1180
1181        let meta = backend.list_sessions_with_metadata();
1182        assert_eq!(meta.len(), 1);
1183        assert!(meta[0].name.is_none());
1184    }
1185
1186    // ── session state tests ─────────────────────────────────────────
1187
1188    #[test]
1189    fn session_state_idle_to_running() {
1190        let tmp = TempDir::new().unwrap();
1191        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1192        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1193
1194        backend
1195            .set_session_state("s1", "running", Some("turn-1"))
1196            .unwrap();
1197        let state = backend.get_session_state("s1").unwrap().unwrap();
1198        assert_eq!(state.state, "running");
1199        assert_eq!(state.turn_id.as_deref(), Some("turn-1"));
1200        assert!(state.turn_started_at.is_some());
1201    }
1202
1203    #[test]
1204    fn session_state_running_to_idle() {
1205        let tmp = TempDir::new().unwrap();
1206        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1207        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1208
1209        backend
1210            .set_session_state("s1", "running", Some("turn-1"))
1211            .unwrap();
1212        backend.set_session_state("s1", "idle", None).unwrap();
1213
1214        let state = backend.get_session_state("s1").unwrap().unwrap();
1215        assert_eq!(state.state, "idle");
1216        assert!(state.turn_id.is_none());
1217        assert!(state.turn_started_at.is_none());
1218    }
1219
1220    #[test]
1221    fn session_state_running_to_error() {
1222        let tmp = TempDir::new().unwrap();
1223        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1224        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1225
1226        backend
1227            .set_session_state("s1", "running", Some("turn-1"))
1228            .unwrap();
1229        backend
1230            .set_session_state("s1", "error", Some("turn-1"))
1231            .unwrap();
1232
1233        let state = backend.get_session_state("s1").unwrap().unwrap();
1234        assert_eq!(state.state, "error");
1235        assert_eq!(state.turn_id.as_deref(), Some("turn-1"));
1236    }
1237
1238    #[test]
1239    fn list_running_sessions_returns_running_only() {
1240        let tmp = TempDir::new().unwrap();
1241        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1242
1243        backend.append("s1", &ChatMessage::user("a")).unwrap();
1244        backend.append("s2", &ChatMessage::user("b")).unwrap();
1245        backend.append("s3", &ChatMessage::user("c")).unwrap();
1246
1247        backend
1248            .set_session_state("s1", "running", Some("t1"))
1249            .unwrap();
1250        backend
1251            .set_session_state("s2", "running", Some("t2"))
1252            .unwrap();
1253        // s3 stays idle (default)
1254
1255        let running = backend.list_running_sessions();
1256        assert_eq!(running.len(), 2);
1257        let keys: Vec<&str> = running.iter().map(|m| m.key.as_str()).collect();
1258        assert!(keys.contains(&"s1"));
1259        assert!(keys.contains(&"s2"));
1260    }
1261
1262    #[test]
1263    fn list_stuck_sessions_detects_old_running() {
1264        let tmp = TempDir::new().unwrap();
1265        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1266        backend.append("s1", &ChatMessage::user("a")).unwrap();
1267
1268        // Manually set an old turn_started_at
1269        {
1270            let conn = backend.conn.lock();
1271            let old_time = (Utc::now() - Duration::seconds(600)).to_rfc3339();
1272            conn.execute(
1273                "UPDATE session_metadata SET state = 'running', turn_id = 'old', turn_started_at = ?1 WHERE session_key = 's1'",
1274                params![old_time],
1275            ).unwrap();
1276        }
1277
1278        let stuck = backend.list_stuck_sessions(300); // 5 min threshold
1279        assert_eq!(stuck.len(), 1);
1280        assert_eq!(stuck[0].key, "s1");
1281
1282        // Not stuck if threshold is longer
1283        let not_stuck = backend.list_stuck_sessions(900); // 15 min threshold
1284        assert_eq!(not_stuck.len(), 0);
1285    }
1286
1287    #[test]
1288    fn get_session_state_nonexistent() {
1289        let tmp = TempDir::new().unwrap();
1290        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1291        let state = backend.get_session_state("nonexistent").unwrap();
1292        assert!(state.is_none());
1293    }
1294
1295    #[test]
1296    fn session_state_migration_preserves_data() {
1297        let tmp = TempDir::new().unwrap();
1298        // Create backend (runs migration)
1299        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1300        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1301
1302        // Re-open (migration should be idempotent)
1303        drop(backend);
1304        let backend2 = SqliteSessionBackend::new(tmp.path()).unwrap();
1305        let msgs = backend2.load("s1");
1306        assert_eq!(msgs.len(), 1);
1307        assert_eq!(msgs[0].content, "hello");
1308
1309        // State should default to idle
1310        let state = backend2.get_session_state("s1").unwrap().unwrap();
1311        assert_eq!(state.state, "idle");
1312    }
1313
1314    #[test]
1315    fn empty_name_clears_to_none() {
1316        let tmp = TempDir::new().unwrap();
1317        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1318
1319        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1320        backend.set_session_name("s1", "Named").unwrap();
1321        backend.set_session_name("s1", "").unwrap();
1322
1323        let meta = backend.list_sessions_with_metadata();
1324        assert!(meta[0].name.is_none());
1325    }
1326
1327    // ── get_session_metadata tests ─────────────────────────────────
1328
1329    #[test]
1330    fn get_session_metadata_returns_full_metadata() {
1331        let tmp = TempDir::new().unwrap();
1332        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1333
1334        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1335        backend.append("s1", &ChatMessage::assistant("hi")).unwrap();
1336        backend.set_session_name("s1", "My Chat").unwrap();
1337
1338        let meta = backend.get_session_metadata("s1").unwrap();
1339        assert_eq!(meta.key, "s1");
1340        assert_eq!(meta.name.as_deref(), Some("My Chat"));
1341        assert_eq!(meta.message_count, 2);
1342    }
1343
1344    #[test]
1345    fn get_session_metadata_returns_none_for_missing() {
1346        let tmp = TempDir::new().unwrap();
1347        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1348        assert!(backend.get_session_metadata("nonexistent").is_none());
1349    }
1350
1351    #[test]
1352    fn agent_alias_roundtrips_through_metadata() {
1353        let tmp = TempDir::new().unwrap();
1354        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1355
1356        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1357        backend.set_session_agent_alias("s1", "scout").unwrap();
1358
1359        let meta = backend.get_session_metadata("s1").unwrap();
1360        assert_eq!(meta.agent_alias.as_deref(), Some("scout"));
1361
1362        let listed = backend.list_sessions_with_metadata();
1363        let row = listed.iter().find(|m| m.key == "s1").unwrap();
1364        assert_eq!(row.agent_alias.as_deref(), Some("scout"));
1365
1366        // Standalone getter also works.
1367        let alias = backend.get_session_agent_alias("s1").unwrap();
1368        assert_eq!(alias.as_deref(), Some("scout"));
1369    }
1370
1371    #[test]
1372    fn agent_alias_set_before_any_append_upserts_metadata() {
1373        let tmp = TempDir::new().unwrap();
1374        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1375
1376        // No prior append — metadata row does not exist yet. UPSERT
1377        // path must still record the alias so the WS handshake can
1378        // attribute the session before the first user message lands.
1379        backend.set_session_agent_alias("s1", "scout").unwrap();
1380
1381        let alias = backend.get_session_agent_alias("s1").unwrap();
1382        assert_eq!(alias.as_deref(), Some("scout"));
1383    }
1384
1385    #[test]
1386    fn session_context_roundtrips_channel_room_sender() {
1387        let tmp = TempDir::new().unwrap();
1388        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1389
1390        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1391        backend
1392            .set_session_context(
1393                "s1",
1394                SessionContext {
1395                    channel_id: Some("discord.clamps"),
1396                    room_id: Some("1234567890"),
1397                    sender_id: Some("@user:matrix"),
1398                },
1399            )
1400            .unwrap();
1401
1402        let meta = backend.get_session_metadata("s1").unwrap();
1403        assert_eq!(meta.channel_id.as_deref(), Some("discord.clamps"));
1404        assert_eq!(meta.room_id.as_deref(), Some("1234567890"));
1405        assert_eq!(meta.sender_id.as_deref(), Some("@user:matrix"));
1406
1407        // Second call with partial context must NOT clear the columns
1408        // already filled in — set_session_context is additive.
1409        backend
1410            .set_session_context(
1411                "s1",
1412                SessionContext {
1413                    channel_id: None,
1414                    room_id: Some("1234567890"),
1415                    sender_id: None,
1416                },
1417            )
1418            .unwrap();
1419        let meta = backend.get_session_metadata("s1").unwrap();
1420        assert_eq!(meta.channel_id.as_deref(), Some("discord.clamps"));
1421        assert_eq!(meta.sender_id.as_deref(), Some("@user:matrix"));
1422    }
1423
1424    #[test]
1425    fn session_context_creates_metadata_row_before_first_append() {
1426        let tmp = TempDir::new().unwrap();
1427        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1428
1429        backend
1430            .set_session_context(
1431                "s1",
1432                SessionContext {
1433                    channel_id: Some("telegram.production"),
1434                    room_id: None,
1435                    sender_id: Some("@alice"),
1436                },
1437            )
1438            .unwrap();
1439
1440        let meta = backend.get_session_metadata("s1").unwrap();
1441        assert_eq!(meta.channel_id.as_deref(), Some("telegram.production"));
1442        assert_eq!(meta.sender_id.as_deref(), Some("@alice"));
1443        assert!(meta.room_id.is_none());
1444    }
1445
1446    #[test]
1447    fn get_session_metadata_matches_list() {
1448        let tmp = TempDir::new().unwrap();
1449        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1450
1451        backend.append("s1", &ChatMessage::user("a")).unwrap();
1452        backend.append("s1", &ChatMessage::user("b")).unwrap();
1453        backend.append("s2", &ChatMessage::user("c")).unwrap();
1454
1455        let single = backend.get_session_metadata("s1").unwrap();
1456        let all = backend.list_sessions_with_metadata();
1457        let from_list = all.iter().find(|m| m.key == "s1").unwrap();
1458
1459        assert_eq!(single.message_count, from_list.message_count);
1460        assert_eq!(single.name, from_list.name);
1461        assert_eq!(single.created_at, from_list.created_at);
1462        assert_eq!(single.last_activity, from_list.last_activity);
1463    }
1464}