1use crate::session_backend::{
8 SessionBackend, SessionContext, SessionMetadata, SessionQuery, SessionState,
9};
10use anyhow::{Context, Result};
11use chrono::{DateTime, Duration, Utc};
12use parking_lot::Mutex;
13use rusqlite::{Connection, params};
14use std::path::Path;
15use zeroclaw_api::model_provider::ChatMessage;
16
17pub struct SqliteSessionBackend {
19 conn: Mutex<Connection>,
20}
21
22impl SqliteSessionBackend {
23 pub fn new(workspace_dir: &Path) -> Result<Self> {
25 let sessions_dir = workspace_dir.join("sessions");
26 std::fs::create_dir_all(&sessions_dir).context("Failed to create sessions directory")?;
27 let db_path = sessions_dir.join("sessions.db");
28
29 let conn = Connection::open(&db_path)
30 .with_context(|| format!("Failed to open session DB: {}", db_path.display()))?;
31
32 conn.execute_batch(
33 "PRAGMA journal_mode = WAL;
34 PRAGMA synchronous = NORMAL;
35 PRAGMA temp_store = MEMORY;
36 PRAGMA mmap_size = 4194304;",
37 )?;
38
39 conn.execute_batch(
40 "CREATE TABLE IF NOT EXISTS sessions (
41 id INTEGER PRIMARY KEY AUTOINCREMENT,
42 session_key TEXT NOT NULL,
43 role TEXT NOT NULL,
44 content TEXT NOT NULL,
45 created_at TEXT NOT NULL
46 );
47 CREATE INDEX IF NOT EXISTS idx_sessions_key ON sessions(session_key);
48 CREATE INDEX IF NOT EXISTS idx_sessions_key_id ON sessions(session_key, id);
49
50 CREATE TABLE IF NOT EXISTS session_metadata (
51 session_key TEXT PRIMARY KEY,
52 created_at TEXT NOT NULL,
53 last_activity TEXT NOT NULL,
54 message_count INTEGER NOT NULL DEFAULT 0,
55 name TEXT
56 );
57
58 CREATE VIRTUAL TABLE IF NOT EXISTS sessions_fts USING fts5(
59 session_key, content, content=sessions, content_rowid=id
60 );
61
62 CREATE TRIGGER IF NOT EXISTS sessions_ai AFTER INSERT ON sessions BEGIN
63 INSERT INTO sessions_fts(rowid, session_key, content)
64 VALUES (new.id, new.session_key, new.content);
65 END;
66 CREATE TRIGGER IF NOT EXISTS sessions_ad AFTER DELETE ON sessions BEGIN
67 INSERT INTO sessions_fts(sessions_fts, rowid, session_key, content)
68 VALUES ('delete', old.id, old.session_key, old.content);
69 END;
70 CREATE TRIGGER IF NOT EXISTS sessions_au AFTER UPDATE ON sessions BEGIN
71 INSERT INTO sessions_fts(sessions_fts, rowid, session_key, content)
72 VALUES ('delete', old.id, old.session_key, old.content);
73 INSERT INTO sessions_fts(rowid, session_key, content)
74 VALUES (new.id, new.session_key, new.content);
75 END;",
76 )
77 .context("Failed to initialize session schema")?;
78
79 let has_name: bool = conn
81 .query_row(
82 "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'name'",
83 [],
84 |row| row.get(0),
85 )
86 .unwrap_or(false);
87 if !has_name {
88 let _ = conn.execute("ALTER TABLE session_metadata ADD COLUMN name TEXT", []);
89 }
90
91 let has_state: bool = conn
93 .query_row(
94 "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'state'",
95 [],
96 |row| row.get(0),
97 )
98 .unwrap_or(false);
99 if !has_state {
100 let _ = conn.execute(
101 "ALTER TABLE session_metadata ADD COLUMN state TEXT NOT NULL DEFAULT 'idle'",
102 [],
103 );
104 let _ = conn.execute("ALTER TABLE session_metadata ADD COLUMN turn_id TEXT", []);
105 let _ = conn.execute(
106 "ALTER TABLE session_metadata ADD COLUMN turn_started_at TEXT",
107 [],
108 );
109 }
110
111 let has_agent_alias: bool = conn
113 .query_row(
114 "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'agent_alias'",
115 [],
116 |row| row.get(0),
117 )
118 .unwrap_or(false);
119 if !has_agent_alias {
120 let _ = conn.execute(
121 "ALTER TABLE session_metadata ADD COLUMN agent_alias TEXT",
122 [],
123 );
124 let _ = conn.execute(
125 "CREATE INDEX IF NOT EXISTS idx_session_metadata_agent_alias \
126 ON session_metadata(agent_alias)",
127 [],
128 );
129 }
130
131 for (column, ddl) in [
138 (
139 "channel_id",
140 "ALTER TABLE session_metadata ADD COLUMN channel_id TEXT",
141 ),
142 (
143 "room_id",
144 "ALTER TABLE session_metadata ADD COLUMN room_id TEXT",
145 ),
146 (
147 "sender_id",
148 "ALTER TABLE session_metadata ADD COLUMN sender_id TEXT",
149 ),
150 ] {
151 let present: bool = conn
152 .query_row(
153 "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') \
154 WHERE name = ?1",
155 params![column],
156 |row| row.get(0),
157 )
158 .unwrap_or(false);
159 if !present {
160 let _ = conn.execute(ddl, []);
161 }
162 }
163 let _ = conn.execute(
164 "CREATE INDEX IF NOT EXISTS idx_session_metadata_channel_id \
165 ON session_metadata(channel_id)",
166 [],
167 );
168 let _ = conn.execute(
169 "CREATE INDEX IF NOT EXISTS idx_session_metadata_room_id \
170 ON session_metadata(room_id)",
171 [],
172 );
173 let _ = conn.execute(
174 "CREATE INDEX IF NOT EXISTS idx_session_metadata_sender_id \
175 ON session_metadata(sender_id)",
176 [],
177 );
178
179 Ok(Self {
180 conn: Mutex::new(conn),
181 })
182 }
183
184 pub fn migrate_from_jsonl(&self, workspace_dir: &Path) -> Result<usize> {
186 let sessions_dir = workspace_dir.join("sessions");
187 let entries = match std::fs::read_dir(&sessions_dir) {
188 Ok(e) => e,
189 Err(_) => return Ok(0),
190 };
191
192 let mut migrated = 0;
193 for entry in entries {
194 let entry = match entry {
195 Ok(e) => e,
196 Err(_) => continue,
197 };
198 let name = match entry.file_name().into_string() {
199 Ok(n) => n,
200 Err(_) => continue,
201 };
202 let Some(key) = name.strip_suffix(".jsonl") else {
203 continue;
204 };
205
206 let path = entry.path();
207 let file = match std::fs::File::open(&path) {
208 Ok(f) => f,
209 Err(_) => continue,
210 };
211
212 let reader = std::io::BufReader::new(file);
213 let mut count = 0;
214 for line in std::io::BufRead::lines(reader) {
215 let Ok(line) = line else { continue };
216 let trimmed = line.trim();
217 if trimmed.is_empty() {
218 continue;
219 }
220 if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed)
221 && self.append(key, &msg).is_ok()
222 {
223 count += 1;
224 }
225 }
226
227 if count > 0 {
228 let migrated_path = path.with_extension("jsonl.migrated");
229 let _ = std::fs::rename(&path, &migrated_path);
230 migrated += 1;
231 }
232 }
233
234 Ok(migrated)
235 }
236}
237
238impl SessionBackend for SqliteSessionBackend {
239 fn load(&self, session_key: &str) -> Vec<ChatMessage> {
240 let conn = self.conn.lock();
241 let mut stmt = match conn
242 .prepare("SELECT role, content FROM sessions WHERE session_key = ?1 ORDER BY id ASC")
243 {
244 Ok(s) => s,
245 Err(_) => return Vec::new(),
246 };
247
248 let rows = match stmt.query_map(params![session_key], |row| {
249 Ok(ChatMessage {
250 role: row.get(0)?,
251 content: row.get(1)?,
252 })
253 }) {
254 Ok(r) => r,
255 Err(_) => return Vec::new(),
256 };
257
258 rows.filter_map(|r| r.ok()).collect()
259 }
260
261 fn load_with_timestamps(
262 &self,
263 session_key: &str,
264 ) -> Vec<crate::session_backend::TimestampedMessage> {
265 use crate::session_backend::TimestampedMessage;
266 let conn = self.conn.lock();
267 let mut stmt = match conn.prepare(
268 "SELECT role, content, created_at FROM sessions WHERE session_key = ?1 ORDER BY id ASC",
269 ) {
270 Ok(s) => s,
271 Err(_) => return Vec::new(),
272 };
273
274 let rows = match stmt.query_map(params![session_key], |row| {
275 let role: String = row.get(0)?;
276 let content: String = row.get(1)?;
277 let created_at_raw: Option<String> = row.get(2).ok();
278 let created_at = created_at_raw
279 .as_deref()
280 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
281 .map(|dt| dt.with_timezone(&Utc));
282 Ok(TimestampedMessage {
283 message: ChatMessage { role, content },
284 created_at,
285 })
286 }) {
287 Ok(r) => r,
288 Err(_) => return Vec::new(),
289 };
290
291 rows.filter_map(|r| r.ok()).collect()
292 }
293
294 fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
295 let conn = self.conn.lock();
296 let now = Utc::now().to_rfc3339();
297
298 conn.execute(
299 "INSERT INTO sessions (session_key, role, content, created_at)
300 VALUES (?1, ?2, ?3, ?4)",
301 params![session_key, message.role, message.content, now],
302 )
303 .map_err(std::io::Error::other)?;
304
305 conn.execute(
307 "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count)
308 VALUES (?1, ?2, ?3, 1)
309 ON CONFLICT(session_key) DO UPDATE SET
310 last_activity = excluded.last_activity,
311 message_count = message_count + 1",
312 params![session_key, now, now],
313 )
314 .map_err(std::io::Error::other)?;
315
316 Ok(())
317 }
318
319 fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
320 let conn = self.conn.lock();
321
322 let last_id: Option<i64> = conn
323 .query_row(
324 "SELECT id FROM sessions WHERE session_key = ?1 ORDER BY id DESC LIMIT 1",
325 params![session_key],
326 |row| row.get(0),
327 )
328 .ok();
329
330 let Some(id) = last_id else {
331 return Ok(false);
332 };
333
334 conn.execute("DELETE FROM sessions WHERE id = ?1", params![id])
335 .map_err(std::io::Error::other)?;
336
337 conn.execute(
339 "UPDATE session_metadata SET message_count = MAX(0, message_count - 1)
340 WHERE session_key = ?1",
341 params![session_key],
342 )
343 .map_err(std::io::Error::other)?;
344
345 Ok(true)
346 }
347
348 fn update_last(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<bool> {
351 let conn = self.conn.lock();
352
353 let last_id: Option<i64> = conn
354 .query_row(
355 "SELECT id FROM sessions WHERE session_key = ?1 ORDER BY id DESC LIMIT 1",
356 params![session_key],
357 |row| row.get(0),
358 )
359 .ok();
360
361 let Some(id) = last_id else {
362 return Ok(false);
363 };
364
365 conn.execute(
366 "UPDATE sessions SET role = ?1, content = ?2 WHERE id = ?3",
367 params![message.role, message.content, id],
368 )
369 .map_err(std::io::Error::other)?;
370
371 let now = chrono::Utc::now().to_rfc3339();
377 conn.execute(
378 "UPDATE session_metadata SET last_activity = ?1 WHERE session_key = ?2",
379 params![now, session_key],
380 )
381 .map_err(std::io::Error::other)?;
382
383 Ok(true)
384 }
385
386 fn list_sessions(&self) -> Vec<String> {
387 let conn = self.conn.lock();
388 let mut stmt = match conn
389 .prepare("SELECT session_key FROM session_metadata ORDER BY last_activity DESC")
390 {
391 Ok(s) => s,
392 Err(_) => return Vec::new(),
393 };
394
395 let rows = match stmt.query_map([], |row| row.get(0)) {
396 Ok(r) => r,
397 Err(_) => return Vec::new(),
398 };
399
400 rows.filter_map(|r| r.ok()).collect()
401 }
402
403 fn list_sessions_with_metadata(&self) -> Vec<SessionMetadata> {
404 let conn = self.conn.lock();
405 let mut stmt = match conn.prepare(
406 "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
407 FROM session_metadata ORDER BY last_activity DESC",
408 ) {
409 Ok(s) => s,
410 Err(_) => return Vec::new(),
411 };
412
413 let rows = match stmt.query_map([], |row| {
414 let key: String = row.get(0)?;
415 let created_str: String = row.get(1)?;
416 let activity_str: String = row.get(2)?;
417 let count: i64 = row.get(3)?;
418 let name: Option<String> = row.get(4)?;
419 let agent_alias: Option<String> = row.get(5)?;
420 let channel_id: Option<String> = row.get(6)?;
421 let room_id: Option<String> = row.get(7)?;
422 let sender_id: Option<String> = row.get(8)?;
423
424 let created = DateTime::parse_from_rfc3339(&created_str)
425 .map(|dt| dt.with_timezone(&Utc))
426 .unwrap_or_else(|_| Utc::now());
427 let activity = DateTime::parse_from_rfc3339(&activity_str)
428 .map(|dt| dt.with_timezone(&Utc))
429 .unwrap_or_else(|_| Utc::now());
430
431 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
432 Ok(SessionMetadata {
433 key,
434 name,
435 created_at: created,
436 last_activity: activity,
437 message_count: count as usize,
438 agent_alias,
439 channel_id,
440 room_id,
441 sender_id,
442 })
443 }) {
444 Ok(r) => r,
445 Err(_) => return Vec::new(),
446 };
447
448 rows.filter_map(|r| r.ok()).collect()
449 }
450
451 fn cleanup_stale(&self, ttl_hours: u32) -> std::io::Result<usize> {
452 let conn = self.conn.lock();
453 let cutoff = (Utc::now() - Duration::hours(i64::from(ttl_hours))).to_rfc3339();
454
455 let stale_keys: Vec<String> = {
457 let mut stmt = conn
458 .prepare("SELECT session_key FROM session_metadata WHERE last_activity < ?1")
459 .map_err(std::io::Error::other)?;
460 let rows = stmt
461 .query_map(params![cutoff], |row| row.get(0))
462 .map_err(std::io::Error::other)?;
463 rows.filter_map(|r| r.ok()).collect()
464 };
465
466 let count = stale_keys.len();
467 for key in &stale_keys {
468 let _ = conn.execute("DELETE FROM sessions WHERE session_key = ?1", params![key]);
469 let _ = conn.execute(
470 "DELETE FROM session_metadata WHERE session_key = ?1",
471 params![key],
472 );
473 }
474
475 Ok(count)
476 }
477
478 fn clear_messages(&self, session_key: &str) -> std::io::Result<usize> {
479 let conn = self.conn.lock();
480
481 conn.execute(
482 "DELETE FROM sessions WHERE session_key = ?1",
483 params![session_key],
484 )
485 .map_err(std::io::Error::other)?;
486
487 let count = conn.changes() as usize;
488
489 if count > 0 {
490 conn.execute(
491 "UPDATE session_metadata SET message_count = 0, last_activity = ?1 WHERE session_key = ?2",
492 params![Utc::now().to_rfc3339(), session_key],
493 )
494 .map_err(std::io::Error::other)?;
495 }
496
497 Ok(count)
498 }
499
500 fn delete_session(&self, session_key: &str) -> std::io::Result<bool> {
501 let conn = self.conn.lock();
502
503 let exists: bool = conn
505 .query_row(
506 "SELECT COUNT(*) > 0 FROM session_metadata WHERE session_key = ?1",
507 params![session_key],
508 |row| row.get(0),
509 )
510 .unwrap_or(false);
511
512 if !exists {
513 return Ok(false);
514 }
515
516 conn.execute(
518 "DELETE FROM sessions WHERE session_key = ?1",
519 params![session_key],
520 )
521 .map_err(std::io::Error::other)?;
522
523 conn.execute(
525 "DELETE FROM session_metadata WHERE session_key = ?1",
526 params![session_key],
527 )
528 .map_err(std::io::Error::other)?;
529
530 Ok(true)
531 }
532
533 fn set_session_name(&self, session_key: &str, name: &str) -> std::io::Result<()> {
534 let conn = self.conn.lock();
535 let name_val = if name.is_empty() { None } else { Some(name) };
536 conn.execute(
537 "UPDATE session_metadata SET name = ?1 WHERE session_key = ?2",
538 params![name_val, session_key],
539 )
540 .map_err(std::io::Error::other)?;
541 Ok(())
542 }
543
544 fn get_session_name(&self, session_key: &str) -> std::io::Result<Option<String>> {
545 let conn = self.conn.lock();
546 conn.query_row(
547 "SELECT name FROM session_metadata WHERE session_key = ?1",
548 params![session_key],
549 |row| row.get(0),
550 )
551 .map_err(std::io::Error::other)
552 }
553
554 fn get_session_metadata(&self, session_key: &str) -> Option<SessionMetadata> {
555 let conn = self.conn.lock();
556 conn.query_row(
557 "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
558 FROM session_metadata WHERE session_key = ?1",
559 params![session_key],
560 |row| {
561 let key: String = row.get(0)?;
562 let created_str: String = row.get(1)?;
563 let activity_str: String = row.get(2)?;
564 let count: i64 = row.get(3)?;
565 let name: Option<String> = row.get(4)?;
566 let agent_alias: Option<String> = row.get(5)?;
567 let channel_id: Option<String> = row.get(6)?;
568 let room_id: Option<String> = row.get(7)?;
569 let sender_id: Option<String> = row.get(8)?;
570
571 let created = DateTime::parse_from_rfc3339(&created_str)
572 .map(|dt| dt.with_timezone(&Utc))
573 .unwrap_or_else(|_| Utc::now());
574 let activity = DateTime::parse_from_rfc3339(&activity_str)
575 .map(|dt| dt.with_timezone(&Utc))
576 .unwrap_or_else(|_| Utc::now());
577
578 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
579 Ok(SessionMetadata {
580 key,
581 name,
582 created_at: created,
583 last_activity: activity,
584 message_count: count as usize,
585 agent_alias,
586 channel_id,
587 room_id,
588 sender_id,
589 })
590 },
591 )
592 .ok()
593 }
594
595 fn set_session_state(
596 &self,
597 session_key: &str,
598 state: &str,
599 turn_id: Option<&str>,
600 ) -> std::io::Result<()> {
601 let conn = self.conn.lock();
602 let now = Utc::now().to_rfc3339();
603 let started_at = if state == "running" {
604 Some(now.as_str())
605 } else {
606 None
607 };
608 conn.execute(
609 "UPDATE session_metadata SET state = ?1, turn_id = ?2, turn_started_at = ?3
610 WHERE session_key = ?4",
611 params![state, turn_id, started_at, session_key],
612 )
613 .map_err(std::io::Error::other)?;
614 Ok(())
615 }
616
617 fn get_session_state(&self, session_key: &str) -> std::io::Result<Option<SessionState>> {
618 let conn = self.conn.lock();
619 conn.query_row(
620 "SELECT state, turn_id, turn_started_at FROM session_metadata WHERE session_key = ?1",
621 params![session_key],
622 |row| {
623 let state: String = row.get(0)?;
624 let turn_id: Option<String> = row.get(1)?;
625 let started_str: Option<String> = row.get(2)?;
626 let turn_started_at = started_str.and_then(|s| {
627 chrono::DateTime::parse_from_rfc3339(&s)
628 .ok()
629 .map(|dt| dt.with_timezone(&Utc))
630 });
631 Ok(SessionState {
632 state,
633 turn_id,
634 turn_started_at,
635 })
636 },
637 )
638 .map(Some)
639 .or_else(|e| match e {
640 rusqlite::Error::QueryReturnedNoRows => Ok(None),
641 other => Err(std::io::Error::other(other)),
642 })
643 }
644
645 fn list_running_sessions(&self) -> Vec<SessionMetadata> {
646 let conn = self.conn.lock();
647 let mut stmt = match conn.prepare(
648 "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
649 FROM session_metadata WHERE state = 'running' ORDER BY turn_started_at DESC",
650 ) {
651 Ok(s) => s,
652 Err(_) => return Vec::new(),
653 };
654
655 let rows = match stmt.query_map([], |row| {
656 let key: String = row.get(0)?;
657 let created_str: String = row.get(1)?;
658 let activity_str: String = row.get(2)?;
659 let count: i64 = row.get(3)?;
660 let name: Option<String> = row.get(4)?;
661 let agent_alias: Option<String> = row.get(5)?;
662 let channel_id: Option<String> = row.get(6)?;
663 let room_id: Option<String> = row.get(7)?;
664 let sender_id: Option<String> = row.get(8)?;
665 let created = DateTime::parse_from_rfc3339(&created_str)
666 .map(|dt| dt.with_timezone(&Utc))
667 .unwrap_or_else(|_| Utc::now());
668 let activity = DateTime::parse_from_rfc3339(&activity_str)
669 .map(|dt| dt.with_timezone(&Utc))
670 .unwrap_or_else(|_| Utc::now());
671 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
672 Ok(SessionMetadata {
673 key,
674 name,
675 created_at: created,
676 last_activity: activity,
677 message_count: count as usize,
678 agent_alias,
679 channel_id,
680 room_id,
681 sender_id,
682 })
683 }) {
684 Ok(r) => r,
685 Err(_) => return Vec::new(),
686 };
687
688 rows.filter_map(|r| r.ok()).collect()
689 }
690
691 fn list_stuck_sessions(&self, threshold_secs: u64) -> Vec<SessionMetadata> {
692 let conn = self.conn.lock();
693 #[allow(clippy::cast_possible_wrap)]
694 let cutoff = (Utc::now() - chrono::Duration::seconds(threshold_secs as i64)).to_rfc3339();
695 let mut stmt = match conn.prepare(
696 "SELECT session_key, created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id
697 FROM session_metadata
698 WHERE state = 'running' AND turn_started_at < ?1
699 ORDER BY turn_started_at ASC",
700 ) {
701 Ok(s) => s,
702 Err(_) => return Vec::new(),
703 };
704
705 let rows = match stmt.query_map(params![cutoff], |row| {
706 let key: String = row.get(0)?;
707 let created_str: String = row.get(1)?;
708 let activity_str: String = row.get(2)?;
709 let count: i64 = row.get(3)?;
710 let name: Option<String> = row.get(4)?;
711 let agent_alias: Option<String> = row.get(5)?;
712 let channel_id: Option<String> = row.get(6)?;
713 let room_id: Option<String> = row.get(7)?;
714 let sender_id: Option<String> = row.get(8)?;
715 let created = DateTime::parse_from_rfc3339(&created_str)
716 .map(|dt| dt.with_timezone(&Utc))
717 .unwrap_or_else(|_| Utc::now());
718 let activity = DateTime::parse_from_rfc3339(&activity_str)
719 .map(|dt| dt.with_timezone(&Utc))
720 .unwrap_or_else(|_| Utc::now());
721 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
722 Ok(SessionMetadata {
723 key,
724 name,
725 created_at: created,
726 last_activity: activity,
727 message_count: count as usize,
728 agent_alias,
729 channel_id,
730 room_id,
731 sender_id,
732 })
733 }) {
734 Ok(r) => r,
735 Err(_) => return Vec::new(),
736 };
737
738 rows.filter_map(|r| r.ok()).collect()
739 }
740
741 fn search(&self, query: &SessionQuery) -> Vec<SessionMetadata> {
742 let Some(keyword) = &query.keyword else {
743 return self.list_sessions_with_metadata();
744 };
745
746 let conn = self.conn.lock();
747 #[allow(clippy::cast_possible_wrap)]
748 let limit = query.limit.unwrap_or(50) as i64;
749
750 let mut stmt = match conn.prepare(
752 "SELECT DISTINCT f.session_key
753 FROM sessions_fts f
754 WHERE sessions_fts MATCH ?1
755 LIMIT ?2",
756 ) {
757 Ok(s) => s,
758 Err(_) => return Vec::new(),
759 };
760
761 let fts_query: String = keyword
763 .split_whitespace()
764 .map(|w| format!("\"{w}\""))
765 .collect::<Vec<_>>()
766 .join(" OR ");
767
768 let keys: Vec<String> = match stmt.query_map(params![fts_query, limit], |row| row.get(0)) {
769 Ok(r) => r.filter_map(|r| r.ok()).collect(),
770 Err(_) => return Vec::new(),
771 };
772
773 keys.iter()
775 .filter_map(|key| {
776 conn.query_row(
777 "SELECT created_at, last_activity, message_count, name, agent_alias, channel_id, room_id, sender_id FROM session_metadata WHERE session_key = ?1",
778 params![key],
779 |row| {
780 let created_str: String = row.get(0)?;
781 let activity_str: String = row.get(1)?;
782 let count: i64 = row.get(2)?;
783 let name: Option<String> = row.get(3)?;
784 let agent_alias: Option<String> = row.get(4)?;
785 let channel_id: Option<String> = row.get(5)?;
786 let room_id: Option<String> = row.get(6)?;
787 let sender_id: Option<String> = row.get(7)?;
788 Ok(SessionMetadata {
789 key: key.clone(),
790 name,
791 created_at: DateTime::parse_from_rfc3339(&created_str)
792 .map(|dt| dt.with_timezone(&Utc))
793 .unwrap_or_else(|_| Utc::now()),
794 last_activity: DateTime::parse_from_rfc3339(&activity_str)
795 .map(|dt| dt.with_timezone(&Utc))
796 .unwrap_or_else(|_| Utc::now()),
797 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
798 message_count: count as usize,
799 agent_alias,
800 channel_id,
801 room_id,
802 sender_id,
803 })
804 },
805 )
806 .ok()
807 })
808 .collect()
809 }
810
811 fn set_session_agent_alias(&self, session_key: &str, agent_alias: &str) -> std::io::Result<()> {
812 let conn = self.conn.lock();
813 let alias_val = if agent_alias.is_empty() {
814 None
815 } else {
816 Some(agent_alias)
817 };
818 let now = Utc::now().to_rfc3339();
819 conn.execute(
820 "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count, agent_alias)
821 VALUES (?1, ?2, ?3, 0, ?4)
822 ON CONFLICT(session_key) DO UPDATE SET agent_alias = excluded.agent_alias",
823 params![session_key, now, now, alias_val],
824 )
825 .map_err(std::io::Error::other)?;
826 Ok(())
827 }
828
829 fn get_session_agent_alias(&self, session_key: &str) -> std::io::Result<Option<String>> {
830 let conn = self.conn.lock();
831 conn.query_row(
832 "SELECT agent_alias FROM session_metadata WHERE session_key = ?1",
833 params![session_key],
834 |row| row.get(0),
835 )
836 .or_else(|e| match e {
837 rusqlite::Error::QueryReturnedNoRows => Ok(None),
838 other => Err(std::io::Error::other(other)),
839 })
840 }
841
842 fn set_session_context(
843 &self,
844 session_key: &str,
845 context: SessionContext<'_>,
846 ) -> std::io::Result<()> {
847 let conn = self.conn.lock();
848 fn normalize(v: Option<&str>) -> Option<&str> {
849 v.map(str::trim).filter(|s| !s.is_empty())
850 }
851 let channel_id = normalize(context.channel_id);
852 let room_id = normalize(context.room_id);
853 let sender_id = normalize(context.sender_id);
854 let now = Utc::now().to_rfc3339();
855 conn.execute(
861 "INSERT INTO session_metadata
862 (session_key, created_at, last_activity, message_count, channel_id, room_id, sender_id)
863 VALUES (?1, ?2, ?3, 0, ?4, ?5, ?6)
864 ON CONFLICT(session_key) DO UPDATE SET
865 channel_id = COALESCE(excluded.channel_id, session_metadata.channel_id),
866 room_id = COALESCE(excluded.room_id, session_metadata.room_id),
867 sender_id = COALESCE(excluded.sender_id, session_metadata.sender_id)",
868 params![session_key, now, now, channel_id, room_id, sender_id],
869 )
870 .map_err(std::io::Error::other)?;
871 Ok(())
872 }
873}
874
875#[cfg(test)]
876mod tests {
877 use super::*;
878 use tempfile::TempDir;
879
880 #[test]
881 fn round_trip_sqlite() {
882 let tmp = TempDir::new().unwrap();
883 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
884
885 backend
886 .append("user1", &ChatMessage::user("hello"))
887 .unwrap();
888 backend
889 .append("user1", &ChatMessage::assistant("hi"))
890 .unwrap();
891
892 let msgs = backend.load("user1");
893 assert_eq!(msgs.len(), 2);
894 assert_eq!(msgs[0].role, "user");
895 assert_eq!(msgs[1].role, "assistant");
896 }
897
898 #[test]
899 fn remove_last_sqlite() {
900 let tmp = TempDir::new().unwrap();
901 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
902
903 backend.append("u", &ChatMessage::user("a")).unwrap();
904 backend.append("u", &ChatMessage::user("b")).unwrap();
905
906 assert!(backend.remove_last("u").unwrap());
907 let msgs = backend.load("u");
908 assert_eq!(msgs.len(), 1);
909 assert_eq!(msgs[0].content, "a");
910 }
911
912 #[test]
913 fn remove_last_empty_sqlite() {
914 let tmp = TempDir::new().unwrap();
915 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
916 assert!(!backend.remove_last("nonexistent").unwrap());
917 }
918
919 #[test]
920 fn list_sessions_sqlite() {
921 let tmp = TempDir::new().unwrap();
922 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
923
924 backend.append("a", &ChatMessage::user("hi")).unwrap();
925 backend.append("b", &ChatMessage::user("hey")).unwrap();
926
927 let sessions = backend.list_sessions();
928 assert_eq!(sessions.len(), 2);
929 }
930
931 #[test]
932 fn metadata_tracks_counts() {
933 let tmp = TempDir::new().unwrap();
934 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
935
936 backend.append("s1", &ChatMessage::user("a")).unwrap();
937 backend.append("s1", &ChatMessage::user("b")).unwrap();
938 backend.append("s1", &ChatMessage::user("c")).unwrap();
939
940 let meta = backend.list_sessions_with_metadata();
941 assert_eq!(meta.len(), 1);
942 assert_eq!(meta[0].message_count, 3);
943 }
944
945 #[test]
946 fn fts5_search_finds_content() {
947 let tmp = TempDir::new().unwrap();
948 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
949
950 backend
951 .append(
952 "code_chat",
953 &ChatMessage::user("How do I parse JSON in Rust?"),
954 )
955 .unwrap();
956 backend
957 .append("weather", &ChatMessage::user("What's the weather today?"))
958 .unwrap();
959
960 let results = backend.search(&SessionQuery {
961 keyword: Some("Rust".into()),
962 limit: Some(10),
963 });
964 assert_eq!(results.len(), 1);
965 assert_eq!(results[0].key, "code_chat");
966 }
967
968 #[test]
969 fn fts5_update_trigger_syncs_index() {
970 let tmp = TempDir::new().unwrap();
971 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
972
973 backend
974 .append("chat", &ChatMessage::user("hello world"))
975 .unwrap();
976
977 let results = backend.search(&SessionQuery {
979 keyword: Some("hello".into()),
980 limit: Some(10),
981 });
982 assert_eq!(results.len(), 1);
983 assert_eq!(results[0].key, "chat");
984
985 {
987 let conn = backend.conn.lock();
988 conn.execute(
989 "UPDATE sessions SET content = ?1 WHERE session_key = ?2",
990 params!["goodbye world", "chat"],
991 )
992 .unwrap();
993 }
994
995 let results = backend.search(&SessionQuery {
997 keyword: Some("hello".into()),
998 limit: Some(10),
999 });
1000 assert!(results.is_empty());
1001
1002 let results = backend.search(&SessionQuery {
1004 keyword: Some("goodbye".into()),
1005 limit: Some(10),
1006 });
1007 assert_eq!(results.len(), 1);
1008 assert_eq!(results[0].key, "chat");
1009 }
1010
1011 #[test]
1012 fn cleanup_stale_removes_old_sessions() {
1013 let tmp = TempDir::new().unwrap();
1014 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1015
1016 {
1018 let conn = backend.conn.lock();
1019 let old_time = (Utc::now() - Duration::hours(100)).to_rfc3339();
1020 conn.execute(
1021 "INSERT INTO sessions (session_key, role, content, created_at) VALUES (?1, ?2, ?3, ?4)",
1022 params!["old_session", "user", "ancient", old_time],
1023 ).unwrap();
1024 conn.execute(
1025 "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count) VALUES (?1, ?2, ?3, 1)",
1026 params!["old_session", old_time, old_time],
1027 ).unwrap();
1028 }
1029
1030 backend
1031 .append("new_session", &ChatMessage::user("fresh"))
1032 .unwrap();
1033
1034 let cleaned = backend.cleanup_stale(48).unwrap(); assert_eq!(cleaned, 1);
1036
1037 let sessions = backend.list_sessions();
1038 assert_eq!(sessions.len(), 1);
1039 assert_eq!(sessions[0], "new_session");
1040 }
1041
1042 #[test]
1043 fn clear_messages_removes_rows_keeps_metadata() {
1044 let tmp = TempDir::new().unwrap();
1045 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1046
1047 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1048 backend.append("s1", &ChatMessage::assistant("hi")).unwrap();
1049 backend.set_session_name("s1", "My Session").unwrap();
1050
1051 let cleared = backend.clear_messages("s1").unwrap();
1052 assert_eq!(cleared, 2);
1053 assert!(backend.load("s1").is_empty());
1054 let meta = backend.list_sessions_with_metadata();
1056 assert_eq!(meta.len(), 1);
1057 assert_eq!(meta[0].message_count, 0);
1058 assert_eq!(meta[0].name.as_deref(), Some("My Session"));
1059 }
1060
1061 #[test]
1062 fn clear_messages_empty_returns_zero() {
1063 let tmp = TempDir::new().unwrap();
1064 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1065 assert_eq!(backend.clear_messages("nonexistent").unwrap(), 0);
1066 }
1067
1068 #[test]
1069 fn clear_messages_does_not_affect_other_sessions() {
1070 let tmp = TempDir::new().unwrap();
1071 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1072
1073 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1074 backend.append("s2", &ChatMessage::user("world")).unwrap();
1075
1076 backend.clear_messages("s1").unwrap();
1077 assert!(backend.load("s1").is_empty());
1078 assert_eq!(backend.load("s2").len(), 1);
1079 }
1080
1081 #[test]
1082 fn clear_messages_then_append_works() {
1083 let tmp = TempDir::new().unwrap();
1084 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1085
1086 backend.append("s1", &ChatMessage::user("old")).unwrap();
1087 backend.clear_messages("s1").unwrap();
1088 backend.append("s1", &ChatMessage::user("new")).unwrap();
1089
1090 let messages = backend.load("s1");
1091 assert_eq!(messages.len(), 1);
1092 assert_eq!(messages[0].content, "new");
1093 let meta = backend.list_sessions_with_metadata();
1095 assert_eq!(meta[0].message_count, 1);
1096 }
1097
1098 #[test]
1099 fn delete_session_removes_all_data() {
1100 let tmp = TempDir::new().unwrap();
1101 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1102
1103 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1104 backend.append("s1", &ChatMessage::assistant("hi")).unwrap();
1105 backend.append("s2", &ChatMessage::user("other")).unwrap();
1106
1107 assert!(backend.delete_session("s1").unwrap());
1108 assert!(backend.load("s1").is_empty());
1109 assert_eq!(backend.list_sessions().len(), 1);
1110 assert_eq!(backend.list_sessions()[0], "s2");
1111 }
1112
1113 #[test]
1114 fn delete_session_returns_false_for_missing() {
1115 let tmp = TempDir::new().unwrap();
1116 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1117 assert!(!backend.delete_session("nonexistent").unwrap());
1118 }
1119
1120 #[test]
1121 fn migrate_from_jsonl_imports_and_renames() {
1122 let tmp = TempDir::new().unwrap();
1123 let sessions_dir = tmp.path().join("sessions");
1124 std::fs::create_dir_all(&sessions_dir).unwrap();
1125
1126 let jsonl_path = sessions_dir.join("test_user.jsonl");
1128 std::fs::write(
1129 &jsonl_path,
1130 "{\"role\":\"user\",\"content\":\"hello\"}\n{\"role\":\"assistant\",\"content\":\"hi\"}\n",
1131 )
1132 .unwrap();
1133
1134 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1135 let migrated = backend.migrate_from_jsonl(tmp.path()).unwrap();
1136 assert_eq!(migrated, 1);
1137
1138 assert!(!jsonl_path.exists());
1140 assert!(sessions_dir.join("test_user.jsonl.migrated").exists());
1141
1142 let msgs = backend.load("test_user");
1144 assert_eq!(msgs.len(), 2);
1145 assert_eq!(msgs[0].content, "hello");
1146 }
1147
1148 #[test]
1149 fn set_session_name_persists() {
1150 let tmp = TempDir::new().unwrap();
1151 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1152
1153 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1154 backend.set_session_name("s1", "My Session").unwrap();
1155
1156 let meta = backend.list_sessions_with_metadata();
1157 assert_eq!(meta.len(), 1);
1158 assert_eq!(meta[0].name.as_deref(), Some("My Session"));
1159 }
1160
1161 #[test]
1162 fn set_session_name_updates_existing() {
1163 let tmp = TempDir::new().unwrap();
1164 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1165
1166 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1167 backend.set_session_name("s1", "First").unwrap();
1168 backend.set_session_name("s1", "Second").unwrap();
1169
1170 let meta = backend.list_sessions_with_metadata();
1171 assert_eq!(meta[0].name.as_deref(), Some("Second"));
1172 }
1173
1174 #[test]
1175 fn sessions_without_name_return_none() {
1176 let tmp = TempDir::new().unwrap();
1177 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1178
1179 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1180
1181 let meta = backend.list_sessions_with_metadata();
1182 assert_eq!(meta.len(), 1);
1183 assert!(meta[0].name.is_none());
1184 }
1185
1186 #[test]
1189 fn session_state_idle_to_running() {
1190 let tmp = TempDir::new().unwrap();
1191 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1192 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1193
1194 backend
1195 .set_session_state("s1", "running", Some("turn-1"))
1196 .unwrap();
1197 let state = backend.get_session_state("s1").unwrap().unwrap();
1198 assert_eq!(state.state, "running");
1199 assert_eq!(state.turn_id.as_deref(), Some("turn-1"));
1200 assert!(state.turn_started_at.is_some());
1201 }
1202
1203 #[test]
1204 fn session_state_running_to_idle() {
1205 let tmp = TempDir::new().unwrap();
1206 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1207 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1208
1209 backend
1210 .set_session_state("s1", "running", Some("turn-1"))
1211 .unwrap();
1212 backend.set_session_state("s1", "idle", None).unwrap();
1213
1214 let state = backend.get_session_state("s1").unwrap().unwrap();
1215 assert_eq!(state.state, "idle");
1216 assert!(state.turn_id.is_none());
1217 assert!(state.turn_started_at.is_none());
1218 }
1219
1220 #[test]
1221 fn session_state_running_to_error() {
1222 let tmp = TempDir::new().unwrap();
1223 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1224 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1225
1226 backend
1227 .set_session_state("s1", "running", Some("turn-1"))
1228 .unwrap();
1229 backend
1230 .set_session_state("s1", "error", Some("turn-1"))
1231 .unwrap();
1232
1233 let state = backend.get_session_state("s1").unwrap().unwrap();
1234 assert_eq!(state.state, "error");
1235 assert_eq!(state.turn_id.as_deref(), Some("turn-1"));
1236 }
1237
1238 #[test]
1239 fn list_running_sessions_returns_running_only() {
1240 let tmp = TempDir::new().unwrap();
1241 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1242
1243 backend.append("s1", &ChatMessage::user("a")).unwrap();
1244 backend.append("s2", &ChatMessage::user("b")).unwrap();
1245 backend.append("s3", &ChatMessage::user("c")).unwrap();
1246
1247 backend
1248 .set_session_state("s1", "running", Some("t1"))
1249 .unwrap();
1250 backend
1251 .set_session_state("s2", "running", Some("t2"))
1252 .unwrap();
1253 let running = backend.list_running_sessions();
1256 assert_eq!(running.len(), 2);
1257 let keys: Vec<&str> = running.iter().map(|m| m.key.as_str()).collect();
1258 assert!(keys.contains(&"s1"));
1259 assert!(keys.contains(&"s2"));
1260 }
1261
1262 #[test]
1263 fn list_stuck_sessions_detects_old_running() {
1264 let tmp = TempDir::new().unwrap();
1265 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1266 backend.append("s1", &ChatMessage::user("a")).unwrap();
1267
1268 {
1270 let conn = backend.conn.lock();
1271 let old_time = (Utc::now() - Duration::seconds(600)).to_rfc3339();
1272 conn.execute(
1273 "UPDATE session_metadata SET state = 'running', turn_id = 'old', turn_started_at = ?1 WHERE session_key = 's1'",
1274 params![old_time],
1275 ).unwrap();
1276 }
1277
1278 let stuck = backend.list_stuck_sessions(300); assert_eq!(stuck.len(), 1);
1280 assert_eq!(stuck[0].key, "s1");
1281
1282 let not_stuck = backend.list_stuck_sessions(900); assert_eq!(not_stuck.len(), 0);
1285 }
1286
1287 #[test]
1288 fn get_session_state_nonexistent() {
1289 let tmp = TempDir::new().unwrap();
1290 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1291 let state = backend.get_session_state("nonexistent").unwrap();
1292 assert!(state.is_none());
1293 }
1294
1295 #[test]
1296 fn session_state_migration_preserves_data() {
1297 let tmp = TempDir::new().unwrap();
1298 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1300 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1301
1302 drop(backend);
1304 let backend2 = SqliteSessionBackend::new(tmp.path()).unwrap();
1305 let msgs = backend2.load("s1");
1306 assert_eq!(msgs.len(), 1);
1307 assert_eq!(msgs[0].content, "hello");
1308
1309 let state = backend2.get_session_state("s1").unwrap().unwrap();
1311 assert_eq!(state.state, "idle");
1312 }
1313
1314 #[test]
1315 fn empty_name_clears_to_none() {
1316 let tmp = TempDir::new().unwrap();
1317 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1318
1319 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1320 backend.set_session_name("s1", "Named").unwrap();
1321 backend.set_session_name("s1", "").unwrap();
1322
1323 let meta = backend.list_sessions_with_metadata();
1324 assert!(meta[0].name.is_none());
1325 }
1326
1327 #[test]
1330 fn get_session_metadata_returns_full_metadata() {
1331 let tmp = TempDir::new().unwrap();
1332 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1333
1334 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1335 backend.append("s1", &ChatMessage::assistant("hi")).unwrap();
1336 backend.set_session_name("s1", "My Chat").unwrap();
1337
1338 let meta = backend.get_session_metadata("s1").unwrap();
1339 assert_eq!(meta.key, "s1");
1340 assert_eq!(meta.name.as_deref(), Some("My Chat"));
1341 assert_eq!(meta.message_count, 2);
1342 }
1343
1344 #[test]
1345 fn get_session_metadata_returns_none_for_missing() {
1346 let tmp = TempDir::new().unwrap();
1347 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1348 assert!(backend.get_session_metadata("nonexistent").is_none());
1349 }
1350
1351 #[test]
1352 fn agent_alias_roundtrips_through_metadata() {
1353 let tmp = TempDir::new().unwrap();
1354 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1355
1356 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1357 backend.set_session_agent_alias("s1", "scout").unwrap();
1358
1359 let meta = backend.get_session_metadata("s1").unwrap();
1360 assert_eq!(meta.agent_alias.as_deref(), Some("scout"));
1361
1362 let listed = backend.list_sessions_with_metadata();
1363 let row = listed.iter().find(|m| m.key == "s1").unwrap();
1364 assert_eq!(row.agent_alias.as_deref(), Some("scout"));
1365
1366 let alias = backend.get_session_agent_alias("s1").unwrap();
1368 assert_eq!(alias.as_deref(), Some("scout"));
1369 }
1370
1371 #[test]
1372 fn agent_alias_set_before_any_append_upserts_metadata() {
1373 let tmp = TempDir::new().unwrap();
1374 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1375
1376 backend.set_session_agent_alias("s1", "scout").unwrap();
1380
1381 let alias = backend.get_session_agent_alias("s1").unwrap();
1382 assert_eq!(alias.as_deref(), Some("scout"));
1383 }
1384
1385 #[test]
1386 fn session_context_roundtrips_channel_room_sender() {
1387 let tmp = TempDir::new().unwrap();
1388 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1389
1390 backend.append("s1", &ChatMessage::user("hello")).unwrap();
1391 backend
1392 .set_session_context(
1393 "s1",
1394 SessionContext {
1395 channel_id: Some("discord.clamps"),
1396 room_id: Some("1234567890"),
1397 sender_id: Some("@user:matrix"),
1398 },
1399 )
1400 .unwrap();
1401
1402 let meta = backend.get_session_metadata("s1").unwrap();
1403 assert_eq!(meta.channel_id.as_deref(), Some("discord.clamps"));
1404 assert_eq!(meta.room_id.as_deref(), Some("1234567890"));
1405 assert_eq!(meta.sender_id.as_deref(), Some("@user:matrix"));
1406
1407 backend
1410 .set_session_context(
1411 "s1",
1412 SessionContext {
1413 channel_id: None,
1414 room_id: Some("1234567890"),
1415 sender_id: None,
1416 },
1417 )
1418 .unwrap();
1419 let meta = backend.get_session_metadata("s1").unwrap();
1420 assert_eq!(meta.channel_id.as_deref(), Some("discord.clamps"));
1421 assert_eq!(meta.sender_id.as_deref(), Some("@user:matrix"));
1422 }
1423
1424 #[test]
1425 fn session_context_creates_metadata_row_before_first_append() {
1426 let tmp = TempDir::new().unwrap();
1427 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1428
1429 backend
1430 .set_session_context(
1431 "s1",
1432 SessionContext {
1433 channel_id: Some("telegram.production"),
1434 room_id: None,
1435 sender_id: Some("@alice"),
1436 },
1437 )
1438 .unwrap();
1439
1440 let meta = backend.get_session_metadata("s1").unwrap();
1441 assert_eq!(meta.channel_id.as_deref(), Some("telegram.production"));
1442 assert_eq!(meta.sender_id.as_deref(), Some("@alice"));
1443 assert!(meta.room_id.is_none());
1444 }
1445
1446 #[test]
1447 fn get_session_metadata_matches_list() {
1448 let tmp = TempDir::new().unwrap();
1449 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
1450
1451 backend.append("s1", &ChatMessage::user("a")).unwrap();
1452 backend.append("s1", &ChatMessage::user("b")).unwrap();
1453 backend.append("s2", &ChatMessage::user("c")).unwrap();
1454
1455 let single = backend.get_session_metadata("s1").unwrap();
1456 let all = backend.list_sessions_with_metadata();
1457 let from_list = all.iter().find(|m| m.key == "s1").unwrap();
1458
1459 assert_eq!(single.message_count, from_list.message_count);
1460 assert_eq!(single.name, from_list.name);
1461 assert_eq!(single.created_at, from_list.created_at);
1462 assert_eq!(single.last_activity, from_list.last_activity);
1463 }
1464}