zeroclaw_channels/orchestrator/
mqtt.rs1use std::sync::{Arc, Mutex};
7
8use anyhow::Result;
9use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS, Transport};
10
11use zeroclaw_config::schema::MqttConfig;
12use zeroclaw_runtime::sop::audit::SopAuditLogger;
13use zeroclaw_runtime::sop::dispatch::{dispatch_sop_event, process_headless_results};
14use zeroclaw_runtime::sop::engine::{SopEngine, now_iso8601};
15use zeroclaw_runtime::sop::types::{SopEvent, SopTriggerSource};
16
17pub async fn run_mqtt_sop_listener(
22 config: &MqttConfig,
23 engine: Arc<Mutex<SopEngine>>,
24 audit: Arc<SopAuditLogger>,
25) -> Result<()> {
26 config.validate()?;
27
28 let mut mqtt_options = MqttOptions::new(
29 &config.client_id,
30 broker_host(&config.broker_url),
31 broker_port(&config.broker_url),
32 );
33 mqtt_options.set_keep_alive(std::time::Duration::from_secs(config.keep_alive_secs));
34
35 if let (Some(user), Some(pass)) = (&config.username, &config.password) {
36 mqtt_options.set_credentials(user, pass);
37 }
38
39 if config.use_tls {
41 mqtt_options.set_transport(Transport::tls_with_default_config());
42 ::zeroclaw_log::record!(
43 INFO,
44 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
45 "MQTT SOP listener: TLS transport enabled"
46 );
47 }
48
49 let (client, mut eventloop) = AsyncClient::new(mqtt_options, 64);
50
51 let qos = match config.qos {
52 0 => QoS::AtMostOnce,
53 1 => QoS::AtLeastOnce,
54 _ => QoS::ExactlyOnce,
55 };
56
57 for topic in &config.topics {
59 client.subscribe(topic, qos).await?;
60 ::zeroclaw_log::record!(
61 INFO,
62 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
63 .with_attrs(::serde_json::json!({"topic": topic})),
64 "MQTT SOP listener: subscribed to ''"
65 );
66 }
67
68 zeroclaw_runtime::health::mark_component_ok("mqtt");
69
70 loop {
71 match eventloop.poll().await {
72 Ok(Event::Incoming(Packet::Publish(msg))) => {
73 let topic = msg.topic.clone();
74 let payload = String::from_utf8_lossy(&msg.payload).to_string();
75
76 let event = SopEvent {
77 source: SopTriggerSource::Mqtt,
78 topic: Some(topic),
79 payload: Some(payload),
80 timestamp: now_iso8601(),
81 };
82
83 let results = dispatch_sop_event(&engine, &audit, event).await;
84 process_headless_results(&results);
85 }
86 Ok(Event::Incoming(Packet::ConnAck(_))) => {
87 zeroclaw_runtime::health::mark_component_ok("mqtt");
88 ::zeroclaw_log::record!(
89 INFO,
90 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
91 "MQTT SOP listener: connected to broker"
92 );
93 }
94 Ok(_) => {
95 }
97 Err(e) => {
98 zeroclaw_runtime::health::mark_component_error("mqtt", e.to_string());
99 ::zeroclaw_log::record!(
100 WARN,
101 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
102 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
103 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
104 "MQTT SOP listener: connection error"
105 );
106 }
108 }
109 }
110}
111
112fn broker_host(url: &str) -> String {
114 let without_scheme = url
115 .strip_prefix("mqtt://")
116 .or_else(|| url.strip_prefix("mqtts://"))
117 .unwrap_or(url);
118 without_scheme
119 .split(':')
120 .next()
121 .unwrap_or("localhost")
122 .to_string()
123}
124
125fn broker_port(url: &str) -> u16 {
127 let is_tls = url.starts_with("mqtts://");
128 let without_scheme = url
129 .strip_prefix("mqtt://")
130 .or_else(|| url.strip_prefix("mqtts://"))
131 .unwrap_or(url);
132 let default_port: u16 = if is_tls { 8883 } else { 1883 };
133 without_scheme
134 .rsplit(':')
135 .next()
136 .and_then(|p| p.parse().ok())
137 .unwrap_or(default_port)
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143
144 #[test]
145 fn mqtt_config_validation_rejects_bad_qos() {
146 let config = MqttConfig {
147 enabled: true,
148 broker_url: "mqtt://localhost:1883".into(),
149 client_id: "zeroclaw".into(),
150 topics: vec!["test".into()],
151 qos: 3,
152 username: None,
153 password: None,
154 use_tls: false,
155 keep_alive_secs: 30,
156 excluded_tools: vec![],
157 };
158 let err = config.validate().unwrap_err();
159 assert!(err.to_string().contains("qos must be 0, 1, or 2"));
160 }
161
162 #[test]
163 fn mqtt_config_validation_rejects_bad_url() {
164 let config = MqttConfig {
165 enabled: true,
166 broker_url: "http://localhost:1883".into(),
167 client_id: "zeroclaw".into(),
168 topics: vec!["test".into()],
169 qos: 1,
170 username: None,
171 password: None,
172 use_tls: false,
173 keep_alive_secs: 30,
174 excluded_tools: vec![],
175 };
176 let err = config.validate().unwrap_err();
177 assert!(err.to_string().contains("mqtt://"));
178 }
179
180 #[test]
181 fn mqtt_config_validation_rejects_empty_topics() {
182 let config = MqttConfig {
183 enabled: true,
184 broker_url: "mqtt://localhost:1883".into(),
185 client_id: "zeroclaw".into(),
186 topics: vec![],
187 qos: 1,
188 username: None,
189 password: None,
190 use_tls: false,
191 keep_alive_secs: 30,
192 excluded_tools: vec![],
193 };
194 let err = config.validate().unwrap_err();
195 assert!(err.to_string().contains("at least one topic"));
196 }
197
198 #[test]
199 fn mqtt_config_validation_rejects_empty_client_id() {
200 let config = MqttConfig {
201 enabled: true,
202 broker_url: "mqtt://localhost:1883".into(),
203 client_id: String::new(),
204 topics: vec!["test".into()],
205 qos: 1,
206 username: None,
207 password: None,
208 use_tls: false,
209 keep_alive_secs: 30,
210 excluded_tools: vec![],
211 };
212 let err = config.validate().unwrap_err();
213 assert!(err.to_string().contains("client_id must not be empty"));
214 }
215
216 #[test]
217 fn mqtt_config_validation_accepts_valid() {
218 let config = MqttConfig {
219 enabled: true,
220 broker_url: "mqtt://localhost:1883".into(),
221 client_id: "zeroclaw".into(),
222 topics: vec!["sensors/#".into()],
223 qos: 1,
224 username: None,
225 password: None,
226 use_tls: false,
227 keep_alive_secs: 30,
228 excluded_tools: vec![],
229 };
230 assert!(config.validate().is_ok());
231 }
232
233 #[test]
234 fn mqtt_tls_flag_rejects_mqtt_scheme_with_use_tls() {
235 let config = MqttConfig {
236 enabled: true,
237 broker_url: "mqtt://localhost:1883".into(),
238 client_id: "zeroclaw".into(),
239 topics: vec!["test".into()],
240 qos: 1,
241 username: None,
242 password: None,
243 use_tls: true,
244 keep_alive_secs: 30,
245 excluded_tools: vec![],
246 };
247 let err = config.validate().unwrap_err();
248 assert!(err.to_string().contains("use_tls is true"));
249 }
250
251 #[test]
252 fn mqtt_tls_flag_rejects_mqtts_scheme_without_use_tls() {
253 let config = MqttConfig {
254 enabled: true,
255 broker_url: "mqtts://localhost:8883".into(),
256 client_id: "zeroclaw".into(),
257 topics: vec!["test".into()],
258 qos: 1,
259 username: None,
260 password: None,
261 use_tls: false,
262 keep_alive_secs: 30,
263 excluded_tools: vec![],
264 };
265 let err = config.validate().unwrap_err();
266 assert!(err.to_string().contains("mqtts://"));
267 }
268
269 #[test]
270 fn mqtt_tls_flag_accepts_mqtts_with_use_tls() {
271 let config = MqttConfig {
272 enabled: true,
273 broker_url: "mqtts://localhost:8883".into(),
274 client_id: "zeroclaw".into(),
275 topics: vec!["test".into()],
276 qos: 1,
277 username: None,
278 password: None,
279 use_tls: true,
280 keep_alive_secs: 30,
281 excluded_tools: vec![],
282 };
283 assert!(config.validate().is_ok());
284 }
285
286 #[test]
287 fn broker_host_extracts_host() {
288 assert_eq!(broker_host("mqtt://myhost:1883"), "myhost");
289 assert_eq!(
290 broker_host("mqtts://secure.example.com:8883"),
291 "secure.example.com"
292 );
293 }
294
295 #[test]
296 fn broker_port_extracts_port() {
297 assert_eq!(broker_port("mqtt://localhost:1883"), 1883);
298 assert_eq!(broker_port("mqtts://host:8883"), 8883);
299 }
300
301 #[test]
302 fn broker_port_defaults_1883_for_mqtt() {
303 assert_eq!(broker_port("mqtt://localhost"), 1883);
304 }
305
306 #[test]
307 fn broker_port_defaults_8883_for_mqtts() {
308 assert_eq!(broker_port("mqtts://secure.example.com"), 8883);
309 }
310}