Skip to main content

zeroclaw_channels/
wecom_ws.rs

1use aes::Aes256;
2use anyhow::{Context, Result};
3use async_trait::async_trait;
4use base64::Engine as _;
5use cbc::cipher::{BlockDecryptMut, KeyIvInit, block_padding::NoPadding};
6use futures_util::{SinkExt, StreamExt};
7use parking_lot::Mutex;
8use rand::RngExt;
9use serde_json::Value;
10use std::borrow::Cow;
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
15use tokio::sync::mpsc;
16use tokio_tungstenite::tungstenite::Message as WsMessage;
17use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
18use zeroclaw_config::schema::{StreamMode, WeComWsConfig};
19use zeroclaw_runtime::i18n;
20
21// ── Constants ────────────────────────────────────────────────────────
22
23const WECOM_WS_URL: &str = "wss://openws.work.weixin.qq.com";
24const WECOM_BACKOFF_INITIAL_SECS: u64 = 5;
25const WECOM_BACKOFF_MAX_SECS: u64 = 60;
26const WECOM_PING_INTERVAL_SECS: u64 = 30;
27const WECOM_SUBSCRIBE_TIMEOUT_SECS: u64 = 10;
28const WECOM_COMMAND_TIMEOUT_SECS: u64 = 10;
29const WECOM_HTTP_TIMEOUT_SECS: u64 = 60;
30const WECOM_CONNECT_TIMEOUT_SECS: u64 = 15;
31const WECOM_WS_READY_WAIT_SECS: u64 = 10;
32const WECOM_WS_READY_POLL_MILLIS: u64 = 100;
33const WECOM_STREAM_CONFLICT_MAX_RETRIES: usize = 3;
34const WECOM_STREAM_CONFLICT_RETRY_BASE_MILLIS: u64 = 150;
35const WECOM_IDEMPOTENCY_MAX_KEYS: usize = 4096;
36const WECOM_PROVIDER_TRAILING_SENTINELS: &[&str] = &["<|eom|>"];
37
38const WECOM_MARKDOWN_MAX_BYTES: usize = 20_480;
39const WECOM_MARKDOWN_CHUNK_BYTES: usize = 8_000;
40const WECOM_EMOJIS: &[&str] = &[
41    "\u{1F642}",
42    "\u{1F604}",
43    "\u{1F91D}",
44    "\u{1F680}",
45    "\u{1F44C}",
46];
47const WECOM_FILE_CLEANUP_INTERVAL_SECS: u64 = 1800;
48macro_rules! wecom_log_debug {
49    ($($arg:tt)*) => {
50        ::zeroclaw_log::record!(
51            DEBUG,
52            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
53            format!($($arg)*),
54        )
55    };
56}
57
58fn wecom_ws_cli_string(key: &str) -> String {
59    i18n::get_required_cli_string(key)
60}
61
62fn wecom_ws_cli_string_with_args(key: &str, args: &[(&str, &str)]) -> String {
63    i18n::get_required_cli_string_with_args(key, args)
64}
65
66macro_rules! wecom_log_info {
67    ($($arg:tt)*) => {
68        ::zeroclaw_log::record!(
69            INFO,
70            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
71            format!($($arg)*),
72        )
73    };
74}
75
76macro_rules! wecom_log_warn {
77    ($($arg:tt)*) => {
78        ::zeroclaw_log::record!(
79            WARN,
80            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
81                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
82            format!($($arg)*),
83        )
84    };
85}
86
87macro_rules! wecom_log_error {
88    ($($arg:tt)*) => {
89        ::zeroclaw_log::record!(
90            ERROR,
91            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
92                .with_outcome(::zeroclaw_log::EventOutcome::Failure),
93            format!($($arg)*),
94        )
95    };
96}
97
98// ── WebSocket outbound command ───────────────────────────────────────
99
100enum WsOutbound {
101    Frame(Value),
102}
103
104// ── Internal types ───────────────────────────────────────────────────
105
106#[derive(Debug, Clone)]
107struct ParsedInbound {
108    msg_id: String,
109    msg_type: String,
110    chat_type: String,
111    chat_id: Option<String>,
112    sender_userid: String,
113    aibot_id: String,
114    raw_payload: Value,
115}
116
117#[derive(Debug, Clone)]
118struct ScopeDecision {
119    conversation_scope: String,
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123enum AccessDecision {
124    Allowed,
125    AllowlistMissing,
126    Denied,
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130enum AttachmentKind {
131    Image,
132    File,
133}
134
135impl AttachmentKind {
136    fn as_str(self) -> &'static str {
137        match self {
138            Self::Image => "image",
139            Self::File => "file",
140        }
141    }
142}
143
144#[derive(Debug)]
145enum NormalizedMessage {
146    Ready(String),
147    VoiceMissingTranscript,
148    Unsupported,
149}
150
151struct SimpleIdempotencyStore {
152    seen: Mutex<HashSet<String>>,
153    order: Mutex<VecDeque<String>>,
154}
155
156impl SimpleIdempotencyStore {
157    fn new() -> Self {
158        Self {
159            seen: Mutex::new(HashSet::new()),
160            order: Mutex::new(VecDeque::new()),
161        }
162    }
163    fn record_if_new(&self, key: &str) -> bool {
164        let mut seen = self.seen.lock();
165        if !seen.insert(key.to_string()) {
166            return false;
167        }
168
169        let mut order = self.order.lock();
170        order.push_back(key.to_string());
171        while order.len() > WECOM_IDEMPOTENCY_MAX_KEYS {
172            if let Some(old_key) = order.pop_front() {
173                seen.remove(&old_key);
174            }
175        }
176        true
177    }
178}
179
180#[derive(Clone)]
181struct WeComRuntimeConfig {
182    workspace_dir: PathBuf,
183    allowed_groups: Vec<String>,
184    bot_name: Option<String>,
185    file_retention_days: u32,
186    max_file_size_bytes: u64,
187    stream_mode: StreamMode,
188    proxy_url: Option<String>,
189}
190
191// ── MediaDecryptor (per-attachment AES key) ──────────────────────────
192
193struct MediaDecryptor;
194
195impl MediaDecryptor {
196    /// Decrypt WeCom media attachment using per-message AES key.
197    /// AES-256-CBC, IV = first 16 bytes of key, WeCom-style PKCS padding.
198    fn decrypt(aeskey_b64: &str, encrypted: &[u8]) -> Result<Vec<u8>> {
199        let raw_key = base64::engine::general_purpose::STANDARD
200            .decode(aeskey_b64.trim())
201            .or_else(|_| base64::engine::general_purpose::STANDARD_NO_PAD.decode(aeskey_b64.trim()))
202            .or_else(|_| base64::engine::general_purpose::URL_SAFE.decode(aeskey_b64.trim()))
203            .context("failed to decode WeCom media aeskey")?;
204
205        if raw_key.len() < 32 {
206            anyhow::bail!(
207                "WeCom media aeskey too short: expected >= 32 bytes, got {}",
208                raw_key.len()
209            );
210        }
211
212        let key = &raw_key[..32];
213        let iv = &key[..16];
214
215        let mut buf = encrypted.to_vec();
216        let plaintext = cbc::Decryptor::<Aes256>::new(key.into(), iv.into())
217            .decrypt_padded_mut::<NoPadding>(&mut buf)
218            .map_err(|_| anyhow::Error::msg("failed to decrypt WeCom media attachment"))?;
219        Ok(strip_wecom_padding(plaintext)?.to_vec())
220    }
221}
222
223// ── WeComWsChannel struct ────────────────────────────────────────────
224
225/// WeCom (企业微信) channel — WebSocket long-connection mode.
226///
227/// Connects to `wss://openws.work.weixin.qq.com`, subscribes with bot_id + secret.
228/// Inbound messages arrive as plaintext JSON frames (no encryption).
229/// Outbound replies are pushed directly via WS frames (streaming supported).
230/// Media attachments are encrypted per-URL with individual AES keys.
231#[derive(Clone)]
232pub struct WeComWsChannel {
233    bot_id: String,
234    secret: String,
235    alias: String,
236    peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
237    cfg: WeComRuntimeConfig,
238    client: reqwest::Client,
239    ws_tx: Arc<tokio::sync::Mutex<Option<mpsc::Sender<WsOutbound>>>>,
240    pending_responses:
241        Arc<tokio::sync::Mutex<HashMap<String, tokio::sync::oneshot::Sender<Result<()>>>>>,
242    respond_msg_locks: Arc<tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
243    last_cleanup: Arc<Mutex<Instant>>,
244    idempotency: Arc<SimpleIdempotencyStore>,
245    req_id_map: Arc<Mutex<HashMap<String, String>>>, // stream_id → req_id
246}
247
248// ── Construction + WS helpers ────────────────────────────────────────
249
250impl WeComWsChannel {
251    pub fn new(config: &WeComWsConfig, workspace_dir: &Path) -> Result<Self> {
252        let allowed_users = normalize_wecom_allowlist(config.allowed_users.clone());
253        Self::new_with_alias(
254            config,
255            "default",
256            Arc::new(move || allowed_users.clone()),
257            workspace_dir,
258        )
259    }
260
261    pub fn new_with_alias(
262        config: &WeComWsConfig,
263        alias: impl Into<String>,
264        peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
265        workspace_dir: &Path,
266    ) -> Result<Self> {
267        if config.stream_mode == StreamMode::MultiMessage {
268            anyhow::bail!(
269                "WeCom WebSocket stream_mode=multi_message is not supported; use partial or off"
270            );
271        }
272
273        let client = zeroclaw_config::schema::build_channel_proxy_client_with_timeouts(
274            "channel.wecom_ws",
275            config.proxy_url.as_deref(),
276            WECOM_HTTP_TIMEOUT_SECS,
277            WECOM_CONNECT_TIMEOUT_SECS,
278        );
279
280        Ok(Self {
281            bot_id: config.bot_id.clone(),
282            secret: config.secret.clone(),
283            alias: alias.into(),
284            peer_resolver,
285            cfg: WeComRuntimeConfig {
286                workspace_dir: workspace_dir.to_path_buf(),
287                allowed_groups: normalize_wecom_allowlist(config.allowed_groups.clone()),
288                bot_name: normalize_optional_wecom_identity(config.bot_name.as_deref()),
289                file_retention_days: config.file_retention_days,
290                max_file_size_bytes: config.max_file_size_mb.saturating_mul(1024 * 1024),
291                stream_mode: config.stream_mode,
292                proxy_url: config.proxy_url.clone(),
293            },
294            client,
295            ws_tx: Arc::new(tokio::sync::Mutex::new(None)),
296            pending_responses: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
297            respond_msg_locks: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
298            last_cleanup: Arc::new(Mutex::new(Instant::now())),
299            idempotency: Arc::new(SimpleIdempotencyStore::new()),
300            req_id_map: Arc::new(Mutex::new(HashMap::new())),
301        })
302    }
303
304    async fn wait_for_ws_sender(&self) -> Result<mpsc::Sender<WsOutbound>> {
305        let deadline = Instant::now() + Duration::from_secs(WECOM_WS_READY_WAIT_SECS);
306
307        loop {
308            if let Some(tx) = self.ws_tx.lock().await.as_ref().cloned() {
309                return Ok(tx);
310            }
311
312            if Instant::now() >= deadline {
313                anyhow::bail!("WeCom WebSocket not connected");
314            }
315
316            tokio::time::sleep(Duration::from_millis(WECOM_WS_READY_POLL_MILLIS)).await;
317        }
318    }
319
320    /// Send a JSON frame through the WebSocket outbound channel.
321    async fn ws_send_frame(&self, frame: Value) -> Result<()> {
322        let tx = self.wait_for_ws_sender().await?;
323        tx.send(WsOutbound::Frame(frame))
324            .await
325            .map_err(|_| anyhow::Error::msg("WeCom WS outbound channel closed"))
326    }
327
328    async fn ws_send_frame_and_wait_for_response(
329        &self,
330        frame: Value,
331        req_id: &str,
332        command: &str,
333    ) -> Result<()> {
334        let (tx, rx) = tokio::sync::oneshot::channel();
335        self.pending_responses
336            .lock()
337            .await
338            .insert(req_id.to_string(), tx);
339
340        if let Err(err) = self.ws_send_frame(frame).await {
341            self.pending_responses.lock().await.remove(req_id);
342            return Err(err);
343        }
344
345        match tokio::time::timeout(Duration::from_secs(WECOM_COMMAND_TIMEOUT_SECS), rx).await {
346            Ok(Ok(result)) => result,
347            Ok(Err(_)) => anyhow::bail!(
348                "WeCom WS {command} response channel closed before ack (req_id={req_id})"
349            ),
350            Err(_) => {
351                self.pending_responses.lock().await.remove(req_id);
352                anyhow::bail!(
353                    "WeCom WS {command} ack timeout after {}s (req_id={req_id})",
354                    WECOM_COMMAND_TIMEOUT_SECS
355                );
356            }
357        }
358    }
359
360    async fn maybe_handle_command_response(&self, frame: &Value) -> bool {
361        let Some(req_id) = frame
362            .get("headers")
363            .and_then(|headers| headers.get("req_id"))
364            .and_then(Value::as_str)
365        else {
366            return false;
367        };
368
369        let Some(errcode) = frame.get("errcode").and_then(Value::as_i64) else {
370            return false;
371        };
372
373        let errmsg = frame
374            .get("errmsg")
375            .and_then(Value::as_str)
376            .unwrap_or("unknown");
377
378        if let Some(waiter) = self.pending_responses.lock().await.remove(req_id) {
379            let result = if errcode == 0 {
380                Ok(())
381            } else {
382                Err(anyhow::Error::msg(format!(
383                    "WeCom command failed: req_id={req_id} errcode={errcode} errmsg={errmsg}"
384                )))
385            };
386            let _ = waiter.send(result);
387            return true;
388        }
389
390        if errcode == 0 {
391            wecom_log_debug!(
392                "[wecom_ws] unsolicited command response req_id={req_id} errcode={errcode} errmsg={errmsg}"
393            );
394        } else {
395            wecom_log_warn!(
396                "[wecom_ws] command response failed without a waiter req_id={req_id} errcode={errcode} errmsg={errmsg}"
397            );
398        }
399
400        true
401    }
402
403    async fn respond_msg_lock_for_req_id(&self, req_id: &str) -> Arc<tokio::sync::Mutex<()>> {
404        self.respond_msg_locks
405            .lock()
406            .await
407            .entry(req_id.to_string())
408            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
409            .clone()
410    }
411
412    async fn cleanup_respond_msg_lock(&self, req_id: &str) {
413        self.respond_msg_locks.lock().await.remove(req_id);
414    }
415
416    async fn fail_pending_responses(&self, reason: &str) {
417        let pending = {
418            let mut guard = self.pending_responses.lock().await;
419            std::mem::take(&mut *guard)
420        };
421
422        for (req_id, waiter) in pending {
423            let _ = waiter.send(Err(anyhow::Error::msg(format!(
424                "WeCom WebSocket disconnected before response: req_id={req_id} reason={reason}"
425            ))));
426        }
427    }
428
429    fn access_decision(&self, inbound: &ParsedInbound) -> AccessDecision {
430        let allowed_users = normalize_wecom_allowlist((self.peer_resolver)());
431        evaluate_access_decision(&allowed_users, &self.cfg.allowed_groups, inbound)
432    }
433
434    fn compose_content_for_framework_with_bot_hint(
435        &self,
436        inbound: &ParsedInbound,
437        normalized: &str,
438    ) -> String {
439        compose_content_for_framework(inbound, normalized, self.cfg.bot_name.as_deref())
440    }
441
442    async fn respond_access_denied(
443        &self,
444        req_id: &str,
445        inbound: &ParsedInbound,
446        decision: AccessDecision,
447    ) {
448        let message = build_access_denied_message(inbound, decision, &self.alias);
449        let stream_id = next_stream_id();
450        if let Err(err) = self
451            .ws_queue_respond_msg(req_id, &stream_id, &message, true)
452            .await
453        {
454            wecom_log_warn!(
455                "[wecom_ws] failed to send access-denied response sender_userid={} chat_type={} chat_id={} error={err:#}",
456                inbound.sender_userid,
457                inbound.chat_type,
458                inbound.chat_id.as_deref().unwrap_or("-")
459            );
460        }
461    }
462
463    /// Send an `aibot_respond_msg` streaming frame.
464    fn build_respond_msg_frame(
465        req_id: &str,
466        stream_id: &str,
467        content: &str,
468        finish: bool,
469    ) -> Value {
470        let stream_obj = serde_json::json!({
471            "id": stream_id,
472            "finish": finish,
473            "content": normalize_stream_content(content),
474        });
475        serde_json::json!({
476            "cmd": "aibot_respond_msg",
477            "headers": { "req_id": req_id },
478            "body": {
479                "msgtype": "stream",
480                "stream": stream_obj,
481            },
482        })
483    }
484
485    async fn ws_queue_respond_msg(
486        &self,
487        req_id: &str,
488        stream_id: &str,
489        content: &str,
490        finish: bool,
491    ) -> Result<()> {
492        let frame = Self::build_respond_msg_frame(req_id, stream_id, content, finish);
493        self.ws_send_frame(frame).await
494    }
495
496    async fn ws_send_respond_msg(
497        &self,
498        req_id: &str,
499        stream_id: &str,
500        content: &str,
501        finish: bool,
502    ) -> Result<()> {
503        let frame = Self::build_respond_msg_frame(req_id, stream_id, content, finish);
504        if req_id.is_empty() {
505            return self.ws_send_frame(frame).await;
506        }
507
508        let stream_lock = self.respond_msg_lock_for_req_id(req_id).await;
509        let _guard = stream_lock.lock().await;
510        let mut attempt = 0usize;
511
512        let result = loop {
513            match self
514                .ws_send_frame_and_wait_for_response(frame.clone(), req_id, "aibot_respond_msg")
515                .await
516            {
517                Ok(()) => break Ok(()),
518                Err(err)
519                    if is_wecom_data_version_conflict_error(&err)
520                        && attempt < WECOM_STREAM_CONFLICT_MAX_RETRIES =>
521                {
522                    let retry_in_ms =
523                        WECOM_STREAM_CONFLICT_RETRY_BASE_MILLIS.saturating_mul(1u64 << attempt);
524                    attempt += 1;
525                    wecom_log_warn!(
526                        "WeCom stream reply hit data-version conflict; retrying req_id={req_id} stream_id={stream_id} attempt={attempt} retry_in_ms={retry_in_ms}"
527                    );
528                    tokio::time::sleep(Duration::from_millis(retry_in_ms)).await;
529                }
530                Err(err) => break Err(err),
531            }
532        };
533
534        if finish {
535            self.cleanup_respond_msg_lock(req_id).await;
536        }
537
538        result
539    }
540
541    // ── file cleanup ─────────────────────────────────────────────────
542
543    fn maybe_cleanup_files(&self) {
544        let now = Instant::now();
545        {
546            let mut last = self.last_cleanup.lock();
547            if now.duration_since(*last) < Duration::from_secs(WECOM_FILE_CLEANUP_INTERVAL_SECS) {
548                return;
549            }
550            *last = now;
551        }
552
553        let retention = Duration::from_secs(u64::from(self.cfg.file_retention_days) * 86_400);
554        let root = self.cfg.workspace_dir.join("wecom_ws_files");
555        tokio::spawn(async move {
556            cleanup_inbox_files(root, retention).await;
557        });
558    }
559
560    // ── WS message dispatch ──────────────────────────────────────────
561
562    /// Returns `true` if the caller should trigger reconnection.
563    async fn handle_ws_message(&self, frame: Value, tx: &mpsc::Sender<ChannelMessage>) -> bool {
564        if self.maybe_handle_command_response(&frame).await {
565            return false;
566        }
567
568        let cmd = frame.get("cmd").and_then(Value::as_str).unwrap_or("");
569
570        match cmd {
571            "aibot_msg_callback" => {
572                let channel = self.clone();
573                let tx = tx.clone();
574                tokio::spawn(async move {
575                    channel.handle_msg_callback(frame, &tx).await;
576                });
577                false
578            }
579            "aibot_event_callback" => self.handle_event_callback(frame).await,
580            _ => {
581                wecom_log_debug!("[wecom_ws] ignoring WS frame cmd={cmd}");
582                false
583            }
584        }
585    }
586
587    // ── Message callback handling ────────────────────────────────────
588
589    async fn handle_msg_callback(&self, frame: Value, tx: &mpsc::Sender<ChannelMessage>) {
590        let req_id = frame
591            .get("headers")
592            .and_then(|h| h.get("req_id"))
593            .and_then(Value::as_str)
594            .unwrap_or("")
595            .to_string();
596
597        let body = match frame.get("body") {
598            Some(b) => b.clone(),
599            None => {
600                wecom_log_warn!("[wecom_ws] msg_callback missing body");
601                return;
602            }
603        };
604
605        let parsed = match parse_inbound_payload(body) {
606            Ok(p) => p,
607            Err(err) => {
608                wecom_log_warn!("[wecom_ws] msg_callback parse failed: {err:#}");
609                return;
610            }
611        };
612
613        // Idempotency check
614        if !parsed.msg_id.is_empty() {
615            let key = format!("wecom_ws_msg_{}", parsed.msg_id);
616            if !self.idempotency.record_if_new(&key) {
617                return;
618            }
619        }
620
621        let scopes = compute_scopes(&parsed);
622
623        // Log inbound info
624        let preview = crate::util::truncate_with_ellipsis(&inbound_content_preview(&parsed), 80);
625        let msg_id_str = if parsed.msg_id.trim().is_empty() {
626            "-"
627        } else {
628            parsed.msg_id.as_str()
629        };
630        wecom_log_info!(
631            "[wecom_ws] from {} in {}: {} (msg_type={}, msg_id={}, aibot_id={})",
632            parsed.sender_userid,
633            scopes.conversation_scope,
634            preview,
635            parsed.msg_type,
636            msg_id_str,
637            parsed.aibot_id
638        );
639
640        match self.access_decision(&parsed) {
641            AccessDecision::Allowed => {}
642            AccessDecision::AllowlistMissing => {
643                wecom_log_warn!(
644                    "[wecom_ws] inbound denied because allowlist is not configured sender_userid={} chat_type={} chat_id={}",
645                    parsed.sender_userid,
646                    parsed.chat_type,
647                    parsed.chat_id.as_deref().unwrap_or("-")
648                );
649                self.respond_access_denied(&req_id, &parsed, AccessDecision::AllowlistMissing)
650                    .await;
651                return;
652            }
653            AccessDecision::Denied => {
654                wecom_log_warn!(
655                    "[wecom_ws] inbound denied by allowlist sender_userid={} chat_type={} chat_id={}",
656                    parsed.sender_userid,
657                    parsed.chat_type,
658                    parsed.chat_id.as_deref().unwrap_or("-")
659                );
660                self.respond_access_denied(&req_id, &parsed, AccessDecision::Denied)
661                    .await;
662                return;
663            }
664        }
665
666        self.maybe_cleanup_files();
667
668        // ── Command detection ────────────────────────────────────────
669
670        let stop_text = extract_stop_signal_text(&parsed).unwrap_or_default();
671
672        // Clear session
673        if is_clear_session_command(&stop_text) {
674            wecom_log_info!(
675                "WeCom session cleared: scope={} msg_id={}",
676                scopes.conversation_scope,
677                parsed.msg_id
678            );
679            let _ = tx
680                .send(ChannelMessage {
681                    channel_alias: Some(self.alias.clone()),
682                    thread_ts: Some(req_id),
683                    ..ChannelMessage::new(
684                        parsed.msg_id.clone(),
685                        parsed.sender_userid.clone(),
686                        scopes.conversation_scope.clone(),
687                        "/new",
688                        "wecom_ws",
689                        bytes_timestamp_now(),
690                    )
691                })
692                .await;
693            return;
694        }
695
696        // Stop command
697        if contains_stop_command(&stop_text) {
698            let msg = wecom_ws_cli_string("channel-wecom-ws-stop-ack");
699            let stream_id = next_stream_id();
700            let _ = self
701                .ws_queue_respond_msg(&req_id, &stream_id, &msg, true)
702                .await;
703            let _ = tx
704                .send(ChannelMessage {
705                    channel_alias: Some(self.alias.clone()),
706                    ..ChannelMessage::new(
707                        parsed.msg_id.clone(),
708                        parsed.sender_userid.clone(),
709                        scopes.conversation_scope.clone(),
710                        "/stop",
711                        "wecom_ws",
712                        bytes_timestamp_now(),
713                    )
714                })
715                .await;
716            return;
717        }
718
719        if let Some(runtime_command) = extract_runtime_model_switch_command(&stop_text) {
720            wecom_log_info!(
721                "WeCom runtime command forwarded: scope={} msg_id={} command={}",
722                scopes.conversation_scope,
723                parsed.msg_id,
724                runtime_command
725            );
726            let _ = tx
727                .send(ChannelMessage {
728                    channel_alias: Some(self.alias.clone()),
729                    thread_ts: Some(req_id),
730                    ..ChannelMessage::new(
731                        parsed.msg_id.clone(),
732                        parsed.sender_userid.clone(),
733                        scopes.conversation_scope.clone(),
734                        runtime_command,
735                        "wecom_ws",
736                        bytes_timestamp_now(),
737                    )
738                })
739                .await;
740            return;
741        }
742
743        // Voice without transcript
744        if is_voice_without_transcript(&parsed) {
745            let msg = wecom_ws_cli_string_with_args(
746                "channel-wecom-ws-voice-unavailable",
747                &[("emoji", random_emoji())],
748            );
749            let stream_id = next_stream_id();
750            let _ = self
751                .ws_queue_respond_msg(&req_id, &stream_id, &msg, true)
752                .await;
753            return;
754        }
755
756        // Unsupported message type
757        if !is_model_supported_msgtype(&parsed.msg_type) {
758            wecom_log_info!(
759                "WeCom unsupported message ignored: msg_type={} msg_id={}",
760                parsed.msg_type,
761                parsed.msg_id
762            );
763            return;
764        }
765
766        // ── Forward normal message to framework ──────────────────────
767
768        let channel_self = self.clone();
769        let tx = tx.clone();
770        tokio::spawn(async move {
771            let mut inbound = parsed;
772            channel_self
773                .materialize_quote_attachments(&mut inbound)
774                .await;
775            let normalized = channel_self.normalize_message(&inbound).await;
776
777            let content = match normalized {
778                NormalizedMessage::VoiceMissingTranscript => {
779                    let msg = wecom_ws_cli_string_with_args(
780                        "channel-wecom-ws-voice-unavailable",
781                        &[("emoji", random_emoji())],
782                    );
783                    let stream_id = next_stream_id();
784                    let _ = channel_self
785                        .ws_queue_respond_msg(&req_id, &stream_id, &msg, true)
786                        .await;
787                    return;
788                }
789                NormalizedMessage::Unsupported => {
790                    let msg = wecom_ws_cli_string("channel-wecom-ws-unsupported-message");
791                    let stream_id = next_stream_id();
792                    let _ = channel_self
793                        .ws_queue_respond_msg(&req_id, &stream_id, &msg, true)
794                        .await;
795                    return;
796                }
797                NormalizedMessage::Ready(content) => content,
798            };
799
800            let composed =
801                channel_self.compose_content_for_framework_with_bot_hint(&inbound, &content);
802
803            wecom_log_info!(
804                "WeCom: forwarding to framework: msg_id={} req_id={} scope={}",
805                inbound.msg_id,
806                req_id,
807                scopes.conversation_scope
808            );
809
810            let _ = tx
811                .send(ChannelMessage {
812                    channel_alias: Some(channel_self.alias.clone()),
813                    thread_ts: Some(req_id),
814                    ..ChannelMessage::new(
815                        inbound.msg_id.clone(),
816                        inbound.sender_userid.clone(),
817                        scopes.conversation_scope.clone(),
818                        composed,
819                        "wecom_ws",
820                        bytes_timestamp_now(),
821                    )
822                })
823                .await;
824        });
825    }
826
827    // ── Event callback handling ──────────────────────────────────────
828
829    /// Returns `true` if the caller should trigger reconnection.
830    async fn handle_event_callback(&self, frame: Value) -> bool {
831        let req_id = frame
832            .get("headers")
833            .and_then(|h| h.get("req_id"))
834            .and_then(Value::as_str)
835            .unwrap_or("")
836            .to_string();
837
838        let body = frame.get("body").cloned().unwrap_or(Value::Null);
839        let event_type = parse_event_type(&body).unwrap_or_else(|| "unknown".to_string());
840
841        match event_type.as_str() {
842            "enter_chat" => {
843                let content = wecom_ws_cli_string_with_args(
844                    "channel-wecom-ws-welcome",
845                    &[("emoji", random_emoji())],
846                );
847                let welcome = serde_json::json!({
848                    "cmd": "aibot_respond_welcome_msg",
849                    "headers": { "req_id": req_id },
850                    "body": {
851                        "msgtype": "text",
852                        "text": { "content": content }
853                    }
854                });
855                let _ = self.ws_send_frame(welcome).await;
856                false
857            }
858            "template_card_event" => {
859                let event_key =
860                    extract_template_card_event_key(&body).unwrap_or_else(|| "-".to_string());
861                wecom_log_info!("WeCom template_card_event received: event_key={event_key}");
862                false
863            }
864            "feedback_event" => {
865                let summary = extract_feedback_event_summary(&body)
866                    .unwrap_or_else(|| "feedback=invalid-payload".to_string());
867                wecom_log_info!("WeCom feedback_event received: {summary}");
868                false
869            }
870            "disconnected_event" => {
871                wecom_log_warn!("[wecom_ws] received disconnected_event, triggering reconnect");
872                true
873            }
874            other => {
875                wecom_log_debug!("[wecom_ws] ignoring event_type={other}");
876                false
877            }
878        }
879    }
880
881    // ── Attachment handling ──────────────────────────────────────────
882
883    async fn materialize_quote_attachments(&self, inbound: &mut ParsedInbound) {
884        let quote_type = inbound
885            .raw_payload
886            .get("quote")
887            .and_then(|v| v.get("msgtype"))
888            .and_then(Value::as_str)
889            .map(str::trim)
890            .unwrap_or("");
891
892        if quote_type == "image" {
893            let quote_obj = inbound
894                .raw_payload
895                .get("quote")
896                .and_then(|v| v.get("image"));
897            let quote_url = quote_obj
898                .and_then(|v| v.get("url"))
899                .and_then(Value::as_str)
900                .map(str::trim)
901                .filter(|v| !v.is_empty())
902                .map(ToOwned::to_owned);
903            let aeskey = quote_obj
904                .and_then(|v| v.get("aeskey"))
905                .and_then(Value::as_str);
906            if let Some(url) = quote_url {
907                let marker = match self
908                    .download_and_store_attachment(&url, AttachmentKind::Image, inbound, aeskey)
909                    .await
910                {
911                    Ok(value) => value,
912                    Err(err) => {
913                        log_attachment_processing_failure(
914                            "WeCom quote image processing failed",
915                            &err,
916                            inbound,
917                            AttachmentKind::Image,
918                            &url,
919                        );
920                        "[\u{5f15}\u{7528}\u{56fe}\u{7247}\u{4e0b}\u{8f7d}\u{5931}\u{8d25}]"
921                            .to_string()
922                    }
923                };
924                if let Some(quote) = inbound.raw_payload.get_mut("quote") {
925                    quote["image"] = serde_json::json!({ "local_path": marker });
926                }
927            }
928            return;
929        }
930
931        if quote_type == "file" {
932            let quote_obj = inbound.raw_payload.get("quote").and_then(|v| v.get("file"));
933            let quote_url = quote_obj
934                .and_then(|v| v.get("url"))
935                .and_then(Value::as_str)
936                .map(str::trim)
937                .filter(|v| !v.is_empty())
938                .map(ToOwned::to_owned);
939            let aeskey = quote_obj
940                .and_then(|v| v.get("aeskey"))
941                .and_then(Value::as_str);
942            if let Some(url) = quote_url {
943                let marker = match self
944                    .download_and_store_attachment(&url, AttachmentKind::File, inbound, aeskey)
945                    .await
946                {
947                    Ok(value) => value,
948                    Err(err) => {
949                        log_attachment_processing_failure(
950                            "WeCom quote file processing failed",
951                            &err,
952                            inbound,
953                            AttachmentKind::File,
954                            &url,
955                        );
956                        "[\u{5f15}\u{7528}\u{6587}\u{4ef6}\u{4e0b}\u{8f7d}\u{5931}\u{8d25}]"
957                            .to_string()
958                    }
959                };
960                if let Some(quote) = inbound.raw_payload.get_mut("quote") {
961                    quote["file"] = serde_json::json!({ "local_path": marker });
962                }
963            }
964            return;
965        }
966
967        if quote_type == "mixed" {
968            let quote_images: Vec<(usize, String, Option<String>)> = inbound
969                .raw_payload
970                .get("quote")
971                .and_then(|v| v.get("mixed"))
972                .and_then(|v| v.get("msg_item"))
973                .and_then(Value::as_array)
974                .map(|items| {
975                    items
976                        .iter()
977                        .enumerate()
978                        .filter_map(|(idx, item)| {
979                            let item_type = item
980                                .get("msgtype")
981                                .and_then(Value::as_str)
982                                .unwrap_or_default();
983                            if item_type != "image" {
984                                return None;
985                            }
986                            let img = item.get("image")?;
987                            let url = img
988                                .get("url")
989                                .and_then(Value::as_str)
990                                .map(str::trim)
991                                .filter(|v| !v.is_empty())?;
992                            let aeskey = img
993                                .get("aeskey")
994                                .and_then(Value::as_str)
995                                .map(ToOwned::to_owned);
996                            Some((idx, url.to_string(), aeskey))
997                        })
998                        .collect()
999                })
1000                .unwrap_or_default();
1001
1002            if quote_images.is_empty() {
1003                return;
1004            }
1005
1006            let mut results: Vec<(usize, String)> = Vec::with_capacity(quote_images.len());
1007            for (idx, url, aeskey) in &quote_images {
1008                let marker = match self
1009                    .download_and_store_attachment(
1010                        url,
1011                        AttachmentKind::Image,
1012                        inbound,
1013                        aeskey.as_deref(),
1014                    )
1015                    .await
1016                {
1017                    Ok(value) => value,
1018                    Err(err) => {
1019                        log_attachment_processing_failure(
1020                            "WeCom quote mixed image processing failed",
1021                            &err,
1022                            inbound,
1023                            AttachmentKind::Image,
1024                            url,
1025                        );
1026                        "[\u{5f15}\u{7528}\u{56fe}\u{7247}\u{4e0b}\u{8f7d}\u{5931}\u{8d25}]"
1027                            .to_string()
1028                    }
1029                };
1030                results.push((*idx, marker));
1031            }
1032
1033            if let Some(items) = inbound
1034                .raw_payload
1035                .get_mut("quote")
1036                .and_then(|v| v.get_mut("mixed"))
1037                .and_then(|v| v.get_mut("msg_item"))
1038                .and_then(Value::as_array_mut)
1039            {
1040                for (idx, marker) in results {
1041                    if let Some(item) = items.get_mut(idx) {
1042                        item["image"] = serde_json::json!({ "local_path": marker });
1043                    }
1044                }
1045            }
1046        }
1047    }
1048
1049    async fn normalize_message(&self, inbound: &ParsedInbound) -> NormalizedMessage {
1050        match inbound.msg_type.as_str() {
1051            "text" => {
1052                let content = inbound
1053                    .raw_payload
1054                    .get("text")
1055                    .and_then(|v| v.get("content"))
1056                    .and_then(Value::as_str)
1057                    .unwrap_or("")
1058                    .trim()
1059                    .to_string();
1060
1061                if content.is_empty() {
1062                    NormalizedMessage::Unsupported
1063                } else {
1064                    NormalizedMessage::Ready(content)
1065                }
1066            }
1067            "voice" => {
1068                let content = inbound
1069                    .raw_payload
1070                    .get("voice")
1071                    .and_then(|v| v.get("content"))
1072                    .and_then(Value::as_str)
1073                    .unwrap_or("")
1074                    .trim()
1075                    .to_string();
1076
1077                if content.is_empty() {
1078                    NormalizedMessage::VoiceMissingTranscript
1079                } else {
1080                    NormalizedMessage::Ready(format!("[Voice transcript]\n{content}"))
1081                }
1082            }
1083            "image" => {
1084                let image_obj = inbound.raw_payload.get("image");
1085                let url = image_obj
1086                    .and_then(|v| v.get("url"))
1087                    .and_then(Value::as_str)
1088                    .unwrap_or("")
1089                    .trim();
1090                let aeskey = image_obj
1091                    .and_then(|v| v.get("aeskey"))
1092                    .and_then(Value::as_str);
1093
1094                if url.is_empty() {
1095                    return NormalizedMessage::Unsupported;
1096                }
1097
1098                match self
1099                    .download_and_store_attachment(url, AttachmentKind::Image, inbound, aeskey)
1100                    .await
1101                {
1102                    Ok(marker) => NormalizedMessage::Ready(marker),
1103                    Err(err) => {
1104                        log_attachment_processing_failure(
1105                            "WeCom image processing failed",
1106                            &err,
1107                            inbound,
1108                            AttachmentKind::Image,
1109                            url,
1110                        );
1111                        NormalizedMessage::Ready(
1112                            "[Image attachment processing failed; please continue without this image.]"
1113                                .to_string(),
1114                        )
1115                    }
1116                }
1117            }
1118            "file" => {
1119                let file_obj = inbound.raw_payload.get("file");
1120                let url = file_obj
1121                    .and_then(|v| v.get("url"))
1122                    .and_then(Value::as_str)
1123                    .unwrap_or("")
1124                    .trim();
1125                let aeskey = file_obj
1126                    .and_then(|v| v.get("aeskey"))
1127                    .and_then(Value::as_str);
1128
1129                if url.is_empty() {
1130                    return NormalizedMessage::Unsupported;
1131                }
1132
1133                match self
1134                    .download_and_store_attachment(url, AttachmentKind::File, inbound, aeskey)
1135                    .await
1136                {
1137                    Ok(marker) => NormalizedMessage::Ready(marker),
1138                    Err(err) => {
1139                        log_attachment_processing_failure(
1140                            "WeCom file processing failed",
1141                            &err,
1142                            inbound,
1143                            AttachmentKind::File,
1144                            url,
1145                        );
1146                        NormalizedMessage::Ready(
1147                            "[File attachment processing failed; please continue without this file.]"
1148                                .to_string(),
1149                        )
1150                    }
1151                }
1152            }
1153            "mixed" => {
1154                let mut text_parts = Vec::new();
1155                if let Some(items) = inbound
1156                    .raw_payload
1157                    .get("mixed")
1158                    .and_then(|v| v.get("msg_item"))
1159                    .and_then(Value::as_array)
1160                {
1161                    for item in items {
1162                        let item_type = item
1163                            .get("msgtype")
1164                            .and_then(Value::as_str)
1165                            .unwrap_or_default();
1166                        if item_type == "text" {
1167                            if let Some(text) = item
1168                                .get("text")
1169                                .and_then(|v| v.get("content"))
1170                                .and_then(Value::as_str)
1171                            {
1172                                let trimmed = text.trim();
1173                                if !trimmed.is_empty() {
1174                                    text_parts.push(trimmed.to_string());
1175                                }
1176                            }
1177                        } else if item_type == "image" {
1178                            let img = item.get("image");
1179                            let url = img.and_then(|v| v.get("url")).and_then(Value::as_str);
1180                            let aeskey = img.and_then(|v| v.get("aeskey")).and_then(Value::as_str);
1181                            if let Some(url) = url {
1182                                match self
1183                                    .download_and_store_attachment(
1184                                        url,
1185                                        AttachmentKind::Image,
1186                                        inbound,
1187                                        aeskey,
1188                                    )
1189                                    .await
1190                                {
1191                                    Ok(marker) => text_parts.push(marker),
1192                                    Err(err) => {
1193                                        log_attachment_processing_failure(
1194                                            "WeCom mixed image processing failed",
1195                                            &err,
1196                                            inbound,
1197                                            AttachmentKind::Image,
1198                                            url,
1199                                        );
1200                                        text_parts.push(
1201                                            "[Image attachment processing failed in mixed message.]"
1202                                                .to_string(),
1203                                        );
1204                                    }
1205                                }
1206                            }
1207                        }
1208                    }
1209                }
1210
1211                if text_parts.is_empty() {
1212                    NormalizedMessage::Unsupported
1213                } else {
1214                    NormalizedMessage::Ready(text_parts.join("\n\n"))
1215                }
1216            }
1217            other => {
1218                wecom_log_info!(
1219                    "[wecom_ws] unsupported msg_type={other}, raw_payload={}",
1220                    inbound.raw_payload
1221                );
1222                NormalizedMessage::Unsupported
1223            }
1224        }
1225    }
1226
1227    async fn download_and_store_attachment(
1228        &self,
1229        url: &str,
1230        kind: AttachmentKind,
1231        inbound: &ParsedInbound,
1232        aeskey: Option<&str>,
1233    ) -> Result<String> {
1234        if self.cfg.max_file_size_bytes == 0 {
1235            anyhow::bail!("WeCom max_file_size_bytes is zero");
1236        }
1237
1238        let started = Instant::now();
1239        let chat_id = inbound.chat_id.as_deref().unwrap_or("single");
1240        let url_target = summarize_attachment_url_for_log(url);
1241        wecom_log_info!(
1242            "WeCom attachment download started msg_id={} msg_type={} chat_type={} chat_id={} sender_userid={} attachment_kind={} url_target={} has_aeskey={} timeout_secs={}",
1243            inbound.msg_id,
1244            inbound.msg_type,
1245            inbound.chat_type,
1246            chat_id,
1247            inbound.sender_userid,
1248            kind.as_str(),
1249            url_target,
1250            aeskey.is_some(),
1251            WECOM_HTTP_TIMEOUT_SECS
1252        );
1253
1254        let response = self
1255            .client
1256            .get(url)
1257            .send()
1258            .await
1259            .with_context(|| {
1260                format!(
1261                    "failed to download WeCom attachment: kind={} msg_id={} url_target={} elapsed_ms={}",
1262                    kind.as_str(),
1263                    inbound.msg_id,
1264                    url_target,
1265                    started.elapsed().as_millis(),
1266                )
1267            })?;
1268        let status = response.status();
1269        if !status.is_success() {
1270            let body = response.text().await.unwrap_or_default();
1271            let body_preview = truncate_for_log(&body, 512);
1272            anyhow::bail!(
1273                "WeCom attachment download failed: kind={} msg_id={} url_target={} status={} body_preview={}",
1274                kind.as_str(),
1275                inbound.msg_id,
1276                url_target,
1277                status,
1278                body_preview
1279            );
1280        }
1281
1282        if let Some(len) = response.content_length()
1283            && len > self.cfg.max_file_size_bytes
1284        {
1285            wecom_log_warn!(
1286                "WeCom attachment skipped: declared size exceeds configured limit msg_id={} attachment_kind={} declared_bytes={} max_file_size_bytes={}",
1287                inbound.msg_id,
1288                kind.as_str(),
1289                len,
1290                self.cfg.max_file_size_bytes
1291            );
1292            return Ok(format!(
1293                "[AttachmentTooLarge kind={:?} size={}B limit={}B]",
1294                kind, len, self.cfg.max_file_size_bytes
1295            ));
1296        }
1297
1298        let bytes = response
1299            .bytes()
1300            .await
1301            .with_context(|| {
1302                format!(
1303                    "failed to read WeCom attachment bytes: kind={} msg_id={} url_target={} elapsed_ms={}",
1304                    kind.as_str(),
1305                    inbound.msg_id,
1306                    url_target,
1307                    started.elapsed().as_millis(),
1308                )
1309            })?;
1310
1311        if bytes.len() as u64 > self.cfg.max_file_size_bytes {
1312            wecom_log_warn!(
1313                "WeCom attachment skipped: payload exceeds configured limit msg_id={} attachment_kind={} actual_bytes={} max_file_size_bytes={}",
1314                inbound.msg_id,
1315                kind.as_str(),
1316                bytes.len(),
1317                self.cfg.max_file_size_bytes
1318            );
1319            return Ok(format!(
1320                "[AttachmentTooLarge kind={:?} size={}B limit={}B]",
1321                kind,
1322                bytes.len(),
1323                self.cfg.max_file_size_bytes
1324            ));
1325        }
1326
1327        // Decrypt if aeskey is present; otherwise write the downloaded bytes directly.
1328        let stored_bytes: Cow<'_, [u8]> = match aeskey {
1329            Some(key) => Cow::Owned(MediaDecryptor::decrypt(key, &bytes).with_context(|| {
1330                format!(
1331                    "failed to decrypt WeCom attachment: kind={} msg_id={} url_target={} encrypted_bytes={}",
1332                    kind.as_str(),
1333                    inbound.msg_id,
1334                    url_target,
1335                    bytes.len(),
1336                )
1337            })?),
1338            None => Cow::Borrowed(bytes.as_ref()),
1339        };
1340        let stored_len = stored_bytes.len();
1341
1342        let ext = match kind {
1343            AttachmentKind::Image => image_file_extension(stored_bytes.as_ref()),
1344            AttachmentKind::File => "bin",
1345        };
1346        let safe_scope = normalize_scope_component(&format!(
1347            "{}_{}",
1348            inbound.chat_id.as_deref().unwrap_or("single"),
1349            inbound.sender_userid
1350        ));
1351        let safe_msg_id = normalize_scope_component(&inbound.msg_id);
1352        let ts = bytes_timestamp_now();
1353        let file_name = format!(
1354            "{safe_scope}_{ts}_{safe_msg_id}_{}.{}",
1355            random_ascii_token(6),
1356            ext
1357        );
1358
1359        let dir = self.cfg.workspace_dir.join("wecom_ws_files");
1360        tokio::fs::create_dir_all(&dir).await.with_context(|| {
1361            format!(
1362                "failed to create WeCom inbox directory: msg_id={} path={}",
1363                inbound.msg_id,
1364                dir.display()
1365            )
1366        })?;
1367        let path = dir.join(file_name);
1368
1369        tokio::fs::write(&path, stored_bytes.as_ref())
1370            .await
1371            .with_context(|| {
1372                format!(
1373                    "failed to persist WeCom attachment: kind={} msg_id={} path={}",
1374                    kind.as_str(),
1375                    inbound.msg_id,
1376                    path.display()
1377                )
1378            })?;
1379
1380        self.maybe_cleanup_files();
1381
1382        let abs = path.canonicalize().unwrap_or(path);
1383        wecom_log_info!(
1384            "WeCom attachment download completed msg_id={} attachment_kind={} url_target={} encrypted_bytes={} stored_bytes={} local_path={} elapsed_ms={}",
1385            inbound.msg_id,
1386            kind.as_str(),
1387            url_target,
1388            bytes.len(),
1389            stored_len,
1390            abs.display(),
1391            started.elapsed().as_millis()
1392        );
1393        match kind {
1394            AttachmentKind::Image => Ok(format!("[IMAGE:{}]", abs.display())),
1395            AttachmentKind::File => Ok(format!("[Document: {}]", abs.display())),
1396        }
1397    }
1398
1399    async fn send_markdown_chunks_to_scope(&self, scope: &str, content: &str) -> Result<()> {
1400        let (chat_type, chatid) = parse_scope(scope)?;
1401        let chunks = split_markdown_chunks(content);
1402
1403        wecom_log_info!(
1404            "WeCom: sending message to scope={}, len={}, chunks={}",
1405            scope,
1406            content.len(),
1407            chunks.len()
1408        );
1409
1410        let total_chunks = chunks.len();
1411        for (idx, chunk) in chunks.into_iter().enumerate() {
1412            let req_id = random_ascii_token(16);
1413            let chunk_len = chunk.len();
1414            let frame = serde_json::json!({
1415                "cmd": "aibot_send_msg",
1416                "headers": { "req_id": req_id },
1417                "body": {
1418                    "chatid": chatid,
1419                    "chat_type": chat_type,
1420                    "msgtype": "markdown",
1421                    "markdown": { "content": chunk }
1422                }
1423            });
1424            self.ws_send_frame_and_wait_for_response(frame, &req_id, "aibot_send_msg")
1425                .await?;
1426            wecom_log_info!(
1427                "WeCom send ack received scope={scope} req_id={req_id} chunk_index={} chunk_count={total_chunks} chunk_len={chunk_len}",
1428                idx + 1
1429            );
1430        }
1431
1432        Ok(())
1433    }
1434}
1435
1436// ── Channel trait impl ───────────────────────────────────────────────
1437
1438impl ::zeroclaw_api::attribution::Attributable for WeComWsChannel {
1439    fn role(&self) -> ::zeroclaw_api::attribution::Role {
1440        ::zeroclaw_api::attribution::Role::Channel(
1441            ::zeroclaw_api::attribution::ChannelKind::WeComWs,
1442        )
1443    }
1444
1445    fn alias(&self) -> &str {
1446        &self.alias
1447    }
1448}
1449
1450#[async_trait]
1451impl Channel for WeComWsChannel {
1452    fn name(&self) -> &str {
1453        "wecom_ws"
1454    }
1455
1456    async fn send(&self, message: &SendMessage) -> Result<()> {
1457        if let Some(req_id) = message
1458            .thread_ts
1459            .as_deref()
1460            .filter(|req_id| !req_id.is_empty())
1461        {
1462            let stream_id = next_stream_id();
1463            let (stream_content, overflow) = split_stream_content_and_overflow(&message.content);
1464
1465            self.ws_send_respond_msg(req_id, &stream_id, &stream_content, true)
1466                .await?;
1467
1468            if let Some(extra) = overflow {
1469                let extra_msg = wecom_ws_cli_string_with_args(
1470                    "channel-wecom-ws-supplemental-message",
1471                    &[("extra", &extra)],
1472                );
1473                self.send_markdown_chunks_to_scope(&message.recipient, &extra_msg)
1474                    .await?;
1475            }
1476
1477            return Ok(());
1478        }
1479
1480        self.send_markdown_chunks_to_scope(&message.recipient, &message.content)
1481            .await
1482    }
1483
1484    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
1485        wecom_log_info!(
1486            "[wecom_ws] starting WebSocket listener (bot_id={})",
1487            self.bot_id
1488        );
1489
1490        let mut backoff = WECOM_BACKOFF_INITIAL_SECS;
1491
1492        loop {
1493            wecom_log_info!("[wecom_ws] connecting to {WECOM_WS_URL}");
1494
1495            let ws_stream = match zeroclaw_config::schema::ws_connect_with_proxy(
1496                WECOM_WS_URL,
1497                "channel.wecom_ws",
1498                self.cfg.proxy_url.as_deref(),
1499            )
1500            .await
1501            {
1502                Ok((stream, _)) => {
1503                    wecom_log_info!("[wecom_ws] WebSocket connected");
1504                    stream
1505                }
1506                Err(err) => {
1507                    wecom_log_warn!(
1508                        "[wecom_ws] WebSocket connect failed: {err:#}, retrying in {backoff}s"
1509                    );
1510                    tokio::time::sleep(Duration::from_secs(backoff)).await;
1511                    backoff = (backoff * 2).min(WECOM_BACKOFF_MAX_SECS);
1512                    continue;
1513                }
1514            };
1515
1516            let (mut ws_write, mut ws_read) = ws_stream.split();
1517
1518            // Send subscribe
1519            let subscribe_req_id = random_ascii_token(16);
1520            let subscribe = serde_json::json!({
1521                "cmd": "aibot_subscribe",
1522                "headers": { "req_id": subscribe_req_id },
1523                "body": {
1524                    "bot_id": self.bot_id,
1525                    "secret": self.secret,
1526                },
1527            });
1528            if let Err(err) = ws_write
1529                .send(WsMessage::Text(subscribe.to_string().into()))
1530                .await
1531            {
1532                wecom_log_warn!(
1533                    "[wecom_ws] subscribe send failed: {err:#}, retrying in {backoff}s"
1534                );
1535                tokio::time::sleep(Duration::from_secs(backoff)).await;
1536                backoff = (backoff * 2).min(WECOM_BACKOFF_MAX_SECS);
1537                continue;
1538            }
1539
1540            // Wait for subscribe response
1541            let subscribe_ok = match tokio::time::timeout(
1542                Duration::from_secs(WECOM_SUBSCRIBE_TIMEOUT_SECS),
1543                ws_read.next(),
1544            )
1545            .await
1546            {
1547                Ok(Some(Ok(WsMessage::Text(text)))) => match serde_json::from_str::<Value>(&text) {
1548                    Ok(val) => {
1549                        if let Some(resp_req_id) = val
1550                            .get("headers")
1551                            .and_then(|h| h.get("req_id"))
1552                            .and_then(Value::as_str)
1553                            && resp_req_id != subscribe_req_id
1554                        {
1555                            wecom_log_warn!(
1556                                "[wecom_ws] subscribe response req_id mismatch expected_req_id={subscribe_req_id} got_req_id={resp_req_id}"
1557                            );
1558                        }
1559                        let errcode = val.get("errcode").and_then(Value::as_i64).unwrap_or(-1);
1560                        if errcode == 0 {
1561                            wecom_log_info!("[wecom_ws] subscribe succeeded");
1562                            true
1563                        } else {
1564                            let errmsg = val
1565                                .get("errmsg")
1566                                .and_then(Value::as_str)
1567                                .unwrap_or("unknown");
1568                            wecom_log_error!(
1569                                "[wecom_ws] subscribe rejected: errcode={errcode} errmsg={errmsg}"
1570                            );
1571                            false
1572                        }
1573                    }
1574                    Err(err) => {
1575                        wecom_log_warn!("[wecom_ws] subscribe response parse failed: {err:#}");
1576                        false
1577                    }
1578                },
1579                Ok(Some(Ok(_))) => {
1580                    wecom_log_warn!("[wecom_ws] unexpected subscribe response frame type");
1581                    false
1582                }
1583                Ok(Some(Err(err))) => {
1584                    wecom_log_warn!("[wecom_ws] subscribe response read error: {err:#}");
1585                    false
1586                }
1587                Ok(None) => {
1588                    wecom_log_warn!("[wecom_ws] WebSocket closed before subscribe response");
1589                    false
1590                }
1591                Err(_) => {
1592                    wecom_log_warn!("[wecom_ws] subscribe response timeout");
1593                    false
1594                }
1595            };
1596
1597            if !subscribe_ok {
1598                tokio::time::sleep(Duration::from_secs(backoff)).await;
1599                backoff = (backoff * 2).min(WECOM_BACKOFF_MAX_SECS);
1600                continue;
1601            }
1602
1603            // Create mpsc channel for outbound frames
1604            let (out_tx, mut out_rx) = mpsc::channel::<WsOutbound>(64);
1605            *self.ws_tx.lock().await = Some(out_tx);
1606            backoff = WECOM_BACKOFF_INITIAL_SECS; // reset on successful connect
1607
1608            let mut ping_interval =
1609                tokio::time::interval(Duration::from_secs(WECOM_PING_INTERVAL_SECS));
1610            ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1611
1612            let mut should_reconnect = false;
1613
1614            // Inner loop: process WS frames
1615            loop {
1616                tokio::select! {
1617                    _ = ping_interval.tick() => {
1618                        let ping = serde_json::json!({
1619                            "cmd": "ping",
1620                            "headers": { "req_id": random_ascii_token(16) },
1621                        });
1622                        if let Err(err) = ws_write
1623                            .send(WsMessage::Text(ping.to_string().into()))
1624                            .await
1625                        {
1626                            wecom_log_warn!("[wecom_ws] ping send failed: {err:#}");
1627                            break;
1628                        }
1629                    }
1630                    Some(outbound) = out_rx.recv() => {
1631                        match outbound {
1632                            WsOutbound::Frame(value) => {
1633                                if let Err(err) = ws_write
1634                                    .send(WsMessage::Text(value.to_string().into()))
1635                                    .await
1636                                {
1637                                    wecom_log_warn!(
1638                                        "[wecom_ws] outbound frame send failed: {err:#}"
1639                                    );
1640                                    break;
1641                                }
1642                            }
1643                        }
1644                    }
1645                    msg = ws_read.next() => {
1646                        match msg {
1647                            Some(Ok(WsMessage::Text(text))) => {
1648                                match serde_json::from_str::<Value>(&text) {
1649                                    Ok(frame) => {
1650                                        should_reconnect =
1651                                            self.handle_ws_message(frame, &tx).await;
1652                                        if should_reconnect {
1653                                            break;
1654                                        }
1655                                    }
1656                                    Err(err) => {
1657                                        wecom_log_warn!(
1658                                            "[wecom_ws] WS frame parse error: {err:#}"
1659                                        );
1660                                    }
1661                                }
1662                            }
1663                            Some(Ok(WsMessage::Close(_))) => {
1664                                wecom_log_info!("[wecom_ws] WebSocket closed by server");
1665                                break;
1666                            }
1667                            Some(Ok(WsMessage::Pong(_) | _)) => {}
1668                            Some(Err(err)) => {
1669                                wecom_log_warn!("[wecom_ws] WS read error: {err:#}");
1670                                break;
1671                            }
1672                            None => {
1673                                wecom_log_info!("[wecom_ws] WebSocket stream ended");
1674                                break;
1675                            }
1676                        }
1677                    }
1678                }
1679            }
1680
1681            // Disconnect cleanup
1682            *self.ws_tx.lock().await = None;
1683            self.fail_pending_responses("socket disconnected").await;
1684
1685            if should_reconnect {
1686                // Server-initiated disconnect — reconnect quickly
1687                wecom_log_info!("[wecom_ws] disconnected (server event), reconnecting immediately");
1688                backoff = WECOM_BACKOFF_INITIAL_SECS;
1689            } else {
1690                wecom_log_info!("[wecom_ws] disconnected, will reconnect in {backoff}s");
1691                tokio::time::sleep(Duration::from_secs(backoff)).await;
1692                backoff = (backoff * 2).min(WECOM_BACKOFF_MAX_SECS);
1693            }
1694        }
1695    }
1696
1697    async fn health_check(&self) -> bool {
1698        self.ws_tx.lock().await.is_some()
1699    }
1700
1701    fn supports_draft_updates(&self) -> bool {
1702        self.cfg.stream_mode != StreamMode::Off
1703    }
1704
1705    async fn send_draft(&self, message: &SendMessage) -> Result<Option<String>> {
1706        if self.cfg.stream_mode == StreamMode::Off {
1707            return Ok(None);
1708        }
1709
1710        // thread_ts carries the req_id from handle_msg_callback
1711        let req_id = message.thread_ts.as_deref().unwrap_or("");
1712        if req_id.is_empty() {
1713            return Ok(None);
1714        }
1715        let stream_id = next_stream_id();
1716
1717        let bootstrap = wecom_ws_cli_string("channel-wecom-ws-stream-bootstrap");
1718        self.ws_send_respond_msg(req_id, &stream_id, &bootstrap, false)
1719            .await?;
1720        self.req_id_map
1721            .lock()
1722            .insert(stream_id.clone(), req_id.to_string());
1723        Ok(Some(stream_id))
1724    }
1725
1726    async fn update_draft(&self, _recipient: &str, message_id: &str, content: &str) -> Result<()> {
1727        let req_id = self
1728            .req_id_map
1729            .lock()
1730            .get(message_id)
1731            .cloned()
1732            .unwrap_or_default();
1733        if req_id.is_empty() {
1734            return Ok(());
1735        }
1736        self.ws_send_respond_msg(&req_id, message_id, content, false)
1737            .await?;
1738        Ok(())
1739    }
1740
1741    async fn finalize_draft(&self, recipient: &str, message_id: &str, content: &str) -> Result<()> {
1742        let req_id = self
1743            .req_id_map
1744            .lock()
1745            .remove(message_id)
1746            .unwrap_or_default();
1747
1748        let (stream_content, overflow) = split_stream_content_and_overflow(content);
1749
1750        if !req_id.is_empty() {
1751            self.ws_send_respond_msg(&req_id, message_id, &stream_content, true)
1752                .await?;
1753        }
1754
1755        // Send overflow via aibot_send_msg
1756        if let Some(extra) = overflow {
1757            let extra_msg = format!("[\u{8865}\u{5145}\u{6d88}\u{606f}]\n{extra}");
1758            if let Ok((chat_type, chatid)) = parse_scope(recipient) {
1759                for chunk in split_markdown_chunks(&extra_msg) {
1760                    let frame = serde_json::json!({
1761                        "cmd": "aibot_send_msg",
1762                        "headers": { "req_id": random_ascii_token(16) },
1763                        "body": {
1764                            "chatid": chatid,
1765                            "chat_type": chat_type,
1766                            "msgtype": "markdown",
1767                            "markdown": { "content": chunk }
1768                        }
1769                    });
1770                    let _ = self.ws_send_frame(frame).await;
1771                }
1772            }
1773        }
1774
1775        Ok(())
1776    }
1777
1778    async fn cancel_draft(&self, _recipient: &str, message_id: &str) -> Result<()> {
1779        let req_id = self
1780            .req_id_map
1781            .lock()
1782            .remove(message_id)
1783            .unwrap_or_default();
1784        if !req_id.is_empty() {
1785            self.ws_send_respond_msg(&req_id, message_id, "", true)
1786                .await?;
1787        }
1788        Ok(())
1789    }
1790}
1791
1792// ── Helper functions ─────────────────────────────────────────────────
1793
1794fn strip_wecom_padding(input: &[u8]) -> Result<&[u8]> {
1795    let Some(last) = input.last() else {
1796        anyhow::bail!("invalid WeCom padding: empty payload");
1797    };
1798    let pad_len = *last as usize;
1799    if pad_len == 0 || pad_len > 32 || pad_len > input.len() {
1800        anyhow::bail!("invalid WeCom padding length");
1801    }
1802    Ok(&input[..input.len() - pad_len])
1803}
1804
1805fn is_wecom_data_version_conflict_error(err: &anyhow::Error) -> bool {
1806    let msg = err.to_string();
1807    msg.contains("errcode=6000") || msg.contains("data version conflict")
1808}
1809
1810fn parse_inbound_payload(payload: Value) -> Result<ParsedInbound> {
1811    let msg_type = payload
1812        .get("msgtype")
1813        .and_then(Value::as_str)
1814        .unwrap_or("")
1815        .to_string();
1816    if msg_type.is_empty() {
1817        anyhow::bail!("missing msgtype");
1818    }
1819
1820    let msg_id = payload
1821        .get("msgid")
1822        .and_then(Value::as_str)
1823        .unwrap_or("")
1824        .to_string();
1825
1826    let chat_type = payload
1827        .get("chattype")
1828        .and_then(Value::as_str)
1829        .unwrap_or("single")
1830        .to_string();
1831
1832    let chat_id = payload
1833        .get("chatid")
1834        .and_then(Value::as_str)
1835        .map(ToOwned::to_owned);
1836
1837    let sender_userid = payload
1838        .get("from")
1839        .and_then(|v| v.get("userid"))
1840        .and_then(Value::as_str)
1841        .unwrap_or("unknown")
1842        .to_string();
1843
1844    let aibot_id = payload
1845        .get("aibotid")
1846        .and_then(Value::as_str)
1847        .unwrap_or("unknown")
1848        .to_string();
1849
1850    Ok(ParsedInbound {
1851        msg_id,
1852        msg_type,
1853        chat_type,
1854        chat_id,
1855        sender_userid,
1856        aibot_id,
1857        raw_payload: payload,
1858    })
1859}
1860
1861fn compute_scopes(inbound: &ParsedInbound) -> ScopeDecision {
1862    let chat_type = inbound.chat_type.to_ascii_lowercase();
1863    if chat_type == "group" {
1864        let chat_id = inbound
1865            .chat_id
1866            .clone()
1867            .unwrap_or_else(|| "unknown".to_string());
1868        let scope = format!("group--{chat_id}");
1869        return ScopeDecision {
1870            conversation_scope: scope,
1871        };
1872    }
1873
1874    let scope = format!("user--{}", inbound.sender_userid);
1875    ScopeDecision {
1876        conversation_scope: scope,
1877    }
1878}
1879
1880fn normalize_wecom_identity(value: &str) -> String {
1881    value.trim().to_string()
1882}
1883
1884fn normalize_optional_wecom_identity(value: Option<&str>) -> Option<String> {
1885    value
1886        .map(normalize_wecom_identity)
1887        .filter(|value| !value.is_empty())
1888}
1889
1890fn normalize_wecom_allowlist(entries: Vec<String>) -> Vec<String> {
1891    entries
1892        .into_iter()
1893        .map(|entry| normalize_wecom_identity(&entry))
1894        .filter(|entry| !entry.is_empty())
1895        .collect()
1896}
1897
1898fn allowlist_matches(allowlist: &[String], candidate: &str) -> bool {
1899    let candidate = normalize_wecom_identity(candidate);
1900    !candidate.is_empty()
1901        && allowlist
1902            .iter()
1903            .any(|entry| entry == "*" || entry == &candidate)
1904}
1905
1906fn evaluate_access_decision(
1907    allowed_users: &[String],
1908    allowed_groups: &[String],
1909    inbound: &ParsedInbound,
1910) -> AccessDecision {
1911    if allowed_users.is_empty() && allowed_groups.is_empty() {
1912        return AccessDecision::AllowlistMissing;
1913    }
1914
1915    if allowlist_matches(allowed_users, &inbound.sender_userid) {
1916        return AccessDecision::Allowed;
1917    }
1918
1919    if inbound.chat_type.eq_ignore_ascii_case("group")
1920        && inbound
1921            .chat_id
1922            .as_deref()
1923            .is_some_and(|chat_id| allowlist_matches(allowed_groups, chat_id))
1924    {
1925        return AccessDecision::Allowed;
1926    }
1927
1928    AccessDecision::Denied
1929}
1930
1931fn build_access_denied_message(
1932    inbound: &ParsedInbound,
1933    decision: AccessDecision,
1934    alias: &str,
1935) -> String {
1936    let userid = normalize_wecom_identity(&inbound.sender_userid);
1937    let userid = if userid.is_empty() {
1938        "unknown"
1939    } else {
1940        userid.as_str()
1941    };
1942
1943    if inbound.chat_type.eq_ignore_ascii_case("group") {
1944        let chatid = inbound
1945            .chat_id
1946            .as_deref()
1947            .map(normalize_wecom_identity)
1948            .filter(|chatid| !chatid.is_empty())
1949            .unwrap_or_else(|| "unknown".to_string());
1950        let allowed_groups_path = format!("channels.wecom_ws.{alias}.allowed_groups");
1951        let allowed_users_path = format!("channels.wecom_ws.{alias}.allowed_users");
1952        return match decision {
1953            AccessDecision::AllowlistMissing => wecom_ws_cli_string_with_args(
1954                "channel-wecom-ws-group-allowlist-missing",
1955                &[
1956                    ("chatid", &chatid),
1957                    ("userid", userid),
1958                    ("allowed_groups_path", &allowed_groups_path),
1959                    ("allowed_users_path", &allowed_users_path),
1960                ],
1961            ),
1962            AccessDecision::Denied => wecom_ws_cli_string_with_args(
1963                "channel-wecom-ws-group-access-denied",
1964                &[
1965                    ("chatid", &chatid),
1966                    ("userid", userid),
1967                    ("allowed_groups_path", &allowed_groups_path),
1968                    ("allowed_users_path", &allowed_users_path),
1969                ],
1970            ),
1971            AccessDecision::Allowed => String::new(),
1972        };
1973    }
1974
1975    let allowed_users_path = format!("channels.wecom_ws.{alias}.allowed_users");
1976    match decision {
1977        AccessDecision::AllowlistMissing => wecom_ws_cli_string_with_args(
1978            "channel-wecom-ws-dm-allowlist-missing",
1979            &[
1980                ("userid", userid),
1981                ("allowed_users_path", &allowed_users_path),
1982            ],
1983        ),
1984        AccessDecision::Denied => wecom_ws_cli_string_with_args(
1985            "channel-wecom-ws-dm-access-denied",
1986            &[
1987                ("userid", userid),
1988                ("allowed_users_path", &allowed_users_path),
1989            ],
1990        ),
1991        AccessDecision::Allowed => String::new(),
1992    }
1993}
1994
1995/// Compose content for framework: quote context (if any) + normalized user text.
1996/// Sender prefix and static context are handled by the framework (mod.rs).
1997fn compose_content_for_framework(
1998    inbound: &ParsedInbound,
1999    normalized: &str,
2000    bot_name: Option<&str>,
2001) -> String {
2002    let quote_context = extract_quote_context(&inbound.raw_payload);
2003    let mention_hint = build_group_bot_mention_hint(inbound, normalized, bot_name);
2004    let body = match mention_hint {
2005        Some(hint) => format!("{hint}\n{normalized}"),
2006        None => normalized.to_string(),
2007    };
2008
2009    match quote_context {
2010        Some(quote) => format!("{quote}\n\n{body}"),
2011        None => body,
2012    }
2013}
2014
2015fn build_group_bot_mention_hint(
2016    inbound: &ParsedInbound,
2017    normalized: &str,
2018    bot_name: Option<&str>,
2019) -> Option<String> {
2020    if !inbound.chat_type.eq_ignore_ascii_case("group") {
2021        return None;
2022    }
2023
2024    let bot_name = bot_name.map(str::trim).filter(|name| !name.is_empty())?;
2025    if !text_mentions_bot_name(normalized, bot_name) {
2026        return None;
2027    }
2028
2029    Some(format!(
2030        "[WeCom group message addressed to this bot via @{bot_name}]"
2031    ))
2032}
2033
2034fn text_mentions_bot_name(text: &str, bot_name: &str) -> bool {
2035    let needle = format!("@{}", bot_name.trim());
2036    if needle == "@" {
2037        return false;
2038    }
2039
2040    text.match_indices(&needle).any(|(start, _)| {
2041        let after = start + needle.len();
2042        text[after..]
2043            .chars()
2044            .next()
2045            .is_none_or(|ch| ch.is_whitespace() || ch.is_ascii_punctuation())
2046    })
2047}
2048
2049fn normalize_scope_component(raw: &str) -> String {
2050    raw.chars()
2051        .map(|ch| {
2052            if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
2053                ch
2054            } else {
2055                '_'
2056            }
2057        })
2058        .collect()
2059}
2060
2061fn image_file_extension(bytes: &[u8]) -> &'static str {
2062    if bytes.starts_with(b"\x89PNG\r\n\x1a\n") {
2063        "png"
2064    } else if bytes.starts_with(&[0xff, 0xd8, 0xff]) {
2065        "jpg"
2066    } else if bytes.starts_with(b"GIF87a") || bytes.starts_with(b"GIF89a") {
2067        "gif"
2068    } else if bytes.len() >= 12 && &bytes[..4] == b"RIFF" && &bytes[8..12] == b"WEBP" {
2069        "webp"
2070    } else {
2071        "bin"
2072    }
2073}
2074
2075/// Parse scope string into (chat_type, chatid) for aibot_send_msg.
2076/// `user--{userid}` → (1, userid), `group--{chatid}` → (2, chatid)
2077fn parse_scope(scope: &str) -> Result<(u32, &str)> {
2078    if let Some(userid) = scope.strip_prefix("user--") {
2079        Ok((1, userid))
2080    } else if let Some(chatid) = scope.strip_prefix("group--") {
2081        Ok((2, chatid))
2082    } else {
2083        anyhow::bail!("WeCom: invalid scope format: {scope}")
2084    }
2085}
2086
2087fn summarize_attachment_url_for_log(url: &str) -> String {
2088    let trimmed = url.trim();
2089    if trimmed.is_empty() {
2090        return "empty-url".to_string();
2091    }
2092    match reqwest::Url::parse(trimmed) {
2093        Ok(parsed) => {
2094            let host = parsed.host_str().unwrap_or("unknown-host");
2095            let query_state = if parsed.query().is_some() {
2096                "query=present"
2097            } else {
2098                "query=none"
2099            };
2100            format!(
2101                "{}://{}{} ({query_state})",
2102                parsed.scheme(),
2103                host,
2104                parsed.path()
2105            )
2106        }
2107        Err(_) => format!("invalid-url(len={})", trimmed.len()),
2108    }
2109}
2110
2111fn truncate_for_log(input: &str, max_chars: usize) -> String {
2112    if input.chars().count() <= max_chars {
2113        return input.to_string();
2114    }
2115    let prefix: String = input.chars().take(max_chars).collect();
2116    format!("{prefix}...(truncated)")
2117}
2118
2119fn log_attachment_processing_failure(
2120    stage: &str,
2121    err: &anyhow::Error,
2122    inbound: &ParsedInbound,
2123    kind: AttachmentKind,
2124    url: &str,
2125) {
2126    wecom_log_warn!(
2127        "{stage} msg_id={} msg_type={} chat_type={} chat_id={} sender_userid={} attachment_kind={} url_target={} error={err:#}",
2128        inbound.msg_id,
2129        inbound.msg_type,
2130        inbound.chat_type,
2131        inbound.chat_id.as_deref().unwrap_or("single"),
2132        inbound.sender_userid,
2133        kind.as_str(),
2134        summarize_attachment_url_for_log(url)
2135    );
2136}
2137
2138fn random_emoji() -> &'static str {
2139    let idx = rand::rng().random_range(0..WECOM_EMOJIS.len());
2140    WECOM_EMOJIS[idx]
2141}
2142
2143fn random_ascii_token(len: usize) -> String {
2144    const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
2145    let mut out = String::with_capacity(len);
2146    let mut rng = rand::rng();
2147    for _ in 0..len {
2148        let idx = rng.random_range(0..CHARSET.len());
2149        out.push(CHARSET[idx] as char);
2150    }
2151    out
2152}
2153
2154fn next_stream_id() -> String {
2155    format!("zs_{}", random_ascii_token(20))
2156}
2157
2158fn contains_stop_command(text: &str) -> bool {
2159    let stripped = strip_edge_mentions(text);
2160    if stripped.contains("\u{505c}\u{6b62}") {
2161        return true;
2162    }
2163    stripped.split_whitespace().any(|word| {
2164        let token = word
2165            .trim_matches(|ch: char| !ch.is_ascii_alphanumeric() && ch != '/')
2166            .to_ascii_lowercase();
2167        token == "stop" || token == "/stop"
2168    })
2169}
2170
2171fn is_clear_session_command(text: &str) -> bool {
2172    let stripped = strip_edge_mentions(text);
2173    stripped.eq_ignore_ascii_case("/clear") || stripped.eq_ignore_ascii_case("/new")
2174}
2175
2176fn extract_runtime_model_switch_command(text: &str) -> Option<String> {
2177    let stripped = strip_edge_mentions(text);
2178    if stripped.is_empty() || !stripped.starts_with('/') {
2179        return None;
2180    }
2181
2182    let command_token = stripped.split_whitespace().next()?;
2183    let base_command = command_token.split('@').next().unwrap_or(command_token);
2184    if base_command.eq_ignore_ascii_case("/model") || base_command.eq_ignore_ascii_case("/models") {
2185        Some(stripped)
2186    } else {
2187        None
2188    }
2189}
2190
2191fn strip_edge_mentions(text: &str) -> String {
2192    let s = text.trim();
2193    if s.is_empty() {
2194        return String::new();
2195    }
2196
2197    let bytes = s.as_bytes();
2198    let len = bytes.len();
2199    let mut start = 0usize;
2200    loop {
2201        while start < len && bytes[start].is_ascii_whitespace() {
2202            start += 1;
2203        }
2204        if start >= len || bytes[start] != b'@' {
2205            break;
2206        }
2207        start += 1;
2208        while start < len && !bytes[start].is_ascii_whitespace() {
2209            start += 1;
2210        }
2211    }
2212
2213    let mut end = len;
2214    loop {
2215        while end > start && bytes[end - 1].is_ascii_whitespace() {
2216            end -= 1;
2217        }
2218        if end <= start {
2219            break;
2220        }
2221        let mut probe = end;
2222        while probe > start && !bytes[probe - 1].is_ascii_whitespace() && bytes[probe - 1] != b'@' {
2223            probe -= 1;
2224        }
2225        if probe > start && bytes[probe - 1] == b'@' {
2226            end = probe - 1;
2227        } else {
2228            break;
2229        }
2230    }
2231
2232    s[start..end].trim().to_string()
2233}
2234
2235fn extract_stop_signal_text(inbound: &ParsedInbound) -> Option<String> {
2236    match inbound.msg_type.as_str() {
2237        "text" => inbound
2238            .raw_payload
2239            .get("text")
2240            .and_then(|v| v.get("content"))
2241            .and_then(Value::as_str)
2242            .map(str::trim)
2243            .filter(|v| !v.is_empty())
2244            .map(ToOwned::to_owned),
2245        "voice" => inbound
2246            .raw_payload
2247            .get("voice")
2248            .and_then(|v| v.get("content"))
2249            .and_then(Value::as_str)
2250            .map(str::trim)
2251            .filter(|v| !v.is_empty())
2252            .map(ToOwned::to_owned),
2253        "mixed" => {
2254            let mut texts = Vec::new();
2255            let items = inbound
2256                .raw_payload
2257                .get("mixed")
2258                .and_then(|v| v.get("msg_item"))
2259                .and_then(Value::as_array)?;
2260            for item in items {
2261                if item
2262                    .get("msgtype")
2263                    .and_then(Value::as_str)
2264                    .is_some_and(|v| v == "text")
2265                    && let Some(content) = item
2266                        .get("text")
2267                        .and_then(|v| v.get("content"))
2268                        .and_then(Value::as_str)
2269                        .map(str::trim)
2270                        .filter(|v| !v.is_empty())
2271                {
2272                    texts.push(content.to_string());
2273                }
2274            }
2275            if texts.is_empty() {
2276                None
2277            } else {
2278                Some(texts.join("\n"))
2279            }
2280        }
2281        _ => None,
2282    }
2283}
2284
2285fn inbound_content_preview(inbound: &ParsedInbound) -> String {
2286    if let Some(text) = extract_stop_signal_text(inbound) {
2287        return text;
2288    }
2289
2290    match inbound.msg_type.as_str() {
2291        "image" => "[Image message]".to_string(),
2292        "file" => inbound
2293            .raw_payload
2294            .get("file")
2295            .and_then(|v| v.get("filename"))
2296            .and_then(Value::as_str)
2297            .map(|name| format!("[File message: {name}]"))
2298            .unwrap_or_else(|| "[File message]".to_string()),
2299        "event" => "[Event callback]".to_string(),
2300        other => format!("[{other} message]"),
2301    }
2302}
2303
2304fn trim_utf8_to_max_bytes(input: &str, max_bytes: usize) -> String {
2305    if input.len() <= max_bytes {
2306        return input.to_string();
2307    }
2308    let mut out = String::new();
2309    for ch in input.chars() {
2310        if out.len() + ch.len_utf8() > max_bytes {
2311            break;
2312        }
2313        out.push(ch);
2314    }
2315    out
2316}
2317
2318fn normalize_stream_content(input: &str) -> String {
2319    let sanitized = strip_trailing_provider_sentinels(input);
2320    trim_utf8_to_max_bytes(&sanitized, WECOM_MARKDOWN_MAX_BYTES)
2321}
2322
2323fn split_stream_content_and_overflow(input: &str) -> (String, Option<String>) {
2324    let input = strip_trailing_provider_sentinels(input);
2325    if input.len() <= WECOM_MARKDOWN_MAX_BYTES {
2326        return (input, None);
2327    }
2328
2329    let mut head = String::new();
2330    let mut tail = String::new();
2331    let mut overflow = false;
2332    for ch in input.chars() {
2333        if !overflow && head.len() + ch.len_utf8() <= WECOM_MARKDOWN_MAX_BYTES {
2334            head.push(ch);
2335        } else {
2336            overflow = true;
2337            tail.push(ch);
2338        }
2339    }
2340
2341    if tail.is_empty() {
2342        (head, None)
2343    } else {
2344        (head, Some(tail))
2345    }
2346}
2347
2348fn strip_trailing_provider_sentinels(input: &str) -> String {
2349    let mut trimmed = input.trim_end();
2350
2351    while let Some(sentinel) = WECOM_PROVIDER_TRAILING_SENTINELS
2352        .iter()
2353        .find(|sentinel| trimmed.ends_with(**sentinel))
2354    {
2355        trimmed = trimmed[..trimmed.len() - sentinel.len()].trim_end();
2356    }
2357
2358    trimmed.to_string()
2359}
2360
2361fn parse_event_type(payload: &Value) -> Option<String> {
2362    payload
2363        .get("event")
2364        .and_then(|v| v.get("eventtype"))
2365        .and_then(Value::as_str)
2366        .map(str::trim)
2367        .filter(|v| !v.is_empty())
2368        .map(ToOwned::to_owned)
2369}
2370
2371fn extract_template_card_event_key(payload: &Value) -> Option<String> {
2372    payload
2373        .get("event")
2374        .and_then(|v| v.get("template_card_event"))
2375        .and_then(|v| {
2376            v.get("event_key")
2377                .or_else(|| v.get("eventkey"))
2378                .and_then(Value::as_str)
2379        })
2380        .map(str::trim)
2381        .filter(|v| !v.is_empty())
2382        .map(ToOwned::to_owned)
2383}
2384
2385fn extract_feedback_event_summary(payload: &Value) -> Option<String> {
2386    let feedback = payload.get("event")?.get("feedback_event")?;
2387    let feedback_id = feedback
2388        .get("id")
2389        .and_then(Value::as_str)
2390        .map(str::trim)
2391        .filter(|v| !v.is_empty())
2392        .unwrap_or("-");
2393    let feedback_type = feedback
2394        .get("type")
2395        .and_then(Value::as_i64)
2396        .map(|v| v.to_string())
2397        .unwrap_or_else(|| "-".to_string());
2398    let content = feedback
2399        .get("content")
2400        .and_then(Value::as_str)
2401        .map(str::trim)
2402        .filter(|v| !v.is_empty())
2403        .unwrap_or("-");
2404    Some(format!(
2405        "feedback_id={feedback_id} feedback_type={feedback_type} content={content}"
2406    ))
2407}
2408
2409fn extract_quote_context(payload: &Value) -> Option<String> {
2410    let quote = payload.get("quote")?;
2411    let quote_type = quote
2412        .get("msgtype")
2413        .and_then(Value::as_str)
2414        .map(str::trim)
2415        .filter(|v| !v.is_empty())?;
2416
2417    let content = match quote_type {
2418        "text" => quote
2419            .get("text")
2420            .and_then(|v| v.get("content"))
2421            .and_then(Value::as_str)
2422            .map(str::trim)
2423            .filter(|v| !v.is_empty())
2424            .map(ToOwned::to_owned)
2425            .unwrap_or_else(|| "[\u{5f15}\u{7528}\u{6587}\u{672c}\u{4e3a}\u{7a7a}]".to_string()),
2426        "voice" => quote
2427            .get("voice")
2428            .and_then(|v| v.get("content"))
2429            .and_then(Value::as_str)
2430            .map(str::trim)
2431            .filter(|v| !v.is_empty())
2432            .map(|v| format!("[\u{5f15}\u{7528}\u{8bed}\u{97f3}\u{8f6c}\u{5199}] {v}"))
2433            .unwrap_or_else(|| {
2434                "[\u{5f15}\u{7528}\u{8bed}\u{97f3}\u{65e0}\u{8f6c}\u{5199}]".to_string()
2435            }),
2436        "image" => quote
2437            .get("image")
2438            .and_then(|v| v.get("local_path"))
2439            .and_then(Value::as_str)
2440            .map(str::trim)
2441            .filter(|v| !v.is_empty())
2442            .map(|v| format!("[\u{5f15}\u{7528}\u{56fe}\u{7247}] {v}"))
2443            .unwrap_or_else(|| "[\u{5f15}\u{7528}\u{56fe}\u{7247}]".to_string()),
2444        "file" => quote
2445            .get("file")
2446            .and_then(|v| v.get("local_path"))
2447            .and_then(Value::as_str)
2448            .map(str::trim)
2449            .filter(|v| !v.is_empty())
2450            .map(|v| format!("[\u{5f15}\u{7528}\u{6587}\u{4ef6}] {v}"))
2451            .unwrap_or_else(|| "[\u{5f15}\u{7528}\u{6587}\u{4ef6}]".to_string()),
2452        "mixed" => {
2453            let mut parts = Vec::new();
2454            if let Some(items) = quote
2455                .get("mixed")
2456                .and_then(|v| v.get("msg_item"))
2457                .and_then(Value::as_array)
2458            {
2459                for item in items {
2460                    let item_type = item
2461                        .get("msgtype")
2462                        .and_then(Value::as_str)
2463                        .unwrap_or_default();
2464                    if item_type == "text" {
2465                        if let Some(text) = item
2466                            .get("text")
2467                            .and_then(|v| v.get("content"))
2468                            .and_then(Value::as_str)
2469                            .map(str::trim)
2470                            .filter(|v| !v.is_empty())
2471                        {
2472                            parts.push(text.to_string());
2473                        }
2474                    } else if item_type == "image" {
2475                        if let Some(path) = item
2476                            .get("image")
2477                            .and_then(|v| v.get("local_path"))
2478                            .and_then(Value::as_str)
2479                            .map(str::trim)
2480                            .filter(|v| !v.is_empty())
2481                        {
2482                            parts.push(format!("[\u{5f15}\u{7528}\u{56fe}\u{7247}] {path}"));
2483                        } else {
2484                            parts.push("[\u{5f15}\u{7528}\u{56fe}\u{7247}]".to_string());
2485                        }
2486                    }
2487                }
2488            }
2489
2490            if parts.is_empty() {
2491                "[\u{5f15}\u{7528}\u{56fe}\u{6587}\u{6d88}\u{606f}]".to_string()
2492            } else {
2493                parts.join("\n")
2494            }
2495        }
2496        _ => format!("[\u{5f15}\u{7528}\u{6d88}\u{606f} type={quote_type}]"),
2497    };
2498
2499    let content = trim_utf8_to_max_bytes(&content, 4_096);
2500    Some(format!(
2501        "[WECOM_QUOTE]\nmsgtype={quote_type}\ncontent={content}\n[/WECOM_QUOTE]"
2502    ))
2503}
2504
2505fn bytes_timestamp_now() -> u64 {
2506    SystemTime::now()
2507        .duration_since(UNIX_EPOCH)
2508        .unwrap_or_default()
2509        .as_secs()
2510}
2511
2512fn split_markdown_chunks(input: &str) -> Vec<String> {
2513    let input = strip_trailing_provider_sentinels(input);
2514    if input.is_empty() {
2515        return vec![String::new()];
2516    }
2517
2518    let mut chunks = Vec::new();
2519    let mut current = String::new();
2520
2521    for line in input.lines() {
2522        let candidate = if current.is_empty() {
2523            line.to_string()
2524        } else {
2525            format!("{current}\n{line}")
2526        };
2527
2528        if candidate.len() > WECOM_MARKDOWN_CHUNK_BYTES
2529            && !current.is_empty()
2530            && current.len() <= WECOM_MARKDOWN_MAX_BYTES
2531        {
2532            chunks.push(current);
2533            current = line.to_string();
2534            continue;
2535        }
2536
2537        current = candidate;
2538    }
2539
2540    if !current.is_empty() {
2541        if current.len() <= WECOM_MARKDOWN_MAX_BYTES {
2542            chunks.push(current);
2543        } else {
2544            let mut buf = String::new();
2545            for ch in current.chars() {
2546                if buf.len() + ch.len_utf8() > WECOM_MARKDOWN_CHUNK_BYTES {
2547                    chunks.push(buf);
2548                    buf = String::new();
2549                }
2550                buf.push(ch);
2551            }
2552            if !buf.is_empty() {
2553                chunks.push(buf);
2554            }
2555        }
2556    }
2557
2558    if chunks.is_empty() {
2559        chunks.push(String::new());
2560    }
2561
2562    chunks
2563}
2564
2565fn is_model_supported_msgtype(msg_type: &str) -> bool {
2566    matches!(msg_type, "text" | "voice" | "image" | "file" | "mixed")
2567}
2568
2569fn is_voice_without_transcript(inbound: &ParsedInbound) -> bool {
2570    if inbound.msg_type != "voice" {
2571        return false;
2572    }
2573    inbound
2574        .raw_payload
2575        .get("voice")
2576        .and_then(|v| v.get("content"))
2577        .and_then(Value::as_str)
2578        .map(str::trim)
2579        .unwrap_or("")
2580        .is_empty()
2581}
2582
2583async fn cleanup_inbox_files(root: PathBuf, retention: Duration) {
2584    if !root.exists() {
2585        return;
2586    }
2587
2588    let mut stack = vec![root];
2589    while let Some(dir) = stack.pop() {
2590        let Ok(mut rd) = tokio::fs::read_dir(&dir).await else {
2591            continue;
2592        };
2593
2594        while let Ok(Some(entry)) = rd.next_entry().await {
2595            let path = entry.path();
2596            let Ok(meta) = entry.metadata().await else {
2597                continue;
2598            };
2599
2600            if meta.is_dir() {
2601                stack.push(path);
2602                continue;
2603            }
2604
2605            let Ok(modified) = meta.modified() else {
2606                continue;
2607            };
2608
2609            let age = SystemTime::now()
2610                .duration_since(modified)
2611                .unwrap_or_else(|_| Duration::from_secs(0));
2612            if age > retention {
2613                let _ = tokio::fs::remove_file(&path).await;
2614            }
2615        }
2616    }
2617}
2618
2619#[cfg(test)]
2620mod tests {
2621    use super::*;
2622
2623    #[test]
2624    fn scope_uses_group_shared_mode_by_default_for_group_chat() {
2625        let inbound = ParsedInbound {
2626            msg_id: "m1".to_string(),
2627            msg_type: "text".to_string(),
2628            chat_type: "group".to_string(),
2629            chat_id: Some("g1".to_string()),
2630            sender_userid: "u1".to_string(),
2631            aibot_id: "b1".to_string(),
2632            raw_payload: serde_json::json!({}),
2633        };
2634
2635        let scopes = compute_scopes(&inbound);
2636        assert_eq!(scopes.conversation_scope, "group--g1");
2637    }
2638
2639    #[test]
2640    fn split_markdown_chunks_preserves_large_input() {
2641        let input = "a".repeat(WECOM_MARKDOWN_CHUNK_BYTES * 3 + 100);
2642        let chunks = split_markdown_chunks(&input);
2643        assert!(chunks.len() >= 3);
2644        for chunk in chunks {
2645            assert!(chunk.len() <= WECOM_MARKDOWN_MAX_BYTES);
2646        }
2647    }
2648
2649    #[test]
2650    fn split_markdown_chunks_small_input() {
2651        let input = "Hello WeCom!";
2652        let chunks = split_markdown_chunks(input);
2653        assert_eq!(chunks.len(), 1);
2654        assert_eq!(chunks[0], "Hello WeCom!");
2655    }
2656
2657    #[test]
2658    fn split_markdown_chunks_empty_input() {
2659        let chunks = split_markdown_chunks("");
2660        assert_eq!(chunks.len(), 1);
2661        assert_eq!(chunks[0], "");
2662    }
2663
2664    #[test]
2665    fn strip_trailing_provider_sentinels_removes_eom_token() {
2666        assert_eq!(
2667            strip_trailing_provider_sentinels("Hi there!<|eom|>"),
2668            "Hi there!"
2669        );
2670        assert_eq!(
2671            strip_trailing_provider_sentinels("Hi there!  <|eom|>\n\n"),
2672            "Hi there!"
2673        );
2674    }
2675
2676    #[test]
2677    fn strip_trailing_provider_sentinels_keeps_mid_message_token() {
2678        assert_eq!(
2679            strip_trailing_provider_sentinels("Literal <|eom|> marker in text."),
2680            "Literal <|eom|> marker in text."
2681        );
2682    }
2683
2684    #[test]
2685    fn outbound_stream_normalization_strips_trailing_provider_sentinel() {
2686        assert_eq!(normalize_stream_content("Hi there!<|eom|>"), "Hi there!");
2687        assert_eq!(
2688            split_stream_content_and_overflow("Hi there!<|eom|>"),
2689            ("Hi there!".to_string(), None)
2690        );
2691        assert_eq!(split_markdown_chunks("Hi there!<|eom|>"), vec!["Hi there!"]);
2692    }
2693
2694    #[test]
2695    fn group_bot_mention_hint_marks_addressed_wecom_message() {
2696        let inbound = test_inbound("group", Some("group-1"), "user-1");
2697        let composed = compose_content_for_framework(&inbound, "@danya say hi", Some("danya"));
2698
2699        assert!(composed.starts_with("[WeCom group message addressed to this bot via @danya]"));
2700        assert!(composed.ends_with("@danya say hi"));
2701    }
2702
2703    #[test]
2704    fn group_bot_mention_hint_omits_non_matching_messages() {
2705        let inbound = test_inbound("group", Some("group-1"), "user-1");
2706        assert_eq!(
2707            compose_content_for_framework(&inbound, "@otherbot say hi", Some("danya")),
2708            "@otherbot say hi"
2709        );
2710        assert_eq!(
2711            compose_content_for_framework(&inbound, "@danya say hi", None),
2712            "@danya say hi"
2713        );
2714
2715        let dm = test_inbound("single", None, "user-1");
2716        assert_eq!(
2717            compose_content_for_framework(&dm, "@danya say hi", Some("danya")),
2718            "@danya say hi"
2719        );
2720    }
2721
2722    #[test]
2723    fn text_mentions_bot_name_uses_simple_boundary_check() {
2724        assert!(text_mentions_bot_name("@danya say hi", "danya"));
2725        assert!(text_mentions_bot_name("hey @danya, say hi", "danya"));
2726        assert!(!text_mentions_bot_name("@danyabot say hi", "danya"));
2727    }
2728
2729    #[test]
2730    fn summarize_attachment_url_for_log_redacts_query_string() {
2731        let url = "https://wework.qpic.cn/wwpic/123456/0?auth=secret_token&expires=123";
2732        let summary = summarize_attachment_url_for_log(url);
2733        assert_eq!(
2734            summary,
2735            "https://wework.qpic.cn/wwpic/123456/0 (query=present)"
2736        );
2737        assert!(!summary.contains("secret_token"));
2738    }
2739
2740    #[test]
2741    fn summarize_attachment_url_for_log_handles_invalid_input() {
2742        let summary = summarize_attachment_url_for_log("not a url");
2743        assert_eq!(summary, "invalid-url(len=9)");
2744    }
2745
2746    #[test]
2747    fn stop_command_detection_supports_cn_and_en() {
2748        assert!(contains_stop_command("\u{505c}\u{6b62}"));
2749        assert!(contains_stop_command("Please STOP now"));
2750        assert!(contains_stop_command("@bot /stop"));
2751        assert!(!contains_stop_command("\u{7ee7}\u{7eed}\u{5904}\u{7406}"));
2752        assert!(!contains_stop_command("explain nonstop operation"));
2753        assert!(!contains_stop_command("what are stopwords?"));
2754    }
2755
2756    #[test]
2757    fn image_file_extension_uses_magic_bytes() {
2758        assert_eq!(image_file_extension(b"\x89PNG\r\n\x1a\nrest"), "png");
2759        assert_eq!(image_file_extension(&[0xff, 0xd8, 0xff, 0x00]), "jpg");
2760        assert_eq!(image_file_extension(b"GIF89a rest"), "gif");
2761        assert_eq!(
2762            image_file_extension(b"RIFF\x00\x00\x00\x00WEBPrest"),
2763            "webp"
2764        );
2765        assert_eq!(image_file_extension(b"not an image"), "bin");
2766    }
2767
2768    #[test]
2769    fn filename_scope_components_reject_path_separators() {
2770        assert_eq!(normalize_scope_component("../room/msg-1"), "___room_msg-1");
2771    }
2772
2773    #[test]
2774    fn idempotency_store_is_bounded() {
2775        let store = SimpleIdempotencyStore::new();
2776        for idx in 0..(WECOM_IDEMPOTENCY_MAX_KEYS + 1) {
2777            assert!(store.record_if_new(&format!("msg-{idx}")));
2778        }
2779        assert_eq!(store.seen.lock().len(), WECOM_IDEMPOTENCY_MAX_KEYS);
2780        assert_eq!(store.order.lock().len(), WECOM_IDEMPOTENCY_MAX_KEYS);
2781        assert!(store.record_if_new("msg-0"));
2782    }
2783
2784    #[test]
2785    fn parse_event_type_extracts_enter_chat() {
2786        let payload = serde_json::json!({
2787            "event": {
2788                "eventtype": "enter_chat"
2789            }
2790        });
2791        assert_eq!(parse_event_type(&payload).as_deref(), Some("enter_chat"));
2792    }
2793
2794    #[test]
2795    fn extract_quote_context_from_text_quote() {
2796        let payload = serde_json::json!({
2797            "quote": {
2798                "msgtype": "text",
2799                "text": {
2800                    "content": "  \u{5f15}\u{7528}\u{5185}\u{5bb9}  "
2801                }
2802            }
2803        });
2804
2805        let quote = extract_quote_context(&payload).expect("quote should be extracted");
2806        assert!(quote.contains("msgtype=text"));
2807        assert!(quote.contains("content=\u{5f15}\u{7528}\u{5185}\u{5bb9}"));
2808    }
2809
2810    #[test]
2811    fn extract_quote_context_from_mixed_quote() {
2812        let payload = serde_json::json!({
2813            "quote": {
2814                "msgtype": "mixed",
2815                "mixed": {
2816                    "msg_item": [
2817                        {
2818                            "msgtype": "text",
2819                            "text": {
2820                                "content": "\u{7b2c}\u{4e00}\u{6bb5}"
2821                            }
2822                        },
2823                        {
2824                            "msgtype": "image",
2825                            "image": {
2826                                "url": "https://example.com/image.png"
2827                            }
2828                        }
2829                    ]
2830                }
2831            }
2832        });
2833
2834        let quote = extract_quote_context(&payload).expect("quote should be extracted");
2835        assert!(quote.contains("\u{7b2c}\u{4e00}\u{6bb5}"));
2836        assert!(quote.contains("\u{5f15}\u{7528}\u{56fe}\u{7247}"));
2837    }
2838
2839    #[test]
2840    fn extract_quote_context_does_not_leak_remote_media_url() {
2841        let payload = serde_json::json!({
2842            "quote": {
2843                "msgtype": "image",
2844                "image": {
2845                    "url": "https://example.com/tmp-sign-url"
2846                }
2847            }
2848        });
2849
2850        let quote = extract_quote_context(&payload).expect("quote should be extracted");
2851        assert!(quote.contains("[\u{5f15}\u{7528}\u{56fe}\u{7247}]"));
2852        assert!(!quote.contains("example.com/tmp-sign-url"));
2853    }
2854
2855    #[test]
2856    fn extract_template_card_event_key_reads_event_key() {
2857        let payload = serde_json::json!({
2858            "event": {
2859                "eventtype": "template_card_event",
2860                "template_card_event": {
2861                    "event_key": "button_confirm"
2862                }
2863            }
2864        });
2865        assert_eq!(
2866            extract_template_card_event_key(&payload).as_deref(),
2867            Some("button_confirm")
2868        );
2869    }
2870
2871    #[test]
2872    fn extract_feedback_event_summary_reads_fields() {
2873        let payload = serde_json::json!({
2874            "event": {
2875                "eventtype": "feedback_event",
2876                "feedback_event": {
2877                    "id": "fb_1",
2878                    "type": 2,
2879                    "content": "not accurate"
2880                }
2881            }
2882        });
2883        let summary = extract_feedback_event_summary(&payload).expect("summary should exist");
2884        assert!(summary.contains("feedback_id=fb_1"));
2885        assert!(summary.contains("feedback_type=2"));
2886        assert!(summary.contains("content=not accurate"));
2887    }
2888
2889    #[test]
2890    fn clear_session_bare_commands() {
2891        assert!(is_clear_session_command("/clear"));
2892        assert!(is_clear_session_command("/new"));
2893        assert!(is_clear_session_command("/CLEAR"));
2894        assert!(is_clear_session_command("/New"));
2895        assert!(is_clear_session_command("  /clear  "));
2896    }
2897
2898    #[test]
2899    fn clear_session_with_mentions() {
2900        assert!(is_clear_session_command("@bot /clear"));
2901        assert!(is_clear_session_command("/clear @bot"));
2902        assert!(is_clear_session_command("@bot1 @bot2 /new"));
2903        assert!(is_clear_session_command("@bot /new @other"));
2904    }
2905
2906    #[test]
2907    fn clear_session_rejects_old_and_invalid() {
2908        assert!(!is_clear_session_command("\u{65b0}\u{4f1a}\u{8bdd}"));
2909        assert!(!is_clear_session_command("clear history"));
2910        assert!(!is_clear_session_command("/clear now"));
2911        assert!(!is_clear_session_command("please /new"));
2912        assert!(!is_clear_session_command(""));
2913        assert!(!is_clear_session_command("   "));
2914    }
2915
2916    #[test]
2917    fn runtime_model_switch_command_with_mentions() {
2918        assert_eq!(
2919            extract_runtime_model_switch_command("@bot /model gpt-5 @other"),
2920            Some("/model gpt-5".to_string())
2921        );
2922        assert_eq!(
2923            extract_runtime_model_switch_command("@bot /models openrouter"),
2924            Some("/models openrouter".to_string())
2925        );
2926        assert_eq!(
2927            extract_runtime_model_switch_command(" /MODEL@zeroclaw qwen-max "),
2928            Some("/MODEL@zeroclaw qwen-max".to_string())
2929        );
2930    }
2931
2932    #[test]
2933    fn runtime_model_switch_command_rejects_non_commands() {
2934        assert_eq!(extract_runtime_model_switch_command("/new"), None);
2935        assert_eq!(
2936            extract_runtime_model_switch_command("please /model gpt-5"),
2937            None
2938        );
2939        assert_eq!(extract_runtime_model_switch_command(""), None);
2940    }
2941
2942    #[test]
2943    fn parse_scope_user() {
2944        let (chat_type, chatid) = parse_scope("user--zeroclaw_user").unwrap();
2945        assert_eq!(chat_type, 1);
2946        assert_eq!(chatid, "zeroclaw_user");
2947    }
2948
2949    #[test]
2950    fn parse_scope_group() {
2951        let (chat_type, chatid) = parse_scope("group--zeroclaw_group").unwrap();
2952        assert_eq!(chat_type, 2);
2953        assert_eq!(chatid, "zeroclaw_group");
2954    }
2955
2956    #[test]
2957    fn parse_scope_invalid() {
2958        assert!(parse_scope("invalid_scope").is_err());
2959    }
2960
2961    fn test_inbound(chat_type: &str, chat_id: Option<&str>, sender_userid: &str) -> ParsedInbound {
2962        ParsedInbound {
2963            msg_id: "msg-1".to_string(),
2964            msg_type: "text".to_string(),
2965            chat_type: chat_type.to_string(),
2966            chat_id: chat_id.map(str::to_string),
2967            sender_userid: sender_userid.to_string(),
2968            aibot_id: "bot123".to_string(),
2969            raw_payload: serde_json::json!({
2970                "msgtype": "text",
2971                "msgid": "msg-1",
2972                "chattype": chat_type,
2973                "chatid": chat_id,
2974                "from": { "userid": sender_userid },
2975                "text": { "content": "@bot hello" }
2976            }),
2977        }
2978    }
2979
2980    fn test_wecom_ws_config() -> WeComWsConfig {
2981        WeComWsConfig {
2982            enabled: true,
2983            bot_id: "bot123".to_string(),
2984            secret: "secret456".to_string(),
2985            allowed_users: vec![],
2986            allowed_groups: vec![],
2987            bot_name: None,
2988            file_retention_days: 3,
2989            max_file_size_mb: 20,
2990            stream_mode: StreamMode::Partial,
2991            proxy_url: None,
2992            excluded_tools: vec![],
2993        }
2994    }
2995
2996    #[test]
2997    fn access_decision_denies_when_allowlists_missing() {
2998        let inbound = test_inbound("single", None, "zeroclaw_user");
2999        assert_eq!(
3000            evaluate_access_decision(&[], &[], &inbound),
3001            AccessDecision::AllowlistMissing
3002        );
3003    }
3004
3005    #[test]
3006    fn access_decision_allows_userid_in_single_chat() {
3007        let inbound = test_inbound("single", None, "zeroclaw_user");
3008        assert_eq!(
3009            evaluate_access_decision(&["zeroclaw_user".to_string()], &[], &inbound),
3010            AccessDecision::Allowed
3011        );
3012    }
3013
3014    #[test]
3015    fn access_decision_allows_group_chatid() {
3016        let inbound = test_inbound("group", Some("zeroclaw_group"), "zeroclaw_user");
3017        assert_eq!(
3018            evaluate_access_decision(&[], &["zeroclaw_group".to_string()], &inbound),
3019            AccessDecision::Allowed
3020        );
3021    }
3022
3023    #[test]
3024    fn access_decision_allows_wildcards() {
3025        let inbound = test_inbound("group", Some("zeroclaw_group"), "zeroclaw_user");
3026        assert_eq!(
3027            evaluate_access_decision(&["*".to_string()], &[], &inbound),
3028            AccessDecision::Allowed
3029        );
3030        assert_eq!(
3031            evaluate_access_decision(&[], &["*".to_string()], &inbound),
3032            AccessDecision::Allowed
3033        );
3034    }
3035
3036    #[test]
3037    fn denied_group_message_mentions_chatid_and_userid() {
3038        let inbound = test_inbound("group", Some("zeroclaw_group"), "zeroclaw_user");
3039        let text = build_access_denied_message(&inbound, AccessDecision::Denied, "primary");
3040        assert!(text.contains("zeroclaw_group"));
3041        assert!(text.contains("zeroclaw_user"));
3042        assert!(text.contains("allowed_groups"));
3043        assert!(text.contains("wecom_ws"));
3044    }
3045
3046    #[test]
3047    fn supports_draft_updates_respects_stream_mode() {
3048        let mut off_cfg = test_wecom_ws_config();
3049        off_cfg.stream_mode = StreamMode::Off;
3050        let off = WeComWsChannel::new(&off_cfg, Path::new("/tmp")).unwrap();
3051        assert!(!off.supports_draft_updates());
3052
3053        let partial = WeComWsChannel::new(&test_wecom_ws_config(), Path::new("/tmp")).unwrap();
3054        assert!(partial.supports_draft_updates());
3055    }
3056
3057    #[test]
3058    fn multi_message_stream_mode_is_rejected() {
3059        let mut cfg = test_wecom_ws_config();
3060        cfg.stream_mode = StreamMode::MultiMessage;
3061        let err = match WeComWsChannel::new(&cfg, Path::new("/tmp")) {
3062            Ok(_) => panic!("multi_message should be rejected"),
3063            Err(err) => err.to_string(),
3064        };
3065        assert!(err.contains("multi_message is not supported"));
3066    }
3067
3068    #[tokio::test]
3069    async fn send_draft_returns_none_when_stream_mode_off() {
3070        let mut cfg = test_wecom_ws_config();
3071        cfg.stream_mode = StreamMode::Off;
3072        let channel = WeComWsChannel::new(&cfg, Path::new("/tmp")).unwrap();
3073
3074        let id = channel
3075            .send_draft(&SendMessage::new("draft", "user--zeroclaw_user"))
3076            .await
3077            .unwrap();
3078
3079        assert!(id.is_none());
3080    }
3081
3082    #[tokio::test]
3083    async fn send_draft_failure_does_not_record_req_id_mapping() {
3084        let channel = WeComWsChannel::new(&test_wecom_ws_config(), Path::new("/tmp")).unwrap();
3085        let result = channel
3086            .send_draft(
3087                &SendMessage::new("draft", "user--zeroclaw_user")
3088                    .in_thread(Some("req-draft".to_string())),
3089            )
3090            .await;
3091
3092        assert!(result.is_err());
3093        assert!(channel.req_id_map.lock().is_empty());
3094    }
3095
3096    #[tokio::test]
3097    async fn finalize_draft_failure_cleans_req_id_mapping() {
3098        let channel = WeComWsChannel::new(&test_wecom_ws_config(), Path::new("/tmp")).unwrap();
3099        channel
3100            .req_id_map
3101            .lock()
3102            .insert("stream-1".to_string(), "req-finalize".to_string());
3103
3104        let result = channel
3105            .finalize_draft("user--zeroclaw_user", "stream-1", "final")
3106            .await;
3107
3108        assert!(result.is_err());
3109        assert!(channel.req_id_map.lock().is_empty());
3110    }
3111
3112    #[tokio::test]
3113    async fn send_with_req_id_uses_respond_msg_when_stream_mode_off() {
3114        let mut cfg = test_wecom_ws_config();
3115        cfg.stream_mode = StreamMode::Off;
3116        let channel = WeComWsChannel::new(&cfg, Path::new("/tmp")).unwrap();
3117
3118        let (ws_tx, mut ws_rx) = mpsc::channel::<WsOutbound>(4);
3119        *channel.ws_tx.lock().await = Some(ws_tx);
3120
3121        let responder_channel = channel.clone();
3122        let responder = tokio::spawn(async move {
3123            let Some(WsOutbound::Frame(frame)) = ws_rx.recv().await else {
3124                panic!("expected respond_msg frame");
3125            };
3126            let req_id = frame
3127                .get("headers")
3128                .and_then(|headers| headers.get("req_id"))
3129                .and_then(Value::as_str)
3130                .unwrap_or("")
3131                .to_string();
3132            responder_channel
3133                .maybe_handle_command_response(&serde_json::json!({
3134                    "headers": { "req_id": req_id },
3135                    "errcode": 0,
3136                    "errmsg": "ok"
3137                }))
3138                .await;
3139            frame
3140        });
3141
3142        channel
3143            .send(
3144                &SendMessage::new("runtime ok", "user--zeroclaw_user")
3145                    .in_thread(Some("req-runtime".to_string())),
3146            )
3147            .await
3148            .unwrap();
3149
3150        let frame = responder.await.unwrap();
3151        assert_eq!(
3152            frame.get("cmd").and_then(Value::as_str),
3153            Some("aibot_respond_msg")
3154        );
3155        assert_eq!(
3156            frame
3157                .get("headers")
3158                .and_then(|headers| headers.get("req_id"))
3159                .and_then(Value::as_str),
3160            Some("req-runtime")
3161        );
3162        assert_eq!(
3163            frame
3164                .pointer("/body/stream/content")
3165                .and_then(Value::as_str),
3166            Some("runtime ok")
3167        );
3168        assert_eq!(
3169            frame
3170                .pointer("/body/stream/finish")
3171                .and_then(Value::as_bool),
3172            Some(true)
3173        );
3174    }
3175
3176    #[tokio::test]
3177    async fn send_without_req_id_uses_send_msg() {
3178        let channel = WeComWsChannel::new(&test_wecom_ws_config(), Path::new("/tmp")).unwrap();
3179
3180        let (ws_tx, mut ws_rx) = mpsc::channel::<WsOutbound>(4);
3181        *channel.ws_tx.lock().await = Some(ws_tx);
3182
3183        let responder_channel = channel.clone();
3184        let responder = tokio::spawn(async move {
3185            let Some(WsOutbound::Frame(frame)) = ws_rx.recv().await else {
3186                panic!("expected send_msg frame");
3187            };
3188            let req_id = frame
3189                .get("headers")
3190                .and_then(|headers| headers.get("req_id"))
3191                .and_then(Value::as_str)
3192                .unwrap_or("")
3193                .to_string();
3194            responder_channel
3195                .maybe_handle_command_response(&serde_json::json!({
3196                    "headers": { "req_id": req_id },
3197                    "errcode": 0,
3198                    "errmsg": "ok"
3199                }))
3200                .await;
3201            frame
3202        });
3203
3204        channel
3205            .send(&SendMessage::new("hello proactive", "user--zeroclaw_user"))
3206            .await
3207            .unwrap();
3208
3209        let frame = responder.await.unwrap();
3210        assert_eq!(
3211            frame.get("cmd").and_then(Value::as_str),
3212            Some("aibot_send_msg")
3213        );
3214        assert_eq!(
3215            frame
3216                .pointer("/body/markdown/content")
3217                .and_then(Value::as_str),
3218            Some("hello proactive")
3219        );
3220    }
3221
3222    #[tokio::test]
3223    async fn command_response_resolves_waiter_successfully() {
3224        let config = test_wecom_ws_config();
3225        let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3226
3227        let (waiter, rx) = tokio::sync::oneshot::channel();
3228        channel
3229            .pending_responses
3230            .lock()
3231            .await
3232            .insert("req-ok".to_string(), waiter);
3233
3234        assert!(
3235            channel
3236                .maybe_handle_command_response(&serde_json::json!({
3237                    "headers": { "req_id": "req-ok" },
3238                    "errcode": 0,
3239                    "errmsg": "ok"
3240                }))
3241                .await
3242        );
3243        assert!(rx.await.unwrap().is_ok());
3244    }
3245
3246    #[tokio::test]
3247    async fn command_response_resolves_waiter_failure() {
3248        let config = test_wecom_ws_config();
3249        let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3250
3251        let (waiter, rx) = tokio::sync::oneshot::channel();
3252        channel
3253            .pending_responses
3254            .lock()
3255            .await
3256            .insert("req-fail".to_string(), waiter);
3257
3258        assert!(
3259            channel
3260                .maybe_handle_command_response(&serde_json::json!({
3261                    "headers": { "req_id": "req-fail" },
3262                    "errcode": 93001,
3263                    "errmsg": "session not allowed"
3264                }))
3265                .await
3266        );
3267        let err = rx.await.unwrap().unwrap_err().to_string();
3268        assert!(err.contains("errcode=93001"));
3269        assert!(err.contains("session not allowed"));
3270    }
3271
3272    #[tokio::test]
3273    async fn handle_ws_message_consumes_command_ack_without_forwarding() {
3274        let config = test_wecom_ws_config();
3275        let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3276
3277        let (waiter, ack_rx) = tokio::sync::oneshot::channel();
3278        channel
3279            .pending_responses
3280            .lock()
3281            .await
3282            .insert("req-ack".to_string(), waiter);
3283
3284        let (tx, mut rx) = mpsc::channel::<ChannelMessage>(1);
3285        let should_reconnect = channel
3286            .handle_ws_message(
3287                serde_json::json!({
3288                    "cmd": "aibot_respond_msg",
3289                    "headers": { "req_id": "req-ack" },
3290                    "errcode": 0,
3291                    "errmsg": "ok"
3292                }),
3293                &tx,
3294            )
3295            .await;
3296
3297        assert!(!should_reconnect);
3298        assert!(ack_rx.await.unwrap().is_ok());
3299        assert!(
3300            tokio::time::timeout(Duration::from_millis(100), rx.recv())
3301                .await
3302                .is_err(),
3303            "command ack must not be forwarded as an inbound channel message"
3304        );
3305    }
3306
3307    #[tokio::test]
3308    async fn clear_command_forwards_runtime_new_session_without_immediate_ws_reply() {
3309        let mut config = test_wecom_ws_config();
3310        config.allowed_users = vec!["zeroclaw_user".to_string()];
3311        let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3312
3313        let (ws_tx, mut ws_rx) = mpsc::channel::<WsOutbound>(1);
3314        *channel.ws_tx.lock().await = Some(ws_tx);
3315
3316        let (tx, mut rx) = mpsc::channel::<ChannelMessage>(1);
3317        channel
3318            .handle_msg_callback(
3319                serde_json::json!({
3320                    "headers": { "req_id": "req-clear" },
3321                    "body": {
3322                        "msgtype": "text",
3323                        "msgid": "msg-clear",
3324                        "chattype": "single",
3325                        "from": { "userid": "zeroclaw_user" },
3326                        "text": { "content": "/clear" }
3327                    }
3328                }),
3329                &tx,
3330            )
3331            .await;
3332
3333        let forwarded = tokio::time::timeout(Duration::from_millis(100), rx.recv())
3334            .await
3335            .expect("clear command should be forwarded promptly")
3336            .expect("clear command should produce a framework message");
3337        assert_eq!(forwarded.content, "/new");
3338        assert_eq!(forwarded.thread_ts.as_deref(), Some("req-clear"));
3339
3340        assert!(
3341            tokio::time::timeout(Duration::from_millis(100), ws_rx.recv())
3342                .await
3343                .is_err(),
3344            "clear command should not emit an immediate websocket reply"
3345        );
3346    }
3347
3348    #[tokio::test]
3349    async fn clear_command_ws_dispatch_does_not_block_when_framework_queue_is_full() {
3350        let mut config = test_wecom_ws_config();
3351        config.allowed_users = vec!["zeroclaw_user".to_string()];
3352        let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3353
3354        let (tx, mut rx) = mpsc::channel::<ChannelMessage>(1);
3355        tx.send(ChannelMessage::new(
3356            "prefill-clear",
3357            "tester",
3358            "user--zeroclaw_user",
3359            "prefill",
3360            "wecom_ws",
3361            bytes_timestamp_now(),
3362        ))
3363        .await
3364        .unwrap();
3365
3366        let should_reconnect = tokio::time::timeout(
3367            Duration::from_millis(100),
3368            channel.handle_ws_message(
3369                serde_json::json!({
3370                    "cmd": "aibot_msg_callback",
3371                    "headers": { "req_id": "req-clear-dispatch" },
3372                    "body": {
3373                        "msgtype": "text",
3374                        "msgid": "msg-clear-dispatch",
3375                        "chattype": "single",
3376                        "from": { "userid": "zeroclaw_user" },
3377                        "text": { "content": "/clear" }
3378                    }
3379                }),
3380                &tx,
3381            ),
3382        )
3383        .await
3384        .expect("clear dispatch should not block the websocket loop");
3385
3386        assert!(!should_reconnect);
3387
3388        let first = tokio::time::timeout(Duration::from_millis(100), rx.recv())
3389            .await
3390            .expect("prefilled framework message should be readable")
3391            .expect("prefilled framework message should exist");
3392        assert_eq!(first.id, "prefill-clear");
3393
3394        let forwarded = tokio::time::timeout(Duration::from_millis(100), rx.recv())
3395            .await
3396            .expect("clear command should forward once queue space is available")
3397            .expect("clear command should produce a framework message");
3398        assert_eq!(forwarded.content, "/new");
3399        assert_eq!(forwarded.thread_ts.as_deref(), Some("req-clear-dispatch"));
3400    }
3401
3402    #[tokio::test]
3403    async fn unauthorized_group_message_replies_with_chatid_and_does_not_forward() {
3404        let config = test_wecom_ws_config();
3405        let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3406
3407        let (ws_tx, mut ws_rx) = mpsc::channel::<WsOutbound>(4);
3408        *channel.ws_tx.lock().await = Some(ws_tx);
3409
3410        let responder_channel = channel.clone();
3411        let responder = tokio::spawn(async move {
3412            let Some(WsOutbound::Frame(frame)) = ws_rx.recv().await else {
3413                panic!("expected access-denied response frame");
3414            };
3415            let req_id = frame
3416                .get("headers")
3417                .and_then(|headers| headers.get("req_id"))
3418                .and_then(Value::as_str)
3419                .unwrap_or("")
3420                .to_string();
3421            let content = frame
3422                .pointer("/body/stream/content")
3423                .and_then(Value::as_str)
3424                .unwrap_or("")
3425                .to_string();
3426            responder_channel
3427                .maybe_handle_command_response(&serde_json::json!({
3428                    "headers": { "req_id": req_id },
3429                    "errcode": 0,
3430                    "errmsg": "ok"
3431                }))
3432                .await;
3433            content
3434        });
3435
3436        let (tx, mut rx) = mpsc::channel::<ChannelMessage>(1);
3437        channel
3438            .handle_msg_callback(
3439                serde_json::json!({
3440                    "headers": { "req_id": "req-denied" },
3441                    "body": {
3442                        "msgtype": "text",
3443                        "msgid": "msg-denied",
3444                        "chattype": "group",
3445                        "chatid": "zeroclaw_group",
3446                        "from": { "userid": "zeroclaw_user" },
3447                        "text": { "content": "@bot hello" }
3448                    }
3449                }),
3450                &tx,
3451            )
3452            .await;
3453
3454        assert!(
3455            tokio::time::timeout(Duration::from_millis(100), rx.recv())
3456                .await
3457                .is_err(),
3458            "unauthorized message must not reach framework"
3459        );
3460
3461        let denied = responder.await.unwrap();
3462        assert!(denied.contains("zeroclaw_group"));
3463        assert!(denied.contains("zeroclaw_user"));
3464        assert!(denied.contains("allowed_groups"));
3465    }
3466
3467    #[tokio::test]
3468    async fn unauthorized_message_ws_dispatch_returns_without_waiting_for_ack() {
3469        let config = test_wecom_ws_config();
3470        let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3471
3472        let (ws_tx, mut ws_rx) = mpsc::channel::<WsOutbound>(4);
3473        *channel.ws_tx.lock().await = Some(ws_tx);
3474
3475        let (tx, mut rx) = mpsc::channel::<ChannelMessage>(1);
3476        let should_reconnect = tokio::time::timeout(
3477            Duration::from_millis(100),
3478            channel.handle_ws_message(
3479                serde_json::json!({
3480                    "cmd": "aibot_msg_callback",
3481                    "headers": { "req_id": "req-denied-no-ack" },
3482                    "body": {
3483                        "msgtype": "text",
3484                        "msgid": "msg-denied-no-ack",
3485                        "chattype": "single",
3486                        "from": { "userid": "zeroclaw_user" },
3487                        "text": { "content": "@bot hello" }
3488                    }
3489                }),
3490                &tx,
3491            ),
3492        )
3493        .await
3494        .expect("access-denied dispatch should not block on websocket ack");
3495
3496        assert!(!should_reconnect);
3497
3498        assert!(
3499            tokio::time::timeout(Duration::from_millis(100), rx.recv())
3500                .await
3501                .is_err(),
3502            "unauthorized message must not reach framework"
3503        );
3504
3505        let Some(WsOutbound::Frame(frame)) =
3506            tokio::time::timeout(Duration::from_millis(100), ws_rx.recv())
3507                .await
3508                .expect("access-denied reply should be queued promptly")
3509        else {
3510            panic!("expected access-denied response frame");
3511        };
3512
3513        assert_eq!(
3514            frame.get("cmd").and_then(Value::as_str),
3515            Some("aibot_respond_msg")
3516        );
3517        assert_eq!(
3518            frame
3519                .get("headers")
3520                .and_then(|headers| headers.get("req_id"))
3521                .and_then(Value::as_str),
3522            Some("req-denied-no-ack")
3523        );
3524        assert!(
3525            frame
3526                .pointer("/body/stream/content")
3527                .and_then(Value::as_str)
3528                .is_some_and(|content| content.contains("allowed_users")),
3529            "access-denied reply should explain how to configure the allowlist"
3530        );
3531    }
3532
3533    #[tokio::test]
3534    async fn stream_reply_retries_data_version_conflict() {
3535        let config = test_wecom_ws_config();
3536        let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3537
3538        let (tx, mut rx) = mpsc::channel::<WsOutbound>(8);
3539        *channel.ws_tx.lock().await = Some(tx);
3540
3541        let attempts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
3542        let responder_channel = channel.clone();
3543        let responder_attempts = Arc::clone(&attempts);
3544        let responder = tokio::spawn(async move {
3545            while let Some(WsOutbound::Frame(frame)) = rx.recv().await {
3546                let attempt = responder_attempts.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
3547                let req_id = frame
3548                    .get("headers")
3549                    .and_then(|headers| headers.get("req_id"))
3550                    .and_then(Value::as_str)
3551                    .unwrap_or("")
3552                    .to_string();
3553
3554                let errcode = if attempt == 0 { 6000 } else { 0 };
3555                let errmsg = if errcode == 0 {
3556                    "ok"
3557                } else {
3558                    "more than one callers at the same time, data version conflict"
3559                };
3560                responder_channel
3561                    .maybe_handle_command_response(&serde_json::json!({
3562                        "headers": { "req_id": req_id },
3563                        "errcode": errcode,
3564                        "errmsg": errmsg
3565                    }))
3566                    .await;
3567
3568                if errcode == 0 {
3569                    break;
3570                }
3571            }
3572        });
3573
3574        channel
3575            .ws_send_respond_msg("req-stream", "stream-1", "hello", false)
3576            .await
3577            .unwrap();
3578
3579        responder.await.unwrap();
3580        assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 2);
3581    }
3582
3583    #[tokio::test]
3584    async fn stream_reply_serializes_same_req_id_updates() {
3585        let config = test_wecom_ws_config();
3586        let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3587
3588        let (tx, mut rx) = mpsc::channel::<WsOutbound>(8);
3589        *channel.ws_tx.lock().await = Some(tx);
3590
3591        let first_channel = channel.clone();
3592        let first = tokio::spawn(async move {
3593            first_channel
3594                .ws_send_respond_msg("req-serial", "stream-1", "first", false)
3595                .await
3596        });
3597
3598        let second_channel = channel.clone();
3599        let second = tokio::spawn(async move {
3600            second_channel
3601                .ws_send_respond_msg("req-serial", "stream-1", "second", false)
3602                .await
3603        });
3604
3605        let first_frame = tokio::time::timeout(Duration::from_millis(250), rx.recv())
3606            .await
3607            .expect("first frame should arrive")
3608            .expect("first frame should exist");
3609        let WsOutbound::Frame(first_frame) = first_frame;
3610        assert_eq!(
3611            first_frame
3612                .get("body")
3613                .and_then(|body| body.get("stream"))
3614                .and_then(|stream| stream.get("content"))
3615                .and_then(Value::as_str),
3616            Some("first")
3617        );
3618
3619        assert!(
3620            tokio::time::timeout(Duration::from_millis(75), rx.recv())
3621                .await
3622                .is_err(),
3623            "second frame should wait for the first ack"
3624        );
3625
3626        channel
3627            .maybe_handle_command_response(&serde_json::json!({
3628                "headers": { "req_id": "req-serial" },
3629                "errcode": 0,
3630                "errmsg": "ok"
3631            }))
3632            .await;
3633        first.await.unwrap().unwrap();
3634
3635        let second_frame = tokio::time::timeout(Duration::from_millis(250), rx.recv())
3636            .await
3637            .expect("second frame should arrive after first ack")
3638            .expect("second frame should exist");
3639        let WsOutbound::Frame(second_frame) = second_frame;
3640        assert_eq!(
3641            second_frame
3642                .get("body")
3643                .and_then(|body| body.get("stream"))
3644                .and_then(|stream| stream.get("content"))
3645                .and_then(Value::as_str),
3646            Some("second")
3647        );
3648
3649        channel
3650            .maybe_handle_command_response(&serde_json::json!({
3651                "headers": { "req_id": "req-serial" },
3652                "errcode": 0,
3653                "errmsg": "ok"
3654            }))
3655            .await;
3656        second.await.unwrap().unwrap();
3657    }
3658}