Skip to main content

zeroclaw_channels/
dingtalk.rs

1use async_trait::async_trait;
2use futures_util::{SinkExt, StreamExt};
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use tokio_tungstenite::tungstenite::Message;
7use uuid::Uuid;
8use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
9
10const DINGTALK_BOT_CALLBACK_TOPIC: &str = "/v1.0/im/bot/messages/get";
11
12/// DingTalk channel — connects via Stream Mode WebSocket for real-time messages.
13/// Replies are sent through per-message session webhook URLs.
14pub struct DingTalkChannel {
15    client_id: String,
16    client_secret: String,
17    /// The alias key under `[channels.dingtalk.<alias>]` this handle is
18    /// bound to. Used to scope peer-group writes and resolver lookups.
19    alias: String,
20    /// Resolves inbound external peers from canonical state at message-time.
21    /// No cache (see AGENTS.md "ABSOLUTE RULE — SINGLE SOURCE OF TRUTH").
22    peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
23    /// Per-chat session webhooks for sending replies (chatID -> webhook URL).
24    /// DingTalk provides a unique webhook URL with each incoming message.
25    session_webhooks: Arc<RwLock<HashMap<String, String>>>,
26    /// Per-channel proxy URL override.
27    proxy_url: Option<String>,
28}
29
30/// Response from DingTalk gateway connection registration.
31#[derive(serde::Deserialize)]
32struct GatewayResponse {
33    endpoint: String,
34    ticket: String,
35}
36
37impl DingTalkChannel {
38    pub fn new(
39        client_id: String,
40        client_secret: String,
41        alias: impl Into<String>,
42        peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
43    ) -> Self {
44        Self {
45            client_id,
46            client_secret,
47            alias: alias.into(),
48            peer_resolver,
49            session_webhooks: Arc::new(RwLock::new(HashMap::new())),
50            proxy_url: None,
51        }
52    }
53
54    /// Return the alias under `[channels.dingtalk.<alias>]` that this
55    /// channel handle is bound to.
56    pub fn alias(&self) -> &str {
57        &self.alias
58    }
59
60    /// Set a per-channel proxy URL that overrides the global proxy config.
61    pub fn with_proxy_url(mut self, proxy_url: Option<String>) -> Self {
62        self.proxy_url = proxy_url;
63        self
64    }
65
66    fn http_client(&self) -> reqwest::Client {
67        zeroclaw_config::schema::build_channel_proxy_client(
68            "channel.dingtalk",
69            self.proxy_url.as_deref(),
70        )
71    }
72
73    fn is_user_allowed(&self, user_id: &str) -> bool {
74        let peers = (self.peer_resolver)();
75        crate::allowlist::is_user_allowed(&peers, user_id, crate::allowlist::Match::Sensitive)
76    }
77
78    fn parse_stream_data(frame: &serde_json::Value) -> Option<serde_json::Value> {
79        match frame.get("data") {
80            Some(serde_json::Value::String(raw)) => serde_json::from_str(raw).ok(),
81            Some(serde_json::Value::Object(_)) => frame.get("data").cloned(),
82            _ => None,
83        }
84    }
85
86    fn resolve_chat_id(data: &serde_json::Value, sender_id: &str) -> String {
87        let is_private_chat = data
88            .get("conversationType")
89            .and_then(|value| {
90                value
91                    .as_str()
92                    .map(|v| v == "1")
93                    .or_else(|| value.as_i64().map(|v| v == 1))
94            })
95            .unwrap_or(true);
96
97        if is_private_chat {
98            sender_id.to_string()
99        } else {
100            data.get("conversationId")
101                .and_then(|c| c.as_str())
102                .unwrap_or(sender_id)
103                .to_string()
104        }
105    }
106
107    /// Register a connection with DingTalk's gateway to get a WebSocket endpoint.
108    async fn register_connection(&self) -> anyhow::Result<GatewayResponse> {
109        let body = serde_json::json!({
110            "clientId": self.client_id,
111            "clientSecret": self.client_secret,
112            "subscriptions": [
113                {
114                    "type": "CALLBACK",
115                    "topic": DINGTALK_BOT_CALLBACK_TOPIC,
116                }
117            ],
118        });
119
120        let resp = self
121            .http_client()
122            .post("https://api.dingtalk.com/v1.0/gateway/connections/open")
123            .json(&body)
124            .send()
125            .await?;
126
127        if !resp.status().is_success() {
128            let status = resp.status();
129            let err = resp.text().await.unwrap_or_default();
130            anyhow::bail!("gateway registration failed ({status}): {err}");
131        }
132
133        let gw: GatewayResponse = resp.json().await?;
134        Ok(gw)
135    }
136}
137
138impl ::zeroclaw_api::attribution::Attributable for DingTalkChannel {
139    fn role(&self) -> ::zeroclaw_api::attribution::Role {
140        ::zeroclaw_api::attribution::Role::Channel(
141            ::zeroclaw_api::attribution::ChannelKind::DingTalk,
142        )
143    }
144    fn alias(&self) -> &str {
145        &self.alias
146    }
147}
148
149#[async_trait]
150impl Channel for DingTalkChannel {
151    fn name(&self) -> &str {
152        "dingtalk"
153    }
154
155    async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
156        let webhooks = self.session_webhooks.read().await;
157        let webhook_url = webhooks.get(&message.recipient).ok_or_else(|| {
158            ::zeroclaw_log::record!(
159                WARN,
160                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
161                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
162                    .with_attrs(::serde_json::json!({
163                        "recipient": message.recipient,
164                        "reason": "no_session_webhook",
165                    })),
166                "dingtalk: no session webhook for recipient"
167            );
168            anyhow::Error::msg(format!(
169                "No session webhook found for chat {}. \
170                 The user must send a message first to establish a session.",
171                message.recipient
172            ))
173        })?;
174
175        let title = message.subject.as_deref().unwrap_or("ZeroClaw");
176        let body = serde_json::json!({
177            "msgtype": "markdown",
178            "markdown": {
179                "title": title,
180                "text": message.content,
181            }
182        });
183
184        let resp = self
185            .http_client()
186            .post(webhook_url)
187            .json(&body)
188            .send()
189            .await?;
190
191        if !resp.status().is_success() {
192            let status = resp.status();
193            let err = resp.text().await.unwrap_or_default();
194            anyhow::bail!("webhook reply failed ({status}): {err}");
195        }
196
197        Ok(())
198    }
199
200    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
201        ::zeroclaw_log::record!(
202            INFO,
203            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
204            "registering gateway connection..."
205        );
206
207        let gw = self.register_connection().await?;
208        let ws_url = format!("{}?ticket={}", gw.endpoint, gw.ticket);
209
210        ::zeroclaw_log::record!(
211            INFO,
212            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
213            "connecting to stream WebSocket..."
214        );
215        let (ws_stream, _) = zeroclaw_config::schema::ws_connect_with_proxy(
216            &ws_url,
217            "channel.dingtalk",
218            self.proxy_url.as_deref(),
219        )
220        .await?;
221        let (mut write, mut read) = ws_stream.split();
222
223        ::zeroclaw_log::record!(
224            INFO,
225            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
226            "connected and listening for messages..."
227        );
228
229        while let Some(msg) = read.next().await {
230            let msg = match msg {
231                Ok(Message::Text(t)) => t,
232                Ok(Message::Close(_)) => break,
233                Err(e) => {
234                    ::zeroclaw_log::record!(
235                        WARN,
236                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
237                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
238                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
239                        "WebSocket error"
240                    );
241                    break;
242                }
243                _ => continue,
244            };
245
246            let frame: serde_json::Value = match serde_json::from_str(msg.as_ref()) {
247                Ok(v) => v,
248                Err(_) => continue,
249            };
250
251            let frame_type = frame.get("type").and_then(|t| t.as_str()).unwrap_or("");
252
253            match frame_type {
254                "SYSTEM" => {
255                    // Respond to system pings to keep the connection alive
256                    let message_id = frame
257                        .get("headers")
258                        .and_then(|h| h.get("messageId"))
259                        .and_then(|m| m.as_str())
260                        .unwrap_or("");
261
262                    let pong = serde_json::json!({
263                        "code": 200,
264                        "headers": {
265                            "contentType": "application/json",
266                            "messageId": message_id,
267                        },
268                        "message": "OK",
269                        "data": "",
270                    });
271
272                    if let Err(e) = write.send(Message::Text(pong.to_string().into())).await {
273                        ::zeroclaw_log::record!(
274                            WARN,
275                            ::zeroclaw_log::Event::new(
276                                module_path!(),
277                                ::zeroclaw_log::Action::Note
278                            )
279                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
280                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
281                            "failed to send pong"
282                        );
283                        break;
284                    }
285                }
286                "EVENT" | "CALLBACK" => {
287                    // Parse the chatbot callback data from the frame.
288                    let data = match Self::parse_stream_data(&frame) {
289                        Some(v) => v,
290                        None => {
291                            ::zeroclaw_log::record!(
292                                DEBUG,
293                                ::zeroclaw_log::Event::new(
294                                    module_path!(),
295                                    ::zeroclaw_log::Action::Note
296                                ),
297                                "frame has no parseable data payload"
298                            );
299                            continue;
300                        }
301                    };
302
303                    // Extract message content
304                    let content = data
305                        .get("text")
306                        .and_then(|t| t.get("content"))
307                        .and_then(|c| c.as_str())
308                        .unwrap_or("")
309                        .trim();
310
311                    if content.is_empty() {
312                        continue;
313                    }
314
315                    let sender_id = data
316                        .get("senderStaffId")
317                        .and_then(|s| s.as_str())
318                        .unwrap_or("unknown");
319
320                    if !self.is_user_allowed(sender_id) {
321                        ::zeroclaw_log::record!(
322                            WARN,
323                            ::zeroclaw_log::Event::new(
324                                module_path!(),
325                                ::zeroclaw_log::Action::Note
326                            )
327                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
328                            .with_attrs(::serde_json::json!({"sender_id": sender_id})),
329                            "ignoring message from unauthorized user"
330                        );
331                        continue;
332                    }
333
334                    // Private chat uses sender ID, group chat uses conversation ID.
335                    let chat_id = Self::resolve_chat_id(&data, sender_id);
336
337                    // Store session webhook for later replies
338                    if let Some(webhook) = data.get("sessionWebhook").and_then(|w| w.as_str()) {
339                        let webhook = webhook.to_string();
340                        let mut webhooks = self.session_webhooks.write().await;
341                        // Use both keys so reply routing works for both group and private flows.
342                        webhooks.insert(chat_id.clone(), webhook.clone());
343                        webhooks.insert(sender_id.to_string(), webhook);
344                    }
345
346                    // Acknowledge the event
347                    let message_id = frame
348                        .get("headers")
349                        .and_then(|h| h.get("messageId"))
350                        .and_then(|m| m.as_str())
351                        .unwrap_or("");
352
353                    let ack = serde_json::json!({
354                        "code": 200,
355                        "headers": {
356                            "contentType": "application/json",
357                            "messageId": message_id,
358                        },
359                        "message": "OK",
360                        "data": "",
361                    });
362                    let _ = write.send(Message::Text(ack.to_string().into())).await;
363
364                    let channel_msg = ChannelMessage {
365                        id: Uuid::new_v4().to_string(),
366                        sender: sender_id.to_string(),
367                        reply_target: chat_id,
368                        content: content.to_string(),
369                        channel: "dingtalk".to_string(),
370                        channel_alias: Some(self.alias.clone()),
371                        timestamp: std::time::SystemTime::now()
372                            .duration_since(std::time::UNIX_EPOCH)
373                            .unwrap_or_default()
374                            .as_secs(),
375                        thread_ts: None,
376                        interruption_scope_id: None,
377                        attachments: vec![],
378                        subject: None,
379                    };
380
381                    if tx.send(channel_msg).await.is_err() {
382                        ::zeroclaw_log::record!(
383                            WARN,
384                            ::zeroclaw_log::Event::new(
385                                module_path!(),
386                                ::zeroclaw_log::Action::Note
387                            )
388                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
389                            "message channel closed"
390                        );
391                        break;
392                    }
393                }
394                _ => {}
395            }
396        }
397
398        anyhow::bail!("WebSocket stream ended")
399    }
400
401    async fn health_check(&self) -> bool {
402        self.register_connection().await.is_ok()
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    #[test]
411    fn test_name() {
412        let ch = DingTalkChannel::new(
413            "id".into(),
414            "secret".into(),
415            "dingtalk_test_alias",
416            Arc::new(Vec::new),
417        );
418        assert_eq!(ch.name(), "dingtalk");
419    }
420
421    #[test]
422    fn test_user_allowed_wildcard() {
423        let ch = DingTalkChannel::new(
424            "id".into(),
425            "secret".into(),
426            "dingtalk_test_alias",
427            Arc::new(|| vec!["*".into()]),
428        );
429        assert!(ch.is_user_allowed("anyone"));
430    }
431
432    #[test]
433    fn test_user_allowed_specific() {
434        let ch = DingTalkChannel::new(
435            "id".into(),
436            "secret".into(),
437            "dingtalk_test_alias",
438            Arc::new(|| vec!["user123".into()]),
439        );
440        assert!(ch.is_user_allowed("user123"));
441        assert!(!ch.is_user_allowed("other"));
442    }
443
444    #[test]
445    fn test_user_denied_empty() {
446        let ch = DingTalkChannel::new(
447            "id".into(),
448            "secret".into(),
449            "dingtalk_test_alias",
450            Arc::new(Vec::new),
451        );
452        assert!(!ch.is_user_allowed("anyone"));
453    }
454
455    #[test]
456    fn v2_allowed_users_fold_into_peer_groups() {
457        // V2 `[channels.dingtalk].allowed_users` migrates into a synthesized
458        // `[peer_groups.dingtalk_default]` block in V3. The wildcard sentinel
459        // is filtered out during synthesis so only concrete usernames survive
460        // as external peers.
461        let v2_toml = r#"
462schema_version = 2
463
464[channels.dingtalk]
465enabled = true
466client_id = "app_id_123"
467client_secret = "secret_456"
468allowed_users = ["user1", "*"]
469"#;
470        let cfg = zeroclaw_config::migration::migrate_to_current(v2_toml)
471            .expect("V2 dingtalk config migrates to V3");
472        let dingtalk = cfg
473            .channels
474            .dingtalk
475            .get("default")
476            .expect("V2 dingtalk folds under alias `default`");
477        assert_eq!(dingtalk.client_id, "app_id_123");
478        assert_eq!(dingtalk.client_secret, "secret_456");
479
480        let group = cfg
481            .peer_groups
482            .get("dingtalk_default")
483            .expect("dingtalk allow-list synthesizes [peer_groups.dingtalk_default]");
484        assert_eq!(group.channel, "dingtalk");
485        let peers: Vec<&str> = group.external_peers.iter().map(|p| p.as_str()).collect();
486        assert_eq!(peers, vec!["user1"]);
487    }
488
489    #[test]
490    fn v2_no_allowed_users_synthesizes_no_peer_group() {
491        // V2 dingtalk without `allowed_users` must not synthesize a peer group;
492        // V3 leaves `peer_groups` empty rather than emitting an empty block.
493        let v2_toml = r#"
494schema_version = 2
495
496[channels.dingtalk]
497enabled = true
498client_id = "id"
499client_secret = "secret"
500"#;
501        let cfg = zeroclaw_config::migration::migrate_to_current(v2_toml)
502            .expect("V2 dingtalk config without allowed_users migrates");
503        assert!(
504            !cfg.peer_groups.contains_key("dingtalk_default"),
505            "no peer group synthesized when allowed_users is absent"
506        );
507    }
508
509    #[test]
510    fn parse_stream_data_supports_string_payload() {
511        let frame = serde_json::json!({
512            "data": "{\"text\":{\"content\":\"hello\"}}"
513        });
514        let parsed = DingTalkChannel::parse_stream_data(&frame).unwrap();
515        assert_eq!(
516            parsed.get("text").and_then(|v| v.get("content")),
517            Some(&serde_json::json!("hello"))
518        );
519    }
520
521    #[test]
522    fn parse_stream_data_supports_object_payload() {
523        let frame = serde_json::json!({
524            "data": {"text": {"content": "hello"}}
525        });
526        let parsed = DingTalkChannel::parse_stream_data(&frame).unwrap();
527        assert_eq!(
528            parsed.get("text").and_then(|v| v.get("content")),
529            Some(&serde_json::json!("hello"))
530        );
531    }
532
533    #[test]
534    fn resolve_chat_id_handles_numeric_group_conversation_type() {
535        let data = serde_json::json!({
536            "conversationType": 2,
537            "conversationId": "cid-group",
538        });
539        let chat_id = DingTalkChannel::resolve_chat_id(&data, "staff-1");
540        assert_eq!(chat_id, "cid-group");
541    }
542}