Skip to main content

zeroclaw_infra/
session_sqlite.rs

1//! SQLite-backed session persistence with FTS5 search.
2//!
3//! Stores sessions in `{workspace}/sessions/sessions.db` using WAL mode.
4//! Provides full-text search via FTS5 and automatic TTL-based cleanup.
5//! Designed as the default backend, replacing JSONL for new installations.
6
7use crate::session_backend::{
8    SessionBackend, SessionContext, SessionMetadata, SessionQuery, SessionState,
9};
10use anyhow::{Context, Result};
11use chrono::{DateTime, Duration, Utc};
12use parking_lot::Mutex;
13use rusqlite::{Connection, params};
14use std::path::Path;
15use zeroclaw_api::model_provider::ChatMessage;
16
17/// SQLite-backed session store with FTS5 and WAL mode.
18pub struct SqliteSessionBackend {
19    conn: Mutex<Connection>,
20}
21
22impl SqliteSessionBackend {
23    /// Open or create the sessions database.
24    pub fn new(workspace_dir: &Path) -> Result<Self> {
25        let sessions_dir = workspace_dir.join("sessions");
26        std::fs::create_dir_all(&sessions_dir).context("Failed to create sessions directory")?;
27        let db_path = sessions_dir.join("sessions.db");
28
29        let conn = Connection::open(&db_path)
30            .with_context(|| format!("Failed to open session DB: {}", db_path.display()))?;
31
32        conn.execute_batch(
33            "PRAGMA journal_mode = WAL;
34             PRAGMA synchronous = NORMAL;
35             PRAGMA temp_store = MEMORY;
36             PRAGMA mmap_size = 4194304;",
37        )?;
38
39        conn.execute_batch(
40            "CREATE TABLE IF NOT EXISTS sessions (
41                id          INTEGER PRIMARY KEY AUTOINCREMENT,
42                session_key TEXT NOT NULL,
43                role        TEXT NOT NULL,
44                content     TEXT NOT NULL,
45                created_at  TEXT NOT NULL
46             );
47             CREATE INDEX IF NOT EXISTS idx_sessions_key ON sessions(session_key);
48             CREATE INDEX IF NOT EXISTS idx_sessions_key_id ON sessions(session_key, id);
49
50             CREATE TABLE IF NOT EXISTS session_metadata (
51                session_key  TEXT PRIMARY KEY,
52                created_at   TEXT NOT NULL,
53                last_activity TEXT NOT NULL,
54                message_count INTEGER NOT NULL DEFAULT 0,
55                name         TEXT
56             );
57
58             CREATE VIRTUAL TABLE IF NOT EXISTS sessions_fts USING fts5(
59                session_key, content, content=sessions, content_rowid=id
60             );
61
62             CREATE TRIGGER IF NOT EXISTS sessions_ai AFTER INSERT ON sessions BEGIN
63                INSERT INTO sessions_fts(rowid, session_key, content)
64                VALUES (new.id, new.session_key, new.content);
65             END;
66             CREATE TRIGGER IF NOT EXISTS sessions_ad AFTER DELETE ON sessions BEGIN
67                INSERT INTO sessions_fts(sessions_fts, rowid, session_key, content)
68                VALUES ('delete', old.id, old.session_key, old.content);
69             END;
70             CREATE TRIGGER IF NOT EXISTS sessions_au AFTER UPDATE ON sessions BEGIN
71                INSERT INTO sessions_fts(sessions_fts, rowid, session_key, content)
72                VALUES ('delete', old.id, old.session_key, old.content);
73                INSERT INTO sessions_fts(rowid, session_key, content)
74                VALUES (new.id, new.session_key, new.content);
75             END;",
76        )
77        .context("Failed to initialize session schema")?;
78
79        // Migration: add name column to existing databases
80        let has_name: bool = conn
81            .query_row(
82                "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'name'",
83                [],
84                |row| row.get(0),
85            )
86            .unwrap_or(false);
87        if !has_name {
88            let _ = conn.execute("ALTER TABLE session_metadata ADD COLUMN name TEXT", []);
89        }
90
91        // Migration: add state tracking columns
92        let has_state: bool = conn
93            .query_row(
94                "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'state'",
95                [],
96                |row| row.get(0),
97            )
98            .unwrap_or(false);
99        if !has_state {
100            let _ = conn.execute(
101                "ALTER TABLE session_metadata ADD COLUMN state TEXT NOT NULL DEFAULT 'idle'",
102                [],
103            );
104            let _ = conn.execute("ALTER TABLE session_metadata ADD COLUMN turn_id TEXT", []);
105            let _ = conn.execute(
106                "ALTER TABLE session_metadata ADD COLUMN turn_started_at TEXT",
107                [],
108            );
109        }
110
111        // Migration: add agent_alias column for per-agent attribution
112        let has_agent_alias: bool = conn
113            .query_row(
114                "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'agent_alias'",
115                [],
116                |row| row.get(0),
117            )
118            .unwrap_or(false);
119        if !has_agent_alias {
120            let _ = conn.execute(
121                "ALTER TABLE session_metadata ADD COLUMN agent_alias TEXT",
122                [],
123            );
124            let _ = conn.execute(
125                "CREATE INDEX IF NOT EXISTS idx_session_metadata_agent_alias \
126                 ON session_metadata(agent_alias)",
127                [],
128            );
129        }
130
131        // Migration: structured routing columns. Each session metadata row
132        // gets the channel ref (`<type>.<alias>` like `discord.clamps`),
133        // the platform-side room/thread id, and the inbound sender id so
134        // dashboard filters and audit queries don't have to re-parse the
135        // `session_key` composition that orchestrator::conversation_history_key
136        // builds.  All three are nullable for backfill compatibility.
137        for (column, ddl) in [
138            (
139                "channel_id",
140                "ALTER TABLE session_metadata ADD COLUMN channel_id TEXT",
141            ),
142            (
143                "room_id",
144                "ALTER TABLE session_metadata ADD COLUMN room_id TEXT",
145            ),
146            (
147                "sender_id",
148                "ALTER TABLE session_metadata ADD COLUMN sender_id TEXT",
149            ),
150        ] {
151            let present: bool = conn
152                .query_row(
153                    "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') \
154                     WHERE name = ?1",
155                    params![column],
156                    |row| row.get(0),
157                )
158                .unwrap_or(false);
159            if !present {
160                let _ = conn.execute(ddl, []);
161            }
162        }
163        let _ = conn.execute(
164            "CREATE INDEX IF NOT EXISTS idx_session_metadata_channel_id \
165             ON session_metadata(channel_id)",
166            [],
167        );
168        let _ = conn.execute(
169            "CREATE INDEX IF NOT EXISTS idx_session_metadata_room_id \
170             ON session_metadata(room_id)",
171            [],
172        );
173        let _ = conn.execute(
174            "CREATE INDEX IF NOT EXISTS idx_session_metadata_sender_id \
175             ON session_metadata(sender_id)",
176            [],
177        );
178
179        Ok(Self {
180            conn: Mutex::new(conn),
181        })
182    }
183
184    /// Migrate JSONL session files into SQLite. Renames migrated files to `.jsonl.migrated`.
185    pub fn migrate_from_jsonl(&self, workspace_dir: &Path) -> Result<usize> {
186        let sessions_dir = workspace_dir.join("sessions");
187        let entries = match std::fs::read_dir(&sessions_dir) {
188            Ok(e) => e,
189            Err(_) => return Ok(0),
190        };
191
192        let mut migrated = 0;
193        for entry in entries {
194            let entry = match entry {
195                Ok(e) => e,
196                Err(_) => continue,
197            };
198            let name = match entry.file_name().into_string() {
199                Ok(n) => n,
200                Err(_) => continue,
201            };
202            let Some(key) = name.strip_suffix(".jsonl") else {
203                continue;
204            };
205
206            let path = entry.path();
207            let file = match std::fs::File::open(&path) {
208                Ok(f) => f,
209                Err(_) => continue,
210            };
211
212            let reader = std::io::BufReader::new(file);
213            let mut count = 0;
214            for line in std::io::BufRead::lines(reader) {
215                let Ok(line) = line else { continue };
216                let trimmed = line.trim();
217                if trimmed.is_empty() {
218                    continue;
219                }
220                if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed)
221                    && self.append(key, &msg).is_ok()
222                {
223                    count += 1;
224                }
225            }
226
227            if count > 0 {
228                let migrated_path = path.with_extension("jsonl.migrated");
229                let _ = std::fs::rename(&path, &migrated_path);
230                migrated += 1;
231            }
232        }
233
234        Ok(migrated)
235    }
236}
237
238impl SessionBackend for SqliteSessionBackend {
239    fn load(&self, session_key: &str) -> Vec<ChatMessage> {
240        let conn = self.conn.lock();
241        let mut stmt = match conn
242            .prepare("SELECT role, content FROM sessions WHERE session_key = ?1 ORDER BY id ASC")
243        {
244            Ok(s) => s,
245            Err(_) => return Vec::new(),
246        };
247
248        let rows = match stmt.query_map(params![session_key], |row| {
249            Ok(ChatMessage {
250                role: row.get(0)?,
251                content: row.get(1)?,
252            })
253        }) {
254            Ok(r) => r,
255            Err(_) => return Vec::new(),
256        };
257
258        rows.filter_map(|r| r.ok()).collect()
259    }
260
261    fn load_with_timestamps(
262        &self,
263        session_key: &str,
264    ) -> Vec<crate::session_backend::TimestampedMessage> {
265        use crate::session_backend::TimestampedMessage;
266        let conn = self.conn.lock();
267        let mut stmt = match conn.prepare(
268            "SELECT role, content, created_at FROM sessions WHERE session_key = ?1 ORDER BY id ASC",
269        ) {
270            Ok(s) => s,
271            Err(_) => return Vec::new(),
272        };
273
274        let rows = match stmt.query_map(params![session_key], |row| {
275            let role: String = row.get(0)?;
276            let content: String = row.get(1)?;
277            let created_at_raw: Option<String> = row.get(2).ok();
278            let created_at = created_at_raw
279                .as_deref()
280                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
281                .map(|dt| dt.with_timezone(&Utc));
282            Ok(TimestampedMessage {
283                message: ChatMessage { role, content },
284                created_at,
285            })
286        }) {
287            Ok(r) => r,
288            Err(_) => return Vec::new(),
289        };
290
291        rows.filter_map(|r| r.ok()).collect()
292    }
293
294    fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
295        let conn = self.conn.lock();
296        let now = Utc::now().to_rfc3339();
297
298        conn.execute(
299            "INSERT INTO sessions (session_key, role, content, created_at)
300             VALUES (?1, ?2, ?3, ?4)",
301            params![session_key, message.role, message.content, now],
302        )
303        .map_err(std::io::Error::other)?;
304
305        // Upsert metadata
306        conn.execute(
307            "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count)
308             VALUES (?1, ?2, ?3, 1)
309             ON CONFLICT(session_key) DO UPDATE SET
310                last_activity = excluded.last_activity,
311                message_count = message_count + 1",
312            params![session_key, now, now],
313        )
314        .map_err(std::io::Error::other)?;
315
316        Ok(())
317    }
318
319    fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
320        let conn = self.conn.lock();
321
322        let last_id: Option<i64> = conn
323            .query_row(
324                "SELECT id FROM sessions WHERE session_key = ?1 ORDER BY id DESC LIMIT 1",
325                params![session_key],
326                |row| row.get(0),
327            )
328            .ok();
329
330        let Some(id) = last_id else {
331            return Ok(false);
332        };
333
334        conn.execute("DELETE FROM sessions WHERE id = ?1", params![id])
335            .map_err(std::io::Error::other)?;
336
337        // Update metadata count
338        conn.execute(
339            "UPDATE session_metadata SET message_count = MAX(0, message_count - 1)
340             WHERE session_key = ?1",
341            params![session_key],
342        )
343        .map_err(std::io::Error::other)?;
344
345        Ok(true)
346    }
347
348    /// Efficiently update the last message in-place (single UPDATE instead of
349    /// DELETE + INSERT). Used for incremental persistence during streaming.
350    fn update_last(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<bool> {
351        let conn = self.conn.lock();
352
353        let last_id: Option<i64> = conn
354            .query_row(
355                "SELECT id FROM sessions WHERE session_key = ?1 ORDER BY id DESC LIMIT 1",
356                params![session_key],
357                |row| row.get(0),
358            )
359            .ok();
360
361        let Some(id) = last_id else {
362            return Ok(false);
363        };
364
365        conn.execute(
366            "UPDATE sessions SET role = ?1, content = ?2 WHERE id = ?3",
367            params![message.role, message.content, id],
368        )
369        .map_err(std::io::Error::other)?;
370
371        // NOTE: FTS index becomes stale here (no UPDATE trigger, only
372        // INSERT/DELETE triggers). This is acceptable — update_last is
373        // used for transient streaming snapshots. The final content will
374        // be correct in the sessions table for load().
375
376        let now = chrono::Utc::now().to_rfc3339();
377        conn.execute(
378            "UPDATE session_metadata SET last_activity = ?1 WHERE session_key = ?2",
379            params![now, session_key],
380        )
381        .map_err(std::io::Error::other)?;
382
383        Ok(true)
384    }
385
386    fn list_sessions(&self) -> Vec<String> {
387        let conn = self.conn.lock();
388        let mut stmt = match conn
389            .prepare("SELECT session_key FROM session_metadata ORDER BY last_activity DESC")
390        {
391            Ok(s) => s,
392            Err(_) => return Vec::new(),
393        };
394
395        let rows = match stmt.query_map([], |row| row.get(0)) {
396            Ok(r) => r,
397            Err(_) => return Vec::new(),
398        };
399
400        rows.filter_map(|r| r.ok()).collect()
401    }
402
403    fn list_sessions_with_metadata(&self) -> Vec<SessionMetadata> {
404        let conn = self.conn.lock();
405        let mut stmt = match conn.prepare(
406            "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
407             FROM session_metadata ORDER BY last_activity DESC",
408        ) {
409            Ok(s) => s,
410            Err(_) => return Vec::new(),
411        };
412
413        let rows = match stmt.query_map([], |row| {
414            let key: String = row.get(0)?;
415            let created_str: String = row.get(1)?;
416            let activity_str: String = row.get(2)?;
417            let count: i64 = row.get(3)?;
418            let name: Option<String> = row.get(4)?;
419            let agent_alias: Option<String> = row.get(5)?;
420            let channel_id: Option<String> = row.get(6)?;
421            let room_id: Option<String> = row.get(7)?;
422            let sender_id: Option<String> = row.get(8)?;
423
424            let created = DateTime::parse_from_rfc3339(&created_str)
425                .map(|dt| dt.with_timezone(&Utc))
426                .unwrap_or_else(|_| Utc::now());
427            let activity = DateTime::parse_from_rfc3339(&activity_str)
428                .map(|dt| dt.with_timezone(&Utc))
429                .unwrap_or_else(|_| Utc::now());
430
431            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
432            Ok(SessionMetadata {
433                key,
434                name,
435                created_at: created,
436                last_activity: activity,
437                message_count: count as usize,
438                agent_alias,
439                channel_id,
440                room_id,
441                sender_id,
442            })
443        }) {
444            Ok(r) => r,
445            Err(_) => return Vec::new(),
446        };
447
448        rows.filter_map(|r| r.ok()).collect()
449    }
450
451    fn cleanup_stale(&self, ttl_hours: u32) -> std::io::Result<usize> {
452        let conn = self.conn.lock();
453        let cutoff = (Utc::now() - Duration::hours(i64::from(ttl_hours))).to_rfc3339();
454
455        // Find stale sessions
456        let stale_keys: Vec<String> = {
457            let mut stmt = conn
458                .prepare("SELECT session_key FROM session_metadata WHERE last_activity < ?1")
459                .map_err(std::io::Error::other)?;
460            let rows = stmt
461                .query_map(params![cutoff], |row| row.get(0))
462                .map_err(std::io::Error::other)?;
463            rows.filter_map(|r| r.ok()).collect()
464        };
465
466        let count = stale_keys.len();
467        for key in &stale_keys {
468            let _ = conn.execute("DELETE FROM sessions WHERE session_key = ?1", params![key]);
469            let _ = conn.execute(
470                "DELETE FROM session_metadata WHERE session_key = ?1",
471                params![key],
472            );
473        }
474
475        Ok(count)
476    }
477
478    fn clear_messages(&self, session_key: &str) -> std::io::Result<usize> {
479        let conn = self.conn.lock();
480
481        conn.execute(
482            "DELETE FROM sessions WHERE session_key = ?1",
483            params![session_key],
484        )
485        .map_err(std::io::Error::other)?;
486
487        let count = conn.changes() as usize;
488
489        if count > 0 {
490            conn.execute(
491                "UPDATE session_metadata SET message_count = 0, last_activity = ?1 WHERE session_key = ?2",
492                params![Utc::now().to_rfc3339(), session_key],
493            )
494            .map_err(std::io::Error::other)?;
495        }
496
497        Ok(count)
498    }
499
500    fn delete_session(&self, session_key: &str) -> std::io::Result<bool> {
501        let conn = self.conn.lock();
502
503        // Check if session exists
504        let exists: bool = conn
505            .query_row(
506                "SELECT COUNT(*) > 0 FROM session_metadata WHERE session_key = ?1",
507                params![session_key],
508                |row| row.get(0),
509            )
510            .unwrap_or(false);
511
512        if !exists {
513            return Ok(false);
514        }
515
516        // Delete messages (FTS5 trigger handles sessions_fts cleanup)
517        conn.execute(
518            "DELETE FROM sessions WHERE session_key = ?1",
519            params![session_key],
520        )
521        .map_err(std::io::Error::other)?;
522
523        // Delete metadata
524        conn.execute(
525            "DELETE FROM session_metadata WHERE session_key = ?1",
526            params![session_key],
527        )
528        .map_err(std::io::Error::other)?;
529
530        Ok(true)
531    }
532
533    /// Cheap existence probe used by the gateway to skip cancelled-append
534    /// writes against a session the user just deleted (#7126). Mirrors the
535    /// row that `delete_session` wipes — once the metadata row is gone the
536    /// session is considered deleted, even if a stray DELETE on the
537    /// `sessions` table might still race ahead.
538    fn session_exists(&self, session_key: &str) -> bool {
539        let conn = self.conn.lock();
540        conn.query_row(
541            "SELECT 1 FROM session_metadata WHERE session_key = ?1 LIMIT 1",
542            params![session_key],
543            |_| Ok(()),
544        )
545        .is_ok()
546    }
547
548    fn set_session_name(&self, session_key: &str, name: &str) -> std::io::Result<()> {
549        let conn = self.conn.lock();
550        let name_val = if name.is_empty() { None } else { Some(name) };
551        conn.execute(
552            "UPDATE session_metadata SET name = ?1 WHERE session_key = ?2",
553            params![name_val, session_key],
554        )
555        .map_err(std::io::Error::other)?;
556        Ok(())
557    }
558
559    fn get_session_name(&self, session_key: &str) -> std::io::Result<Option<String>> {
560        let conn = self.conn.lock();
561        conn.query_row(
562            "SELECT name FROM session_metadata WHERE session_key = ?1",
563            params![session_key],
564            |row| row.get(0),
565        )
566        .map_err(std::io::Error::other)
567    }
568
569    fn get_session_metadata(&self, session_key: &str) -> Option<SessionMetadata> {
570        let conn = self.conn.lock();
571        conn.query_row(
572            "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
573             FROM session_metadata WHERE session_key = ?1",
574            params![session_key],
575            |row| {
576                let key: String = row.get(0)?;
577                let created_str: String = row.get(1)?;
578                let activity_str: String = row.get(2)?;
579                let count: i64 = row.get(3)?;
580                let name: Option<String> = row.get(4)?;
581                let agent_alias: Option<String> = row.get(5)?;
582                let channel_id: Option<String> = row.get(6)?;
583                let room_id: Option<String> = row.get(7)?;
584                let sender_id: Option<String> = row.get(8)?;
585
586                let created = DateTime::parse_from_rfc3339(&created_str)
587                    .map(|dt| dt.with_timezone(&Utc))
588                    .unwrap_or_else(|_| Utc::now());
589                let activity = DateTime::parse_from_rfc3339(&activity_str)
590                    .map(|dt| dt.with_timezone(&Utc))
591                    .unwrap_or_else(|_| Utc::now());
592
593                #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
594                Ok(SessionMetadata {
595                    key,
596                    name,
597                    created_at: created,
598                    last_activity: activity,
599                    message_count: count as usize,
600                    agent_alias,
601                    channel_id,
602                    room_id,
603                    sender_id,
604                })
605            },
606        )
607        .ok()
608    }
609
610    fn set_session_state(
611        &self,
612        session_key: &str,
613        state: &str,
614        turn_id: Option<&str>,
615    ) -> std::io::Result<()> {
616        let conn = self.conn.lock();
617        let now = Utc::now().to_rfc3339();
618        let started_at = if state == "running" {
619            Some(now.as_str())
620        } else {
621            None
622        };
623        conn.execute(
624            "UPDATE session_metadata SET state = ?1, turn_id = ?2, turn_started_at = ?3
625             WHERE session_key = ?4",
626            params![state, turn_id, started_at, session_key],
627        )
628        .map_err(std::io::Error::other)?;
629        Ok(())
630    }
631
632    fn get_session_state(&self, session_key: &str) -> std::io::Result<Option<SessionState>> {
633        let conn = self.conn.lock();
634        conn.query_row(
635            "SELECT state, turn_id, turn_started_at FROM session_metadata WHERE session_key = ?1",
636            params![session_key],
637            |row| {
638                let state: String = row.get(0)?;
639                let turn_id: Option<String> = row.get(1)?;
640                let started_str: Option<String> = row.get(2)?;
641                let turn_started_at = started_str.and_then(|s| {
642                    chrono::DateTime::parse_from_rfc3339(&s)
643                        .ok()
644                        .map(|dt| dt.with_timezone(&Utc))
645                });
646                Ok(SessionState {
647                    state,
648                    turn_id,
649                    turn_started_at,
650                })
651            },
652        )
653        .map(Some)
654        .or_else(|e| match e {
655            rusqlite::Error::QueryReturnedNoRows => Ok(None),
656            other => Err(std::io::Error::other(other)),
657        })
658    }
659
660    fn list_running_sessions(&self) -> Vec<SessionMetadata> {
661        let conn = self.conn.lock();
662        let mut stmt = match conn.prepare(
663            "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
664             FROM session_metadata WHERE state = 'running' ORDER BY turn_started_at DESC",
665        ) {
666            Ok(s) => s,
667            Err(_) => return Vec::new(),
668        };
669
670        let rows = match stmt.query_map([], |row| {
671            let key: String = row.get(0)?;
672            let created_str: String = row.get(1)?;
673            let activity_str: String = row.get(2)?;
674            let count: i64 = row.get(3)?;
675            let name: Option<String> = row.get(4)?;
676            let agent_alias: Option<String> = row.get(5)?;
677            let channel_id: Option<String> = row.get(6)?;
678            let room_id: Option<String> = row.get(7)?;
679            let sender_id: Option<String> = row.get(8)?;
680            let created = DateTime::parse_from_rfc3339(&created_str)
681                .map(|dt| dt.with_timezone(&Utc))
682                .unwrap_or_else(|_| Utc::now());
683            let activity = DateTime::parse_from_rfc3339(&activity_str)
684                .map(|dt| dt.with_timezone(&Utc))
685                .unwrap_or_else(|_| Utc::now());
686            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
687            Ok(SessionMetadata {
688                key,
689                name,
690                created_at: created,
691                last_activity: activity,
692                message_count: count as usize,
693                agent_alias,
694                channel_id,
695                room_id,
696                sender_id,
697            })
698        }) {
699            Ok(r) => r,
700            Err(_) => return Vec::new(),
701        };
702
703        rows.filter_map(|r| r.ok()).collect()
704    }
705
706    fn list_stuck_sessions(&self, threshold_secs: u64) -> Vec<SessionMetadata> {
707        let conn = self.conn.lock();
708        #[allow(clippy::cast_possible_wrap)]
709        let cutoff = (Utc::now() - chrono::Duration::seconds(threshold_secs as i64)).to_rfc3339();
710        let mut stmt = match conn.prepare(
711            "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
712             FROM session_metadata
713             WHERE state = 'running' AND turn_started_at < ?1
714             ORDER BY turn_started_at ASC",
715        ) {
716            Ok(s) => s,
717            Err(_) => return Vec::new(),
718        };
719
720        let rows = match stmt.query_map(params![cutoff], |row| {
721            let key: String = row.get(0)?;
722            let created_str: String = row.get(1)?;
723            let activity_str: String = row.get(2)?;
724            let count: i64 = row.get(3)?;
725            let name: Option<String> = row.get(4)?;
726            let agent_alias: Option<String> = row.get(5)?;
727            let channel_id: Option<String> = row.get(6)?;
728            let room_id: Option<String> = row.get(7)?;
729            let sender_id: Option<String> = row.get(8)?;
730            let created = DateTime::parse_from_rfc3339(&created_str)
731                .map(|dt| dt.with_timezone(&Utc))
732                .unwrap_or_else(|_| Utc::now());
733            let activity = DateTime::parse_from_rfc3339(&activity_str)
734                .map(|dt| dt.with_timezone(&Utc))
735                .unwrap_or_else(|_| Utc::now());
736            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
737            Ok(SessionMetadata {
738                key,
739                name,
740                created_at: created,
741                last_activity: activity,
742                message_count: count as usize,
743                agent_alias,
744                channel_id,
745                room_id,
746                sender_id,
747            })
748        }) {
749            Ok(r) => r,
750            Err(_) => return Vec::new(),
751        };
752
753        rows.filter_map(|r| r.ok()).collect()
754    }
755
756    fn search(&self, query: &SessionQuery) -> Vec<SessionMetadata> {
757        let Some(keyword) = &query.keyword else {
758            return self.list_sessions_with_metadata();
759        };
760
761        let conn = self.conn.lock();
762        #[allow(clippy::cast_possible_wrap)]
763        let limit = query.limit.unwrap_or(50) as i64;
764
765        // FTS5 search
766        let mut stmt = match conn.prepare(
767            "SELECT DISTINCT f.session_key
768             FROM sessions_fts f
769             WHERE sessions_fts MATCH ?1
770             LIMIT ?2",
771        ) {
772            Ok(s) => s,
773            Err(_) => return Vec::new(),
774        };
775
776        // Quote each word for FTS5
777        let fts_query: String = keyword
778            .split_whitespace()
779            .map(|w| format!("\"{w}\""))
780            .collect::<Vec<_>>()
781            .join(" OR ");
782
783        let keys: Vec<String> = match stmt.query_map(params![fts_query, limit], |row| row.get(0)) {
784            Ok(r) => r.filter_map(|r| r.ok()).collect(),
785            Err(_) => return Vec::new(),
786        };
787
788        // Look up metadata for matched sessions
789        keys.iter()
790            .filter_map(|key| {
791                conn.query_row(
792                    "SELECT created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id FROM session_metadata WHERE session_key = ?1",
793                    params![key],
794                    |row| {
795                        let created_str: String = row.get(0)?;
796                        let activity_str: String = row.get(1)?;
797                        let count: i64 = row.get(2)?;
798                        let name: Option<String> = row.get(3)?;
799                        let agent_alias: Option<String> = row.get(4)?;
800                        let channel_id: Option<String> = row.get(5)?;
801                        let room_id: Option<String> = row.get(6)?;
802                        let sender_id: Option<String> = row.get(7)?;
803                        Ok(SessionMetadata {
804                            key: key.clone(),
805                            name,
806                            created_at: DateTime::parse_from_rfc3339(&created_str)
807                                .map(|dt| dt.with_timezone(&Utc))
808                                .unwrap_or_else(|_| Utc::now()),
809                            last_activity: DateTime::parse_from_rfc3339(&activity_str)
810                                .map(|dt| dt.with_timezone(&Utc))
811                                .unwrap_or_else(|_| Utc::now()),
812                            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
813                            message_count: count as usize,
814                            agent_alias,
815                            channel_id,
816                            room_id,
817                            sender_id,
818                        })
819                    },
820                )
821                .ok()
822            })
823            .collect()
824    }
825
826    fn set_session_agent_alias(&self, session_key: &str, agent_alias: &str) -> std::io::Result<()> {
827        let conn = self.conn.lock();
828        let alias_val = if agent_alias.is_empty() {
829            None
830        } else {
831            Some(agent_alias)
832        };
833        let now = Utc::now().to_rfc3339();
834        conn.execute(
835            "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count, agent_alias)
836             VALUES (?1, ?2, ?3, 0, ?4)
837             ON CONFLICT(session_key) DO UPDATE SET agent_alias = excluded.agent_alias",
838            params![session_key, now, now, alias_val],
839        )
840        .map_err(std::io::Error::other)?;
841        Ok(())
842    }
843
844    fn get_session_agent_alias(&self, session_key: &str) -> std::io::Result<Option<String>> {
845        let conn = self.conn.lock();
846        conn.query_row(
847            "SELECT agent_alias FROM session_metadata WHERE session_key = ?1",
848            params![session_key],
849            |row| row.get(0),
850        )
851        .or_else(|e| match e {
852            rusqlite::Error::QueryReturnedNoRows => Ok(None),
853            other => Err(std::io::Error::other(other)),
854        })
855    }
856
857    fn set_session_context(
858        &self,
859        session_key: &str,
860        context: SessionContext<'_>,
861    ) -> std::io::Result<()> {
862        let conn = self.conn.lock();
863        fn normalize(v: Option<&str>) -> Option<&str> {
864            v.map(str::trim).filter(|s| !s.is_empty())
865        }
866        let channel_id = normalize(context.channel_id);
867        let room_id = normalize(context.room_id);
868        let sender_id = normalize(context.sender_id);
869        let now = Utc::now().to_rfc3339();
870        // Insert a metadata stub row when missing so the per-platform
871        // fields land even before the first message append fires the
872        // upsert path. The COALESCE clauses preserve any field a prior
873        // append/set already stamped — channel-side updates only fill in
874        // gaps, they don't overwrite earlier routing context.
875        conn.execute(
876            "INSERT INTO session_metadata
877                (session_key, created_at, last_activity, message_count, channel_id, room_id, sender_id)
878             VALUES (?1, ?2, ?3, 0, ?4, ?5, ?6)
879             ON CONFLICT(session_key) DO UPDATE SET
880                channel_id = COALESCE(excluded.channel_id, session_metadata.channel_id),
881                room_id    = COALESCE(excluded.room_id,    session_metadata.room_id),
882                sender_id  = COALESCE(excluded.sender_id,  session_metadata.sender_id)",
883            params![session_key, now, now, channel_id, room_id, sender_id],
884        )
885        .map_err(std::io::Error::other)?;
886        Ok(())
887    }
888}
889
890#[cfg(test)]
891mod tests {
892    use super::*;
893    use tempfile::TempDir;
894
895    #[test]
896    fn round_trip_sqlite() {
897        let tmp = TempDir::new().unwrap();
898        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
899
900        backend
901            .append("user1", &ChatMessage::user("hello"))
902            .unwrap();
903        backend
904            .append("user1", &ChatMessage::assistant("hi"))
905            .unwrap();
906
907        let msgs = backend.load("user1");
908        assert_eq!(msgs.len(), 2);
909        assert_eq!(msgs[0].role, "user");
910        assert_eq!(msgs[1].role, "assistant");
911    }
912
913    #[test]
914    fn remove_last_sqlite() {
915        let tmp = TempDir::new().unwrap();
916        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
917
918        backend.append("u", &ChatMessage::user("a")).unwrap();
919        backend.append("u", &ChatMessage::user("b")).unwrap();
920
921        assert!(backend.remove_last("u").unwrap());
922        let msgs = backend.load("u");
923        assert_eq!(msgs.len(), 1);
924        assert_eq!(msgs[0].content, "a");
925    }
926
927    #[test]
928    fn remove_last_empty_sqlite() {
929        let tmp = TempDir::new().unwrap();
930        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
931        assert!(!backend.remove_last("nonexistent").unwrap());
932    }
933
934    #[test]
935    fn list_sessions_sqlite() {
936        let tmp = TempDir::new().unwrap();
937        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
938
939        backend.append("a", &ChatMessage::user("hi")).unwrap();
940        backend.append("b", &ChatMessage::user("hey")).unwrap();
941
942        let sessions = backend.list_sessions();
943        assert_eq!(sessions.len(), 2);
944    }
945
946    #[test]
947    fn metadata_tracks_counts() {
948        let tmp = TempDir::new().unwrap();
949        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
950
951        backend.append("s1", &ChatMessage::user("a")).unwrap();
952        backend.append("s1", &ChatMessage::user("b")).unwrap();
953        backend.append("s1", &ChatMessage::user("c")).unwrap();
954
955        let meta = backend.list_sessions_with_metadata();
956        assert_eq!(meta.len(), 1);
957        assert_eq!(meta[0].message_count, 3);
958    }
959
960    #[test]
961    fn fts5_search_finds_content() {
962        let tmp = TempDir::new().unwrap();
963        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
964
965        backend
966            .append(
967                "code_chat",
968                &ChatMessage::user("How do I parse JSON in Rust?"),
969            )
970            .unwrap();
971        backend
972            .append("weather", &ChatMessage::user("What's the weather today?"))
973            .unwrap();
974
975        let results = backend.search(&SessionQuery {
976            keyword: Some("Rust".into()),
977            limit: Some(10),
978        });
979        assert_eq!(results.len(), 1);
980        assert_eq!(results[0].key, "code_chat");
981    }
982
983    #[test]
984    fn fts5_update_trigger_syncs_index() {
985        let tmp = TempDir::new().unwrap();
986        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
987
988        backend
989            .append("chat", &ChatMessage::user("hello world"))
990            .unwrap();
991
992        // Verify initial content is searchable
993        let results = backend.search(&SessionQuery {
994            keyword: Some("hello".into()),
995            limit: Some(10),
996        });
997        assert_eq!(results.len(), 1);
998        assert_eq!(results[0].key, "chat");
999
1000        // Directly update the session content (simulates update_last behavior)
1001        {
1002            let conn = backend.conn.lock();
1003            conn.execute(
1004                "UPDATE sessions SET content = ?1 WHERE session_key = ?2",
1005                params!["goodbye world", "chat"],
1006            )
1007            .unwrap();
1008        }
1009
1010        // Old keyword should no longer match
1011        let results = backend.search(&SessionQuery {
1012            keyword: Some("hello".into()),
1013            limit: Some(10),
1014        });
1015        assert!(results.is_empty());
1016
1017        // New keyword should match after UPDATE trigger syncs FTS index
1018        let results = backend.search(&SessionQuery {
1019            keyword: Some("goodbye".into()),
1020            limit: Some(10),
1021        });
1022        assert_eq!(results.len(), 1);
1023        assert_eq!(results[0].key, "chat");
1024    }
1025
1026    #[test]
1027    fn cleanup_stale_removes_old_sessions() {
1028        let tmp = TempDir::new().unwrap();
1029        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1030
1031        // Insert a session with old timestamp
1032        {
1033            let conn = backend.conn.lock();
1034            let old_time = (Utc::now() - Duration::hours(100)).to_rfc3339();
1035            conn.execute(
1036                "INSERT INTO sessions (session_key, role, content, created_at) VALUES (?1, ?2, ?3, ?4)",
1037                params!["old_session", "user", "ancient", old_time],
1038            ).unwrap();
1039            conn.execute(
1040                "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count) VALUES (?1, ?2, ?3, 1)",
1041                params!["old_session", old_time, old_time],
1042            ).unwrap();
1043        }
1044
1045        backend
1046            .append("new_session", &ChatMessage::user("fresh"))
1047            .unwrap();
1048
1049        let cleaned = backend.cleanup_stale(48).unwrap(); // 48h TTL
1050        assert_eq!(cleaned, 1);
1051
1052        let sessions = backend.list_sessions();
1053        assert_eq!(sessions.len(), 1);
1054        assert_eq!(sessions[0], "new_session");
1055    }
1056
1057    #[test]
1058    fn clear_messages_removes_rows_keeps_metadata() {
1059        let tmp = TempDir::new().unwrap();
1060        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1061
1062        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1063        backend.append("s1", &ChatMessage::assistant("hi")).unwrap();
1064        backend.set_session_name("s1", "My Session").unwrap();
1065
1066        let cleared = backend.clear_messages("s1").unwrap();
1067        assert_eq!(cleared, 2);
1068        assert!(backend.load("s1").is_empty());
1069        // Session still exists in metadata with name preserved
1070        let meta = backend.list_sessions_with_metadata();
1071        assert_eq!(meta.len(), 1);
1072        assert_eq!(meta[0].message_count, 0);
1073        assert_eq!(meta[0].name.as_deref(), Some("My Session"));
1074    }
1075
1076    #[test]
1077    fn clear_messages_empty_returns_zero() {
1078        let tmp = TempDir::new().unwrap();
1079        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1080        assert_eq!(backend.clear_messages("nonexistent").unwrap(), 0);
1081    }
1082
1083    #[test]
1084    fn clear_messages_does_not_affect_other_sessions() {
1085        let tmp = TempDir::new().unwrap();
1086        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1087
1088        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1089        backend.append("s2", &ChatMessage::user("world")).unwrap();
1090
1091        backend.clear_messages("s1").unwrap();
1092        assert!(backend.load("s1").is_empty());
1093        assert_eq!(backend.load("s2").len(), 1);
1094    }
1095
1096    #[test]
1097    fn clear_messages_then_append_works() {
1098        let tmp = TempDir::new().unwrap();
1099        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1100
1101        backend.append("s1", &ChatMessage::user("old")).unwrap();
1102        backend.clear_messages("s1").unwrap();
1103        backend.append("s1", &ChatMessage::user("new")).unwrap();
1104
1105        let messages = backend.load("s1");
1106        assert_eq!(messages.len(), 1);
1107        assert_eq!(messages[0].content, "new");
1108        // Metadata count should reflect the new message
1109        let meta = backend.list_sessions_with_metadata();
1110        assert_eq!(meta[0].message_count, 1);
1111    }
1112
1113    #[test]
1114    fn delete_session_removes_all_data() {
1115        let tmp = TempDir::new().unwrap();
1116        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1117
1118        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1119        backend.append("s1", &ChatMessage::assistant("hi")).unwrap();
1120        backend.append("s2", &ChatMessage::user("other")).unwrap();
1121
1122        assert!(backend.delete_session("s1").unwrap());
1123        assert!(backend.load("s1").is_empty());
1124        assert_eq!(backend.list_sessions().len(), 1);
1125        assert_eq!(backend.list_sessions()[0], "s2");
1126    }
1127
1128    #[test]
1129    fn delete_session_returns_false_for_missing() {
1130        let tmp = TempDir::new().unwrap();
1131        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1132        assert!(!backend.delete_session("nonexistent").unwrap());
1133    }
1134
1135    /// #7126: `session_exists` must reflect the same row that
1136    /// `delete_session` wipes, so the gateway's cancelled-append guard
1137    /// stops resurrecting just-deleted sessions.
1138    #[test]
1139    fn session_exists_tracks_metadata_row() {
1140        let tmp = TempDir::new().unwrap();
1141        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1142
1143        assert!(!backend.session_exists("ghost"));
1144
1145        backend
1146            .append("ghost", &ChatMessage::user("first"))
1147            .unwrap();
1148        assert!(backend.session_exists("ghost"));
1149
1150        assert!(backend.delete_session("ghost").unwrap());
1151        assert!(!backend.session_exists("ghost"));
1152    }
1153
1154    #[test]
1155    fn migrate_from_jsonl_imports_and_renames() {
1156        let tmp = TempDir::new().unwrap();
1157        let sessions_dir = tmp.path().join("sessions");
1158        std::fs::create_dir_all(&sessions_dir).unwrap();
1159
1160        // Create a JSONL file
1161        let jsonl_path = sessions_dir.join("test_user.jsonl");
1162        std::fs::write(
1163            &jsonl_path,
1164            "{\"role\":\"user\",\"content\":\"hello\"}\n{\"role\":\"assistant\",\"content\":\"hi\"}\n",
1165        )
1166        .unwrap();
1167
1168        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1169        let migrated = backend.migrate_from_jsonl(tmp.path()).unwrap();
1170        assert_eq!(migrated, 1);
1171
1172        // JSONL should be renamed
1173        assert!(!jsonl_path.exists());
1174        assert!(sessions_dir.join("test_user.jsonl.migrated").exists());
1175
1176        // Messages should be in SQLite
1177        let msgs = backend.load("test_user");
1178        assert_eq!(msgs.len(), 2);
1179        assert_eq!(msgs[0].content, "hello");
1180    }
1181
1182    #[test]
1183    fn set_session_name_persists() {
1184        let tmp = TempDir::new().unwrap();
1185        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1186
1187        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1188        backend.set_session_name("s1", "My Session").unwrap();
1189
1190        let meta = backend.list_sessions_with_metadata();
1191        assert_eq!(meta.len(), 1);
1192        assert_eq!(meta[0].name.as_deref(), Some("My Session"));
1193    }
1194
1195    #[test]
1196    fn set_session_name_updates_existing() {
1197        let tmp = TempDir::new().unwrap();
1198        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1199
1200        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1201        backend.set_session_name("s1", "First").unwrap();
1202        backend.set_session_name("s1", "Second").unwrap();
1203
1204        let meta = backend.list_sessions_with_metadata();
1205        assert_eq!(meta[0].name.as_deref(), Some("Second"));
1206    }
1207
1208    #[test]
1209    fn sessions_without_name_return_none() {
1210        let tmp = TempDir::new().unwrap();
1211        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1212
1213        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1214
1215        let meta = backend.list_sessions_with_metadata();
1216        assert_eq!(meta.len(), 1);
1217        assert!(meta[0].name.is_none());
1218    }
1219
1220    // ── session state tests ─────────────────────────────────────────
1221
1222    #[test]
1223    fn session_state_idle_to_running() {
1224        let tmp = TempDir::new().unwrap();
1225        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1226        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1227
1228        backend
1229            .set_session_state("s1", "running", Some("turn-1"))
1230            .unwrap();
1231        let state = backend.get_session_state("s1").unwrap().unwrap();
1232        assert_eq!(state.state, "running");
1233        assert_eq!(state.turn_id.as_deref(), Some("turn-1"));
1234        assert!(state.turn_started_at.is_some());
1235    }
1236
1237    #[test]
1238    fn session_state_running_to_idle() {
1239        let tmp = TempDir::new().unwrap();
1240        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1241        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1242
1243        backend
1244            .set_session_state("s1", "running", Some("turn-1"))
1245            .unwrap();
1246        backend.set_session_state("s1", "idle", None).unwrap();
1247
1248        let state = backend.get_session_state("s1").unwrap().unwrap();
1249        assert_eq!(state.state, "idle");
1250        assert!(state.turn_id.is_none());
1251        assert!(state.turn_started_at.is_none());
1252    }
1253
1254    #[test]
1255    fn session_state_running_to_error() {
1256        let tmp = TempDir::new().unwrap();
1257        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1258        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1259
1260        backend
1261            .set_session_state("s1", "running", Some("turn-1"))
1262            .unwrap();
1263        backend
1264            .set_session_state("s1", "error", Some("turn-1"))
1265            .unwrap();
1266
1267        let state = backend.get_session_state("s1").unwrap().unwrap();
1268        assert_eq!(state.state, "error");
1269        assert_eq!(state.turn_id.as_deref(), Some("turn-1"));
1270    }
1271
1272    #[test]
1273    fn list_running_sessions_returns_running_only() {
1274        let tmp = TempDir::new().unwrap();
1275        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1276
1277        backend.append("s1", &ChatMessage::user("a")).unwrap();
1278        backend.append("s2", &ChatMessage::user("b")).unwrap();
1279        backend.append("s3", &ChatMessage::user("c")).unwrap();
1280
1281        backend
1282            .set_session_state("s1", "running", Some("t1"))
1283            .unwrap();
1284        backend
1285            .set_session_state("s2", "running", Some("t2"))
1286            .unwrap();
1287        // s3 stays idle (default)
1288
1289        let running = backend.list_running_sessions();
1290        assert_eq!(running.len(), 2);
1291        let keys: Vec<&str> = running.iter().map(|m| m.key.as_str()).collect();
1292        assert!(keys.contains(&"s1"));
1293        assert!(keys.contains(&"s2"));
1294    }
1295
1296    #[test]
1297    fn list_stuck_sessions_detects_old_running() {
1298        let tmp = TempDir::new().unwrap();
1299        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1300        backend.append("s1", &ChatMessage::user("a")).unwrap();
1301
1302        // Manually set an old turn_started_at
1303        {
1304            let conn = backend.conn.lock();
1305            let old_time = (Utc::now() - Duration::seconds(600)).to_rfc3339();
1306            conn.execute(
1307                "UPDATE session_metadata SET state = 'running', turn_id = 'old', turn_started_at = ?1 WHERE session_key = 's1'",
1308                params![old_time],
1309            ).unwrap();
1310        }
1311
1312        let stuck = backend.list_stuck_sessions(300); // 5 min threshold
1313        assert_eq!(stuck.len(), 1);
1314        assert_eq!(stuck[0].key, "s1");
1315
1316        // Not stuck if threshold is longer
1317        let not_stuck = backend.list_stuck_sessions(900); // 15 min threshold
1318        assert_eq!(not_stuck.len(), 0);
1319    }
1320
1321    #[test]
1322    fn get_session_state_nonexistent() {
1323        let tmp = TempDir::new().unwrap();
1324        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1325        let state = backend.get_session_state("nonexistent").unwrap();
1326        assert!(state.is_none());
1327    }
1328
1329    #[test]
1330    fn session_state_migration_preserves_data() {
1331        let tmp = TempDir::new().unwrap();
1332        // Create backend (runs migration)
1333        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1334        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1335
1336        // Re-open (migration should be idempotent)
1337        drop(backend);
1338        let backend2 = SqliteSessionBackend::new(tmp.path()).unwrap();
1339        let msgs = backend2.load("s1");
1340        assert_eq!(msgs.len(), 1);
1341        assert_eq!(msgs[0].content, "hello");
1342
1343        // State should default to idle
1344        let state = backend2.get_session_state("s1").unwrap().unwrap();
1345        assert_eq!(state.state, "idle");
1346    }
1347
1348    #[test]
1349    fn empty_name_clears_to_none() {
1350        let tmp = TempDir::new().unwrap();
1351        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1352
1353        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1354        backend.set_session_name("s1", "Named").unwrap();
1355        backend.set_session_name("s1", "").unwrap();
1356
1357        let meta = backend.list_sessions_with_metadata();
1358        assert!(meta[0].name.is_none());
1359    }
1360
1361    // ── get_session_metadata tests ─────────────────────────────────
1362
1363    #[test]
1364    fn get_session_metadata_returns_full_metadata() {
1365        let tmp = TempDir::new().unwrap();
1366        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1367
1368        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1369        backend.append("s1", &ChatMessage::assistant("hi")).unwrap();
1370        backend.set_session_name("s1", "My Chat").unwrap();
1371
1372        let meta = backend.get_session_metadata("s1").unwrap();
1373        assert_eq!(meta.key, "s1");
1374        assert_eq!(meta.name.as_deref(), Some("My Chat"));
1375        assert_eq!(meta.message_count, 2);
1376    }
1377
1378    #[test]
1379    fn get_session_metadata_returns_none_for_missing() {
1380        let tmp = TempDir::new().unwrap();
1381        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1382        assert!(backend.get_session_metadata("nonexistent").is_none());
1383    }
1384
1385    #[test]
1386    fn agent_alias_roundtrips_through_metadata() {
1387        let tmp = TempDir::new().unwrap();
1388        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1389
1390        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1391        backend.set_session_agent_alias("s1", "scout").unwrap();
1392
1393        let meta = backend.get_session_metadata("s1").unwrap();
1394        assert_eq!(meta.agent_alias.as_deref(), Some("scout"));
1395
1396        let listed = backend.list_sessions_with_metadata();
1397        let row = listed.iter().find(|m| m.key == "s1").unwrap();
1398        assert_eq!(row.agent_alias.as_deref(), Some("scout"));
1399
1400        // Standalone getter also works.
1401        let alias = backend.get_session_agent_alias("s1").unwrap();
1402        assert_eq!(alias.as_deref(), Some("scout"));
1403    }
1404
1405    #[test]
1406    fn agent_alias_set_before_any_append_upserts_metadata() {
1407        let tmp = TempDir::new().unwrap();
1408        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1409
1410        // No prior append — metadata row does not exist yet. UPSERT
1411        // path must still record the alias so the WS handshake can
1412        // attribute the session before the first user message lands.
1413        backend.set_session_agent_alias("s1", "scout").unwrap();
1414
1415        let alias = backend.get_session_agent_alias("s1").unwrap();
1416        assert_eq!(alias.as_deref(), Some("scout"));
1417    }
1418
1419    #[test]
1420    fn session_context_roundtrips_channel_room_sender() {
1421        let tmp = TempDir::new().unwrap();
1422        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1423
1424        backend.append("s1", &ChatMessage::user("hello")).unwrap();
1425        backend
1426            .set_session_context(
1427                "s1",
1428                SessionContext {
1429                    channel_id: Some("discord.clamps"),
1430                    room_id: Some("1234567890"),
1431                    sender_id: Some("@user:matrix"),
1432                },
1433            )
1434            .unwrap();
1435
1436        let meta = backend.get_session_metadata("s1").unwrap();
1437        assert_eq!(meta.channel_id.as_deref(), Some("discord.clamps"));
1438        assert_eq!(meta.room_id.as_deref(), Some("1234567890"));
1439        assert_eq!(meta.sender_id.as_deref(), Some("@user:matrix"));
1440
1441        // Second call with partial context must NOT clear the columns
1442        // already filled in — set_session_context is additive.
1443        backend
1444            .set_session_context(
1445                "s1",
1446                SessionContext {
1447                    channel_id: None,
1448                    room_id: Some("1234567890"),
1449                    sender_id: None,
1450                },
1451            )
1452            .unwrap();
1453        let meta = backend.get_session_metadata("s1").unwrap();
1454        assert_eq!(meta.channel_id.as_deref(), Some("discord.clamps"));
1455        assert_eq!(meta.sender_id.as_deref(), Some("@user:matrix"));
1456    }
1457
1458    #[test]
1459    fn session_context_creates_metadata_row_before_first_append() {
1460        let tmp = TempDir::new().unwrap();
1461        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1462
1463        backend
1464            .set_session_context(
1465                "s1",
1466                SessionContext {
1467                    channel_id: Some("telegram.production"),
1468                    room_id: None,
1469                    sender_id: Some("@alice"),
1470                },
1471            )
1472            .unwrap();
1473
1474        let meta = backend.get_session_metadata("s1").unwrap();
1475        assert_eq!(meta.channel_id.as_deref(), Some("telegram.production"));
1476        assert_eq!(meta.sender_id.as_deref(), Some("@alice"));
1477        assert!(meta.room_id.is_none());
1478    }
1479
1480    #[test]
1481    fn get_session_metadata_matches_list() {
1482        let tmp = TempDir::new().unwrap();
1483        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1484
1485        backend.append("s1", &ChatMessage::user("a")).unwrap();
1486        backend.append("s1", &ChatMessage::user("b")).unwrap();
1487        backend.append("s2", &ChatMessage::user("c")).unwrap();
1488
1489        let single = backend.get_session_metadata("s1").unwrap();
1490        let all = backend.list_sessions_with_metadata();
1491        let from_list = all.iter().find(|m| m.key == "s1").unwrap();
1492
1493        assert_eq!(single.message_count, from_list.message_count);
1494        assert_eq!(single.name, from_list.name);
1495        assert_eq!(single.created_at, from_list.created_at);
1496        assert_eq!(single.last_activity, from_list.last_activity);
1497    }
1498}