Skip to main content

zeroclaw_channels/
qq.rs

1use async_trait::async_trait;
2use base64::Engine as _;
3use futures_util::{SinkExt, StreamExt};
4use serde::Deserialize;
5use serde_json::json;
6use sha2::{Digest, Sha256};
7use std::collections::{HashMap, HashSet};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use tokio_tungstenite::tungstenite::Message;
12use uuid::Uuid;
13use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
14
15const QQ_API_BASE: &str = "https://api.sgroup.qq.com";
16const QQ_AUTH_URL: &str = "https://bots.qq.com/app/getAppAccessToken";
17
18/// Maximum upload size for QQ media files (10 MB).
19const QQ_MAX_UPLOAD_BYTES: u64 = 10 * 1024 * 1024;
20
21/// Maximum entries in the upload cache before eviction.
22const UPLOAD_CACHE_CAPACITY: usize = 500;
23
24/// QQ API media file types.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26enum QQMediaFileType {
27    /// Image (png, jpg, gif, etc.)
28    Image = 1,
29    /// Video (mp4, mov, etc.)
30    Video = 2,
31    /// Voice — only natively supported formats (.wav, .mp3, .silk).
32    /// Non-native audio formats degrade to `File` instead.
33    /// Note: The TS openclaw-qqbot uses silk-wasm + ffmpeg for full format
34    /// transcoding; Rust version avoids heavyweight dependencies and only
35    /// passes through natively supported formats.
36    Voice = 3,
37    /// File (pdf, zip, or any non-native audio format)
38    File = 4,
39}
40
41/// A parsed media attachment from `[TYPE:target]` markers.
42#[derive(Debug, Clone, PartialEq, Eq)]
43struct QQMediaAttachment {
44    kind: QQMediaFileType,
45    target: String,
46}
47
48/// Response from QQ media upload API.
49#[derive(Debug, Deserialize)]
50struct QQUploadResponse {
51    file_info: String,
52    ttl: Option<u64>,
53}
54
55/// Cached upload entry to avoid re-uploading the same file within TTL.
56struct UploadCacheEntry {
57    file_info: String,
58    expires_at: u64,
59}
60
61fn ensure_https(url: &str) -> anyhow::Result<()> {
62    if !url.starts_with("https://") {
63        anyhow::bail!(
64            "Refusing to transmit sensitive data over non-HTTPS URL: URL scheme must be https"
65        );
66    }
67    Ok(())
68}
69
70/// Check whether a file extension is a natively supported QQ voice format.
71fn is_native_voice_ext(ext: &str) -> bool {
72    matches!(ext.to_ascii_lowercase().as_str(), "wav" | "mp3" | "silk")
73}
74
75/// Map a `[TYPE:target]` marker kind string to `QQMediaFileType`.
76///
77/// For AUDIO/VOICE types, the target's extension determines whether it's
78/// sent as `Voice` (native formats only) or degrades to `File`.
79fn marker_kind_to_qq_file_type(marker: &str, target: &str) -> Option<QQMediaFileType> {
80    match marker.trim().to_ascii_uppercase().as_str() {
81        "IMAGE" | "PHOTO" => Some(QQMediaFileType::Image),
82        "DOCUMENT" | "FILE" => Some(QQMediaFileType::File),
83        "VIDEO" => Some(QQMediaFileType::Video),
84        "AUDIO" | "VOICE" => {
85            let ext = Path::new(target.split('?').next().unwrap_or(target))
86                .extension()
87                .and_then(|e| e.to_str())
88                .unwrap_or("");
89            if is_native_voice_ext(ext) {
90                Some(QQMediaFileType::Voice)
91            } else {
92                Some(QQMediaFileType::File)
93            }
94        }
95        _ => None,
96    }
97}
98
99/// Find the matching closing bracket, handling nested brackets.
100fn find_matching_close(s: &str) -> Option<usize> {
101    let mut depth = 1usize;
102    for (i, ch) in s.char_indices() {
103        match ch {
104            '[' => depth += 1,
105            ']' => {
106                depth -= 1;
107                if depth == 0 {
108                    return Some(i);
109                }
110            }
111            _ => {}
112        }
113    }
114    None
115}
116
117/// Parse `[TYPE:target]` attachment markers from message content.
118///
119/// Returns the cleaned text (markers removed) and a list of parsed attachments.
120/// Uses the same bracket-matching logic as `telegram.rs::parse_attachment_markers`.
121fn parse_qq_attachment_markers(content: &str) -> (String, Vec<QQMediaAttachment>) {
122    let mut cleaned = String::with_capacity(content.len());
123    let mut attachments = Vec::new();
124    let mut cursor = 0;
125
126    while cursor < content.len() {
127        let Some(open_rel) = content[cursor..].find('[') else {
128            cleaned.push_str(&content[cursor..]);
129            break;
130        };
131
132        let open = cursor + open_rel;
133        cleaned.push_str(&content[cursor..open]);
134
135        let Some(close_rel) = find_matching_close(&content[open + 1..]) else {
136            cleaned.push_str(&content[open..]);
137            break;
138        };
139
140        let close = open + 1 + close_rel;
141        let marker = &content[open + 1..close];
142
143        let parsed = marker.split_once(':').and_then(|(kind, target)| {
144            let target = target.trim();
145            if target.is_empty() {
146                return None;
147            }
148            let file_type = marker_kind_to_qq_file_type(kind, target)?;
149            Some(QQMediaAttachment {
150                kind: file_type,
151                target: target.to_string(),
152            })
153        });
154
155        if let Some(attachment) = parsed {
156            attachments.push(attachment);
157        } else {
158            cleaned.push_str(&content[open..=close]);
159        }
160
161        cursor = close + 1;
162    }
163
164    (cleaned.trim().to_string(), attachments)
165}
166
167/// Infer attachment type marker from content_type or filename.
168fn infer_attachment_marker(content_type: &str, filename: &str) -> &'static str {
169    let ct = content_type.to_ascii_lowercase();
170    if ct.starts_with("image/") {
171        return "IMAGE";
172    }
173    if ct.starts_with("audio/") || ct.contains("voice") {
174        return "VOICE";
175    }
176    if ct.starts_with("video/") {
177        return "VIDEO";
178    }
179
180    // Fallback to extension
181    let lower = filename.to_ascii_lowercase();
182    if lower.ends_with(".png")
183        || lower.ends_with(".jpg")
184        || lower.ends_with(".jpeg")
185        || lower.ends_with(".gif")
186        || lower.ends_with(".webp")
187        || lower.ends_with(".bmp")
188        || lower.ends_with(".heic")
189        || lower.ends_with(".heif")
190        || lower.ends_with(".svg")
191    {
192        return "IMAGE";
193    }
194    if lower.ends_with(".mp3")
195        || lower.ends_with(".wav")
196        || lower.ends_with(".silk")
197        || lower.ends_with(".ogg")
198        || lower.ends_with(".flac")
199        || lower.ends_with(".m4a")
200    {
201        return "VOICE";
202    }
203    if lower.ends_with(".mp4")
204        || lower.ends_with(".mov")
205        || lower.ends_with(".mkv")
206        || lower.ends_with(".avi")
207        || lower.ends_with(".webm")
208    {
209        return "VIDEO";
210    }
211    "DOCUMENT"
212}
213
214/// Fix QQ attachment URLs that start with `//` (missing scheme).
215fn fix_qq_url(url: &str) -> String {
216    let trimmed = url.trim();
217    if trimmed.starts_with("//") {
218        format!("https:{trimmed}")
219    } else {
220        trimmed.to_string()
221    }
222}
223
224/// Generate a message sequence number for QQ API requests.
225/// Based on timestamp low bits XOR random, range 0~65535.
226fn next_msg_seq() -> u32 {
227    #[allow(clippy::cast_possible_truncation)]
228    let time_part = (std::time::SystemTime::now()
229        .duration_since(std::time::UNIX_EPOCH)
230        .unwrap_or_default()
231        .as_millis() as u32)
232        % 100_000_000;
233    let random = u32::from(rand::random::<u16>());
234    (time_part ^ random) % 65536
235}
236
237/// Get current unix timestamp in seconds.
238fn now_secs() -> u64 {
239    std::time::SystemTime::now()
240        .duration_since(std::time::UNIX_EPOCH)
241        .unwrap_or_default()
242        .as_secs()
243}
244
245/// Deduplication set capacity — evict half of entries when full.
246const DEDUP_CAPACITY: usize = 10_000;
247
248/// Maximum number of retry attempts when fetching the access token.
249const AUTH_RETRY_MAX_ATTEMPTS: u32 = 4;
250
251/// Initial backoff delay for auth token retry (in milliseconds).
252const AUTH_RETRY_INITIAL_BACKOFF_MS: u64 = 500;
253
254/// Maximum backoff delay for auth token retry (in milliseconds).
255const AUTH_RETRY_MAX_BACKOFF_MS: u64 = 8_000;
256
257/// QQ Official Bot channel — uses Tencent's official QQ Bot API with
258/// OAuth2 authentication and a Discord-like WebSocket gateway protocol.
259pub struct QQChannel {
260    app_id: String,
261    app_secret: String,
262    /// The alias key under `[channels.qq.<alias>]` this handle is
263    /// bound to. Used to scope peer-group writes and resolver lookups.
264    alias: String,
265    /// Resolves inbound external peers from canonical state at message-time.
266    /// No cache (see AGENTS.md "ABSOLUTE RULE — SINGLE SOURCE OF TRUTH").
267    peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
268    /// Cached access token + expiry timestamp.
269    token_cache: Arc<RwLock<Option<(String, u64)>>>,
270    /// Message deduplication set.
271    dedup: Arc<RwLock<HashSet<String>>>,
272    /// Workspace directory for saving downloaded attachments.
273    workspace_dir: Option<PathBuf>,
274    /// Upload cache: avoids re-uploading the same file within TTL.
275    upload_cache: Arc<RwLock<HashMap<String, UploadCacheEntry>>>,
276    /// Per-channel proxy URL override.
277    proxy_url: Option<String>,
278    /// Session ID from the last READY event, used for gateway resume (opcode 6).
279    session_id: Arc<RwLock<Option<String>>>,
280    /// Last sequence number received, used for gateway resume (opcode 6).
281    last_sequence: Arc<RwLock<Option<i64>>>,
282}
283
284impl QQChannel {
285    pub fn new(
286        app_id: String,
287        app_secret: String,
288        alias: impl Into<String>,
289        peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
290    ) -> Self {
291        Self {
292            app_id,
293            app_secret,
294            alias: alias.into(),
295            peer_resolver,
296            token_cache: Arc::new(RwLock::new(None)),
297            dedup: Arc::new(RwLock::new(HashSet::new())),
298            workspace_dir: None,
299            upload_cache: Arc::new(RwLock::new(HashMap::new())),
300            proxy_url: None,
301            session_id: Arc::new(RwLock::new(None)),
302            last_sequence: Arc::new(RwLock::new(None)),
303        }
304    }
305
306    /// Return the alias under `[channels.qq.<alias>]` that this
307    /// channel handle is bound to.
308    pub fn alias(&self) -> &str {
309        &self.alias
310    }
311
312    /// Configure workspace directory for saving downloaded attachments.
313    pub fn with_workspace_dir(mut self, dir: PathBuf) -> Self {
314        self.workspace_dir = Some(dir);
315        self
316    }
317
318    /// Set a per-channel proxy URL that overrides the global proxy config.
319    pub fn with_proxy_url(mut self, proxy_url: Option<String>) -> Self {
320        self.proxy_url = proxy_url;
321        self
322    }
323
324    fn http_client(&self) -> reqwest::Client {
325        zeroclaw_config::schema::build_channel_proxy_client("channel.qq", self.proxy_url.as_deref())
326    }
327
328    fn is_user_allowed(&self, user_id: &str) -> bool {
329        let peers = (self.peer_resolver)();
330        crate::allowlist::is_user_allowed(&peers, user_id, crate::allowlist::Match::Sensitive)
331    }
332
333    /// Fetch an access token from QQ's OAuth2 endpoint.
334    async fn fetch_access_token(&self) -> anyhow::Result<(String, u64)> {
335        let body = json!({
336            "appId": self.app_id,
337            "clientSecret": self.app_secret,
338        });
339
340        let resp = self
341            .http_client()
342            .post(QQ_AUTH_URL)
343            .json(&body)
344            .send()
345            .await?;
346
347        if !resp.status().is_success() {
348            let status = resp.status();
349            let err = resp.text().await.unwrap_or_default();
350            anyhow::bail!("QQ token request failed ({status}): {err}");
351        }
352
353        let data: serde_json::Value = resp.json().await?;
354        let token = data
355            .get("access_token")
356            .and_then(|t| t.as_str())
357            .ok_or_else(|| {
358                ::zeroclaw_log::record!(
359                    WARN,
360                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
361                        .with_outcome(::zeroclaw_log::EventOutcome::Failure),
362                    "Missing access_token in QQ response"
363                );
364                anyhow::Error::msg("Missing access_token in QQ response")
365            })?
366            .to_string();
367
368        let expires_in = data
369            .get("expires_in")
370            .and_then(|e| e.as_str())
371            .and_then(|e| e.parse::<u64>().ok())
372            .unwrap_or(7200);
373
374        let now = std::time::SystemTime::now()
375            .duration_since(std::time::UNIX_EPOCH)
376            .unwrap_or_default()
377            .as_secs();
378
379        // Expire 60 seconds early to avoid edge cases
380        let expiry = now + expires_in.saturating_sub(60);
381
382        Ok((token, expiry))
383    }
384
385    /// Fetch an access token with retry and exponential backoff.
386    ///
387    /// Transient failures (network errors, 5xx responses) during reconnection
388    /// can cause the entire recovery loop to fail. This method retries up to
389    /// `AUTH_RETRY_MAX_ATTEMPTS` times with exponential backoff + jitter so
390    /// that a single transient error doesn't permanently break the reconnect
391    /// flow.
392    async fn fetch_access_token_with_retry(&self) -> anyhow::Result<(String, u64)> {
393        let mut backoff_ms = AUTH_RETRY_INITIAL_BACKOFF_MS;
394        let mut last_err = None;
395
396        for attempt in 1..=AUTH_RETRY_MAX_ATTEMPTS {
397            match self.fetch_access_token().await {
398                Ok(result) => {
399                    if attempt > 1 {
400                        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"attempt": attempt, "AUTH_RETRY_MAX_ATTEMPTS": AUTH_RETRY_MAX_ATTEMPTS})), "getAppAccessToken succeeded on attempt /");
401                    }
402                    return Ok(result);
403                }
404                Err(e) => {
405                    ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"attempt": attempt, "AUTH_RETRY_MAX_ATTEMPTS": AUTH_RETRY_MAX_ATTEMPTS, "e": e.to_string()})), "getAppAccessToken failed (attempt /)");
406                    last_err = Some(e);
407
408                    if attempt < AUTH_RETRY_MAX_ATTEMPTS {
409                        // Add jitter: 75%-125% of base backoff
410                        let jitter_factor = 0.75 + (rand::random::<f64>() * 0.5);
411                        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
412                        let sleep_ms = (backoff_ms as f64 * jitter_factor) as u64;
413                        tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
414                        backoff_ms = (backoff_ms * 2).min(AUTH_RETRY_MAX_BACKOFF_MS);
415                    }
416                }
417            }
418        }
419
420        Err(last_err.unwrap_or_else(|| {
421            ::zeroclaw_log::record!(
422                ERROR,
423                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
424                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
425                    .with_attrs(::serde_json::json!({
426                        "phase": "getAppAccessToken",
427                        "max_attempts": AUTH_RETRY_MAX_ATTEMPTS,
428                    })),
429                "qq: getAppAccessToken exhausted retries"
430            );
431            anyhow::Error::msg(format!(
432                "getAppAccessToken failed after {AUTH_RETRY_MAX_ATTEMPTS} attempts"
433            ))
434        }))
435    }
436
437    /// Get a valid access token, refreshing if expired.
438    async fn get_token(&self) -> anyhow::Result<String> {
439        let now = std::time::SystemTime::now()
440            .duration_since(std::time::UNIX_EPOCH)
441            .unwrap_or_default()
442            .as_secs();
443
444        {
445            let cache = self.token_cache.read().await;
446            if let Some((ref token, expiry)) = *cache
447                && now < expiry
448            {
449                return Ok(token.clone());
450            }
451        }
452
453        let (token, expiry) = self.fetch_access_token_with_retry().await?;
454        {
455            let mut cache = self.token_cache.write().await;
456            *cache = Some((token.clone(), expiry));
457        }
458        Ok(token)
459    }
460
461    /// Get the WebSocket gateway URL.
462    async fn get_gateway_url(&self, token: &str) -> anyhow::Result<String> {
463        let resp = self
464            .http_client()
465            .get(format!("{QQ_API_BASE}/gateway"))
466            .header("Authorization", format!("QQBot {token}"))
467            .send()
468            .await?;
469
470        if !resp.status().is_success() {
471            let status = resp.status();
472            let err = resp.text().await.unwrap_or_default();
473            anyhow::bail!("QQ gateway request failed ({status}): {err}");
474        }
475
476        let data: serde_json::Value = resp.json().await?;
477        let url = data
478            .get("url")
479            .and_then(|u| u.as_str())
480            .ok_or_else(|| {
481                ::zeroclaw_log::record!(
482                    WARN,
483                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
484                        .with_outcome(::zeroclaw_log::EventOutcome::Failure),
485                    "Missing gateway URL in QQ response"
486                );
487                anyhow::Error::msg("Missing gateway URL in QQ response")
488            })?
489            .to_string();
490
491        Ok(url)
492    }
493
494    /// Check and insert message ID for deduplication.
495    async fn is_duplicate(&self, msg_id: &str) -> bool {
496        if msg_id.is_empty() {
497            return false;
498        }
499
500        let mut dedup = self.dedup.write().await;
501
502        if dedup.contains(msg_id) {
503            return true;
504        }
505
506        // Evict oldest half when at capacity
507        if dedup.len() >= DEDUP_CAPACITY {
508            let to_remove: Vec<String> = dedup.iter().take(DEDUP_CAPACITY / 2).cloned().collect();
509            for key in to_remove {
510                dedup.remove(&key);
511            }
512        }
513
514        dedup.insert(msg_id.to_string());
515        false
516    }
517
518    /// Build upload cache key from file content hash.
519    fn upload_cache_key(
520        file_data: &[u8],
521        scope: &str,
522        target_id: &str,
523        file_type: QQMediaFileType,
524    ) -> String {
525        let mut hasher = Sha256::new();
526        hasher.update(file_data);
527        let hash = format!("{:x}", hasher.finalize());
528        format!("{hash}:{scope}:{target_id}:{}", file_type as u8)
529    }
530
531    /// Look up a cached file_info, returning it if still valid.
532    async fn get_cached_upload(&self, cache_key: &str) -> Option<String> {
533        let cache = self.upload_cache.read().await;
534        if let Some(entry) = cache.get(cache_key) {
535            // TTL safety margin: expire 60s early (same as TS version)
536            if now_secs() + 60 < entry.expires_at {
537                return Some(entry.file_info.clone());
538            }
539        }
540        None
541    }
542
543    /// Store a file_info in the upload cache with TTL.
544    async fn set_cached_upload(&self, cache_key: String, file_info: String, ttl: u64) {
545        let mut cache = self.upload_cache.write().await;
546
547        // Evict expired entries if at capacity
548        if cache.len() >= UPLOAD_CACHE_CAPACITY {
549            let now = now_secs();
550            cache.retain(|_, v| v.expires_at > now);
551
552            // If still at capacity, evict half
553            if cache.len() >= UPLOAD_CACHE_CAPACITY {
554                let keys_to_remove: Vec<String> = cache
555                    .keys()
556                    .take(UPLOAD_CACHE_CAPACITY / 2)
557                    .cloned()
558                    .collect();
559                for key in keys_to_remove {
560                    cache.remove(&key);
561                }
562            }
563        }
564
565        cache.insert(
566            cache_key,
567            UploadCacheEntry {
568                file_info,
569                expires_at: now_secs() + ttl,
570            },
571        );
572    }
573
574    /// Resolve the API endpoint path components from a recipient string.
575    /// Returns (scope, id) where scope is "groups" or "users".
576    fn resolve_recipient(recipient: &str) -> (&str, String) {
577        if let Some(group_id) = recipient.strip_prefix("group:") {
578            ("groups", group_id.to_string())
579        } else {
580            let raw_uid = recipient.strip_prefix("user:").unwrap_or(recipient);
581            let user_id: String = raw_uid
582                .chars()
583                .filter(|c| c.is_alphanumeric() || *c == '_')
584                .collect();
585            ("users", user_id)
586        }
587    }
588
589    /// Upload media to QQ API and return file_info for sending.
590    ///
591    /// Supports two modes:
592    /// - URL upload: pass `url = Some(...)`, `file_data = None`
593    /// - Base64 upload: pass `file_data = Some(...)`, `url = None`
594    async fn upload_media(
595        &self,
596        recipient: &str,
597        file_type: QQMediaFileType,
598        url: Option<&str>,
599        file_data: Option<&str>,
600        file_name: Option<&str>,
601    ) -> anyhow::Result<(String, Option<u64>)> {
602        let token = self.get_token().await?;
603        let (scope, id) = Self::resolve_recipient(recipient);
604
605        let api_url = format!("{QQ_API_BASE}/v2/{scope}/{id}/files");
606        ensure_https(&api_url)?;
607
608        let mut body = json!({
609            "file_type": file_type as u8,
610            "srv_send_msg": false,
611        });
612
613        if let Some(u) = url {
614            body["url"] = json!(u);
615        }
616        if let Some(d) = file_data {
617            body["file_data"] = json!(d);
618        }
619        // QQ API uses file_name for File type to display the filename in chat
620        if file_type == QQMediaFileType::File
621            && let Some(name) = file_name
622        {
623            body["file_name"] = json!(name);
624        }
625
626        let resp = self
627            .http_client()
628            .post(&api_url)
629            .header("Authorization", format!("QQBot {token}"))
630            .json(&body)
631            .send()
632            .await?;
633
634        if !resp.status().is_success() {
635            let status = resp.status();
636            let err = resp.text().await.unwrap_or_default();
637            anyhow::bail!("QQ upload media failed ({status}): {err}");
638        }
639
640        let upload_resp: QQUploadResponse = resp.json().await?;
641        Ok((upload_resp.file_info, upload_resp.ttl))
642    }
643
644    /// Send a media message (msg_type=7) with an already-uploaded file_info.
645    async fn send_media_message(&self, recipient: &str, file_info: &str) -> anyhow::Result<()> {
646        let token = self.get_token().await?;
647        let (scope, id) = Self::resolve_recipient(recipient);
648
649        let url = format!("{QQ_API_BASE}/v2/{scope}/{id}/messages");
650        ensure_https(&url)?;
651
652        let body = json!({
653            "msg_type": 7,
654            "media": {
655                "file_info": file_info,
656            },
657            "msg_seq": next_msg_seq(),
658        });
659
660        let resp = self
661            .http_client()
662            .post(&url)
663            .header("Authorization", format!("QQBot {token}"))
664            .json(&body)
665            .send()
666            .await?;
667
668        if !resp.status().is_success() {
669            let status = resp.status();
670            let err = resp.text().await.unwrap_or_default();
671            anyhow::bail!("QQ send media message failed ({status}): {err}");
672        }
673
674        Ok(())
675    }
676
677    /// Send a single attachment: resolve local/URL, upload, then send.
678    async fn send_attachment(
679        &self,
680        recipient: &str,
681        attachment: &QQMediaAttachment,
682    ) -> anyhow::Result<()> {
683        let target = attachment.target.trim();
684
685        // Extract filename from target path/URL for File type display
686        let file_name = Path::new(target.split('?').next().unwrap_or(target))
687            .file_name()
688            .and_then(|n| n.to_str())
689            .map(|s| s.to_string());
690
691        if target.starts_with("http://") || target.starts_with("https://") {
692            // URL upload — no caching (remote content may change)
693            let (file_info, _ttl) = self
694                .upload_media(
695                    recipient,
696                    attachment.kind,
697                    Some(target),
698                    None,
699                    file_name.as_deref(),
700                )
701                .await?;
702            self.send_media_message(recipient, &file_info).await?;
703        } else {
704            // Local file upload
705            let path = Path::new(target);
706            if !path.exists() {
707                anyhow::bail!("QQ attachment path not found: {target}");
708            }
709
710            let metadata = tokio::fs::metadata(path).await?;
711            if metadata.len() > QQ_MAX_UPLOAD_BYTES {
712                anyhow::bail!(
713                    "QQ attachment too large ({} bytes, max {}): {target}",
714                    metadata.len(),
715                    QQ_MAX_UPLOAD_BYTES
716                );
717            }
718
719            let file_bytes = tokio::fs::read(path).await?;
720            let (scope_label, target_id) = Self::resolve_recipient(recipient);
721            let scope = if scope_label == "groups" {
722                "group"
723            } else {
724                "c2c"
725            };
726            let cache_key = Self::upload_cache_key(&file_bytes, scope, &target_id, attachment.kind);
727
728            // Check upload cache
729            if let Some(cached_file_info) = self.get_cached_upload(&cache_key).await {
730                ::zeroclaw_log::record!(
731                    DEBUG,
732                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
733                        .with_attrs(::serde_json::json!({"target": target})),
734                    "using cached upload for"
735                );
736                self.send_media_message(recipient, &cached_file_info)
737                    .await?;
738                return Ok(());
739            }
740
741            let b64 = base64::engine::general_purpose::STANDARD.encode(&file_bytes);
742            let (file_info, ttl) = self
743                .upload_media(
744                    recipient,
745                    attachment.kind,
746                    None,
747                    Some(&b64),
748                    file_name.as_deref(),
749                )
750                .await?;
751
752            // Cache the upload result
753            if let Some(ttl_secs) = ttl {
754                self.set_cached_upload(cache_key, file_info.clone(), ttl_secs)
755                    .await;
756            }
757
758            self.send_media_message(recipient, &file_info).await?;
759        }
760
761        Ok(())
762    }
763
764    /// Compose message content from an incoming QQ event payload.
765    ///
766    /// Handles all attachment types (not just images), downloads to workspace
767    /// if configured, and generates appropriate `[TYPE:path]` markers.
768    async fn compose_message_content(&self, payload: &serde_json::Value) -> Option<String> {
769        let text = payload
770            .get("content")
771            .and_then(|c| c.as_str())
772            .unwrap_or("")
773            .trim();
774
775        let mut markers: Vec<String> = Vec::new();
776        let mut voice_transcripts: Vec<String> = Vec::new();
777
778        if let Some(attachments) = payload.get("attachments").and_then(|a| a.as_array()) {
779            for att in attachments {
780                let url = match att.get("url").and_then(|u| u.as_str()) {
781                    Some(u) if !u.trim().is_empty() => fix_qq_url(u),
782                    _ => continue,
783                };
784
785                let content_type = att
786                    .get("content_type")
787                    .and_then(|ct| ct.as_str())
788                    .unwrap_or("");
789                let filename = att
790                    .get("filename")
791                    .and_then(|f| f.as_str())
792                    .unwrap_or("attachment");
793
794                let marker_type = infer_attachment_marker(content_type, filename);
795
796                // For voice attachments, prefer voice_wav_url (WAV format) over
797                // the default url (AMR/SILK). QQ provides this for direct use
798                // without transcoding. (aligned with openclaw-qqbot behavior)
799                let is_voice = content_type == "voice"
800                    || content_type.starts_with("audio/")
801                    || marker_type == "VOICE";
802                let (download_url, save_filename) = if is_voice {
803                    if let Some(wav_url) = att
804                        .get("voice_wav_url")
805                        .and_then(|u| u.as_str())
806                        .filter(|u| !u.trim().is_empty())
807                    {
808                        let fixed = fix_qq_url(wav_url);
809                        // Extract filename from WAV URL path
810                        let wav_name = Path::new(fixed.split('?').next().unwrap_or(&fixed))
811                            .file_name()
812                            .and_then(|n| n.to_str())
813                            .unwrap_or("voice.wav")
814                            .to_string();
815                        (fixed, wav_name)
816                    } else {
817                        (url.clone(), filename.to_string())
818                    }
819                } else {
820                    (url.clone(), filename.to_string())
821                };
822
823                // Try to download to workspace
824                let location = if let Some(ref ws) = self.workspace_dir {
825                    let dir = ws.join("qq_files");
826                    match self
827                        .download_attachment(&download_url, &dir, &save_filename)
828                        .await
829                    {
830                        Ok(local_path) => local_path.display().to_string(),
831                        Err(e) => {
832                            ::zeroclaw_log::record!(
833                                WARN,
834                                ::zeroclaw_log::Event::new(
835                                    module_path!(),
836                                    ::zeroclaw_log::Action::Note
837                                )
838                                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
839                                .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
840                                "failed to download attachment"
841                            );
842                            url.clone()
843                        }
844                    }
845                } else {
846                    url.clone()
847                };
848
849                if is_voice {
850                    // For voice: include ASR transcription text (aligned with
851                    // openclaw-qqbot format: "[语音消息] transcribed text")
852                    // Also keep the file path marker for future multimodal support
853                    markers.push(format!("[{marker_type}:{location}]"));
854                    if let Some(asr_text) = att
855                        .get("asr_refer_text")
856                        .and_then(|t| t.as_str())
857                        .map(|t| t.trim())
858                        .filter(|t| !t.is_empty())
859                    {
860                        voice_transcripts.push(asr_text.to_string());
861                    }
862                } else {
863                    markers.push(format!("[{marker_type}:{location}]"));
864                }
865            }
866        }
867
868        // Voice ASR transcription uses angle brackets to distinguish from
869        // [TYPE:target] media markers (which use square brackets)
870        let voice_text = match voice_transcripts.len() {
871            0 => String::new(),
872            1 => format!(
873                "<VOICE_TRANSCRIPTION>{}</VOICE_TRANSCRIPTION>",
874                voice_transcripts[0]
875            ),
876            _ => voice_transcripts
877                .iter()
878                .enumerate()
879                .map(|(i, t)| format!("<VOICE_TRANSCRIPTION_{i}>{t}</VOICE_TRANSCRIPTION_{i}>"))
880                .collect::<Vec<_>>()
881                .join("\n"),
882        };
883
884        let mut parts: Vec<&str> = Vec::new();
885        if !text.is_empty() {
886            parts.push(text);
887        }
888        if !voice_text.is_empty() {
889            parts.push(&voice_text);
890        }
891        let markers_joined = markers.join("\n");
892        if !markers_joined.is_empty() {
893            parts.push(&markers_joined);
894        }
895
896        if parts.is_empty() {
897            return None;
898        }
899
900        Some(parts.join("\n"))
901    }
902
903    /// Download an attachment to the local workspace directory.
904    async fn download_attachment(
905        &self,
906        url: &str,
907        dir: &Path,
908        filename: &str,
909    ) -> anyhow::Result<PathBuf> {
910        tokio::fs::create_dir_all(dir).await?;
911
912        // Generate a unique filename to avoid collisions
913        let stem = Path::new(filename)
914            .file_stem()
915            .and_then(|s| s.to_str())
916            .unwrap_or("file");
917        let ext = Path::new(filename)
918            .extension()
919            .and_then(|e| e.to_str())
920            .unwrap_or("");
921        let unique = &Uuid::new_v4().to_string()[..8];
922        let safe_name = if ext.is_empty() {
923            format!("{stem}_{unique}")
924        } else {
925            format!("{stem}_{unique}.{ext}")
926        };
927
928        let dest = dir.join(&safe_name);
929
930        // QQ multimedia URLs carry rkey auth in query params — no Authorization header needed
931        // (consistent with openclaw-qqbot's downloadFile implementation)
932        let resp = self.http_client().get(url).send().await?;
933        if !resp.status().is_success() {
934            anyhow::bail!("Download failed ({}): {url}", resp.status());
935        }
936
937        let bytes = resp.bytes().await?;
938        tokio::fs::write(&dest, &bytes).await?;
939
940        Ok(dest)
941    }
942
943    /// Send a markdown text message (msg_type=2).
944    async fn send_text_markdown(&self, recipient: &str, content: &str) -> anyhow::Result<()> {
945        let token = self.get_token().await?;
946        let (scope, id) = Self::resolve_recipient(recipient);
947
948        let url = format!("{QQ_API_BASE}/v2/{scope}/{id}/messages");
949        ensure_https(&url)?;
950
951        let body = json!({
952            "markdown": {
953                "content": content,
954            },
955            "msg_type": 2,
956            "msg_seq": next_msg_seq(),
957        });
958
959        let resp = self
960            .http_client()
961            .post(&url)
962            .header("Authorization", format!("QQBot {token}"))
963            .json(&body)
964            .send()
965            .await?;
966
967        if !resp.status().is_success() {
968            let status = resp.status();
969            let err = resp.text().await.unwrap_or_default();
970            anyhow::bail!("QQ send message failed ({status}): {err}");
971        }
972
973        Ok(())
974    }
975}
976
977impl ::zeroclaw_api::attribution::Attributable for QQChannel {
978    fn role(&self) -> ::zeroclaw_api::attribution::Role {
979        ::zeroclaw_api::attribution::Role::Channel(::zeroclaw_api::attribution::ChannelKind::Qq)
980    }
981    fn alias(&self) -> &str {
982        &self.alias
983    }
984}
985
986#[async_trait]
987impl Channel for QQChannel {
988    fn name(&self) -> &str {
989        "qq"
990    }
991
992    async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
993        let (cleaned_text, attachments) = parse_qq_attachment_markers(&message.content);
994
995        if attachments.is_empty() {
996            // No media markers — send as markdown (original path)
997            return self
998                .send_text_markdown(&message.recipient, &message.content)
999                .await;
1000        }
1001
1002        // Send cleaned text first (if non-empty)
1003        if !cleaned_text.is_empty() {
1004            self.send_text_markdown(&message.recipient, &cleaned_text)
1005                .await?;
1006        }
1007
1008        // Send each media attachment
1009        for attachment in &attachments {
1010            if let Err(e) = self.send_attachment(&message.recipient, attachment).await {
1011                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"target": attachment.target, "error": format!("{}", e)})), "failed to send media attachment; falling back to text");
1012                // Degrade to text fallback
1013                let fallback = format!(
1014                    "{}: {}",
1015                    match attachment.kind {
1016                        QQMediaFileType::Image => "Image",
1017                        QQMediaFileType::Video => "Video",
1018                        QQMediaFileType::Voice => "Voice",
1019                        QQMediaFileType::File => "File",
1020                    },
1021                    attachment.target
1022                );
1023                self.send_text_markdown(&message.recipient, &fallback)
1024                    .await?;
1025            }
1026        }
1027
1028        Ok(())
1029    }
1030
1031    #[allow(clippy::too_many_lines)]
1032    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
1033        ::zeroclaw_log::record!(
1034            INFO,
1035            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1036            "authenticating..."
1037        );
1038        let token = self.get_token().await?;
1039
1040        ::zeroclaw_log::record!(
1041            INFO,
1042            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1043            "fetching gateway URL..."
1044        );
1045        let gw_url = self.get_gateway_url(&token).await?;
1046
1047        ::zeroclaw_log::record!(
1048            INFO,
1049            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1050            "connecting to gateway WebSocket..."
1051        );
1052        let (ws_stream, _) = zeroclaw_config::schema::ws_connect_with_proxy(
1053            &gw_url,
1054            "channel.qq",
1055            self.proxy_url.as_deref(),
1056        )
1057        .await?;
1058        let (mut write, mut read) = ws_stream.split();
1059
1060        // Read Hello (opcode 10)
1061        let hello = read.next().await.ok_or_else(|| {
1062            ::zeroclaw_log::record!(
1063                ERROR,
1064                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1065                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1066                    .with_attrs(::serde_json::json!({"phase": "gateway_hello"})),
1067                "qq: gateway closed before Hello frame"
1068            );
1069            anyhow::Error::msg("no hello frame")
1070        })??;
1071        let hello_data: serde_json::Value = serde_json::from_str(&hello.to_string())?;
1072        let heartbeat_interval = hello_data
1073            .get("d")
1074            .and_then(|d| d.get("heartbeat_interval"))
1075            .and_then(serde_json::Value::as_u64)
1076            .unwrap_or(41250);
1077
1078        // Check if we can resume a previous session
1079        let stored_session = self.session_id.read().await.clone();
1080        let stored_seq = *self.last_sequence.read().await;
1081
1082        if let (Some(sid), Some(seq)) = (&stored_session, stored_seq) {
1083            // Attempt Resume (opcode 6)
1084            ::zeroclaw_log::record!(
1085                INFO,
1086                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1087                    .with_attrs(::serde_json::json!({"sid": sid, "seq": seq})),
1088                "attempting session resume (session_id=, seq=)"
1089            );
1090            let resume = json!({
1091                "op": 6,
1092                "d": {
1093                    "token": format!("QQBot {token}"),
1094                    "session_id": sid,
1095                    "seq": seq,
1096                }
1097            });
1098            write.send(Message::Text(resume.to_string().into())).await?;
1099        } else {
1100            // Send Identify (opcode 2)
1101            // Intents: PUBLIC_GUILD_MESSAGES (1<<30) | C2C_MESSAGE_CREATE & GROUP_AT_MESSAGE_CREATE (1<<25)
1102            let intents: u64 = (1 << 25) | (1 << 30);
1103            let identify = json!({
1104                "op": 2,
1105                "d": {
1106                    "token": format!("QQBot {token}"),
1107                    "intents": intents,
1108                    "properties": {
1109                        "os": "linux",
1110                        "browser": "zeroclaw",
1111                        "device": "zeroclaw",
1112                    }
1113                }
1114            });
1115            write
1116                .send(Message::Text(identify.to_string().into()))
1117                .await?;
1118            ::zeroclaw_log::record!(
1119                INFO,
1120                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1121                "connected and sent Identify"
1122            );
1123        }
1124
1125        let mut sequence: i64 = stored_seq.unwrap_or(-1);
1126
1127        // Track consecutive missed heartbeat ACKs.  The previous logic
1128        // killed the connection on the *first* missed ACK which is overly
1129        // aggressive -- transient network hiccups or brief server-side GC
1130        // pauses can cause a single ACK to be delayed.  We now allow up to
1131        // `MAX_MISSED_ACKS` consecutive misses before declaring the
1132        // connection dead.
1133        const MAX_MISSED_ACKS: u32 = 3;
1134        let mut missed_ack_count: u32 = 0;
1135
1136        // Spawn heartbeat timer.
1137        //
1138        // We add a small grace period (10% of the server-provided interval,
1139        // capped at 5s) so that a slightly-delayed ACK does not immediately
1140        // count as missed.
1141        let hb_interval = heartbeat_interval;
1142        let grace_ms: u64 = (hb_interval / 10).min(5_000);
1143        let effective_interval = hb_interval.saturating_add(grace_ms);
1144
1145        let (hb_tx, mut hb_rx) = tokio::sync::mpsc::channel::<()>(1);
1146        tokio::spawn(async move {
1147            let mut interval =
1148                tokio::time::interval(std::time::Duration::from_millis(effective_interval));
1149            loop {
1150                interval.tick().await;
1151                if hb_tx.send(()).await.is_err() {
1152                    break;
1153                }
1154            }
1155        });
1156
1157        // Reason the loop exited — used to decide error type
1158        enum ExitReason {
1159            Reconnect,
1160            InvalidSession,
1161            Close(Option<tokio_tungstenite::tungstenite::protocol::CloseFrame>),
1162            StreamEnded,
1163            HeartbeatTimeout,
1164            WriteFailed,
1165            ChannelClosed,
1166        }
1167
1168        let exit_reason;
1169
1170        'outer: loop {
1171            tokio::select! {
1172                _ = hb_rx.recv() => {
1173                    // Increment the missed-ACK counter.  Only declare the
1174                    // connection dead after MAX_MISSED_ACKS consecutive
1175                    // heartbeats go un-acknowledged.
1176                    if missed_ack_count > 0 {
1177                        if missed_ack_count >= MAX_MISSED_ACKS {
1178                            ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"missed_ack_count": missed_ack_count, "hb_interval": hb_interval, "grace_ms": grace_ms})), "consecutive heartbeat ACKs missed (interval ms + ms grace); connection appears zombied");
1179                            exit_reason = ExitReason::HeartbeatTimeout;
1180                            break;
1181                        }
1182                        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"missed_ack_count": missed_ack_count, "MAX_MISSED_ACKS": MAX_MISSED_ACKS})), "heartbeat ACK missed (/); tolerating transient delay");
1183                    }
1184                    let d = if sequence >= 0 { json!(sequence) } else { json!(null) };
1185                    let hb = json!({"op": 1, "d": d});
1186                    if write
1187                        .send(Message::Text(hb.to_string().into()))
1188                        .await
1189                        .is_err()
1190                    {
1191                        exit_reason = ExitReason::WriteFailed;
1192                        break;
1193                    }
1194                    missed_ack_count += 1;
1195                }
1196                msg = read.next() => {
1197                    let msg = match msg {
1198                        Some(Ok(Message::Text(t))) => t,
1199                        Some(Ok(Message::Ping(payload))) => {
1200                            if write.send(Message::Pong(payload)).await.is_err() {
1201                                exit_reason = ExitReason::WriteFailed;
1202                                break;
1203                            }
1204                            continue;
1205                        }
1206                        Some(Ok(Message::Close(frame))) => {
1207                            exit_reason = ExitReason::Close(frame);
1208                            break;
1209                        }
1210                        None => {
1211                            exit_reason = ExitReason::StreamEnded;
1212                            break;
1213                        }
1214                        _ => continue,
1215                    };
1216
1217                    let event: serde_json::Value = match serde_json::from_str(msg.as_ref()) {
1218                        Ok(e) => e,
1219                        Err(_) => continue,
1220                    };
1221
1222                    // Track sequence number
1223                    if let Some(s) = event.get("s").and_then(serde_json::Value::as_i64) {
1224                        sequence = s;
1225                    }
1226
1227                    let op = event.get("op").and_then(serde_json::Value::as_u64).unwrap_or(0);
1228
1229                    match op {
1230                        // Server requests immediate heartbeat
1231                        1 => {
1232                            let d = if sequence >= 0 { json!(sequence) } else { json!(null) };
1233                            let hb = json!({"op": 1, "d": d});
1234                            if write
1235                                .send(Message::Text(hb.to_string().into()))
1236                                .await
1237                                .is_err()
1238                            {
1239                                exit_reason = ExitReason::WriteFailed;
1240                                break;
1241                            }
1242                            missed_ack_count += 1;
1243                            continue;
1244                        }
1245                        // Reconnect
1246                        7 => {
1247                            ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "received Reconnect (op 7); will resume");
1248                            exit_reason = ExitReason::Reconnect;
1249                            break;
1250                        }
1251                        // Invalid Session
1252                        9 => {
1253                            ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "received Invalid Session (op 9); clearing session for fresh auth");
1254                            exit_reason = ExitReason::InvalidSession;
1255                            break;
1256                        }
1257                        // Heartbeat ACK
1258                        11 => {
1259                            missed_ack_count = 0;
1260                            continue;
1261                        }
1262                        _ => {}
1263                    }
1264
1265                    // Only process dispatch events (op 0)
1266                    if op != 0 {
1267                        continue;
1268                    }
1269
1270                    let event_type = event.get("t").and_then(|t| t.as_str()).unwrap_or("");
1271                    let d = match event.get("d") {
1272                        Some(d) => d,
1273                        None => continue,
1274                    };
1275
1276                    // Capture session_id from READY event for future resume
1277                    if event_type == "READY" || event_type == "RESUMED" {
1278                        if let Some(sid) = d.get("session_id").and_then(|s| s.as_str()) {
1279                            *self.session_id.write().await = Some(sid.to_string());
1280                            ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"sid": sid, "event_type": event_type})), "session established (session_id=, event=)");
1281                        }
1282                        continue;
1283                    }
1284
1285                    ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"event_type": event_type, "d": d})), "event_type= payload=");
1286
1287                    match event_type {
1288                        "C2C_MESSAGE_CREATE" => {
1289                            let msg_id = d.get("id").and_then(|i| i.as_str()).unwrap_or("");
1290                            if self.is_duplicate(msg_id).await {
1291                                continue;
1292                            }
1293
1294                            let Some(content) = self.compose_message_content(d).await else {
1295                                continue;
1296                            };
1297
1298                            let author_id = d.get("author").and_then(|a| a.get("id")).and_then(|i| i.as_str()).unwrap_or("unknown");
1299                            // For QQ, user_openid is the identifier
1300                            let user_openid = d.get("author").and_then(|a| a.get("user_openid")).and_then(|u| u.as_str()).unwrap_or(author_id);
1301
1302                            if !self.is_user_allowed(user_openid) {
1303                                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"user_openid": user_openid})), "ignoring C2C message from unauthorized user");
1304                                continue;
1305                            }
1306
1307                            let chat_id = format!("user:{user_openid}");
1308
1309                            let channel_msg = ChannelMessage {
1310                                id: Uuid::new_v4().to_string(),
1311                                sender: user_openid.to_string(),
1312                                reply_target: chat_id,
1313                                content,
1314                                channel: "qq".to_string(),
1315                channel_alias: Some(self.alias.clone()),
1316                                timestamp: std::time::SystemTime::now()
1317                                    .duration_since(std::time::UNIX_EPOCH)
1318                                    .unwrap_or_default()
1319                                    .as_secs(),
1320                                thread_ts: None,
1321                                interruption_scope_id: None,
1322                    attachments: vec![],
1323                                subject: None,
1324                            };
1325
1326                            if tx.send(channel_msg).await.is_err() {
1327                                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "message channel closed");
1328                                exit_reason = ExitReason::ChannelClosed;
1329                                break 'outer;
1330                            }
1331                        }
1332                        "GROUP_AT_MESSAGE_CREATE" => {
1333                            let msg_id = d.get("id").and_then(|i| i.as_str()).unwrap_or("");
1334                            if self.is_duplicate(msg_id).await {
1335                                continue;
1336                            }
1337
1338                            let Some(content) = self.compose_message_content(d).await else {
1339                                continue;
1340                            };
1341
1342                            let author_id = d.get("author").and_then(|a| a.get("member_openid")).and_then(|m| m.as_str()).unwrap_or("unknown");
1343
1344                            if !self.is_user_allowed(author_id) {
1345                                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"author_id": author_id})), "ignoring group message from unauthorized user");
1346                                continue;
1347                            }
1348
1349                            let group_openid = d.get("group_openid").and_then(|g| g.as_str()).unwrap_or("unknown");
1350                            let chat_id = format!("group:{group_openid}");
1351
1352                            let channel_msg = ChannelMessage {
1353                                id: Uuid::new_v4().to_string(),
1354                                sender: author_id.to_string(),
1355                                reply_target: chat_id,
1356                                content,
1357                                channel: "qq".to_string(),
1358                channel_alias: Some(self.alias.clone()),
1359                                timestamp: std::time::SystemTime::now()
1360                                    .duration_since(std::time::UNIX_EPOCH)
1361                                    .unwrap_or_default()
1362                                    .as_secs(),
1363                                thread_ts: None,
1364                                interruption_scope_id: None,
1365                    attachments: vec![],
1366                                subject: None,
1367                            };
1368
1369                            if tx.send(channel_msg).await.is_err() {
1370                                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "message channel closed");
1371                                exit_reason = ExitReason::ChannelClosed;
1372                                break 'outer;
1373                            }
1374                        }
1375                        _ => {}
1376                    }
1377                }
1378            }
1379        }
1380
1381        // Persist sequence number for potential resume on next reconnect
1382        *self.last_sequence.write().await = if sequence >= 0 { Some(sequence) } else { None };
1383
1384        match exit_reason {
1385            ExitReason::InvalidSession => {
1386                // Clear stored session so next reconnect does a fresh Identify
1387                *self.session_id.write().await = None;
1388                *self.last_sequence.write().await = None;
1389                anyhow::bail!(
1390                    "QQ WebSocket connection closed: invalid session (fresh auth required)"
1391                )
1392            }
1393            ExitReason::Reconnect => {
1394                // Session state preserved — supervisor will reconnect and we'll attempt Resume
1395                anyhow::bail!(
1396                    "QQ WebSocket connection closed: server requested reconnect (resume will be attempted)"
1397                )
1398            }
1399            ExitReason::Close(ref frame) => {
1400                let (code, reason) = frame
1401                    .as_ref()
1402                    .map(|f| (f.code.to_string(), f.reason.to_string()))
1403                    .unwrap_or_else(|| ("unknown".into(), "none".into()));
1404                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"code": code.to_string(), "reason": reason.to_string()})), "WebSocket closed with code=, reason=\"\"; resume will be attempted on reconnect");
1405                anyhow::bail!(
1406                    "QQ WebSocket connection closed: close_code={code}, reason=\"{reason}\""
1407                )
1408            }
1409            ExitReason::StreamEnded => {
1410                ::zeroclaw_log::record!(
1411                    WARN,
1412                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1413                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1414                    "WebSocket stream ended unexpectedly; resume will be attempted on reconnect"
1415                );
1416                anyhow::bail!("QQ WebSocket connection closed: stream ended unexpectedly")
1417            }
1418            ExitReason::HeartbeatTimeout => {
1419                ::zeroclaw_log::record!(
1420                    WARN,
1421                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1422                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1423                        .with_attrs(::serde_json::json!({"MAX_MISSED_ACKS": MAX_MISSED_ACKS})),
1424                    "heartbeat timeout after consecutive missed ACKs; resume will be attempted on reconnect"
1425                );
1426                anyhow::bail!(
1427                    "QQ WebSocket connection closed: heartbeat ACK timeout \
1428                     ({MAX_MISSED_ACKS} consecutive missed ACKs)"
1429                )
1430            }
1431            ExitReason::WriteFailed => {
1432                ::zeroclaw_log::record!(
1433                    WARN,
1434                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1435                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1436                    "WebSocket write failed; resume will be attempted on reconnect"
1437                );
1438                anyhow::bail!("QQ WebSocket connection closed: write failed")
1439            }
1440            ExitReason::ChannelClosed => {
1441                anyhow::bail!("QQ WebSocket connection closed: internal message channel closed")
1442            }
1443        }
1444    }
1445
1446    async fn health_check(&self) -> bool {
1447        self.fetch_access_token_with_retry().await.is_ok()
1448    }
1449}
1450
1451#[cfg(test)]
1452mod tests {
1453    use super::*;
1454    use serde_json::json;
1455
1456    #[test]
1457    fn test_name() {
1458        let ch = QQChannel::new(
1459            "id".into(),
1460            "secret".into(),
1461            "qq_test_alias",
1462            Arc::new(Vec::new),
1463        );
1464        assert_eq!(ch.name(), "qq");
1465    }
1466
1467    #[test]
1468    fn test_user_allowed_wildcard() {
1469        let ch = QQChannel::new(
1470            "id".into(),
1471            "secret".into(),
1472            "qq_test_alias",
1473            Arc::new(|| vec!["*".into()]),
1474        );
1475        assert!(ch.is_user_allowed("anyone"));
1476    }
1477
1478    #[test]
1479    fn test_user_allowed_specific() {
1480        let ch = QQChannel::new(
1481            "id".into(),
1482            "secret".into(),
1483            "qq_test_alias",
1484            Arc::new(|| vec!["user123".into()]),
1485        );
1486        assert!(ch.is_user_allowed("user123"));
1487        assert!(!ch.is_user_allowed("other"));
1488    }
1489
1490    #[test]
1491    fn test_user_denied_empty() {
1492        let ch = QQChannel::new(
1493            "id".into(),
1494            "secret".into(),
1495            "qq_test_alias",
1496            Arc::new(Vec::new),
1497        );
1498        assert!(!ch.is_user_allowed("anyone"));
1499    }
1500
1501    #[tokio::test]
1502    async fn test_dedup() {
1503        let ch = QQChannel::new(
1504            "id".into(),
1505            "secret".into(),
1506            "qq_test_alias",
1507            Arc::new(Vec::new),
1508        );
1509        assert!(!ch.is_duplicate("msg1").await);
1510        assert!(ch.is_duplicate("msg1").await);
1511        assert!(!ch.is_duplicate("msg2").await);
1512    }
1513
1514    #[tokio::test]
1515    async fn test_dedup_empty_id() {
1516        let ch = QQChannel::new(
1517            "id".into(),
1518            "secret".into(),
1519            "qq_test_alias",
1520            Arc::new(Vec::new),
1521        );
1522        assert!(!ch.is_duplicate("").await);
1523        assert!(!ch.is_duplicate("").await);
1524    }
1525
1526    #[test]
1527    fn v2_allowed_users_fold_into_peer_groups() {
1528        // V2 `[channels.qq].allowed_users` migrates into a synthesized
1529        // `[peer_groups.qq_default]` block in V3, while the channel block
1530        // itself survives under the bridge alias `default`.
1531        let v2_toml = r#"
1532schema_version = 2
1533
1534[channels.qq]
1535enabled = true
1536app_id = "12345"
1537app_secret = "secret_abc"
1538allowed_users = ["user1"]
1539"#;
1540        let cfg = zeroclaw_config::migration::migrate_to_current(v2_toml)
1541            .expect("V2 qq config migrates to V3");
1542        let qq = cfg
1543            .channels
1544            .qq
1545            .get("default")
1546            .expect("V2 qq folds under alias `default`");
1547        assert_eq!(qq.app_id, "12345");
1548        assert_eq!(qq.app_secret, "secret_abc");
1549
1550        let group = cfg
1551            .peer_groups
1552            .get("qq_default")
1553            .expect("qq allow-list synthesizes [peer_groups.qq_default]");
1554        assert_eq!(group.channel, "qq");
1555        let peers: Vec<&str> = group.external_peers.iter().map(|p| p.as_str()).collect();
1556        assert_eq!(peers, vec!["user1"]);
1557    }
1558
1559    // --- Marker parsing tests ---
1560
1561    #[test]
1562    fn test_parse_qq_markers_single_image() {
1563        let (text, atts) = parse_qq_attachment_markers("Hello [IMAGE:/tmp/a.png] world");
1564        assert_eq!(text, "Hello  world");
1565        assert_eq!(atts.len(), 1);
1566        assert_eq!(atts[0].kind, QQMediaFileType::Image);
1567        assert_eq!(atts[0].target, "/tmp/a.png");
1568    }
1569
1570    #[test]
1571    fn test_parse_qq_markers_multiple() {
1572        let (text, atts) =
1573            parse_qq_attachment_markers("[IMAGE:/a.png] text [VIDEO:https://example.com/v.mp4]");
1574        assert_eq!(text, "text");
1575        assert_eq!(atts.len(), 2);
1576        assert_eq!(atts[0].kind, QQMediaFileType::Image);
1577        assert_eq!(atts[1].kind, QQMediaFileType::Video);
1578    }
1579
1580    #[test]
1581    fn test_parse_qq_markers_no_markers() {
1582        let (text, atts) = parse_qq_attachment_markers("Just plain text");
1583        assert_eq!(text, "Just plain text");
1584        assert!(atts.is_empty());
1585    }
1586
1587    #[test]
1588    fn test_parse_qq_markers_case_insensitive() {
1589        let (_, atts) = parse_qq_attachment_markers("[image:/a.png]");
1590        assert_eq!(atts.len(), 1);
1591        assert_eq!(atts[0].kind, QQMediaFileType::Image);
1592
1593        let (_, atts) = parse_qq_attachment_markers("[Image:/a.png]");
1594        assert_eq!(atts.len(), 1);
1595        assert_eq!(atts[0].kind, QQMediaFileType::Image);
1596    }
1597
1598    #[test]
1599    fn test_parse_qq_markers_invalid_preserved() {
1600        let (text, atts) = parse_qq_attachment_markers("Keep [UNKNOWN:foo] here");
1601        assert_eq!(text, "Keep [UNKNOWN:foo] here");
1602        assert!(atts.is_empty());
1603    }
1604
1605    #[test]
1606    fn test_parse_qq_markers_mixed_text_and_markers() {
1607        let (text, atts) =
1608            parse_qq_attachment_markers("Before [DOCUMENT:/doc.pdf] middle [PHOTO:/p.jpg] after");
1609        assert_eq!(text, "Before  middle  after");
1610        assert_eq!(atts.len(), 2);
1611        assert_eq!(atts[0].kind, QQMediaFileType::File);
1612        assert_eq!(atts[0].target, "/doc.pdf");
1613        assert_eq!(atts[1].kind, QQMediaFileType::Image);
1614        assert_eq!(atts[1].target, "/p.jpg");
1615    }
1616
1617    // --- marker_kind_to_qq_file_type tests ---
1618
1619    #[test]
1620    fn test_marker_kind_image() {
1621        assert_eq!(
1622            marker_kind_to_qq_file_type("IMAGE", "/a.png"),
1623            Some(QQMediaFileType::Image)
1624        );
1625        assert_eq!(
1626            marker_kind_to_qq_file_type("PHOTO", "/a.png"),
1627            Some(QQMediaFileType::Image)
1628        );
1629    }
1630
1631    #[test]
1632    fn test_marker_kind_document() {
1633        assert_eq!(
1634            marker_kind_to_qq_file_type("DOCUMENT", "/a.pdf"),
1635            Some(QQMediaFileType::File)
1636        );
1637        assert_eq!(
1638            marker_kind_to_qq_file_type("FILE", "/a.zip"),
1639            Some(QQMediaFileType::File)
1640        );
1641    }
1642
1643    #[test]
1644    fn test_marker_kind_video() {
1645        assert_eq!(
1646            marker_kind_to_qq_file_type("VIDEO", "/v.mp4"),
1647            Some(QQMediaFileType::Video)
1648        );
1649    }
1650
1651    #[test]
1652    fn test_marker_kind_voice_native() {
1653        assert_eq!(
1654            marker_kind_to_qq_file_type("VOICE", "/a.mp3"),
1655            Some(QQMediaFileType::Voice)
1656        );
1657        assert_eq!(
1658            marker_kind_to_qq_file_type("AUDIO", "/a.wav"),
1659            Some(QQMediaFileType::Voice)
1660        );
1661        assert_eq!(
1662            marker_kind_to_qq_file_type("VOICE", "/a.silk"),
1663            Some(QQMediaFileType::Voice)
1664        );
1665    }
1666
1667    #[test]
1668    fn test_marker_kind_voice_non_native_degrades() {
1669        // .ogg is not a natively supported QQ voice format — degrades to File
1670        assert_eq!(
1671            marker_kind_to_qq_file_type("VOICE", "/a.ogg"),
1672            Some(QQMediaFileType::File)
1673        );
1674        assert_eq!(
1675            marker_kind_to_qq_file_type("AUDIO", "/a.flac"),
1676            Some(QQMediaFileType::File)
1677        );
1678    }
1679
1680    // --- Upload/send body construction tests ---
1681
1682    #[test]
1683    fn test_upload_body_url() {
1684        let body = json!({
1685            "file_type": QQMediaFileType::Image as u8,
1686            "srv_send_msg": false,
1687            "url": "https://example.com/a.jpg",
1688        });
1689        assert_eq!(body["file_type"], 1);
1690        assert_eq!(body["srv_send_msg"], false);
1691        assert_eq!(body["url"], "https://example.com/a.jpg");
1692        assert!(body.get("file_data").is_none());
1693    }
1694
1695    #[test]
1696    fn test_upload_body_base64() {
1697        let body = json!({
1698            "file_type": QQMediaFileType::File as u8,
1699            "srv_send_msg": false,
1700            "file_data": "dGVzdA==",
1701        });
1702        assert_eq!(body["file_type"], 4);
1703        assert_eq!(body["file_data"], "dGVzdA==");
1704        assert!(body.get("url").is_none());
1705    }
1706
1707    #[test]
1708    fn test_send_media_body_msg_type_7() {
1709        let file_info = "some_file_info_string";
1710        let body = json!({
1711            "msg_type": 7,
1712            "media": {
1713                "file_info": file_info,
1714            },
1715            "msg_seq": 1,
1716        });
1717        assert_eq!(body["msg_type"], 7);
1718        assert_eq!(body["media"]["file_info"], file_info);
1719    }
1720
1721    // --- compose_message_content tests (now async) ---
1722
1723    #[tokio::test]
1724    async fn test_compose_message_content_text_only() {
1725        let ch = QQChannel::new(
1726            "id".into(),
1727            "secret".into(),
1728            "qq_test_alias",
1729            Arc::new(Vec::new),
1730        );
1731        let payload = json!({ "content": "  hello world  " });
1732        assert_eq!(
1733            ch.compose_message_content(&payload).await,
1734            Some("hello world".to_string())
1735        );
1736    }
1737
1738    #[tokio::test]
1739    async fn test_compose_message_content_image_attachment() {
1740        let ch = QQChannel::new(
1741            "id".into(),
1742            "secret".into(),
1743            "qq_test_alias",
1744            Arc::new(Vec::new),
1745        );
1746        let payload = json!({
1747            "content": "   ",
1748            "attachments": [{
1749                "content_type": "image/jpg",
1750                "url": "https://cdn.example.com/a.jpg"
1751            }]
1752        });
1753        assert_eq!(
1754            ch.compose_message_content(&payload).await,
1755            Some("[IMAGE:https://cdn.example.com/a.jpg]".to_string())
1756        );
1757    }
1758
1759    #[tokio::test]
1760    async fn test_compose_message_content_text_and_attachments() {
1761        let ch = QQChannel::new(
1762            "id".into(),
1763            "secret".into(),
1764            "qq_test_alias",
1765            Arc::new(Vec::new),
1766        );
1767        let payload = json!({
1768            "content": "Here is an image",
1769            "attachments": [
1770                { "content_type": "image/png", "url": "https://cdn.example.com/a.png" },
1771                { "filename": "b.jpeg", "url": "https://cdn.example.com/b.jpeg" }
1772            ]
1773        });
1774        assert_eq!(
1775            ch.compose_message_content(&payload).await,
1776            Some(
1777                "Here is an image\n[IMAGE:https://cdn.example.com/a.png]\n[IMAGE:https://cdn.example.com/b.jpeg]"
1778                    .to_string()
1779            )
1780        );
1781    }
1782
1783    #[tokio::test]
1784    async fn test_compose_all_attachment_types() {
1785        let ch = QQChannel::new(
1786            "id".into(),
1787            "secret".into(),
1788            "qq_test_alias",
1789            Arc::new(Vec::new),
1790        );
1791        let payload = json!({
1792            "content": "",
1793            "attachments": [
1794                { "content_type": "image/png", "url": "https://cdn.example.com/a.png" },
1795                { "content_type": "audio/mpeg", "url": "https://cdn.example.com/b.mp3" },
1796                { "content_type": "video/mp4", "url": "https://cdn.example.com/c.mp4" },
1797                { "content_type": "application/pdf", "url": "https://cdn.example.com/d.pdf" }
1798            ]
1799        });
1800        let result = ch.compose_message_content(&payload).await.unwrap();
1801        assert!(result.contains("[IMAGE:"));
1802        assert!(result.contains("[VOICE:"));
1803        assert!(result.contains("[VIDEO:"));
1804        assert!(result.contains("[DOCUMENT:"));
1805    }
1806
1807    #[tokio::test]
1808    async fn test_compose_fixes_double_slash_url() {
1809        let ch = QQChannel::new(
1810            "id".into(),
1811            "secret".into(),
1812            "qq_test_alias",
1813            Arc::new(Vec::new),
1814        );
1815        let payload = json!({
1816            "content": "",
1817            "attachments": [{
1818                "content_type": "image/png",
1819                "url": "//cdn.example.com/a.png"
1820            }]
1821        });
1822        let result = ch.compose_message_content(&payload).await.unwrap();
1823        assert!(result.contains("https://cdn.example.com/a.png"));
1824        // Ensure the raw `//` prefix was replaced with `https:`
1825        assert!(!result.starts_with("[IMAGE://"));
1826    }
1827
1828    #[tokio::test]
1829    async fn test_compose_fallback_no_workspace() {
1830        // Without workspace_dir, attachments use URLs directly
1831        let ch = QQChannel::new(
1832            "id".into(),
1833            "secret".into(),
1834            "qq_test_alias",
1835            Arc::new(Vec::new),
1836        );
1837        let payload = json!({
1838            "content": "text",
1839            "attachments": [{
1840                "content_type": "application/pdf",
1841                "filename": "report.pdf",
1842                "url": "https://cdn.example.com/report.pdf"
1843            }]
1844        });
1845        let result = ch.compose_message_content(&payload).await.unwrap();
1846        assert!(result.contains("[DOCUMENT:https://cdn.example.com/report.pdf]"));
1847    }
1848
1849    #[tokio::test]
1850    async fn test_compose_drops_empty_url() {
1851        let ch = QQChannel::new(
1852            "id".into(),
1853            "secret".into(),
1854            "qq_test_alias",
1855            Arc::new(Vec::new),
1856        );
1857        let payload = json!({
1858            "content": "   ",
1859            "attachments": [{
1860                "content_type": "image/png",
1861                "url": "   "
1862            }]
1863        });
1864        assert_eq!(ch.compose_message_content(&payload).await, None);
1865    }
1866
1867    // --- Markdown send body test ---
1868
1869    #[test]
1870    fn test_send_body_uses_markdown_msg_type() {
1871        let content = "**bold** and `code`";
1872        let body = json!({
1873            "markdown": { "content": content },
1874            "msg_type": 2,
1875        });
1876        assert_eq!(body["msg_type"], 2);
1877        assert_eq!(body["markdown"]["content"], content);
1878        assert!(
1879            body.get("content").is_none(),
1880            "top-level 'content' must not be present"
1881        );
1882    }
1883
1884    // --- Helper function tests ---
1885
1886    #[test]
1887    fn test_fix_qq_url() {
1888        assert_eq!(
1889            fix_qq_url("//cdn.example.com/a.png"),
1890            "https://cdn.example.com/a.png"
1891        );
1892        assert_eq!(
1893            fix_qq_url("https://cdn.example.com/a.png"),
1894            "https://cdn.example.com/a.png"
1895        );
1896    }
1897
1898    #[test]
1899    fn test_next_msg_seq_range() {
1900        for _ in 0..100 {
1901            let seq = next_msg_seq();
1902            assert!(seq < 65536);
1903        }
1904    }
1905
1906    #[test]
1907    fn test_resolve_recipient_group() {
1908        let (scope, id) = QQChannel::resolve_recipient("group:abc123");
1909        assert_eq!(scope, "groups");
1910        assert_eq!(id, "abc123");
1911    }
1912
1913    #[test]
1914    fn test_resolve_recipient_user() {
1915        let (scope, id) = QQChannel::resolve_recipient("user:xyz789");
1916        assert_eq!(scope, "users");
1917        assert_eq!(id, "xyz789");
1918    }
1919
1920    #[test]
1921    fn test_resolve_recipient_bare_id() {
1922        let (scope, id) = QQChannel::resolve_recipient("raw_id_123");
1923        assert_eq!(scope, "users");
1924        assert_eq!(id, "raw_id_123");
1925    }
1926
1927    #[test]
1928    fn test_infer_attachment_marker() {
1929        assert_eq!(infer_attachment_marker("image/png", "a.png"), "IMAGE");
1930        assert_eq!(infer_attachment_marker("audio/mpeg", "a.mp3"), "VOICE");
1931        assert_eq!(infer_attachment_marker("video/mp4", "a.mp4"), "VIDEO");
1932        assert_eq!(
1933            infer_attachment_marker("application/pdf", "doc.pdf"),
1934            "DOCUMENT"
1935        );
1936        assert_eq!(infer_attachment_marker("", "photo.jpg"), "IMAGE");
1937        assert_eq!(infer_attachment_marker("", "song.mp3"), "VOICE");
1938        assert_eq!(infer_attachment_marker("", "clip.mp4"), "VIDEO");
1939        assert_eq!(infer_attachment_marker("", "unknown.xyz"), "DOCUMENT");
1940    }
1941
1942    // --- Upload cache tests ---
1943
1944    #[tokio::test]
1945    async fn test_upload_cache_hit_and_miss() {
1946        let ch = QQChannel::new(
1947            "id".into(),
1948            "secret".into(),
1949            "qq_test_alias",
1950            Arc::new(Vec::new),
1951        );
1952        let key = QQChannel::upload_cache_key(b"test_data", "c2c", "user1", QQMediaFileType::Image);
1953
1954        // Miss
1955        assert!(ch.get_cached_upload(&key).await.is_none());
1956
1957        // Set with long TTL
1958        ch.set_cached_upload(key.clone(), "cached_file_info".into(), 3600)
1959            .await;
1960
1961        // Hit
1962        assert_eq!(
1963            ch.get_cached_upload(&key).await,
1964            Some("cached_file_info".to_string())
1965        );
1966    }
1967
1968    #[tokio::test]
1969    async fn test_upload_cache_expired() {
1970        let ch = QQChannel::new(
1971            "id".into(),
1972            "secret".into(),
1973            "qq_test_alias",
1974            Arc::new(Vec::new),
1975        );
1976        let key = QQChannel::upload_cache_key(b"test_data", "group", "g1", QQMediaFileType::Video);
1977
1978        // Set with 0 TTL (already expired considering 60s safety margin)
1979        ch.set_cached_upload(key.clone(), "old_info".into(), 0)
1980            .await;
1981
1982        // Should miss due to expiry
1983        assert!(ch.get_cached_upload(&key).await.is_none());
1984    }
1985
1986    // --- Auth retry tests ---
1987
1988    #[test]
1989    fn test_auth_retry_constants_are_sensible() {
1990        const {
1991            assert!(AUTH_RETRY_MAX_ATTEMPTS >= 2, "should retry at least once");
1992            assert!(
1993                AUTH_RETRY_INITIAL_BACKOFF_MS > 0,
1994                "initial backoff must be positive"
1995            );
1996            assert!(
1997                AUTH_RETRY_MAX_BACKOFF_MS >= AUTH_RETRY_INITIAL_BACKOFF_MS,
1998                "max backoff must be >= initial"
1999            );
2000        }
2001    }
2002
2003    #[test]
2004    fn test_auth_retry_backoff_stays_within_bounds() {
2005        // Simulate the backoff progression and verify it caps at max
2006        let mut backoff = AUTH_RETRY_INITIAL_BACKOFF_MS;
2007        for _ in 1..AUTH_RETRY_MAX_ATTEMPTS {
2008            backoff = (backoff * 2).min(AUTH_RETRY_MAX_BACKOFF_MS);
2009        }
2010        assert!(
2011            backoff <= AUTH_RETRY_MAX_BACKOFF_MS,
2012            "backoff must never exceed the configured maximum"
2013        );
2014    }
2015
2016    #[tokio::test]
2017    async fn test_get_token_returns_cached_token_without_fetch() {
2018        let ch = QQChannel::new(
2019            "id".into(),
2020            "secret".into(),
2021            "qq_test_alias",
2022            Arc::new(Vec::new),
2023        );
2024        // Pre-populate the token cache with a token that expires far in the future
2025        let future_expiry = now_secs() + 3600;
2026        *ch.token_cache.write().await = Some(("cached_tok".to_string(), future_expiry));
2027
2028        // get_token should return the cached value without hitting the network
2029        let tok = ch.get_token().await.unwrap();
2030        assert_eq!(tok, "cached_tok");
2031    }
2032
2033    #[tokio::test]
2034    async fn test_get_token_refreshes_expired_cache() {
2035        let ch = QQChannel::new(
2036            "id".into(),
2037            "secret".into(),
2038            "qq_test_alias",
2039            Arc::new(Vec::new),
2040        );
2041        // Pre-populate with an already-expired token
2042        *ch.token_cache.write().await = Some(("old_tok".to_string(), 0));
2043
2044        // get_token should try to refresh -- will fail because there's no real
2045        // server, but the important thing is it doesn't return the stale token.
2046        let result = ch.get_token().await;
2047        assert!(
2048            result.is_err(),
2049            "should fail when token expired and no server available"
2050        );
2051    }
2052
2053    // --- Heartbeat stability tests ---
2054
2055    #[test]
2056    fn test_heartbeat_grace_period_calculation() {
2057        // The grace period is 10% of the server interval, capped at 5000ms.
2058        let cases: Vec<(u64, u64)> = vec![
2059            (41_250, 4_125),  // default QQ interval
2060            (30_000, 3_000),  // smaller interval
2061            (60_000, 5_000),  // larger interval, capped at 5s
2062            (100_000, 5_000), // very large, still capped
2063            (5_000, 500),     // small interval
2064            (0, 0),           // degenerate zero
2065        ];
2066        for (interval, expected_grace) in cases {
2067            let grace: u64 = (interval / 10).min(5_000);
2068            assert_eq!(
2069                grace, expected_grace,
2070                "grace for interval {interval} should be {expected_grace}"
2071            );
2072            let effective = interval.saturating_add(grace);
2073            assert!(effective >= interval);
2074        }
2075    }
2076
2077    #[test]
2078    fn test_missed_ack_counter_logic() {
2079        let max_missed: u32 = 3;
2080        let mut missed: u32 = 0;
2081
2082        // First tick: counter is 0, send heartbeat
2083        assert!(missed < max_missed);
2084        missed += 1;
2085        assert_eq!(missed, 1, "counter should be 1 after first heartbeat");
2086
2087        // ACK received: reset
2088        missed = 0;
2089        assert_eq!(missed, 0, "counter should reset on ACK");
2090
2091        // 3 consecutive misses without ACK
2092        for _ in 0..max_missed {
2093            assert!(
2094                missed < max_missed,
2095                "should not reach zombie state before {max_missed} misses"
2096            );
2097            missed += 1;
2098        }
2099        assert!(
2100            missed >= max_missed,
2101            "should declare zombie after {max_missed} missed ACKs"
2102        );
2103    }
2104
2105    #[test]
2106    fn test_missed_ack_counter_reset_on_ack() {
2107        let max_missed: u32 = 3;
2108        let mut missed: u32 = 0;
2109
2110        missed += 1;
2111        missed += 1;
2112        assert_eq!(missed, 2);
2113
2114        // ACK arrives: reset
2115        missed = 0;
2116        assert_eq!(missed, 0);
2117
2118        // One more miss, still under threshold
2119        missed += 1;
2120        assert!(missed < max_missed);
2121    }
2122
2123    #[test]
2124    fn test_effective_interval_never_overflows() {
2125        let interval = u64::MAX;
2126        let grace: u64 = (interval / 10).min(5_000);
2127        let effective = interval.saturating_add(grace);
2128        assert_eq!(effective, u64::MAX);
2129    }
2130}