Skip to main content

zeroclaw_channels/orchestrator/
mqtt.rs

1//! MQTT → SOP event fan-in listener.
2//!
3//! This is NOT a `Channel` trait implementor — it routes MQTT messages
4//! to the SOP engine via `dispatch_sop_event`, not to the chat loop.
5
6use std::sync::{Arc, Mutex};
7
8use anyhow::Result;
9use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS, Transport};
10
11use zeroclaw_config::schema::MqttConfig;
12use zeroclaw_runtime::sop::audit::SopAuditLogger;
13use zeroclaw_runtime::sop::dispatch::{dispatch_sop_event, process_headless_results};
14use zeroclaw_runtime::sop::engine::{SopEngine, now_iso8601};
15use zeroclaw_runtime::sop::types::{SopEvent, SopTriggerSource};
16
17/// Run the MQTT SOP listener loop.
18///
19/// Subscribes to configured topics and dispatches incoming publishes
20/// to the SOP engine. Blocks until disconnected or cancelled.
21pub async fn run_mqtt_sop_listener(
22    config: &MqttConfig,
23    engine: Arc<Mutex<SopEngine>>,
24    audit: Arc<SopAuditLogger>,
25) -> Result<()> {
26    config.validate()?;
27
28    let mut mqtt_options = MqttOptions::new(
29        &config.client_id,
30        broker_host(&config.broker_url),
31        broker_port(&config.broker_url),
32    );
33    mqtt_options.set_keep_alive(std::time::Duration::from_secs(config.keep_alive_secs));
34
35    if let (Some(user), Some(pass)) = (&config.username, &config.password) {
36        mqtt_options.set_credentials(user, pass);
37    }
38
39    // Configure TLS transport when mqtts:// scheme is used
40    if config.use_tls {
41        mqtt_options.set_transport(Transport::tls_with_default_config());
42        ::zeroclaw_log::record!(
43            INFO,
44            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
45            "MQTT SOP listener: TLS transport enabled"
46        );
47    }
48
49    let (client, mut eventloop) = AsyncClient::new(mqtt_options, 64);
50
51    let qos = match config.qos {
52        0 => QoS::AtMostOnce,
53        1 => QoS::AtLeastOnce,
54        _ => QoS::ExactlyOnce,
55    };
56
57    // Subscribe to all configured topics
58    for topic in &config.topics {
59        client.subscribe(topic, qos).await?;
60        ::zeroclaw_log::record!(
61            INFO,
62            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
63                .with_attrs(::serde_json::json!({"topic": topic})),
64            "MQTT SOP listener: subscribed to ''"
65        );
66    }
67
68    zeroclaw_runtime::health::mark_component_ok("mqtt");
69
70    loop {
71        match eventloop.poll().await {
72            Ok(Event::Incoming(Packet::Publish(msg))) => {
73                let topic = msg.topic.clone();
74                let payload = String::from_utf8_lossy(&msg.payload).to_string();
75
76                let event = SopEvent {
77                    source: SopTriggerSource::Mqtt,
78                    topic: Some(topic),
79                    payload: Some(payload),
80                    timestamp: now_iso8601(),
81                };
82
83                let results = dispatch_sop_event(&engine, &audit, event).await;
84                process_headless_results(&results);
85            }
86            Ok(Event::Incoming(Packet::ConnAck(_))) => {
87                zeroclaw_runtime::health::mark_component_ok("mqtt");
88                ::zeroclaw_log::record!(
89                    INFO,
90                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
91                    "MQTT SOP listener: connected to broker"
92                );
93            }
94            Ok(_) => {
95                // Other events (PingResp, SubAck, etc.) — ignore
96            }
97            Err(e) => {
98                zeroclaw_runtime::health::mark_component_error("mqtt", e.to_string());
99                ::zeroclaw_log::record!(
100                    WARN,
101                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
102                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
103                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
104                    "MQTT SOP listener: connection error"
105                );
106                // rumqttc handles auto-reconnect; loop continues
107            }
108        }
109    }
110}
111
112/// Extract host from broker URL like "mqtt://host:port"
113fn broker_host(url: &str) -> String {
114    let without_scheme = url
115        .strip_prefix("mqtt://")
116        .or_else(|| url.strip_prefix("mqtts://"))
117        .unwrap_or(url);
118    without_scheme
119        .split(':')
120        .next()
121        .unwrap_or("localhost")
122        .to_string()
123}
124
125/// Extract port from broker URL, defaulting to 1883 for mqtt:// and 8883 for mqtts://.
126fn broker_port(url: &str) -> u16 {
127    let is_tls = url.starts_with("mqtts://");
128    let without_scheme = url
129        .strip_prefix("mqtt://")
130        .or_else(|| url.strip_prefix("mqtts://"))
131        .unwrap_or(url);
132    let default_port: u16 = if is_tls { 8883 } else { 1883 };
133    without_scheme
134        .rsplit(':')
135        .next()
136        .and_then(|p| p.parse().ok())
137        .unwrap_or(default_port)
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143
144    #[test]
145    fn mqtt_config_validation_rejects_bad_qos() {
146        let config = MqttConfig {
147            enabled: true,
148            broker_url: "mqtt://localhost:1883".into(),
149            client_id: "zeroclaw".into(),
150            topics: vec!["test".into()],
151            qos: 3,
152            username: None,
153            password: None,
154            use_tls: false,
155            keep_alive_secs: 30,
156            excluded_tools: vec![],
157        };
158        let err = config.validate().unwrap_err();
159        assert!(err.to_string().contains("qos must be 0, 1, or 2"));
160    }
161
162    #[test]
163    fn mqtt_config_validation_rejects_bad_url() {
164        let config = MqttConfig {
165            enabled: true,
166            broker_url: "http://localhost:1883".into(),
167            client_id: "zeroclaw".into(),
168            topics: vec!["test".into()],
169            qos: 1,
170            username: None,
171            password: None,
172            use_tls: false,
173            keep_alive_secs: 30,
174            excluded_tools: vec![],
175        };
176        let err = config.validate().unwrap_err();
177        assert!(err.to_string().contains("mqtt://"));
178    }
179
180    #[test]
181    fn mqtt_config_validation_rejects_empty_topics() {
182        let config = MqttConfig {
183            enabled: true,
184            broker_url: "mqtt://localhost:1883".into(),
185            client_id: "zeroclaw".into(),
186            topics: vec![],
187            qos: 1,
188            username: None,
189            password: None,
190            use_tls: false,
191            keep_alive_secs: 30,
192            excluded_tools: vec![],
193        };
194        let err = config.validate().unwrap_err();
195        assert!(err.to_string().contains("at least one topic"));
196    }
197
198    #[test]
199    fn mqtt_config_validation_rejects_empty_client_id() {
200        let config = MqttConfig {
201            enabled: true,
202            broker_url: "mqtt://localhost:1883".into(),
203            client_id: String::new(),
204            topics: vec!["test".into()],
205            qos: 1,
206            username: None,
207            password: None,
208            use_tls: false,
209            keep_alive_secs: 30,
210            excluded_tools: vec![],
211        };
212        let err = config.validate().unwrap_err();
213        assert!(err.to_string().contains("client_id must not be empty"));
214    }
215
216    #[test]
217    fn mqtt_config_validation_accepts_valid() {
218        let config = MqttConfig {
219            enabled: true,
220            broker_url: "mqtt://localhost:1883".into(),
221            client_id: "zeroclaw".into(),
222            topics: vec!["sensors/#".into()],
223            qos: 1,
224            username: None,
225            password: None,
226            use_tls: false,
227            keep_alive_secs: 30,
228            excluded_tools: vec![],
229        };
230        assert!(config.validate().is_ok());
231    }
232
233    #[test]
234    fn mqtt_tls_flag_rejects_mqtt_scheme_with_use_tls() {
235        let config = MqttConfig {
236            enabled: true,
237            broker_url: "mqtt://localhost:1883".into(),
238            client_id: "zeroclaw".into(),
239            topics: vec!["test".into()],
240            qos: 1,
241            username: None,
242            password: None,
243            use_tls: true,
244            keep_alive_secs: 30,
245            excluded_tools: vec![],
246        };
247        let err = config.validate().unwrap_err();
248        assert!(err.to_string().contains("use_tls is true"));
249    }
250
251    #[test]
252    fn mqtt_tls_flag_rejects_mqtts_scheme_without_use_tls() {
253        let config = MqttConfig {
254            enabled: true,
255            broker_url: "mqtts://localhost:8883".into(),
256            client_id: "zeroclaw".into(),
257            topics: vec!["test".into()],
258            qos: 1,
259            username: None,
260            password: None,
261            use_tls: false,
262            keep_alive_secs: 30,
263            excluded_tools: vec![],
264        };
265        let err = config.validate().unwrap_err();
266        assert!(err.to_string().contains("mqtts://"));
267    }
268
269    #[test]
270    fn mqtt_tls_flag_accepts_mqtts_with_use_tls() {
271        let config = MqttConfig {
272            enabled: true,
273            broker_url: "mqtts://localhost:8883".into(),
274            client_id: "zeroclaw".into(),
275            topics: vec!["test".into()],
276            qos: 1,
277            username: None,
278            password: None,
279            use_tls: true,
280            keep_alive_secs: 30,
281            excluded_tools: vec![],
282        };
283        assert!(config.validate().is_ok());
284    }
285
286    #[test]
287    fn broker_host_extracts_host() {
288        assert_eq!(broker_host("mqtt://myhost:1883"), "myhost");
289        assert_eq!(
290            broker_host("mqtts://secure.example.com:8883"),
291            "secure.example.com"
292        );
293    }
294
295    #[test]
296    fn broker_port_extracts_port() {
297        assert_eq!(broker_port("mqtt://localhost:1883"), 1883);
298        assert_eq!(broker_port("mqtts://host:8883"), 8883);
299    }
300
301    #[test]
302    fn broker_port_defaults_1883_for_mqtt() {
303        assert_eq!(broker_port("mqtt://localhost"), 1883);
304    }
305
306    #[test]
307    fn broker_port_defaults_8883_for_mqtts() {
308        assert_eq!(broker_port("mqtts://secure.example.com"), 8883);
309    }
310}