1use async_trait::async_trait;
2use futures_util::{SinkExt, StreamExt};
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use tokio_tungstenite::tungstenite::Message;
7use uuid::Uuid;
8use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
9
10const DINGTALK_BOT_CALLBACK_TOPIC: &str = "/v1.0/im/bot/messages/get";
11
12pub struct DingTalkChannel {
15 client_id: String,
16 client_secret: String,
17 alias: String,
20 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
23 session_webhooks: Arc<RwLock<HashMap<String, String>>>,
26 proxy_url: Option<String>,
28}
29
30#[derive(serde::Deserialize)]
32struct GatewayResponse {
33 endpoint: String,
34 ticket: String,
35}
36
37impl DingTalkChannel {
38 pub fn new(
39 client_id: String,
40 client_secret: String,
41 alias: impl Into<String>,
42 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
43 ) -> Self {
44 Self {
45 client_id,
46 client_secret,
47 alias: alias.into(),
48 peer_resolver,
49 session_webhooks: Arc::new(RwLock::new(HashMap::new())),
50 proxy_url: None,
51 }
52 }
53
54 pub fn alias(&self) -> &str {
57 &self.alias
58 }
59
60 pub fn with_proxy_url(mut self, proxy_url: Option<String>) -> Self {
62 self.proxy_url = proxy_url;
63 self
64 }
65
66 fn http_client(&self) -> reqwest::Client {
67 zeroclaw_config::schema::build_channel_proxy_client(
68 "channel.dingtalk",
69 self.proxy_url.as_deref(),
70 )
71 }
72
73 fn is_user_allowed(&self, user_id: &str) -> bool {
74 let peers = (self.peer_resolver)();
75 crate::allowlist::is_user_allowed(&peers, user_id, crate::allowlist::Match::Sensitive)
76 }
77
78 fn parse_stream_data(frame: &serde_json::Value) -> Option<serde_json::Value> {
79 match frame.get("data") {
80 Some(serde_json::Value::String(raw)) => serde_json::from_str(raw).ok(),
81 Some(serde_json::Value::Object(_)) => frame.get("data").cloned(),
82 _ => None,
83 }
84 }
85
86 fn resolve_chat_id(data: &serde_json::Value, sender_id: &str) -> String {
87 let is_private_chat = data
88 .get("conversationType")
89 .and_then(|value| {
90 value
91 .as_str()
92 .map(|v| v == "1")
93 .or_else(|| value.as_i64().map(|v| v == 1))
94 })
95 .unwrap_or(true);
96
97 if is_private_chat {
98 sender_id.to_string()
99 } else {
100 data.get("conversationId")
101 .and_then(|c| c.as_str())
102 .unwrap_or(sender_id)
103 .to_string()
104 }
105 }
106
107 async fn register_connection(&self) -> anyhow::Result<GatewayResponse> {
109 let body = serde_json::json!({
110 "clientId": self.client_id,
111 "clientSecret": self.client_secret,
112 "subscriptions": [
113 {
114 "type": "CALLBACK",
115 "topic": DINGTALK_BOT_CALLBACK_TOPIC,
116 }
117 ],
118 });
119
120 let resp = self
121 .http_client()
122 .post("https://api.dingtalk.com/v1.0/gateway/connections/open")
123 .json(&body)
124 .send()
125 .await?;
126
127 if !resp.status().is_success() {
128 let status = resp.status();
129 let err = resp.text().await.unwrap_or_default();
130 anyhow::bail!("gateway registration failed ({status}): {err}");
131 }
132
133 let gw: GatewayResponse = resp.json().await?;
134 Ok(gw)
135 }
136}
137
138impl ::zeroclaw_api::attribution::Attributable for DingTalkChannel {
139 fn role(&self) -> ::zeroclaw_api::attribution::Role {
140 ::zeroclaw_api::attribution::Role::Channel(
141 ::zeroclaw_api::attribution::ChannelKind::DingTalk,
142 )
143 }
144 fn alias(&self) -> &str {
145 &self.alias
146 }
147}
148
149#[async_trait]
150impl Channel for DingTalkChannel {
151 fn name(&self) -> &str {
152 "dingtalk"
153 }
154
155 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
156 let webhooks = self.session_webhooks.read().await;
157 let webhook_url = webhooks.get(&message.recipient).ok_or_else(|| {
158 ::zeroclaw_log::record!(
159 WARN,
160 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
161 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
162 .with_attrs(::serde_json::json!({
163 "recipient": message.recipient,
164 "reason": "no_session_webhook",
165 })),
166 "dingtalk: no session webhook for recipient"
167 );
168 anyhow::Error::msg(format!(
169 "No session webhook found for chat {}. \
170 The user must send a message first to establish a session.",
171 message.recipient
172 ))
173 })?;
174
175 let title = message.subject.as_deref().unwrap_or("ZeroClaw");
176 let body = serde_json::json!({
177 "msgtype": "markdown",
178 "markdown": {
179 "title": title,
180 "text": message.content,
181 }
182 });
183
184 let resp = self
185 .http_client()
186 .post(webhook_url)
187 .json(&body)
188 .send()
189 .await?;
190
191 if !resp.status().is_success() {
192 let status = resp.status();
193 let err = resp.text().await.unwrap_or_default();
194 anyhow::bail!("webhook reply failed ({status}): {err}");
195 }
196
197 Ok(())
198 }
199
200 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
201 ::zeroclaw_log::record!(
202 INFO,
203 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
204 "registering gateway connection..."
205 );
206
207 let gw = self.register_connection().await?;
208 let ws_url = format!("{}?ticket={}", gw.endpoint, gw.ticket);
209
210 ::zeroclaw_log::record!(
211 INFO,
212 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
213 "connecting to stream WebSocket..."
214 );
215 let (ws_stream, _) = zeroclaw_config::schema::ws_connect_with_proxy(
216 &ws_url,
217 "channel.dingtalk",
218 self.proxy_url.as_deref(),
219 )
220 .await?;
221 let (mut write, mut read) = ws_stream.split();
222
223 ::zeroclaw_log::record!(
224 INFO,
225 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
226 "connected and listening for messages..."
227 );
228
229 while let Some(msg) = read.next().await {
230 let msg = match msg {
231 Ok(Message::Text(t)) => t,
232 Ok(Message::Close(_)) => break,
233 Err(e) => {
234 ::zeroclaw_log::record!(
235 WARN,
236 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
237 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
238 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
239 "WebSocket error"
240 );
241 break;
242 }
243 _ => continue,
244 };
245
246 let frame: serde_json::Value = match serde_json::from_str(msg.as_ref()) {
247 Ok(v) => v,
248 Err(_) => continue,
249 };
250
251 let frame_type = frame.get("type").and_then(|t| t.as_str()).unwrap_or("");
252
253 match frame_type {
254 "SYSTEM" => {
255 let message_id = frame
257 .get("headers")
258 .and_then(|h| h.get("messageId"))
259 .and_then(|m| m.as_str())
260 .unwrap_or("");
261
262 let pong = serde_json::json!({
263 "code": 200,
264 "headers": {
265 "contentType": "application/json",
266 "messageId": message_id,
267 },
268 "message": "OK",
269 "data": "",
270 });
271
272 if let Err(e) = write.send(Message::Text(pong.to_string().into())).await {
273 ::zeroclaw_log::record!(
274 WARN,
275 ::zeroclaw_log::Event::new(
276 module_path!(),
277 ::zeroclaw_log::Action::Note
278 )
279 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
280 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
281 "failed to send pong"
282 );
283 break;
284 }
285 }
286 "EVENT" | "CALLBACK" => {
287 let data = match Self::parse_stream_data(&frame) {
289 Some(v) => v,
290 None => {
291 ::zeroclaw_log::record!(
292 DEBUG,
293 ::zeroclaw_log::Event::new(
294 module_path!(),
295 ::zeroclaw_log::Action::Note
296 ),
297 "frame has no parseable data payload"
298 );
299 continue;
300 }
301 };
302
303 let content = data
305 .get("text")
306 .and_then(|t| t.get("content"))
307 .and_then(|c| c.as_str())
308 .unwrap_or("")
309 .trim();
310
311 if content.is_empty() {
312 continue;
313 }
314
315 let sender_id = data
316 .get("senderStaffId")
317 .and_then(|s| s.as_str())
318 .unwrap_or("unknown");
319
320 if !self.is_user_allowed(sender_id) {
321 ::zeroclaw_log::record!(
322 WARN,
323 ::zeroclaw_log::Event::new(
324 module_path!(),
325 ::zeroclaw_log::Action::Note
326 )
327 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
328 .with_attrs(::serde_json::json!({"sender_id": sender_id})),
329 "ignoring message from unauthorized user"
330 );
331 continue;
332 }
333
334 let chat_id = Self::resolve_chat_id(&data, sender_id);
336
337 if let Some(webhook) = data.get("sessionWebhook").and_then(|w| w.as_str()) {
339 let webhook = webhook.to_string();
340 let mut webhooks = self.session_webhooks.write().await;
341 webhooks.insert(chat_id.clone(), webhook.clone());
343 webhooks.insert(sender_id.to_string(), webhook);
344 }
345
346 let message_id = frame
348 .get("headers")
349 .and_then(|h| h.get("messageId"))
350 .and_then(|m| m.as_str())
351 .unwrap_or("");
352
353 let ack = serde_json::json!({
354 "code": 200,
355 "headers": {
356 "contentType": "application/json",
357 "messageId": message_id,
358 },
359 "message": "OK",
360 "data": "",
361 });
362 let _ = write.send(Message::Text(ack.to_string().into())).await;
363
364 let channel_msg = ChannelMessage {
365 id: Uuid::new_v4().to_string(),
366 sender: sender_id.to_string(),
367 reply_target: chat_id,
368 content: content.to_string(),
369 channel: "dingtalk".to_string(),
370 channel_alias: Some(self.alias.clone()),
371 timestamp: std::time::SystemTime::now()
372 .duration_since(std::time::UNIX_EPOCH)
373 .unwrap_or_default()
374 .as_secs(),
375 thread_ts: None,
376 interruption_scope_id: None,
377 attachments: vec![],
378 subject: None,
379 };
380
381 if tx.send(channel_msg).await.is_err() {
382 ::zeroclaw_log::record!(
383 WARN,
384 ::zeroclaw_log::Event::new(
385 module_path!(),
386 ::zeroclaw_log::Action::Note
387 )
388 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
389 "message channel closed"
390 );
391 break;
392 }
393 }
394 _ => {}
395 }
396 }
397
398 anyhow::bail!("WebSocket stream ended")
399 }
400
401 async fn health_check(&self) -> bool {
402 self.register_connection().await.is_ok()
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409
410 #[test]
411 fn test_name() {
412 let ch = DingTalkChannel::new(
413 "id".into(),
414 "secret".into(),
415 "dingtalk_test_alias",
416 Arc::new(Vec::new),
417 );
418 assert_eq!(ch.name(), "dingtalk");
419 }
420
421 #[test]
422 fn test_user_allowed_wildcard() {
423 let ch = DingTalkChannel::new(
424 "id".into(),
425 "secret".into(),
426 "dingtalk_test_alias",
427 Arc::new(|| vec!["*".into()]),
428 );
429 assert!(ch.is_user_allowed("anyone"));
430 }
431
432 #[test]
433 fn test_user_allowed_specific() {
434 let ch = DingTalkChannel::new(
435 "id".into(),
436 "secret".into(),
437 "dingtalk_test_alias",
438 Arc::new(|| vec!["user123".into()]),
439 );
440 assert!(ch.is_user_allowed("user123"));
441 assert!(!ch.is_user_allowed("other"));
442 }
443
444 #[test]
445 fn test_user_denied_empty() {
446 let ch = DingTalkChannel::new(
447 "id".into(),
448 "secret".into(),
449 "dingtalk_test_alias",
450 Arc::new(Vec::new),
451 );
452 assert!(!ch.is_user_allowed("anyone"));
453 }
454
455 #[test]
456 fn v2_allowed_users_fold_into_peer_groups() {
457 let v2_toml = r#"
462schema_version = 2
463
464[channels.dingtalk]
465enabled = true
466client_id = "app_id_123"
467client_secret = "secret_456"
468allowed_users = ["user1", "*"]
469"#;
470 let cfg = zeroclaw_config::migration::migrate_to_current(v2_toml)
471 .expect("V2 dingtalk config migrates to V3");
472 let dingtalk = cfg
473 .channels
474 .dingtalk
475 .get("default")
476 .expect("V2 dingtalk folds under alias `default`");
477 assert_eq!(dingtalk.client_id, "app_id_123");
478 assert_eq!(dingtalk.client_secret, "secret_456");
479
480 let group = cfg
481 .peer_groups
482 .get("dingtalk_default")
483 .expect("dingtalk allow-list synthesizes [peer_groups.dingtalk_default]");
484 assert_eq!(group.channel, "dingtalk");
485 let peers: Vec<&str> = group.external_peers.iter().map(|p| p.as_str()).collect();
486 assert_eq!(peers, vec!["user1"]);
487 }
488
489 #[test]
490 fn v2_no_allowed_users_synthesizes_no_peer_group() {
491 let v2_toml = r#"
494schema_version = 2
495
496[channels.dingtalk]
497enabled = true
498client_id = "id"
499client_secret = "secret"
500"#;
501 let cfg = zeroclaw_config::migration::migrate_to_current(v2_toml)
502 .expect("V2 dingtalk config without allowed_users migrates");
503 assert!(
504 !cfg.peer_groups.contains_key("dingtalk_default"),
505 "no peer group synthesized when allowed_users is absent"
506 );
507 }
508
509 #[test]
510 fn parse_stream_data_supports_string_payload() {
511 let frame = serde_json::json!({
512 "data": "{\"text\":{\"content\":\"hello\"}}"
513 });
514 let parsed = DingTalkChannel::parse_stream_data(&frame).unwrap();
515 assert_eq!(
516 parsed.get("text").and_then(|v| v.get("content")),
517 Some(&serde_json::json!("hello"))
518 );
519 }
520
521 #[test]
522 fn parse_stream_data_supports_object_payload() {
523 let frame = serde_json::json!({
524 "data": {"text": {"content": "hello"}}
525 });
526 let parsed = DingTalkChannel::parse_stream_data(&frame).unwrap();
527 assert_eq!(
528 parsed.get("text").and_then(|v| v.get("content")),
529 Some(&serde_json::json!("hello"))
530 );
531 }
532
533 #[test]
534 fn resolve_chat_id_handles_numeric_group_conversation_type() {
535 let data = serde_json::json!({
536 "conversationType": 2,
537 "conversationId": "cid-group",
538 });
539 let chat_id = DingTalkChannel::resolve_chat_id(&data, "staff-1");
540 assert_eq!(chat_id, "cid-group");
541 }
542}