1use crate::agent::agent::Agent;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::Mutex;
9use zeroclaw_infra::session_queue::SessionActorQueue;
10use zeroclaw_providers::ModelProvider;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
18#[serde(rename_all = "snake_case")]
19pub enum CancelCause {
20 ClientRpc,
22 AdminKill,
24 SessionRemoved,
26}
27
28impl CancelCause {
29 pub fn as_str(self) -> &'static str {
30 match self {
31 CancelCause::ClientRpc => "client_rpc",
32 CancelCause::AdminKill => "admin_kill",
33 CancelCause::SessionRemoved => "session_removed",
34 }
35 }
36}
37
38#[derive(Clone, Debug, Default, Serialize, Deserialize)]
45pub struct SessionOverrides {
46 #[serde(skip_serializing_if = "Option::is_none")]
47 pub model: Option<String>,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub model_provider: Option<String>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub temperature: Option<f64>,
52}
53
54#[derive(Clone, Debug)]
56pub struct UploadEntry {
57 pub ref_id: String,
58 pub marker: String,
59 pub workspace_path: String,
60 pub size_bytes: u64,
61}
62
63pub struct RpcSession {
64 pub agent: Arc<Mutex<Agent>>,
65 pub created_at: Instant,
66 pub last_active: Instant,
67 pub agent_alias: String,
68 pub workspace_dir: String,
69 pub overrides: SessionOverrides,
70 pub uploads: HashMap<String, UploadEntry>,
71 pub chat_mode: crate::rpc::types::ChatMode,
72 pub owner_tui_id: Option<String>,
73}
74
75impl RpcSession {
76 pub fn new(
77 agent: Agent,
78 alias: &str,
79 workspace: &str,
80 chat_mode: crate::rpc::types::ChatMode,
81 ) -> Self {
82 Self {
83 agent: Arc::new(Mutex::new(agent)),
84 created_at: Instant::now(),
85 last_active: Instant::now(),
86 agent_alias: alias.to_string(),
87 workspace_dir: workspace.to_string(),
88 overrides: SessionOverrides::default(),
89 uploads: HashMap::new(),
90 chat_mode,
91 owner_tui_id: None,
92 }
93 }
94
95 pub fn with_owner(mut self, tui_id: Option<String>) -> Self {
97 self.owner_tui_id = tui_id;
98 self
99 }
100}
101
102pub struct SessionStore {
103 sessions: Mutex<HashMap<String, RpcSession>>,
104 cancel_tokens: std::sync::Mutex<HashMap<String, (u64, tokio_util::sync::CancellationToken)>>,
105 cancel_generation: std::sync::atomic::AtomicU64,
106 cancel_causes: std::sync::Mutex<HashMap<String, CancelCause>>,
112 max_sessions: usize,
113 pub session_queue: Arc<SessionActorQueue>,
114}
115
116impl SessionStore {
117 pub fn new(max_sessions: usize, session_queue: Arc<SessionActorQueue>) -> Self {
118 Self {
119 sessions: Mutex::new(HashMap::new()),
120 cancel_tokens: std::sync::Mutex::new(HashMap::new()),
121 cancel_generation: std::sync::atomic::AtomicU64::new(0),
122 cancel_causes: std::sync::Mutex::new(HashMap::new()),
123 max_sessions,
124 session_queue,
125 }
126 }
127
128 pub async fn insert(&self, id: String, session: RpcSession) -> Result<(), &'static str> {
129 let mut sessions = self.sessions.lock().await;
130 if sessions.len() >= self.max_sessions {
131 return Err("session limit reached");
132 }
133 sessions.insert(id, session);
134 Ok(())
135 }
136
137 pub async fn get_agent(&self, id: &str) -> Option<Arc<Mutex<Agent>>> {
138 self.sessions.lock().await.get(id).map(|s| s.agent.clone())
139 }
140
141 pub async fn touch(&self, id: &str) {
142 if let Some(s) = self.sessions.lock().await.get_mut(id) {
143 s.last_active = Instant::now();
144 }
145 }
146
147 pub async fn set_overrides(
155 &self,
156 id: &str,
157 patch: SessionOverrides,
158 ) -> Option<SessionOverrides> {
159 let mut sessions = self.sessions.lock().await;
160 let session = sessions.get_mut(id)?;
161 if let Some(ref m) = patch.model {
162 session.overrides.model = Some(m.clone());
163 }
164 if let Some(ref p) = patch.model_provider {
165 session.overrides.model_provider = Some(p.clone());
166 if patch.model.is_none() {
171 session.overrides.model = None;
172 }
173 }
174 if let Some(t) = patch.temperature {
175 session.overrides.temperature = Some(t);
176 }
177 let overrides = session.overrides.clone();
179 let agent = session.agent.clone();
180 drop(sessions);
181 let mut guard = agent.lock().await;
182 if let Some(ref m) = overrides.model {
183 guard.set_model_name(m.clone());
184 }
185 if overrides.temperature.is_some() {
186 guard.set_temperature(overrides.temperature);
187 }
188 Some(overrides)
189 }
190
191 pub async fn apply_model_provider(
195 &self,
196 id: &str,
197 model_provider: Box<dyn ModelProvider>,
198 model_provider_name: String,
199 ) -> bool {
200 let agent = {
201 let sessions = self.sessions.lock().await;
202 match sessions.get(id) {
203 Some(s) => s.agent.clone(),
204 None => return false,
205 }
206 };
207 let mut guard = agent.lock().await;
208 guard.set_model_provider(model_provider);
209 guard.set_model_provider_name(model_provider_name);
210 true
211 }
212
213 pub async fn get_overrides(&self, id: &str) -> Option<SessionOverrides> {
214 self.sessions
215 .lock()
216 .await
217 .get(id)
218 .map(|s| s.overrides.clone())
219 }
220
221 pub async fn get_upload(&self, session_id: &str, ref_id: &str) -> Option<UploadEntry> {
224 self.sessions
225 .lock()
226 .await
227 .get(session_id)
228 .and_then(|s| s.uploads.get(ref_id).cloned())
229 }
230
231 pub async fn insert_upload(&self, session_id: &str, entry: UploadEntry) {
233 if let Some(s) = self.sessions.lock().await.get_mut(session_id) {
234 s.uploads.insert(entry.ref_id.clone(), entry);
235 }
236 }
237
238 pub async fn get_workspace_dir(&self, session_id: &str) -> Option<String> {
240 self.sessions
241 .lock()
242 .await
243 .get(session_id)
244 .map(|s| s.workspace_dir.clone())
245 }
246
247 pub async fn get_agent_alias(&self, session_id: &str) -> Option<String> {
252 self.sessions
253 .lock()
254 .await
255 .get(session_id)
256 .map(|s| s.agent_alias.clone())
257 }
258
259 pub async fn seed_history(&self, id: &str, msgs: &[zeroclaw_api::model_provider::ChatMessage]) {
260 if let Some(s) = self.sessions.lock().await.get(id) {
261 s.agent.lock().await.seed_history(msgs);
262 }
263 }
264
265 pub async fn seed_conversation_history(
266 &self,
267 id: &str,
268 msgs: Vec<zeroclaw_api::model_provider::ConversationMessage>,
269 ) {
270 if let Some(s) = self.sessions.lock().await.get(id) {
271 s.agent.lock().await.seed_conversation_history(msgs);
272 }
273 }
274
275 pub async fn chat_mode(&self, id: &str) -> Option<crate::rpc::types::ChatMode> {
276 self.sessions
277 .lock()
278 .await
279 .get(id)
280 .map(|s| s.chat_mode.clone())
281 }
282
283 pub async fn history_len(&self, id: &str) -> Option<usize> {
284 let sessions = self.sessions.lock().await;
285 let s = sessions.get(id)?;
286 Some(s.agent.lock().await.history().len())
287 }
288
289 pub async fn history_slice_from(
290 &self,
291 id: &str,
292 from: usize,
293 ) -> Option<Vec<zeroclaw_api::model_provider::ConversationMessage>> {
294 let sessions = self.sessions.lock().await;
295 let s = sessions.get(id)?;
296 let h = s.agent.lock().await;
297 let history = h.history();
299 Some(history[from.min(history.len())..].to_vec())
300 }
301
302 pub async fn remove(&self, id: &str) -> bool {
303 if let Some((_, token)) = self
304 .cancel_tokens
305 .lock()
306 .unwrap_or_else(|e| e.into_inner())
307 .remove(id)
308 {
309 self.record_cancel_cause(id, CancelCause::SessionRemoved);
310 token.cancel();
311 }
312 self.sessions.lock().await.remove(id).is_some()
313 }
314
315 pub async fn evict_same_mode_sibling(
333 &self,
334 tui_id: &str,
335 chat_mode: &crate::rpc::types::ChatMode,
336 except_id: &str,
337 ) -> Vec<(String, String)> {
338 let in_flight: std::collections::HashSet<String> = self
339 .cancel_tokens
340 .lock()
341 .unwrap_or_else(|e| e.into_inner())
342 .keys()
343 .cloned()
344 .collect();
345 let mut sessions = self.sessions.lock().await;
346 let victims: Vec<String> = sessions
347 .iter()
348 .filter(|(key, s)| {
349 key.as_str() != except_id
350 && s.owner_tui_id.as_deref() == Some(tui_id)
351 && &s.chat_mode == chat_mode
352 && !in_flight.contains(key.as_str())
353 })
354 .map(|(key, _)| key.clone())
355 .collect();
356 let mut evicted = Vec::with_capacity(victims.len());
357 for key in victims {
358 if let Some(s) = sessions.remove(&key) {
359 evicted.push((key, s.agent_alias));
360 }
361 }
362 evicted
363 }
364
365 pub async fn session_owner_tui_id(&self, session_id: &str) -> Option<Option<String>> {
369 let sessions = self.sessions.lock().await;
370 sessions.get(session_id).map(|s| s.owner_tui_id.clone())
371 }
372
373 pub async fn list_ids(&self) -> Vec<String> {
374 self.sessions.lock().await.keys().cloned().collect()
375 }
376
377 pub fn register_cancel_token(
378 &self,
379 id: &str,
380 token: tokio_util::sync::CancellationToken,
381 ) -> u64 {
382 let generation = self
383 .cancel_generation
384 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
385 .wrapping_add(1);
386 if let Some((_, stale)) = self
387 .cancel_tokens
388 .lock()
389 .unwrap_or_else(|e| e.into_inner())
390 .insert(id.to_string(), (generation, token))
391 {
392 stale.cancel();
393 }
394 generation
395 }
396
397 pub fn remove_cancel_token(&self, id: &str, generation: u64) {
398 {
399 let mut tokens = self.cancel_tokens.lock().unwrap_or_else(|e| e.into_inner());
400 match tokens.get(id) {
401 Some((g, _)) if *g == generation => {
402 tokens.remove(id);
403 }
404 _ => return,
405 }
406 }
407 self.cancel_causes
408 .lock()
409 .unwrap_or_else(|e| e.into_inner())
410 .remove(id);
411 }
412
413 pub fn cancel_session(&self, id: &str) -> bool {
414 self.record_cancel_cause(id, CancelCause::ClientRpc);
415 self.cancel_tokens
416 .lock()
417 .unwrap_or_else(|e| e.into_inner())
418 .get(id)
419 .map(|(_, t)| {
420 t.cancel();
421 true
422 })
423 .unwrap_or(false)
424 }
425
426 pub fn has_inflight_turn(&self, id: &str) -> bool {
428 self.cancel_tokens
429 .lock()
430 .unwrap_or_else(|e| e.into_inner())
431 .contains_key(id)
432 }
433
434 pub async fn kill_session(&self, id: &str) -> bool {
440 if let Some((_, token)) = self
441 .cancel_tokens
442 .lock()
443 .unwrap_or_else(|e| e.into_inner())
444 .remove(id)
445 {
446 self.record_cancel_cause(id, CancelCause::AdminKill);
447 token.cancel();
448 }
449 self.sessions.lock().await.remove(id).is_some()
450 }
451
452 pub fn record_cancel_cause(&self, id: &str, cause: CancelCause) {
455 self.cancel_causes
456 .lock()
457 .unwrap_or_else(|e| e.into_inner())
458 .insert(id.to_string(), cause);
459 }
460
461 pub fn take_cancel_cause(&self, id: &str) -> Option<CancelCause> {
466 self.cancel_causes
467 .lock()
468 .unwrap_or_else(|e| e.into_inner())
469 .remove(id)
470 }
471
472 pub async fn count(&self) -> usize {
473 self.sessions.lock().await.len()
474 }
475
476 pub async fn count_by_agent(&self) -> HashMap<String, usize> {
478 let sessions = self.sessions.lock().await;
479 let mut counts: HashMap<String, usize> = HashMap::new();
480 for session in sessions.values() {
481 *counts.entry(session.agent_alias.clone()).or_insert(0) += 1;
482 }
483 counts
484 }
485}
486
487#[cfg(test)]
488mod tests {
489 use super::*;
490
491 fn make_store(max: usize) -> SessionStore {
492 SessionStore::new(max, Arc::new(SessionActorQueue::new(4, 10, 60)))
493 }
494
495 fn make_agent() -> Agent {
496 use crate::agent::dispatcher::NativeToolDispatcher;
497 use crate::observability::NoopObserver;
498
499 let mem_cfg = zeroclaw_config::schema::MemoryConfig {
500 backend: "none".into(),
501 ..zeroclaw_config::schema::MemoryConfig::default()
502 };
503 let mem = Arc::from(
504 zeroclaw_memory::create_memory(&mem_cfg, &std::env::temp_dir(), None).unwrap(),
505 );
506
507 Agent::builder()
508 .model_provider(Box::new(StubProvider))
509 .tools(vec![])
510 .memory(mem)
511 .observer(Arc::new(NoopObserver {}) as Arc<dyn crate::observability::Observer>)
512 .tool_dispatcher(Box::new(NativeToolDispatcher))
513 .workspace_dir(std::env::temp_dir())
514 .build()
515 .unwrap()
516 }
517
518 struct StubProvider;
520
521 #[async_trait::async_trait]
522 impl zeroclaw_providers::ModelProvider for StubProvider {
523 async fn chat_with_system(
524 &self,
525 _: Option<&str>,
526 _: &str,
527 _: &str,
528 _: Option<f64>,
529 ) -> anyhow::Result<String> {
530 Ok(String::new())
531 }
532 async fn chat(
533 &self,
534 _: zeroclaw_providers::ChatRequest<'_>,
535 _: &str,
536 _: Option<f64>,
537 ) -> anyhow::Result<zeroclaw_providers::ChatResponse> {
538 Ok(zeroclaw_providers::ChatResponse {
539 text: Some("stub".into()),
540 tool_calls: vec![],
541 usage: None,
542 reasoning_content: None,
543 })
544 }
545 }
546 impl zeroclaw_api::attribution::Attributable for StubProvider {
547 fn role(&self) -> zeroclaw_api::attribution::Role {
548 zeroclaw_api::attribution::Role::Provider(
549 zeroclaw_api::attribution::ProviderKind::Model(
550 zeroclaw_api::attribution::ModelProviderKind::Custom,
551 ),
552 )
553 }
554 fn alias(&self) -> &str {
555 "stub"
556 }
557 }
558
559 #[tokio::test]
560 async fn insert_and_count() {
561 let store = make_store(4);
562 assert_eq!(store.count().await, 0);
563
564 store
565 .insert(
566 "s1".into(),
567 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
568 )
569 .await
570 .unwrap();
571 assert_eq!(store.count().await, 1);
572 }
573
574 #[tokio::test]
575 async fn insert_rejects_over_limit() {
576 let store = make_store(1);
577 store
578 .insert(
579 "s1".into(),
580 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
581 )
582 .await
583 .unwrap();
584 let err = store
585 .insert(
586 "s2".into(),
587 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
588 )
589 .await;
590 assert!(err.is_err());
591 }
592
593 #[tokio::test]
594 async fn get_agent_returns_arc() {
595 let store = make_store(4);
596 store
597 .insert(
598 "s1".into(),
599 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
600 )
601 .await
602 .unwrap();
603 assert!(store.get_agent("s1").await.is_some());
604 assert!(store.get_agent("nonexistent").await.is_none());
605 }
606
607 #[tokio::test]
608 async fn set_overrides_applies_model_and_temperature_live() {
609 let store = make_store(4);
610 store
611 .insert(
612 "s1".into(),
613 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
614 )
615 .await
616 .unwrap();
617
618 let merged = store
619 .set_overrides(
620 "s1",
621 SessionOverrides {
622 model: Some("model-x".into()),
623 temperature: Some(0.42),
624 ..Default::default()
625 },
626 )
627 .await
628 .expect("session exists");
629 assert_eq!(merged.model.as_deref(), Some("model-x"));
630 assert_eq!(merged.temperature, Some(0.42));
631
632 let agent = store.get_agent("s1").await.unwrap();
634 let (_, _, model_name) = agent.lock().await.attribution_fields();
635 assert_eq!(model_name, "model-x");
636 }
637
638 #[tokio::test]
639 async fn set_overrides_records_model_provider_without_rebuilding() {
640 let store = make_store(4);
644 store
645 .insert(
646 "s1".into(),
647 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
648 )
649 .await
650 .unwrap();
651
652 let merged = store
653 .set_overrides(
654 "s1",
655 SessionOverrides {
656 model_provider: Some("anthropic.default".into()),
657 ..Default::default()
658 },
659 )
660 .await
661 .expect("session exists");
662 assert_eq!(merged.model_provider.as_deref(), Some("anthropic.default"));
663 }
664
665 #[tokio::test]
666 async fn provider_switch_without_model_clears_prior_model() {
667 let store = make_store(4);
671 store
672 .insert(
673 "s1".into(),
674 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
675 )
676 .await
677 .unwrap();
678 store
679 .set_overrides(
680 "s1",
681 SessionOverrides {
682 model: Some("claude-opus-4-5".into()),
683 ..Default::default()
684 },
685 )
686 .await
687 .expect("session exists");
688 let merged = store
689 .set_overrides(
690 "s1",
691 SessionOverrides {
692 model_provider: Some("ollama.default".into()),
693 ..Default::default()
694 },
695 )
696 .await
697 .expect("session exists");
698 assert_eq!(merged.model_provider.as_deref(), Some("ollama.default"));
699 assert_eq!(
700 merged.model, None,
701 "a provider-only switch must clear the lingering model override"
702 );
703 }
704
705 #[tokio::test]
706 async fn set_overrides_missing_session_is_none() {
707 let store = make_store(4);
708 assert!(
709 store
710 .set_overrides("ghost", SessionOverrides::default())
711 .await
712 .is_none()
713 );
714 }
715
716 #[tokio::test]
717 async fn remove_cleans_up() {
718 let store = make_store(4);
719 store
720 .insert(
721 "s1".into(),
722 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
723 )
724 .await
725 .unwrap();
726
727 let token = tokio_util::sync::CancellationToken::new();
728 store.register_cancel_token("s1", token.clone());
729
730 assert!(store.remove("s1").await);
731 assert_eq!(store.count().await, 0);
732 assert!(!store.cancel_session("s1"));
734 }
735
736 #[tokio::test]
737 async fn remove_nonexistent_returns_false() {
738 let store = make_store(4);
739 assert!(!store.remove("ghost").await);
740 }
741
742 #[tokio::test]
743 async fn evict_same_mode_sibling_drops_only_same_mode_owner() {
744 use crate::rpc::types::ChatMode;
745 let store = make_store(8);
746 let mk = |mode: ChatMode, owner: &str| {
747 RpcSession::new(make_agent(), "a", ".", mode).with_owner(Some(owner.to_string()))
748 };
749 store
750 .insert("old_chat".into(), mk(ChatMode::Chat, "tui1"))
751 .await
752 .unwrap();
753 store
754 .insert("old_code".into(), mk(ChatMode::Acp, "tui1"))
755 .await
756 .unwrap();
757 store
758 .insert("other_chat".into(), mk(ChatMode::Chat, "tui2"))
759 .await
760 .unwrap();
761 store
762 .insert("new_chat".into(), mk(ChatMode::Chat, "tui1"))
763 .await
764 .unwrap();
765
766 let evicted = store
767 .evict_same_mode_sibling("tui1", &ChatMode::Chat, "new_chat")
768 .await;
769
770 let ids: Vec<&str> = evicted.iter().map(|(id, _)| id.as_str()).collect();
771 assert_eq!(ids, vec!["old_chat"]);
772 assert!(
773 store.get_agent("new_chat").await.is_some(),
774 "new session preserved"
775 );
776 assert!(
777 store.get_agent("old_code").await.is_some(),
778 "cross-mode Code session preserved"
779 );
780 assert!(
781 store.get_agent("other_chat").await.is_some(),
782 "other TUI session preserved"
783 );
784 assert!(
785 store.get_agent("old_chat").await.is_none(),
786 "abandoned same-mode session evicted"
787 );
788 }
789
790 #[tokio::test]
791 async fn evict_same_mode_sibling_skips_in_flight_turn() {
792 use crate::rpc::types::ChatMode;
793 let store = make_store(8);
794 let mk = |mode: ChatMode, owner: &str| {
795 RpcSession::new(make_agent(), "a", ".", mode).with_owner(Some(owner.to_string()))
796 };
797 store
798 .insert("busy_chat".into(), mk(ChatMode::Chat, "tui1"))
799 .await
800 .unwrap();
801 store
802 .insert("new_chat".into(), mk(ChatMode::Chat, "tui1"))
803 .await
804 .unwrap();
805 let token = tokio_util::sync::CancellationToken::new();
809 store.register_cancel_token("busy_chat", token.clone());
810
811 let evicted = store
812 .evict_same_mode_sibling("tui1", &ChatMode::Chat, "new_chat")
813 .await;
814
815 assert!(
816 evicted.is_empty(),
817 "in-flight same-mode session must be left to finish its turn"
818 );
819 assert!(
820 store.get_agent("busy_chat").await.is_some(),
821 "mid-turn session preserved"
822 );
823 assert!(
824 !token.is_cancelled(),
825 "eviction must not fire a mid-turn cancel token"
826 );
827 }
828
829 #[tokio::test]
830 async fn cancel_token_lifecycle() {
831 let store = make_store(4);
832 let token = tokio_util::sync::CancellationToken::new();
833 let generation = store.register_cancel_token("s1", token.clone());
834
835 assert!(!token.is_cancelled());
836 assert!(store.cancel_session("s1"));
837 assert!(token.is_cancelled());
838
839 store.remove_cancel_token("s1", generation);
841 assert!(!store.cancel_session("s1"));
842 }
843
844 #[tokio::test]
845 async fn reregister_force_cancels_prior_turn() {
846 let store = make_store(4);
847 let old = tokio_util::sync::CancellationToken::new();
848 let old_gen = store.register_cancel_token("s", old.clone());
849
850 let new = tokio_util::sync::CancellationToken::new();
851 let new_gen = store.register_cancel_token("s", new.clone());
852
853 assert!(old.is_cancelled(), "re-register must kill the prior turn");
854 assert!(!new.is_cancelled());
855 assert_ne!(old_gen, new_gen);
856
857 store.remove_cancel_token("s", old_gen);
858 assert!(
859 store.cancel_session("s"),
860 "stale-generation remove must not orphan the live turn's token"
861 );
862 assert!(new.is_cancelled());
863 }
864
865 #[tokio::test]
866 async fn stale_remove_is_a_noop() {
867 let store = make_store(4);
868 let token = tokio_util::sync::CancellationToken::new();
869 let generation = store.register_cancel_token("s", token.clone());
870 store.remove_cancel_token("s", generation.wrapping_sub(1));
871 assert!(
872 store.cancel_session("s"),
873 "a remove with a non-matching generation must leave the token intact"
874 );
875 }
876
877 #[tokio::test]
878 async fn cancel_nonexistent_returns_false() {
879 let store = make_store(4);
880 assert!(!store.cancel_session("nope"));
881 }
882
883 #[tokio::test]
884 async fn list_ids() {
885 let store = make_store(4);
886 store
887 .insert(
888 "b".into(),
889 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
890 )
891 .await
892 .unwrap();
893 store
894 .insert(
895 "a".into(),
896 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
897 )
898 .await
899 .unwrap();
900 let mut ids = store.list_ids().await;
901 ids.sort();
902 assert_eq!(ids, vec!["a", "b"]);
903 }
904
905 #[tokio::test]
906 async fn touch_updates_last_active() {
907 let store = make_store(4);
908 store
909 .insert(
910 "s1".into(),
911 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
912 )
913 .await
914 .unwrap();
915
916 let before = { store.sessions.lock().await.get("s1").unwrap().last_active };
917 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
918 store.touch("s1").await;
919 let after = { store.sessions.lock().await.get("s1").unwrap().last_active };
920 assert!(after > before);
921 }
922
923 #[tokio::test]
926 async fn session_persists_after_transport_disconnect() {
927 let store = make_store(4);
928 store
929 .insert(
930 "s1".into(),
931 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat)
932 .with_owner(Some("tui-x".to_string())),
933 )
934 .await
935 .unwrap();
936 assert_eq!(
938 store.count().await,
939 1,
940 "session must survive transport disconnect"
941 );
942 }
943
944 #[tokio::test]
946 async fn kill_session_cancels_inflight_and_removes() {
947 let store = make_store(4);
948 store
949 .insert(
950 "live".into(),
951 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
952 )
953 .await
954 .unwrap();
955 let token = tokio_util::sync::CancellationToken::new();
956 store.register_cancel_token("live", token.clone());
957
958 let removed = store.kill_session("live").await;
959 assert!(removed, "kill_session must return true for a real session");
960 assert!(
961 token.is_cancelled(),
962 "kill_session must fire the cancel token"
963 );
964 assert_eq!(store.count().await, 0, "session must be removed");
965 }
966
967 #[tokio::test]
969 async fn kill_session_idle_session_removed() {
970 let store = make_store(4);
971 store
972 .insert(
973 "cold".into(),
974 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
975 )
976 .await
977 .unwrap();
978 let removed = store.kill_session("cold").await;
980 assert!(removed);
981 assert_eq!(store.count().await, 0);
982 }
983
984 #[tokio::test]
986 async fn kill_session_missing_returns_false() {
987 let store = make_store(4);
988 assert!(!store.kill_session("ghost").await);
989 }
990
991 #[tokio::test]
994 async fn kill_session_cause_is_admin_kill() {
995 let store = make_store(4);
996 store
997 .insert(
998 "s".into(),
999 RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
1000 )
1001 .await
1002 .unwrap();
1003 let token = tokio_util::sync::CancellationToken::new();
1004 store.register_cancel_token("s", token.clone());
1005
1006 store.kill_session("s").await;
1007 assert_eq!(
1009 store.take_cancel_cause("s"),
1010 Some(CancelCause::AdminKill),
1011 "kill_session must preserve AdminKill cause for verdict-site attribution"
1012 );
1013 }
1014}