zeroclaw_runtime/rpc/
context.rs1use std::collections::HashMap;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12use serde_json::Value;
13use tokio::sync::oneshot;
14
15use zeroclaw_api::channel::ChannelApprovalResponse;
16use zeroclaw_config::cost::tracker::CostTracker;
17use zeroclaw_config::schema::Config;
18use zeroclaw_infra::acp_session_store::AcpSessionStore;
19use zeroclaw_infra::session_backend::SessionBackend;
20
21use super::session::SessionStore;
22use super::tui_identity::TuiRegistry;
23
24#[derive(Default)]
30pub struct ApprovalPendingMap {
31 inner: std::sync::Mutex<HashMap<String, oneshot::Sender<ChannelApprovalResponse>>>,
32}
33
34impl ApprovalPendingMap {
35 pub fn insert(&self, request_id: String, tx: oneshot::Sender<ChannelApprovalResponse>) {
36 self.inner
37 .lock()
38 .unwrap_or_else(|e| e.into_inner())
39 .insert(request_id, tx);
40 }
41
42 pub fn resolve(&self, request_id: &str, response: ChannelApprovalResponse) {
43 let tx = self
44 .inner
45 .lock()
46 .unwrap_or_else(|e| e.into_inner())
47 .remove(request_id);
48 if let Some(tx) = tx {
49 let _ = tx.send(response);
50 }
51 }
52}
53
54pub struct RpcContext {
56 pub config: Arc<RwLock<Config>>,
60
61 pub sessions: Arc<SessionStore>,
63
64 pub session_backend: Option<Arc<dyn SessionBackend>>,
67
68 pub memory: Option<Arc<dyn zeroclaw_api::memory_traits::Memory>>,
70
71 pub cost_tracker: Option<Arc<CostTracker>>,
73
74 pub event_tx: Option<tokio::sync::broadcast::Sender<Value>>,
77
78 pub reload_tx: Option<tokio::sync::watch::Sender<bool>>,
81
82 pub approval_pending: Arc<ApprovalPendingMap>,
84
85 pub tui_registry: Arc<TuiRegistry>,
88
89 pub acp_session_store: Option<Arc<AcpSessionStore>>,
94}
95
96impl RpcContext {
97 #[cfg(test)]
100 pub fn minimal(config: Config, sessions: Arc<SessionStore>) -> Arc<Self> {
101 Arc::new(Self {
102 config: Arc::new(RwLock::new(config)),
103 sessions,
104 session_backend: None,
105 memory: None,
106 cost_tracker: None,
107 event_tx: None,
108 reload_tx: None,
109 approval_pending: Arc::new(ApprovalPendingMap::default()),
110 tui_registry: Arc::new(TuiRegistry::new_unsigned()),
111 acp_session_store: None,
112 })
113 }
114
115 #[cfg(test)]
116 pub fn for_persistence_tests(
117 config: Config,
118 sessions: Arc<SessionStore>,
119 session_backend: Option<Arc<dyn SessionBackend>>,
120 acp_session_store: Option<Arc<AcpSessionStore>>,
121 ) -> Arc<Self> {
122 Arc::new(Self {
123 config: Arc::new(RwLock::new(config)),
124 sessions,
125 session_backend,
126 memory: None,
127 cost_tracker: None,
128 event_tx: None,
129 reload_tx: None,
130 approval_pending: Arc::new(ApprovalPendingMap::default()),
131 tui_registry: Arc::new(TuiRegistry::new_unsigned()),
132 acp_session_store,
133 })
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use tokio::sync::oneshot;
141 use zeroclaw_api::channel::ChannelApprovalResponse;
142
143 #[test]
144 fn pending_map_insert_and_resolve() {
145 let map = ApprovalPendingMap::default();
146 let (tx, mut rx) = oneshot::channel::<ChannelApprovalResponse>();
147 map.insert("req-1".to_string(), tx);
148 map.resolve("req-1", ChannelApprovalResponse::Approve);
149 assert_eq!(rx.try_recv().unwrap(), ChannelApprovalResponse::Approve);
150 }
151
152 #[test]
153 fn pending_map_resolve_unknown_key_is_noop() {
154 let map = ApprovalPendingMap::default();
155 map.resolve("nonexistent", ChannelApprovalResponse::Deny);
156 }
157
158 #[test]
159 fn pending_map_insert_then_drop_is_safe() {
160 let map = ApprovalPendingMap::default();
161 let (tx, _rx) = oneshot::channel::<ChannelApprovalResponse>();
162 map.insert("req-2".to_string(), tx);
163 map.resolve("req-2", ChannelApprovalResponse::Approve);
165 }
166}