1use async_trait::async_trait;
2use serde_json::json;
3use std::collections::HashSet;
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use uuid::Uuid;
7use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
8
9const DEDUP_CAPACITY: usize = 10_000;
11
12pub struct MochatChannel {
17 api_url: String,
18 api_token: String,
19 alias: String,
22 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
25 poll_interval_secs: u64,
26 dedup: Arc<RwLock<HashSet<String>>>,
28}
29
30impl MochatChannel {
31 pub fn new(
32 api_url: String,
33 api_token: String,
34 alias: impl Into<String>,
35 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
36 poll_interval_secs: u64,
37 ) -> Self {
38 Self {
39 api_url: api_url.trim_end_matches('/').to_string(),
40 api_token,
41 alias: alias.into(),
42 peer_resolver,
43 poll_interval_secs,
44 dedup: Arc::new(RwLock::new(HashSet::new())),
45 }
46 }
47
48 pub fn alias(&self) -> &str {
51 &self.alias
52 }
53
54 fn http_client(&self) -> reqwest::Client {
55 zeroclaw_config::schema::build_runtime_proxy_client("channel.mochat")
56 }
57
58 fn is_user_allowed(&self, user_id: &str) -> bool {
59 let peers = (self.peer_resolver)();
60 crate::allowlist::is_user_allowed(&peers, user_id, crate::allowlist::Match::Sensitive)
61 }
62
63 async fn is_duplicate(&self, msg_id: &str) -> bool {
65 if msg_id.is_empty() {
66 return false;
67 }
68
69 let mut dedup = self.dedup.write().await;
70
71 if dedup.contains(msg_id) {
72 return true;
73 }
74
75 if dedup.len() >= DEDUP_CAPACITY {
76 let to_remove: Vec<String> = dedup.iter().take(DEDUP_CAPACITY / 2).cloned().collect();
77 for key in to_remove {
78 dedup.remove(&key);
79 }
80 }
81
82 dedup.insert(msg_id.to_string());
83 false
84 }
85}
86
87impl ::zeroclaw_api::attribution::Attributable for MochatChannel {
88 fn role(&self) -> ::zeroclaw_api::attribution::Role {
89 ::zeroclaw_api::attribution::Role::Channel(::zeroclaw_api::attribution::ChannelKind::MoChat)
90 }
91 fn alias(&self) -> &str {
92 &self.alias
93 }
94}
95
96#[async_trait]
97impl Channel for MochatChannel {
98 fn name(&self) -> &str {
99 "mochat"
100 }
101
102 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
103 let body = json!({
104 "toUserId": message.recipient,
105 "msgType": "text",
106 "content": {
107 "text": message.content,
108 }
109 });
110
111 let resp = self
112 .http_client()
113 .post(format!("{}/api/message/send", self.api_url))
114 .header("Authorization", format!("Bearer {}", self.api_token))
115 .json(&body)
116 .send()
117 .await?;
118
119 if !resp.status().is_success() {
120 let status = resp.status();
121 let err = resp.text().await.unwrap_or_default();
122 anyhow::bail!("Mochat send message failed ({status}): {err}");
123 }
124
125 let result: serde_json::Value = resp.json().await?;
126 let code = result.get("code").and_then(|v| v.as_i64()).unwrap_or(-1);
127 if code != 0 && code != 200 {
128 let msg = result
129 .get("msg")
130 .or_else(|| result.get("message"))
131 .and_then(|v| v.as_str())
132 .unwrap_or("unknown error");
133 anyhow::bail!("Mochat API error (code={code}): {msg}");
134 }
135
136 Ok(())
137 }
138
139 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
140 ::zeroclaw_log::record!(
141 INFO,
142 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
143 "starting message poller"
144 );
145
146 let poll_interval = std::time::Duration::from_secs(self.poll_interval_secs);
147 let mut last_message_id: Option<String> = None;
148
149 loop {
150 let mut url = format!("{}/api/message/receive", self.api_url);
151 if let Some(ref id) = last_message_id {
152 use std::fmt::Write;
153 let _ = write!(url, "?since_id={id}");
154 }
155
156 match self
157 .http_client()
158 .get(&url)
159 .header("Authorization", format!("Bearer {}", self.api_token))
160 .send()
161 .await
162 {
163 Ok(resp) if resp.status().is_success() => {
164 let data: serde_json::Value = match resp.json().await {
165 Ok(d) => d,
166 Err(e) => {
167 ::zeroclaw_log::record!(
168 WARN,
169 ::zeroclaw_log::Event::new(
170 module_path!(),
171 ::zeroclaw_log::Action::Note
172 )
173 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
174 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
175 "failed to parse response"
176 );
177 tokio::time::sleep(poll_interval).await;
178 continue;
179 }
180 };
181
182 let messages = data
183 .get("data")
184 .or_else(|| data.get("messages"))
185 .and_then(|d| d.as_array());
186
187 if let Some(messages) = messages {
188 for msg in messages {
189 let msg_id = msg
190 .get("messageId")
191 .or_else(|| msg.get("id"))
192 .and_then(|i| i.as_str())
193 .unwrap_or("");
194
195 if self.is_duplicate(msg_id).await {
196 continue;
197 }
198
199 let sender = msg
200 .get("fromUserId")
201 .or_else(|| msg.get("sender"))
202 .and_then(|s| s.as_str())
203 .unwrap_or("unknown");
204
205 if !self.is_user_allowed(sender) {
206 ::zeroclaw_log::record!(
207 DEBUG,
208 ::zeroclaw_log::Event::new(
209 module_path!(),
210 ::zeroclaw_log::Action::Note
211 )
212 .with_attrs(::serde_json::json!({"sender": sender})),
213 "ignoring message from unauthorized user"
214 );
215 continue;
216 }
217
218 let content = msg
219 .get("content")
220 .and_then(|c| {
221 c.get("text")
222 .and_then(|t| t.as_str())
223 .or_else(|| c.as_str())
224 })
225 .unwrap_or("")
226 .trim();
227
228 if content.is_empty() {
229 continue;
230 }
231
232 let channel_msg = ChannelMessage {
233 id: Uuid::new_v4().to_string(),
234 sender: sender.to_string(),
235 reply_target: sender.to_string(),
236 content: content.to_string(),
237 channel: "mochat".to_string(),
238 channel_alias: Some(self.alias.clone()),
239 timestamp: std::time::SystemTime::now()
240 .duration_since(std::time::UNIX_EPOCH)
241 .unwrap_or_default()
242 .as_secs(),
243 thread_ts: None,
244 interruption_scope_id: None,
245 attachments: vec![],
246 subject: None,
247 };
248
249 if tx.send(channel_msg).await.is_err() {
250 ::zeroclaw_log::record!(
251 WARN,
252 ::zeroclaw_log::Event::new(
253 module_path!(),
254 ::zeroclaw_log::Action::Note
255 )
256 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
257 "message channel closed"
258 );
259 return Ok(());
260 }
261
262 if !msg_id.is_empty() {
263 last_message_id = Some(msg_id.to_string());
264 }
265 }
266 }
267 }
268 Ok(resp) => {
269 let status = resp.status();
270 let err = resp.text().await.unwrap_or_default();
271 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"error": format!("{}", err), "status": status.to_string()})), "poll request failed");
272 }
273 Err(e) => {
274 ::zeroclaw_log::record!(
275 WARN,
276 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
277 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
278 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
279 "poll request error"
280 );
281 }
282 }
283
284 tokio::time::sleep(poll_interval).await;
285 }
286 }
287
288 async fn health_check(&self) -> bool {
289 let resp = self
290 .http_client()
291 .get(format!("{}/api/health", self.api_url))
292 .header("Authorization", format!("Bearer {}", self.api_token))
293 .send()
294 .await;
295
296 match resp {
297 Ok(r) => r.status().is_success(),
298 Err(_) => false,
299 }
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn test_name() {
309 let ch = MochatChannel::new(
310 "https://mochat.example.com".into(),
311 "tok".into(),
312 "mochat_test_alias",
313 Arc::new(Vec::new),
314 5,
315 );
316 assert_eq!(ch.name(), "mochat");
317 }
318
319 #[test]
320 fn test_api_url_trailing_slash_stripped() {
321 let ch = MochatChannel::new(
322 "https://mochat.example.com/".into(),
323 "tok".into(),
324 "mochat_test_alias",
325 Arc::new(Vec::new),
326 5,
327 );
328 assert_eq!(ch.api_url, "https://mochat.example.com");
329 }
330
331 #[test]
332 fn test_user_allowed_wildcard() {
333 let ch = MochatChannel::new(
334 "https://m.test".into(),
335 "tok".into(),
336 "mochat_test_alias",
337 Arc::new(|| vec!["*".into()]),
338 5,
339 );
340 assert!(ch.is_user_allowed("anyone"));
341 }
342
343 #[test]
344 fn test_user_allowed_specific() {
345 let ch = MochatChannel::new(
346 "https://m.test".into(),
347 "tok".into(),
348 "mochat_test_alias",
349 Arc::new(|| vec!["user123".into()]),
350 5,
351 );
352 assert!(ch.is_user_allowed("user123"));
353 assert!(!ch.is_user_allowed("other"));
354 }
355
356 #[test]
357 fn test_user_denied_empty() {
358 let ch = MochatChannel::new(
359 "https://m.test".into(),
360 "tok".into(),
361 "mochat_test_alias",
362 Arc::new(Vec::new),
363 5,
364 );
365 assert!(!ch.is_user_allowed("anyone"));
366 }
367
368 #[tokio::test]
369 async fn test_dedup() {
370 let ch = MochatChannel::new(
371 "https://m.test".into(),
372 "tok".into(),
373 "mochat_test_alias",
374 Arc::new(Vec::new),
375 5,
376 );
377 assert!(!ch.is_duplicate("msg1").await);
378 assert!(ch.is_duplicate("msg1").await);
379 assert!(!ch.is_duplicate("msg2").await);
380 }
381
382 #[tokio::test]
383 async fn test_dedup_empty_id() {
384 let ch = MochatChannel::new(
385 "https://m.test".into(),
386 "tok".into(),
387 "mochat_test_alias",
388 Arc::new(Vec::new),
389 5,
390 );
391 assert!(!ch.is_duplicate("").await);
392 assert!(!ch.is_duplicate("").await);
393 }
394
395 #[test]
396 fn v2_allowed_users_fold_into_peer_groups() {
397 let v2_toml = r#"
401schema_version = 2
402
403[channels.mochat]
404enabled = true
405api_url = "https://mochat.example.com"
406api_token = "secret"
407allowed_users = ["user1"]
408"#;
409 let cfg = zeroclaw_config::migration::migrate_to_current(v2_toml)
410 .expect("V2 mochat config migrates to V3");
411 let mochat = cfg
412 .channels
413 .mochat
414 .get("default")
415 .expect("V2 mochat folds under alias `default`");
416 assert_eq!(mochat.api_url, "https://mochat.example.com");
417 assert_eq!(mochat.api_token, "secret");
418
419 let group = cfg
420 .peer_groups
421 .get("mochat_default")
422 .expect("mochat allow-list synthesizes [peer_groups.mochat_default]");
423 assert_eq!(group.channel, "mochat");
424 let peers: Vec<&str> = group.external_peers.iter().map(|p| p.as_str()).collect();
425 assert_eq!(peers, vec!["user1"]);
426 }
427
428 #[test]
429 fn v2_no_allowed_users_synthesizes_no_peer_group() {
430 let v2_toml = r#"
433schema_version = 2
434
435[channels.mochat]
436enabled = true
437api_url = "https://mochat.example.com"
438api_token = "secret"
439"#;
440 let cfg = zeroclaw_config::migration::migrate_to_current(v2_toml)
441 .expect("V2 mochat config without allowed_users migrates");
442 assert!(
443 !cfg.peer_groups.contains_key("mochat_default"),
444 "no peer group synthesized when allowed_users is absent"
445 );
446 let mochat = cfg
447 .channels
448 .mochat
449 .get("default")
450 .expect("V2 mochat folds under alias `default`");
451 assert_eq!(mochat.poll_interval_secs, 5);
452 }
453}