zeroclaw_runtime/daemon/
registry.rs1use anyhow::Result;
2use serde_json::Value;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::AtomicUsize;
7use tokio::sync::{broadcast, watch};
8use tokio_util::sync::CancellationToken;
9use zeroclaw_config::schema::{Config, MqttConfig};
10
11use crate::rpc::context::RpcContext;
12use crate::rpc::tui_identity::TuiRegistry;
13
14pub type StarterFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
15
16pub type GatewayStarter = Box<
22 dyn Fn(
23 String,
24 u16,
25 Config,
26 Option<broadcast::Sender<Value>>,
27 Option<watch::Sender<bool>>,
28 Option<Arc<TuiRegistry>>,
29 ) -> StarterFuture
30 + Send
31 + Sync,
32>;
33
34pub type ChannelsStarter = Box<dyn Fn(Config, CancellationToken) -> StarterFuture + Send + Sync>;
36
37pub type RpcStarter = Box<
39 dyn Fn(Arc<RpcContext>, CancellationToken, Arc<AtomicUsize>) -> StarterFuture + Send + Sync,
40>;
41
42pub type MqttStarter = Box<dyn Fn(MqttConfig) -> StarterFuture + Send + Sync>;
44
45#[derive(Default)]
51pub struct DaemonRegistry {
52 gateway_start: Option<GatewayStarter>,
53 channels_start: Option<ChannelsStarter>,
54 socket_start: Option<RpcStarter>,
55 wss_start: Option<RpcStarter>,
56 mqtt_start: Option<MqttStarter>,
57}
58
59impl DaemonRegistry {
60 pub fn new() -> Self {
63 Self::default()
64 }
65
66 pub fn register_gateway(&mut self, starter: GatewayStarter) -> &mut Self {
67 self.gateway_start = Some(starter);
68 self
69 }
70
71 #[cfg(test)]
72 fn has_gateway_start(&self) -> bool {
73 self.gateway_start.is_some()
74 }
75
76 pub fn register_channels(&mut self, starter: ChannelsStarter) -> &mut Self {
77 self.channels_start = Some(starter);
78 self
79 }
80
81 #[cfg(test)]
82 fn has_channels_start(&self) -> bool {
83 self.channels_start.is_some()
84 }
85
86 pub fn register_socket(&mut self, starter: RpcStarter) -> &mut Self {
87 self.socket_start = Some(starter);
88 self
89 }
90
91 pub(crate) fn has_socket_start(&self) -> bool {
92 self.socket_start.is_some()
93 }
94
95 pub fn register_wss(&mut self, starter: RpcStarter) -> &mut Self {
96 self.wss_start = Some(starter);
97 self
98 }
99
100 pub(crate) fn has_wss_start(&self) -> bool {
101 self.wss_start.is_some()
102 }
103
104 pub fn register_mqtt(&mut self, starter: MqttStarter) -> &mut Self {
105 self.mqtt_start = Some(starter);
106 self
107 }
108
109 #[cfg(test)]
110 fn has_mqtt_start(&self) -> bool {
111 self.mqtt_start.is_some()
112 }
113
114 pub(crate) fn take_gateway_start(&mut self) -> Option<GatewayStarter> {
115 self.gateway_start.take()
116 }
117
118 pub(crate) fn take_channels_start(&mut self) -> Option<ChannelsStarter> {
119 self.channels_start.take()
120 }
121
122 pub(crate) fn take_socket_start(&mut self) -> Option<RpcStarter> {
123 self.socket_start.take()
124 }
125
126 pub(crate) fn take_wss_start(&mut self) -> Option<RpcStarter> {
127 self.wss_start.take()
128 }
129
130 pub(crate) fn take_mqtt_start(&mut self) -> Option<MqttStarter> {
131 self.mqtt_start.take()
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 fn gateway_starter() -> GatewayStarter {
140 Box::new(|_, _, _, _, _, _| Box::pin(async { Ok(()) }))
141 }
142
143 fn channels_starter() -> ChannelsStarter {
144 Box::new(|_, _| Box::pin(async { Ok(()) }))
145 }
146
147 fn rpc_starter() -> RpcStarter {
148 Box::new(|_, _, _| Box::pin(async { Ok(()) }))
149 }
150
151 fn mqtt_starter() -> MqttStarter {
152 Box::new(|_| Box::pin(async { Ok(()) }))
153 }
154
155 #[test]
156 fn new_registry_has_no_start_hooks() {
157 let registry = DaemonRegistry::new();
158
159 assert!(!registry.has_gateway_start());
160 assert!(!registry.has_channels_start());
161 assert!(!registry.has_socket_start());
162 assert!(!registry.has_wss_start());
163 assert!(!registry.has_mqtt_start());
164 }
165
166 #[test]
167 fn builder_records_typed_start_hooks() {
168 let mut registry = DaemonRegistry::new();
169 registry
170 .register_gateway(gateway_starter())
171 .register_channels(channels_starter())
172 .register_socket(rpc_starter())
173 .register_wss(rpc_starter())
174 .register_mqtt(mqtt_starter());
175
176 assert!(registry.has_gateway_start());
177 assert!(registry.has_channels_start());
178 assert!(registry.has_socket_start());
179 assert!(registry.has_wss_start());
180 assert!(registry.has_mqtt_start());
181 }
182
183 #[test]
184 fn taking_start_hooks_consumes_slots() {
185 let mut registry = DaemonRegistry::new();
186 registry
187 .register_gateway(gateway_starter())
188 .register_channels(channels_starter())
189 .register_socket(rpc_starter())
190 .register_wss(rpc_starter())
191 .register_mqtt(mqtt_starter());
192
193 assert!(registry.take_gateway_start().is_some());
194 assert!(registry.take_channels_start().is_some());
195 assert!(registry.take_socket_start().is_some());
196 assert!(registry.take_wss_start().is_some());
197 assert!(registry.take_mqtt_start().is_some());
198
199 assert!(!registry.has_gateway_start());
200 assert!(!registry.has_channels_start());
201 assert!(!registry.has_socket_start());
202 assert!(!registry.has_wss_start());
203 assert!(!registry.has_mqtt_start());
204 }
205}