Skip to main content

zeroclaw_runtime/daemon/
registry.rs

1use anyhow::Result;
2use serde_json::Value;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::AtomicUsize;
7use tokio::sync::{broadcast, watch};
8use tokio_util::sync::CancellationToken;
9use zeroclaw_config::schema::{Config, MqttConfig};
10
11use crate::rpc::context::RpcContext;
12use crate::rpc::tui_identity::TuiRegistry;
13
14pub type StarterFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
15
16/// Starts the gateway HTTP server for one daemon run/reload iteration.
17///
18/// The optional broadcast sender carries daemon events, the optional reload
19/// sender lets the gateway trigger in-process reloads, and the optional TUI
20/// registry powers the gateway's TUI identity endpoints.
21pub type GatewayStarter = Box<
22    dyn Fn(
23            String,
24            u16,
25            Config,
26            Option<broadcast::Sender<Value>>,
27            Option<watch::Sender<bool>>,
28            Option<Arc<TuiRegistry>>,
29        ) -> StarterFuture
30        + Send
31        + Sync,
32>;
33
34/// Starts the supervised channel orchestrator for one daemon run/reload iteration.
35pub type ChannelsStarter = Box<dyn Fn(Config, CancellationToken) -> StarterFuture + Send + Sync>;
36
37/// Starts an RPC transport using the shared daemon RPC context.
38pub type RpcStarter = Box<
39    dyn Fn(Arc<RpcContext>, CancellationToken, Arc<AtomicUsize>) -> StarterFuture + Send + Sync,
40>;
41
42/// Starts the MQTT SOP listener for one configured MQTT channel alias.
43pub type MqttStarter = Box<dyn Fn(MqttConfig) -> StarterFuture + Send + Sync>;
44
45/// Typed startup registry injected by the binary crate.
46///
47/// This registry is the source of truth for startup hook values for the current
48/// daemon run/reload iteration. It deliberately does not copy config-derived
49/// facts; `Config` remains the source of truth for which subsystems are enabled.
50#[derive(Default)]
51pub struct DaemonRegistry {
52    gateway_start: Option<GatewayStarter>,
53    channels_start: Option<ChannelsStarter>,
54    socket_start: Option<RpcStarter>,
55    wss_start: Option<RpcStarter>,
56    mqtt_start: Option<MqttStarter>,
57}
58
59impl DaemonRegistry {
60    /// Create an empty registry. Missing starters are treated as unwired
61    /// optional subsystems by `daemon::run`.
62    pub fn new() -> Self {
63        Self::default()
64    }
65
66    pub fn register_gateway(&mut self, starter: GatewayStarter) -> &mut Self {
67        self.gateway_start = Some(starter);
68        self
69    }
70
71    #[cfg(test)]
72    fn has_gateway_start(&self) -> bool {
73        self.gateway_start.is_some()
74    }
75
76    pub fn register_channels(&mut self, starter: ChannelsStarter) -> &mut Self {
77        self.channels_start = Some(starter);
78        self
79    }
80
81    #[cfg(test)]
82    fn has_channels_start(&self) -> bool {
83        self.channels_start.is_some()
84    }
85
86    pub fn register_socket(&mut self, starter: RpcStarter) -> &mut Self {
87        self.socket_start = Some(starter);
88        self
89    }
90
91    pub(crate) fn has_socket_start(&self) -> bool {
92        self.socket_start.is_some()
93    }
94
95    pub fn register_wss(&mut self, starter: RpcStarter) -> &mut Self {
96        self.wss_start = Some(starter);
97        self
98    }
99
100    pub(crate) fn has_wss_start(&self) -> bool {
101        self.wss_start.is_some()
102    }
103
104    pub fn register_mqtt(&mut self, starter: MqttStarter) -> &mut Self {
105        self.mqtt_start = Some(starter);
106        self
107    }
108
109    #[cfg(test)]
110    fn has_mqtt_start(&self) -> bool {
111        self.mqtt_start.is_some()
112    }
113
114    pub(crate) fn take_gateway_start(&mut self) -> Option<GatewayStarter> {
115        self.gateway_start.take()
116    }
117
118    pub(crate) fn take_channels_start(&mut self) -> Option<ChannelsStarter> {
119        self.channels_start.take()
120    }
121
122    pub(crate) fn take_socket_start(&mut self) -> Option<RpcStarter> {
123        self.socket_start.take()
124    }
125
126    pub(crate) fn take_wss_start(&mut self) -> Option<RpcStarter> {
127        self.wss_start.take()
128    }
129
130    pub(crate) fn take_mqtt_start(&mut self) -> Option<MqttStarter> {
131        self.mqtt_start.take()
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    fn gateway_starter() -> GatewayStarter {
140        Box::new(|_, _, _, _, _, _| Box::pin(async { Ok(()) }))
141    }
142
143    fn channels_starter() -> ChannelsStarter {
144        Box::new(|_, _| Box::pin(async { Ok(()) }))
145    }
146
147    fn rpc_starter() -> RpcStarter {
148        Box::new(|_, _, _| Box::pin(async { Ok(()) }))
149    }
150
151    fn mqtt_starter() -> MqttStarter {
152        Box::new(|_| Box::pin(async { Ok(()) }))
153    }
154
155    #[test]
156    fn new_registry_has_no_start_hooks() {
157        let registry = DaemonRegistry::new();
158
159        assert!(!registry.has_gateway_start());
160        assert!(!registry.has_channels_start());
161        assert!(!registry.has_socket_start());
162        assert!(!registry.has_wss_start());
163        assert!(!registry.has_mqtt_start());
164    }
165
166    #[test]
167    fn builder_records_typed_start_hooks() {
168        let mut registry = DaemonRegistry::new();
169        registry
170            .register_gateway(gateway_starter())
171            .register_channels(channels_starter())
172            .register_socket(rpc_starter())
173            .register_wss(rpc_starter())
174            .register_mqtt(mqtt_starter());
175
176        assert!(registry.has_gateway_start());
177        assert!(registry.has_channels_start());
178        assert!(registry.has_socket_start());
179        assert!(registry.has_wss_start());
180        assert!(registry.has_mqtt_start());
181    }
182
183    #[test]
184    fn taking_start_hooks_consumes_slots() {
185        let mut registry = DaemonRegistry::new();
186        registry
187            .register_gateway(gateway_starter())
188            .register_channels(channels_starter())
189            .register_socket(rpc_starter())
190            .register_wss(rpc_starter())
191            .register_mqtt(mqtt_starter());
192
193        assert!(registry.take_gateway_start().is_some());
194        assert!(registry.take_channels_start().is_some());
195        assert!(registry.take_socket_start().is_some());
196        assert!(registry.take_wss_start().is_some());
197        assert!(registry.take_mqtt_start().is_some());
198
199        assert!(!registry.has_gateway_start());
200        assert!(!registry.has_channels_start());
201        assert!(!registry.has_socket_start());
202        assert!(!registry.has_wss_start());
203        assert!(!registry.has_mqtt_start());
204    }
205}