Skip to main content

zeroclaw_runtime/rpc/
context.rs

1//! Shared context threaded from `daemon::run()` through the Unix socket
2//! listener into each per-connection [`super::dispatch::RpcDispatcher`].
3//!
4//! Every subsystem handle the RPC layer might need lives here. Fields
5//! beyond `config` and `sessions` are `Option` so the context works in
6//! tests and minimal (kernel-only) daemon configurations.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12use serde_json::Value;
13use tokio::sync::oneshot;
14
15use zeroclaw_api::channel::ChannelApprovalResponse;
16use zeroclaw_config::cost::tracker::CostTracker;
17use zeroclaw_config::schema::Config;
18use zeroclaw_infra::acp_session_store::AcpSessionStore;
19use zeroclaw_infra::session_backend::SessionBackend;
20
21use super::session::SessionStore;
22use super::tui_identity::TuiRegistry;
23
24/// Registry for in-flight tool approval requests.
25///
26/// The RpcApprovalChannel inserts a (request_id, oneshot::Sender) pair
27/// before sending the approval_request notification.
28/// handle_session_approve resolves it when the client sends session/approve.
29#[derive(Default)]
30pub struct ApprovalPendingMap {
31    inner: std::sync::Mutex<HashMap<String, oneshot::Sender<ChannelApprovalResponse>>>,
32}
33
34impl ApprovalPendingMap {
35    pub fn insert(&self, request_id: String, tx: oneshot::Sender<ChannelApprovalResponse>) {
36        self.inner
37            .lock()
38            .unwrap_or_else(|e| e.into_inner())
39            .insert(request_id, tx);
40    }
41
42    pub fn resolve(&self, request_id: &str, response: ChannelApprovalResponse) {
43        let tx = self
44            .inner
45            .lock()
46            .unwrap_or_else(|e| e.into_inner())
47            .remove(request_id);
48        if let Some(tx) = tx {
49            let _ = tx.send(response);
50        }
51    }
52}
53
54/// Daemon-wide state shared across all RPC connections.
55pub struct RpcContext {
56    /// Live config behind a read-write lock so `config/set` can mutate
57    /// without a full daemon reload. Mirrors the gateway's
58    /// `Arc<RwLock<Config>>` pattern.
59    pub config: Arc<RwLock<Config>>,
60
61    /// In-memory session store for active RPC sessions.
62    pub sessions: Arc<SessionStore>,
63
64    /// Persistent session backend (SQLite / JSONL) for history and
65    /// session metadata. `None` when persistence is disabled.
66    pub session_backend: Option<Arc<dyn SessionBackend>>,
67
68    /// Memory subsystem (`dyn Memory` from `zeroclaw-api`).
69    pub memory: Option<Arc<dyn zeroclaw_api::memory_traits::Memory>>,
70
71    /// Cost tracking. `None` when cost tracking is disabled.
72    pub cost_tracker: Option<Arc<CostTracker>>,
73
74    /// Daemon-wide event broadcast. RPC handlers subscribe to forward
75    /// events as JSON-RPC notifications (`logs/subscribe`).
76    pub event_tx: Option<tokio::sync::broadcast::Sender<Value>>,
77
78    /// Write `true` to trigger a daemon-level config reload. Mirrors
79    /// the gateway's `/admin/reload` mechanism.
80    pub reload_tx: Option<tokio::sync::watch::Sender<bool>>,
81
82    /// In-flight approval requests waiting for session/approve RPC calls.
83    pub approval_pending: Arc<ApprovalPendingMap>,
84
85    /// Live TUI client registry. Tracks connected TUI sessions by UID.
86    /// **Source of truth** for "which TUIs are connected right now."
87    pub tui_registry: Arc<TuiRegistry>,
88
89    /// ACP session persistence. Opened (and the DB file created) at
90    /// daemon boot under `<data_dir>/sessions/acp-sessions.db`. `None`
91    /// when the store could not be opened (read-only FS, bad perms) —
92    /// callers must treat persistence as best-effort.
93    pub acp_session_store: Option<Arc<AcpSessionStore>>,
94}
95
96impl RpcContext {
97    /// Minimal context for tests — only config and sessions, everything
98    /// else `None`.
99    #[cfg(test)]
100    pub fn minimal(config: Config, sessions: Arc<SessionStore>) -> Arc<Self> {
101        Arc::new(Self {
102            config: Arc::new(RwLock::new(config)),
103            sessions,
104            session_backend: None,
105            memory: None,
106            cost_tracker: None,
107            event_tx: None,
108            reload_tx: None,
109            approval_pending: Arc::new(ApprovalPendingMap::default()),
110            tui_registry: Arc::new(TuiRegistry::new_unsigned()),
111            acp_session_store: None,
112        })
113    }
114
115    #[cfg(test)]
116    pub fn for_persistence_tests(
117        config: Config,
118        sessions: Arc<SessionStore>,
119        session_backend: Option<Arc<dyn SessionBackend>>,
120        acp_session_store: Option<Arc<AcpSessionStore>>,
121    ) -> Arc<Self> {
122        Arc::new(Self {
123            config: Arc::new(RwLock::new(config)),
124            sessions,
125            session_backend,
126            memory: None,
127            cost_tracker: None,
128            event_tx: None,
129            reload_tx: None,
130            approval_pending: Arc::new(ApprovalPendingMap::default()),
131            tui_registry: Arc::new(TuiRegistry::new_unsigned()),
132            acp_session_store,
133        })
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use tokio::sync::oneshot;
141    use zeroclaw_api::channel::ChannelApprovalResponse;
142
143    #[test]
144    fn pending_map_insert_and_resolve() {
145        let map = ApprovalPendingMap::default();
146        let (tx, mut rx) = oneshot::channel::<ChannelApprovalResponse>();
147        map.insert("req-1".to_string(), tx);
148        map.resolve("req-1", ChannelApprovalResponse::Approve);
149        assert_eq!(rx.try_recv().unwrap(), ChannelApprovalResponse::Approve);
150    }
151
152    #[test]
153    fn pending_map_resolve_unknown_key_is_noop() {
154        let map = ApprovalPendingMap::default();
155        map.resolve("nonexistent", ChannelApprovalResponse::Deny);
156    }
157
158    #[test]
159    fn pending_map_insert_then_drop_is_safe() {
160        let map = ApprovalPendingMap::default();
161        let (tx, _rx) = oneshot::channel::<ChannelApprovalResponse>();
162        map.insert("req-2".to_string(), tx);
163        // _rx is dropped — resolve sends to a closed channel; must not panic
164        map.resolve("req-2", ChannelApprovalResponse::Approve);
165    }
166}