Skip to main content

zeroclaw_runtime/rpc/
session.rs

1//! RPC session state.
2
3use crate::agent::agent::Agent;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::Mutex;
9use zeroclaw_infra::session_queue::SessionActorQueue;
10use zeroclaw_providers::ModelProvider;
11
12/// Why a session's in-flight turn cancel token was fired. Recorded at the
13/// firing site and drained at the turn-verdict site so the durable audit row
14/// names the trigger instead of leaving a bare "cancelled" with no provenance.
15/// Each variant is a distinct, named path — there is deliberately no catch-all
16/// "unknown": a fired token must be attributable.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
18#[serde(rename_all = "snake_case")]
19pub enum CancelCause {
20    /// Explicit `session/cancel` RPC from the client (e.g. zerocode Ctrl+D).
21    ClientRpc,
22    /// Explicit `session/kill` from the dashboard or admin RPC.
23    AdminKill,
24    /// The session was explicitly removed/torn down while a turn was live.
25    SessionRemoved,
26}
27
28impl CancelCause {
29    pub fn as_str(self) -> &'static str {
30        match self {
31            CancelCause::ClientRpc => "client_rpc",
32            CancelCause::AdminKill => "admin_kill",
33            CancelCause::SessionRemoved => "session_removed",
34        }
35    }
36}
37
38/// Per-session runtime overrides. All fields are optional — `None` means
39/// "use config default". Overrides are session-scoped, do not persist,
40/// and evaporate when the session ends.
41///
42/// `reasoning_effort` is deferred — it requires `ModelProvider` trait
43/// changes to support mutation after construction.
44#[derive(Clone, Debug, Default, Serialize, Deserialize)]
45pub struct SessionOverrides {
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub model: Option<String>,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub model_provider: Option<String>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub temperature: Option<f64>,
52}
53
54/// An entry in the per-session upload index (content-addressed by SHA-256).
55#[derive(Clone, Debug)]
56pub struct UploadEntry {
57    pub ref_id: String,
58    pub marker: String,
59    pub workspace_path: String,
60    pub size_bytes: u64,
61}
62
63pub struct RpcSession {
64    pub agent: Arc<Mutex<Agent>>,
65    pub created_at: Instant,
66    pub last_active: Instant,
67    pub agent_alias: String,
68    pub workspace_dir: String,
69    pub overrides: SessionOverrides,
70    pub uploads: HashMap<String, UploadEntry>,
71    pub chat_mode: crate::rpc::types::ChatMode,
72    pub owner_tui_id: Option<String>,
73}
74
75impl RpcSession {
76    pub fn new(
77        agent: Agent,
78        alias: &str,
79        workspace: &str,
80        chat_mode: crate::rpc::types::ChatMode,
81    ) -> Self {
82        Self {
83            agent: Arc::new(Mutex::new(agent)),
84            created_at: Instant::now(),
85            last_active: Instant::now(),
86            agent_alias: alias.to_string(),
87            workspace_dir: workspace.to_string(),
88            overrides: SessionOverrides::default(),
89            uploads: HashMap::new(),
90            chat_mode,
91            owner_tui_id: None,
92        }
93    }
94
95    /// Bind this session to a TUI owner.
96    pub fn with_owner(mut self, tui_id: Option<String>) -> Self {
97        self.owner_tui_id = tui_id;
98        self
99    }
100}
101
102pub struct SessionStore {
103    sessions: Mutex<HashMap<String, RpcSession>>,
104    cancel_tokens: std::sync::Mutex<HashMap<String, (u64, tokio_util::sync::CancellationToken)>>,
105    cancel_generation: std::sync::atomic::AtomicU64,
106    /// Records WHY each session's cancel token was fired. Populated at the
107    /// firing site immediately before `token.cancel()`; drained by the
108    /// turn-verdict site. Every known firing site records before firing; a
109    /// fired token with no entry means a new path was added without wiring
110    /// the cause — treat it as a bug, not as user attribution.
111    cancel_causes: std::sync::Mutex<HashMap<String, CancelCause>>,
112    max_sessions: usize,
113    pub session_queue: Arc<SessionActorQueue>,
114}
115
116impl SessionStore {
117    pub fn new(max_sessions: usize, session_queue: Arc<SessionActorQueue>) -> Self {
118        Self {
119            sessions: Mutex::new(HashMap::new()),
120            cancel_tokens: std::sync::Mutex::new(HashMap::new()),
121            cancel_generation: std::sync::atomic::AtomicU64::new(0),
122            cancel_causes: std::sync::Mutex::new(HashMap::new()),
123            max_sessions,
124            session_queue,
125        }
126    }
127
128    pub async fn insert(&self, id: String, session: RpcSession) -> Result<(), &'static str> {
129        let mut sessions = self.sessions.lock().await;
130        if sessions.len() >= self.max_sessions {
131            return Err("session limit reached");
132        }
133        sessions.insert(id, session);
134        Ok(())
135    }
136
137    pub async fn get_agent(&self, id: &str) -> Option<Arc<Mutex<Agent>>> {
138        self.sessions.lock().await.get(id).map(|s| s.agent.clone())
139    }
140
141    pub async fn touch(&self, id: &str) {
142        if let Some(s) = self.sessions.lock().await.get_mut(id) {
143            s.last_active = Instant::now();
144        }
145    }
146
147    /// Apply overrides to the session and immediately mutate the agent.
148    /// Returns the merged overrides for confirmation.
149    ///
150    /// Note: `model_provider` is recorded here but the live provider swap is
151    /// driven by the dispatcher via [`Self::apply_model_provider`], because
152    /// rebuilding the `ModelProvider` box needs `Config` access that the
153    /// session store deliberately does not hold.
154    pub async fn set_overrides(
155        &self,
156        id: &str,
157        patch: SessionOverrides,
158    ) -> Option<SessionOverrides> {
159        let mut sessions = self.sessions.lock().await;
160        let session = sessions.get_mut(id)?;
161        if let Some(ref m) = patch.model {
162            session.overrides.model = Some(m.clone());
163        }
164        if let Some(ref p) = patch.model_provider {
165            session.overrides.model_provider = Some(p.clone());
166            // A provider switch without an explicit model must not carry the
167            // previous provider's model forward (e.g. switching to an Ollama
168            // alias while a Claude model override lingers). Clear it so the
169            // dispatcher resolves the new alias's configured model.
170            if patch.model.is_none() {
171                session.overrides.model = None;
172            }
173        }
174        if let Some(t) = patch.temperature {
175            session.overrides.temperature = Some(t);
176        }
177        // Apply to agent immediately.
178        let overrides = session.overrides.clone();
179        let agent = session.agent.clone();
180        drop(sessions);
181        let mut guard = agent.lock().await;
182        if let Some(ref m) = overrides.model {
183            guard.set_model_name(m.clone());
184        }
185        if overrides.temperature.is_some() {
186            guard.set_temperature(overrides.temperature);
187        }
188        Some(overrides)
189    }
190
191    /// Swap a freshly built `ModelProvider` box (and its name) onto the
192    /// session's agent. Called by the dispatcher after it constructs the
193    /// box from config, keeping model_provider-build logic out of the store.
194    pub async fn apply_model_provider(
195        &self,
196        id: &str,
197        model_provider: Box<dyn ModelProvider>,
198        model_provider_name: String,
199    ) -> bool {
200        let agent = {
201            let sessions = self.sessions.lock().await;
202            match sessions.get(id) {
203                Some(s) => s.agent.clone(),
204                None => return false,
205            }
206        };
207        let mut guard = agent.lock().await;
208        guard.set_model_provider(model_provider);
209        guard.set_model_provider_name(model_provider_name);
210        true
211    }
212
213    pub async fn get_overrides(&self, id: &str) -> Option<SessionOverrides> {
214        self.sessions
215            .lock()
216            .await
217            .get(id)
218            .map(|s| s.overrides.clone())
219    }
220
221    /// Look up an existing upload by ref_id. Returns `None` if the session
222    /// or entry doesn't exist.
223    pub async fn get_upload(&self, session_id: &str, ref_id: &str) -> Option<UploadEntry> {
224        self.sessions
225            .lock()
226            .await
227            .get(session_id)
228            .and_then(|s| s.uploads.get(ref_id).cloned())
229    }
230
231    /// Insert (or overwrite) an upload entry in the session's index.
232    pub async fn insert_upload(&self, session_id: &str, entry: UploadEntry) {
233        if let Some(s) = self.sessions.lock().await.get_mut(session_id) {
234            s.uploads.insert(entry.ref_id.clone(), entry);
235        }
236    }
237
238    /// Get the workspace directory for a session.
239    pub async fn get_workspace_dir(&self, session_id: &str) -> Option<String> {
240        self.sessions
241            .lock()
242            .await
243            .get(session_id)
244            .map(|s| s.workspace_dir.clone())
245    }
246
247    /// Get the agent alias bound to a session, if known. Used by the
248    /// dispatcher to route uploads to the agent's own workspace dir
249    /// rather than to the user's session cwd (which is often a git
250    /// repo we shouldn't be writing into).
251    pub async fn get_agent_alias(&self, session_id: &str) -> Option<String> {
252        self.sessions
253            .lock()
254            .await
255            .get(session_id)
256            .map(|s| s.agent_alias.clone())
257    }
258
259    pub async fn seed_history(&self, id: &str, msgs: &[zeroclaw_api::model_provider::ChatMessage]) {
260        if let Some(s) = self.sessions.lock().await.get(id) {
261            s.agent.lock().await.seed_history(msgs);
262        }
263    }
264
265    pub async fn seed_conversation_history(
266        &self,
267        id: &str,
268        msgs: Vec<zeroclaw_api::model_provider::ConversationMessage>,
269    ) {
270        if let Some(s) = self.sessions.lock().await.get(id) {
271            s.agent.lock().await.seed_conversation_history(msgs);
272        }
273    }
274
275    pub async fn chat_mode(&self, id: &str) -> Option<crate::rpc::types::ChatMode> {
276        self.sessions
277            .lock()
278            .await
279            .get(id)
280            .map(|s| s.chat_mode.clone())
281    }
282
283    pub async fn history_len(&self, id: &str) -> Option<usize> {
284        let sessions = self.sessions.lock().await;
285        let s = sessions.get(id)?;
286        Some(s.agent.lock().await.history().len())
287    }
288
289    pub async fn history_slice_from(
290        &self,
291        id: &str,
292        from: usize,
293    ) -> Option<Vec<zeroclaw_api::model_provider::ConversationMessage>> {
294        let sessions = self.sessions.lock().await;
295        let s = sessions.get(id)?;
296        let h = s.agent.lock().await;
297        // Saturate: `trim_history` can shift indices past `from` between polls.
298        let history = h.history();
299        Some(history[from.min(history.len())..].to_vec())
300    }
301
302    pub async fn remove(&self, id: &str) -> bool {
303        if let Some((_, token)) = self
304            .cancel_tokens
305            .lock()
306            .unwrap_or_else(|e| e.into_inner())
307            .remove(id)
308        {
309            self.record_cancel_cause(id, CancelCause::SessionRemoved);
310            token.cancel();
311        }
312        self.sessions.lock().await.remove(id).is_some()
313    }
314
315    /// Drop every *idle* session owned by `tui_id` in the same `chat_mode` as a
316    /// freshly created session, except `except_id` itself. zerocode keeps one
317    /// active session per mode per TUI: creating or loading another session of
318    /// that mode abandons the prior one until it is explicitly reloaded, so the
319    /// prior agent and its history are dead weight in RSS. Chat and Code
320    /// sessions are orthogonal, so a Chat switch must never evict the live Code
321    /// session and vice versa.
322    ///
323    /// A session with a registered cancel token has a turn in flight: a spawned
324    /// `session/prompt` task still holds an `Arc<Mutex<Agent>>` clone, so
325    /// removing the map's strong ref would neither free the agent nor be safe to
326    /// trim against, and force-cancelling another TUI's mid-turn work is exactly
327    /// the freeze the reaper guards against. Such sessions are skipped; they
328    /// finish their turn and are reclaimed later. Returns the
329    /// `(session_key, agent_alias)` of each session actually dropped, so the
330    /// caller can attribute the eviction and knows the agents are freed before
331    /// it trims.
332    pub async fn evict_same_mode_sibling(
333        &self,
334        tui_id: &str,
335        chat_mode: &crate::rpc::types::ChatMode,
336        except_id: &str,
337    ) -> Vec<(String, String)> {
338        let in_flight: std::collections::HashSet<String> = self
339            .cancel_tokens
340            .lock()
341            .unwrap_or_else(|e| e.into_inner())
342            .keys()
343            .cloned()
344            .collect();
345        let mut sessions = self.sessions.lock().await;
346        let victims: Vec<String> = sessions
347            .iter()
348            .filter(|(key, s)| {
349                key.as_str() != except_id
350                    && s.owner_tui_id.as_deref() == Some(tui_id)
351                    && &s.chat_mode == chat_mode
352                    && !in_flight.contains(key.as_str())
353            })
354            .map(|(key, _)| key.clone())
355            .collect();
356        let mut evicted = Vec::with_capacity(victims.len());
357        for key in victims {
358            if let Some(s) = sessions.remove(&key) {
359                evicted.push((key, s.agent_alias));
360            }
361        }
362        evicted
363    }
364
365    /// Read the `owner_tui_id` stamp from a session. Returns `None` if the
366    /// session doesn't exist, `Some(None)` if it exists but is unowned (e.g.
367    /// created by an anonymous connection), `Some(Some(id))` if owned by `id`.
368    pub async fn session_owner_tui_id(&self, session_id: &str) -> Option<Option<String>> {
369        let sessions = self.sessions.lock().await;
370        sessions.get(session_id).map(|s| s.owner_tui_id.clone())
371    }
372
373    pub async fn list_ids(&self) -> Vec<String> {
374        self.sessions.lock().await.keys().cloned().collect()
375    }
376
377    pub fn register_cancel_token(
378        &self,
379        id: &str,
380        token: tokio_util::sync::CancellationToken,
381    ) -> u64 {
382        let generation = self
383            .cancel_generation
384            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
385            .wrapping_add(1);
386        if let Some((_, stale)) = self
387            .cancel_tokens
388            .lock()
389            .unwrap_or_else(|e| e.into_inner())
390            .insert(id.to_string(), (generation, token))
391        {
392            stale.cancel();
393        }
394        generation
395    }
396
397    pub fn remove_cancel_token(&self, id: &str, generation: u64) {
398        {
399            let mut tokens = self.cancel_tokens.lock().unwrap_or_else(|e| e.into_inner());
400            match tokens.get(id) {
401                Some((g, _)) if *g == generation => {
402                    tokens.remove(id);
403                }
404                _ => return,
405            }
406        }
407        self.cancel_causes
408            .lock()
409            .unwrap_or_else(|e| e.into_inner())
410            .remove(id);
411    }
412
413    pub fn cancel_session(&self, id: &str) -> bool {
414        self.record_cancel_cause(id, CancelCause::ClientRpc);
415        self.cancel_tokens
416            .lock()
417            .unwrap_or_else(|e| e.into_inner())
418            .get(id)
419            .map(|(_, t)| {
420                t.cancel();
421                true
422            })
423            .unwrap_or(false)
424    }
425
426    /// Returns true if a cancel token is registered — i.e. a turn is in flight.
427    pub fn has_inflight_turn(&self, id: &str) -> bool {
428        self.cancel_tokens
429            .lock()
430            .unwrap_or_else(|e| e.into_inner())
431            .contains_key(id)
432    }
433
434    /// Force-terminate a session: if a turn is in flight, record `AdminKill`
435    /// and fire the cancel token so the verdict site can attribute the cause;
436    /// then remove the session from the store.
437    /// Returns `true` if the session existed and was removed, `false` if not found.
438    /// History on disk is NOT touched — this is an in-memory eviction only.
439    pub async fn kill_session(&self, id: &str) -> bool {
440        if let Some((_, token)) = self
441            .cancel_tokens
442            .lock()
443            .unwrap_or_else(|e| e.into_inner())
444            .remove(id)
445        {
446            self.record_cancel_cause(id, CancelCause::AdminKill);
447            token.cancel();
448        }
449        self.sessions.lock().await.remove(id).is_some()
450    }
451
452    /// Record the cause for an imminent cancel-token fire. Call immediately
453    /// before firing so the verdict site can attribute the cancel.
454    pub fn record_cancel_cause(&self, id: &str, cause: CancelCause) {
455        self.cancel_causes
456            .lock()
457            .unwrap_or_else(|e| e.into_inner())
458            .insert(id.to_string(), cause);
459    }
460
461    /// Drain the recorded cancel cause for a session. Returns `None` only
462    /// when no cancel actually fired (clean completion); every firing path
463    /// records before `token.cancel()`, so `Some(_)` after a fired token is
464    /// the invariant the verdict audit relies on.
465    pub fn take_cancel_cause(&self, id: &str) -> Option<CancelCause> {
466        self.cancel_causes
467            .lock()
468            .unwrap_or_else(|e| e.into_inner())
469            .remove(id)
470    }
471
472    pub async fn count(&self) -> usize {
473        self.sessions.lock().await.len()
474    }
475
476    /// Count active sessions grouped by agent alias.
477    pub async fn count_by_agent(&self) -> HashMap<String, usize> {
478        let sessions = self.sessions.lock().await;
479        let mut counts: HashMap<String, usize> = HashMap::new();
480        for session in sessions.values() {
481            *counts.entry(session.agent_alias.clone()).or_insert(0) += 1;
482        }
483        counts
484    }
485}
486
487#[cfg(test)]
488mod tests {
489    use super::*;
490
491    fn make_store(max: usize) -> SessionStore {
492        SessionStore::new(max, Arc::new(SessionActorQueue::new(4, 10, 60)))
493    }
494
495    fn make_agent() -> Agent {
496        use crate::agent::dispatcher::NativeToolDispatcher;
497        use crate::observability::NoopObserver;
498
499        let mem_cfg = zeroclaw_config::schema::MemoryConfig {
500            backend: "none".into(),
501            ..zeroclaw_config::schema::MemoryConfig::default()
502        };
503        let mem = Arc::from(
504            zeroclaw_memory::create_memory(&mem_cfg, &std::env::temp_dir(), None).unwrap(),
505        );
506
507        Agent::builder()
508            .model_provider(Box::new(StubProvider))
509            .tools(vec![])
510            .memory(mem)
511            .observer(Arc::new(NoopObserver {}) as Arc<dyn crate::observability::Observer>)
512            .tool_dispatcher(Box::new(NativeToolDispatcher))
513            .workspace_dir(std::env::temp_dir())
514            .build()
515            .unwrap()
516    }
517
518    /// Minimal provider that satisfies the builder. Never called in these tests.
519    struct StubProvider;
520
521    #[async_trait::async_trait]
522    impl zeroclaw_providers::ModelProvider for StubProvider {
523        async fn chat_with_system(
524            &self,
525            _: Option<&str>,
526            _: &str,
527            _: &str,
528            _: Option<f64>,
529        ) -> anyhow::Result<String> {
530            Ok(String::new())
531        }
532        async fn chat(
533            &self,
534            _: zeroclaw_providers::ChatRequest<'_>,
535            _: &str,
536            _: Option<f64>,
537        ) -> anyhow::Result<zeroclaw_providers::ChatResponse> {
538            Ok(zeroclaw_providers::ChatResponse {
539                text: Some("stub".into()),
540                tool_calls: vec![],
541                usage: None,
542                reasoning_content: None,
543            })
544        }
545    }
546    impl zeroclaw_api::attribution::Attributable for StubProvider {
547        fn role(&self) -> zeroclaw_api::attribution::Role {
548            zeroclaw_api::attribution::Role::Provider(
549                zeroclaw_api::attribution::ProviderKind::Model(
550                    zeroclaw_api::attribution::ModelProviderKind::Custom,
551                ),
552            )
553        }
554        fn alias(&self) -> &str {
555            "stub"
556        }
557    }
558
559    #[tokio::test]
560    async fn insert_and_count() {
561        let store = make_store(4);
562        assert_eq!(store.count().await, 0);
563
564        store
565            .insert(
566                "s1".into(),
567                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
568            )
569            .await
570            .unwrap();
571        assert_eq!(store.count().await, 1);
572    }
573
574    #[tokio::test]
575    async fn insert_rejects_over_limit() {
576        let store = make_store(1);
577        store
578            .insert(
579                "s1".into(),
580                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
581            )
582            .await
583            .unwrap();
584        let err = store
585            .insert(
586                "s2".into(),
587                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
588            )
589            .await;
590        assert!(err.is_err());
591    }
592
593    #[tokio::test]
594    async fn get_agent_returns_arc() {
595        let store = make_store(4);
596        store
597            .insert(
598                "s1".into(),
599                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
600            )
601            .await
602            .unwrap();
603        assert!(store.get_agent("s1").await.is_some());
604        assert!(store.get_agent("nonexistent").await.is_none());
605    }
606
607    #[tokio::test]
608    async fn set_overrides_applies_model_and_temperature_live() {
609        let store = make_store(4);
610        store
611            .insert(
612                "s1".into(),
613                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
614            )
615            .await
616            .unwrap();
617
618        let merged = store
619            .set_overrides(
620                "s1",
621                SessionOverrides {
622                    model: Some("model-x".into()),
623                    temperature: Some(0.42),
624                    ..Default::default()
625                },
626            )
627            .await
628            .expect("session exists");
629        assert_eq!(merged.model.as_deref(), Some("model-x"));
630        assert_eq!(merged.temperature, Some(0.42));
631
632        // The override is applied to the live agent immediately.
633        let agent = store.get_agent("s1").await.unwrap();
634        let (_, _, model_name) = agent.lock().await.attribution_fields();
635        assert_eq!(model_name, "model-x");
636    }
637
638    #[tokio::test]
639    async fn set_overrides_records_model_provider_without_rebuilding() {
640        // The store records the model_provider override but does NOT rebuild the
641        // provider box — that is the dispatcher's job (needs Config). Here we
642        // only assert the field round-trips through the merge.
643        let store = make_store(4);
644        store
645            .insert(
646                "s1".into(),
647                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
648            )
649            .await
650            .unwrap();
651
652        let merged = store
653            .set_overrides(
654                "s1",
655                SessionOverrides {
656                    model_provider: Some("anthropic.default".into()),
657                    ..Default::default()
658                },
659            )
660            .await
661            .expect("session exists");
662        assert_eq!(merged.model_provider.as_deref(), Some("anthropic.default"));
663    }
664
665    #[tokio::test]
666    async fn provider_switch_without_model_clears_prior_model() {
667        // Switching provider with no explicit model must drop the prior
668        // model override so the dispatcher resolves the new alias's
669        // configured model (e.g. Ollama alias must not keep a Claude model).
670        let store = make_store(4);
671        store
672            .insert(
673                "s1".into(),
674                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
675            )
676            .await
677            .unwrap();
678        store
679            .set_overrides(
680                "s1",
681                SessionOverrides {
682                    model: Some("claude-opus-4-5".into()),
683                    ..Default::default()
684                },
685            )
686            .await
687            .expect("session exists");
688        let merged = store
689            .set_overrides(
690                "s1",
691                SessionOverrides {
692                    model_provider: Some("ollama.default".into()),
693                    ..Default::default()
694                },
695            )
696            .await
697            .expect("session exists");
698        assert_eq!(merged.model_provider.as_deref(), Some("ollama.default"));
699        assert_eq!(
700            merged.model, None,
701            "a provider-only switch must clear the lingering model override"
702        );
703    }
704
705    #[tokio::test]
706    async fn set_overrides_missing_session_is_none() {
707        let store = make_store(4);
708        assert!(
709            store
710                .set_overrides("ghost", SessionOverrides::default())
711                .await
712                .is_none()
713        );
714    }
715
716    #[tokio::test]
717    async fn remove_cleans_up() {
718        let store = make_store(4);
719        store
720            .insert(
721                "s1".into(),
722                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
723            )
724            .await
725            .unwrap();
726
727        let token = tokio_util::sync::CancellationToken::new();
728        store.register_cancel_token("s1", token.clone());
729
730        assert!(store.remove("s1").await);
731        assert_eq!(store.count().await, 0);
732        // Cancel token was also removed -- cancelling is a no-op now.
733        assert!(!store.cancel_session("s1"));
734    }
735
736    #[tokio::test]
737    async fn remove_nonexistent_returns_false() {
738        let store = make_store(4);
739        assert!(!store.remove("ghost").await);
740    }
741
742    #[tokio::test]
743    async fn evict_same_mode_sibling_drops_only_same_mode_owner() {
744        use crate::rpc::types::ChatMode;
745        let store = make_store(8);
746        let mk = |mode: ChatMode, owner: &str| {
747            RpcSession::new(make_agent(), "a", ".", mode).with_owner(Some(owner.to_string()))
748        };
749        store
750            .insert("old_chat".into(), mk(ChatMode::Chat, "tui1"))
751            .await
752            .unwrap();
753        store
754            .insert("old_code".into(), mk(ChatMode::Acp, "tui1"))
755            .await
756            .unwrap();
757        store
758            .insert("other_chat".into(), mk(ChatMode::Chat, "tui2"))
759            .await
760            .unwrap();
761        store
762            .insert("new_chat".into(), mk(ChatMode::Chat, "tui1"))
763            .await
764            .unwrap();
765
766        let evicted = store
767            .evict_same_mode_sibling("tui1", &ChatMode::Chat, "new_chat")
768            .await;
769
770        let ids: Vec<&str> = evicted.iter().map(|(id, _)| id.as_str()).collect();
771        assert_eq!(ids, vec!["old_chat"]);
772        assert!(
773            store.get_agent("new_chat").await.is_some(),
774            "new session preserved"
775        );
776        assert!(
777            store.get_agent("old_code").await.is_some(),
778            "cross-mode Code session preserved"
779        );
780        assert!(
781            store.get_agent("other_chat").await.is_some(),
782            "other TUI session preserved"
783        );
784        assert!(
785            store.get_agent("old_chat").await.is_none(),
786            "abandoned same-mode session evicted"
787        );
788    }
789
790    #[tokio::test]
791    async fn evict_same_mode_sibling_skips_in_flight_turn() {
792        use crate::rpc::types::ChatMode;
793        let store = make_store(8);
794        let mk = |mode: ChatMode, owner: &str| {
795            RpcSession::new(make_agent(), "a", ".", mode).with_owner(Some(owner.to_string()))
796        };
797        store
798            .insert("busy_chat".into(), mk(ChatMode::Chat, "tui1"))
799            .await
800            .unwrap();
801        store
802            .insert("new_chat".into(), mk(ChatMode::Chat, "tui1"))
803            .await
804            .unwrap();
805        // A registered cancel token marks a turn in flight: a spawned prompt
806        // task still holds an Agent clone, so this session must NOT be force
807        // evicted (that is the reaper's documented mid-turn freeze).
808        let token = tokio_util::sync::CancellationToken::new();
809        store.register_cancel_token("busy_chat", token.clone());
810
811        let evicted = store
812            .evict_same_mode_sibling("tui1", &ChatMode::Chat, "new_chat")
813            .await;
814
815        assert!(
816            evicted.is_empty(),
817            "in-flight same-mode session must be left to finish its turn"
818        );
819        assert!(
820            store.get_agent("busy_chat").await.is_some(),
821            "mid-turn session preserved"
822        );
823        assert!(
824            !token.is_cancelled(),
825            "eviction must not fire a mid-turn cancel token"
826        );
827    }
828
829    #[tokio::test]
830    async fn cancel_token_lifecycle() {
831        let store = make_store(4);
832        let token = tokio_util::sync::CancellationToken::new();
833        let generation = store.register_cancel_token("s1", token.clone());
834
835        assert!(!token.is_cancelled());
836        assert!(store.cancel_session("s1"));
837        assert!(token.is_cancelled());
838
839        // Second cancel returns false (token was consumed by remove).
840        store.remove_cancel_token("s1", generation);
841        assert!(!store.cancel_session("s1"));
842    }
843
844    #[tokio::test]
845    async fn reregister_force_cancels_prior_turn() {
846        let store = make_store(4);
847        let old = tokio_util::sync::CancellationToken::new();
848        let old_gen = store.register_cancel_token("s", old.clone());
849
850        let new = tokio_util::sync::CancellationToken::new();
851        let new_gen = store.register_cancel_token("s", new.clone());
852
853        assert!(old.is_cancelled(), "re-register must kill the prior turn");
854        assert!(!new.is_cancelled());
855        assert_ne!(old_gen, new_gen);
856
857        store.remove_cancel_token("s", old_gen);
858        assert!(
859            store.cancel_session("s"),
860            "stale-generation remove must not orphan the live turn's token"
861        );
862        assert!(new.is_cancelled());
863    }
864
865    #[tokio::test]
866    async fn stale_remove_is_a_noop() {
867        let store = make_store(4);
868        let token = tokio_util::sync::CancellationToken::new();
869        let generation = store.register_cancel_token("s", token.clone());
870        store.remove_cancel_token("s", generation.wrapping_sub(1));
871        assert!(
872            store.cancel_session("s"),
873            "a remove with a non-matching generation must leave the token intact"
874        );
875    }
876
877    #[tokio::test]
878    async fn cancel_nonexistent_returns_false() {
879        let store = make_store(4);
880        assert!(!store.cancel_session("nope"));
881    }
882
883    #[tokio::test]
884    async fn list_ids() {
885        let store = make_store(4);
886        store
887            .insert(
888                "b".into(),
889                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
890            )
891            .await
892            .unwrap();
893        store
894            .insert(
895                "a".into(),
896                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
897            )
898            .await
899            .unwrap();
900        let mut ids = store.list_ids().await;
901        ids.sort();
902        assert_eq!(ids, vec!["a", "b"]);
903    }
904
905    #[tokio::test]
906    async fn touch_updates_last_active() {
907        let store = make_store(4);
908        store
909            .insert(
910                "s1".into(),
911                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
912            )
913            .await
914            .unwrap();
915
916        let before = { store.sessions.lock().await.get("s1").unwrap().last_active };
917        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
918        store.touch("s1").await;
919        let after = { store.sessions.lock().await.get("s1").unwrap().last_active };
920        assert!(after > before);
921    }
922
923    /// A session must persist indefinitely after transport disconnect —
924    /// no orphan grace, no idle TTL. The reaper no longer exists.
925    #[tokio::test]
926    async fn session_persists_after_transport_disconnect() {
927        let store = make_store(4);
928        store
929            .insert(
930                "s1".into(),
931                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat)
932                    .with_owner(Some("tui-x".to_string())),
933            )
934            .await
935            .unwrap();
936        // Simulate transport disconnect — store must still hold the session.
937        assert_eq!(
938            store.count().await,
939            1,
940            "session must survive transport disconnect"
941        );
942    }
943
944    /// kill_session fires the cancel token and removes the session.
945    #[tokio::test]
946    async fn kill_session_cancels_inflight_and_removes() {
947        let store = make_store(4);
948        store
949            .insert(
950                "live".into(),
951                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
952            )
953            .await
954            .unwrap();
955        let token = tokio_util::sync::CancellationToken::new();
956        store.register_cancel_token("live", token.clone());
957
958        let removed = store.kill_session("live").await;
959        assert!(removed, "kill_session must return true for a real session");
960        assert!(
961            token.is_cancelled(),
962            "kill_session must fire the cancel token"
963        );
964        assert_eq!(store.count().await, 0, "session must be removed");
965    }
966
967    /// kill_session on a session with no in-flight turn still removes it.
968    #[tokio::test]
969    async fn kill_session_idle_session_removed() {
970        let store = make_store(4);
971        store
972            .insert(
973                "cold".into(),
974                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
975            )
976            .await
977            .unwrap();
978        // No cancel token registered.
979        let removed = store.kill_session("cold").await;
980        assert!(removed);
981        assert_eq!(store.count().await, 0);
982    }
983
984    /// kill_session returns false for a session that doesn't exist.
985    #[tokio::test]
986    async fn kill_session_missing_returns_false() {
987        let store = make_store(4);
988        assert!(!store.kill_session("ghost").await);
989    }
990
991    /// kill_session must record AdminKill so the turn-verdict site can attribute
992    /// the cancel. The cause must survive until take_cancel_cause drains it.
993    #[tokio::test]
994    async fn kill_session_cause_is_admin_kill() {
995        let store = make_store(4);
996        store
997            .insert(
998                "s".into(),
999                RpcSession::new(make_agent(), "a", ".", crate::rpc::types::ChatMode::Chat),
1000            )
1001            .await
1002            .unwrap();
1003        let token = tokio_util::sync::CancellationToken::new();
1004        store.register_cancel_token("s", token.clone());
1005
1006        store.kill_session("s").await;
1007        // The verdict site must see AdminKill, not None.
1008        assert_eq!(
1009            store.take_cancel_cause("s"),
1010            Some(CancelCause::AdminKill),
1011            "kill_session must preserve AdminKill cause for verdict-site attribution"
1012        );
1013    }
1014}