Skip to main content

zeroclaw_channels/
mochat.rs

1use async_trait::async_trait;
2use serde_json::json;
3use std::collections::HashSet;
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use uuid::Uuid;
7use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
8
9/// Deduplication set capacity — evict half of entries when full.
10const DEDUP_CAPACITY: usize = 10_000;
11
12/// Mochat customer service channel.
13///
14/// Integrates with the Mochat open-source customer service platform API
15/// for receiving and sending messages through its HTTP endpoints.
16pub struct MochatChannel {
17    api_url: String,
18    api_token: String,
19    /// The alias key under `[channels.mochat.<alias>]` this handle is
20    /// bound to. Used to scope peer-group writes and resolver lookups.
21    alias: String,
22    /// Resolves inbound external peers from canonical state at message-time.
23    /// No cache (see AGENTS.md "ABSOLUTE RULE — SINGLE SOURCE OF TRUTH").
24    peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
25    poll_interval_secs: u64,
26    /// Message deduplication set.
27    dedup: Arc<RwLock<HashSet<String>>>,
28}
29
30impl MochatChannel {
31    pub fn new(
32        api_url: String,
33        api_token: String,
34        alias: impl Into<String>,
35        peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
36        poll_interval_secs: u64,
37    ) -> Self {
38        Self {
39            api_url: api_url.trim_end_matches('/').to_string(),
40            api_token,
41            alias: alias.into(),
42            peer_resolver,
43            poll_interval_secs,
44            dedup: Arc::new(RwLock::new(HashSet::new())),
45        }
46    }
47
48    /// Return the alias under `[channels.mochat.<alias>]` that this
49    /// channel handle is bound to.
50    pub fn alias(&self) -> &str {
51        &self.alias
52    }
53
54    fn http_client(&self) -> reqwest::Client {
55        zeroclaw_config::schema::build_runtime_proxy_client("channel.mochat")
56    }
57
58    fn is_user_allowed(&self, user_id: &str) -> bool {
59        let peers = (self.peer_resolver)();
60        crate::allowlist::is_user_allowed(&peers, user_id, crate::allowlist::Match::Sensitive)
61    }
62
63    /// Check and insert message ID for deduplication.
64    async fn is_duplicate(&self, msg_id: &str) -> bool {
65        if msg_id.is_empty() {
66            return false;
67        }
68
69        let mut dedup = self.dedup.write().await;
70
71        if dedup.contains(msg_id) {
72            return true;
73        }
74
75        if dedup.len() >= DEDUP_CAPACITY {
76            let to_remove: Vec<String> = dedup.iter().take(DEDUP_CAPACITY / 2).cloned().collect();
77            for key in to_remove {
78                dedup.remove(&key);
79            }
80        }
81
82        dedup.insert(msg_id.to_string());
83        false
84    }
85}
86
87impl ::zeroclaw_api::attribution::Attributable for MochatChannel {
88    fn role(&self) -> ::zeroclaw_api::attribution::Role {
89        ::zeroclaw_api::attribution::Role::Channel(::zeroclaw_api::attribution::ChannelKind::MoChat)
90    }
91    fn alias(&self) -> &str {
92        &self.alias
93    }
94}
95
96#[async_trait]
97impl Channel for MochatChannel {
98    fn name(&self) -> &str {
99        "mochat"
100    }
101
102    async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
103        let body = json!({
104            "toUserId": message.recipient,
105            "msgType": "text",
106            "content": {
107                "text": message.content,
108            }
109        });
110
111        let resp = self
112            .http_client()
113            .post(format!("{}/api/message/send", self.api_url))
114            .header("Authorization", format!("Bearer {}", self.api_token))
115            .json(&body)
116            .send()
117            .await?;
118
119        if !resp.status().is_success() {
120            let status = resp.status();
121            let err = resp.text().await.unwrap_or_default();
122            anyhow::bail!("Mochat send message failed ({status}): {err}");
123        }
124
125        let result: serde_json::Value = resp.json().await?;
126        let code = result.get("code").and_then(|v| v.as_i64()).unwrap_or(-1);
127        if code != 0 && code != 200 {
128            let msg = result
129                .get("msg")
130                .or_else(|| result.get("message"))
131                .and_then(|v| v.as_str())
132                .unwrap_or("unknown error");
133            anyhow::bail!("Mochat API error (code={code}): {msg}");
134        }
135
136        Ok(())
137    }
138
139    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
140        ::zeroclaw_log::record!(
141            INFO,
142            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
143            "starting message poller"
144        );
145
146        let poll_interval = std::time::Duration::from_secs(self.poll_interval_secs);
147        let mut last_message_id: Option<String> = None;
148
149        loop {
150            let mut url = format!("{}/api/message/receive", self.api_url);
151            if let Some(ref id) = last_message_id {
152                use std::fmt::Write;
153                let _ = write!(url, "?since_id={id}");
154            }
155
156            match self
157                .http_client()
158                .get(&url)
159                .header("Authorization", format!("Bearer {}", self.api_token))
160                .send()
161                .await
162            {
163                Ok(resp) if resp.status().is_success() => {
164                    let data: serde_json::Value = match resp.json().await {
165                        Ok(d) => d,
166                        Err(e) => {
167                            ::zeroclaw_log::record!(
168                                WARN,
169                                ::zeroclaw_log::Event::new(
170                                    module_path!(),
171                                    ::zeroclaw_log::Action::Note
172                                )
173                                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
174                                .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
175                                "failed to parse response"
176                            );
177                            tokio::time::sleep(poll_interval).await;
178                            continue;
179                        }
180                    };
181
182                    let messages = data
183                        .get("data")
184                        .or_else(|| data.get("messages"))
185                        .and_then(|d| d.as_array());
186
187                    if let Some(messages) = messages {
188                        for msg in messages {
189                            let msg_id = msg
190                                .get("messageId")
191                                .or_else(|| msg.get("id"))
192                                .and_then(|i| i.as_str())
193                                .unwrap_or("");
194
195                            if self.is_duplicate(msg_id).await {
196                                continue;
197                            }
198
199                            let sender = msg
200                                .get("fromUserId")
201                                .or_else(|| msg.get("sender"))
202                                .and_then(|s| s.as_str())
203                                .unwrap_or("unknown");
204
205                            if !self.is_user_allowed(sender) {
206                                ::zeroclaw_log::record!(
207                                    DEBUG,
208                                    ::zeroclaw_log::Event::new(
209                                        module_path!(),
210                                        ::zeroclaw_log::Action::Note
211                                    )
212                                    .with_attrs(::serde_json::json!({"sender": sender})),
213                                    "ignoring message from unauthorized user"
214                                );
215                                continue;
216                            }
217
218                            let content = msg
219                                .get("content")
220                                .and_then(|c| {
221                                    c.get("text")
222                                        .and_then(|t| t.as_str())
223                                        .or_else(|| c.as_str())
224                                })
225                                .unwrap_or("")
226                                .trim();
227
228                            if content.is_empty() {
229                                continue;
230                            }
231
232                            let channel_msg = ChannelMessage {
233                                id: Uuid::new_v4().to_string(),
234                                sender: sender.to_string(),
235                                reply_target: sender.to_string(),
236                                content: content.to_string(),
237                                channel: "mochat".to_string(),
238                                channel_alias: Some(self.alias.clone()),
239                                timestamp: std::time::SystemTime::now()
240                                    .duration_since(std::time::UNIX_EPOCH)
241                                    .unwrap_or_default()
242                                    .as_secs(),
243                                thread_ts: None,
244                                interruption_scope_id: None,
245                                attachments: vec![],
246                                subject: None,
247                            };
248
249                            if tx.send(channel_msg).await.is_err() {
250                                ::zeroclaw_log::record!(
251                                    WARN,
252                                    ::zeroclaw_log::Event::new(
253                                        module_path!(),
254                                        ::zeroclaw_log::Action::Note
255                                    )
256                                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
257                                    "message channel closed"
258                                );
259                                return Ok(());
260                            }
261
262                            if !msg_id.is_empty() {
263                                last_message_id = Some(msg_id.to_string());
264                            }
265                        }
266                    }
267                }
268                Ok(resp) => {
269                    let status = resp.status();
270                    let err = resp.text().await.unwrap_or_default();
271                    ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"error": format!("{}", err), "status": status.to_string()})), "poll request failed");
272                }
273                Err(e) => {
274                    ::zeroclaw_log::record!(
275                        WARN,
276                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
277                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
278                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
279                        "poll request error"
280                    );
281                }
282            }
283
284            tokio::time::sleep(poll_interval).await;
285        }
286    }
287
288    async fn health_check(&self) -> bool {
289        let resp = self
290            .http_client()
291            .get(format!("{}/api/health", self.api_url))
292            .header("Authorization", format!("Bearer {}", self.api_token))
293            .send()
294            .await;
295
296        match resp {
297            Ok(r) => r.status().is_success(),
298            Err(_) => false,
299        }
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    #[test]
308    fn test_name() {
309        let ch = MochatChannel::new(
310            "https://mochat.example.com".into(),
311            "tok".into(),
312            "mochat_test_alias",
313            Arc::new(Vec::new),
314            5,
315        );
316        assert_eq!(ch.name(), "mochat");
317    }
318
319    #[test]
320    fn test_api_url_trailing_slash_stripped() {
321        let ch = MochatChannel::new(
322            "https://mochat.example.com/".into(),
323            "tok".into(),
324            "mochat_test_alias",
325            Arc::new(Vec::new),
326            5,
327        );
328        assert_eq!(ch.api_url, "https://mochat.example.com");
329    }
330
331    #[test]
332    fn test_user_allowed_wildcard() {
333        let ch = MochatChannel::new(
334            "https://m.test".into(),
335            "tok".into(),
336            "mochat_test_alias",
337            Arc::new(|| vec!["*".into()]),
338            5,
339        );
340        assert!(ch.is_user_allowed("anyone"));
341    }
342
343    #[test]
344    fn test_user_allowed_specific() {
345        let ch = MochatChannel::new(
346            "https://m.test".into(),
347            "tok".into(),
348            "mochat_test_alias",
349            Arc::new(|| vec!["user123".into()]),
350            5,
351        );
352        assert!(ch.is_user_allowed("user123"));
353        assert!(!ch.is_user_allowed("other"));
354    }
355
356    #[test]
357    fn test_user_denied_empty() {
358        let ch = MochatChannel::new(
359            "https://m.test".into(),
360            "tok".into(),
361            "mochat_test_alias",
362            Arc::new(Vec::new),
363            5,
364        );
365        assert!(!ch.is_user_allowed("anyone"));
366    }
367
368    #[tokio::test]
369    async fn test_dedup() {
370        let ch = MochatChannel::new(
371            "https://m.test".into(),
372            "tok".into(),
373            "mochat_test_alias",
374            Arc::new(Vec::new),
375            5,
376        );
377        assert!(!ch.is_duplicate("msg1").await);
378        assert!(ch.is_duplicate("msg1").await);
379        assert!(!ch.is_duplicate("msg2").await);
380    }
381
382    #[tokio::test]
383    async fn test_dedup_empty_id() {
384        let ch = MochatChannel::new(
385            "https://m.test".into(),
386            "tok".into(),
387            "mochat_test_alias",
388            Arc::new(Vec::new),
389            5,
390        );
391        assert!(!ch.is_duplicate("").await);
392        assert!(!ch.is_duplicate("").await);
393    }
394
395    #[test]
396    fn v2_allowed_users_fold_into_peer_groups() {
397        // V2 `[channels.mochat].allowed_users` migrates into a synthesized
398        // `[peer_groups.mochat_default]` block in V3, while the channel block
399        // itself survives under the bridge alias `default`.
400        let v2_toml = r#"
401schema_version = 2
402
403[channels.mochat]
404enabled = true
405api_url = "https://mochat.example.com"
406api_token = "secret"
407allowed_users = ["user1"]
408"#;
409        let cfg = zeroclaw_config::migration::migrate_to_current(v2_toml)
410            .expect("V2 mochat config migrates to V3");
411        let mochat = cfg
412            .channels
413            .mochat
414            .get("default")
415            .expect("V2 mochat folds under alias `default`");
416        assert_eq!(mochat.api_url, "https://mochat.example.com");
417        assert_eq!(mochat.api_token, "secret");
418
419        let group = cfg
420            .peer_groups
421            .get("mochat_default")
422            .expect("mochat allow-list synthesizes [peer_groups.mochat_default]");
423        assert_eq!(group.channel, "mochat");
424        let peers: Vec<&str> = group.external_peers.iter().map(|p| p.as_str()).collect();
425        assert_eq!(peers, vec!["user1"]);
426    }
427
428    #[test]
429    fn v2_no_allowed_users_synthesizes_no_peer_group() {
430        // V2 mochat without `allowed_users` migrates without synthesizing a
431        // peer group; `poll_interval_secs` default survives untouched.
432        let v2_toml = r#"
433schema_version = 2
434
435[channels.mochat]
436enabled = true
437api_url = "https://mochat.example.com"
438api_token = "secret"
439"#;
440        let cfg = zeroclaw_config::migration::migrate_to_current(v2_toml)
441            .expect("V2 mochat config without allowed_users migrates");
442        assert!(
443            !cfg.peer_groups.contains_key("mochat_default"),
444            "no peer group synthesized when allowed_users is absent"
445        );
446        let mochat = cfg
447            .channels
448            .mochat
449            .get("default")
450            .expect("V2 mochat folds under alias `default`");
451        assert_eq!(mochat.poll_interval_secs, 5);
452    }
453}