1use async_trait::async_trait;
2use base64::Engine as _;
3use futures_util::{SinkExt, StreamExt};
4use serde::Deserialize;
5use serde_json::json;
6use sha2::{Digest, Sha256};
7use std::collections::{HashMap, HashSet};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use tokio_tungstenite::tungstenite::Message;
12use uuid::Uuid;
13use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
14
15const QQ_API_BASE: &str = "https://api.sgroup.qq.com";
16const QQ_AUTH_URL: &str = "https://bots.qq.com/app/getAppAccessToken";
17
18const QQ_MAX_UPLOAD_BYTES: u64 = 10 * 1024 * 1024;
20
21const UPLOAD_CACHE_CAPACITY: usize = 500;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26enum QQMediaFileType {
27 Image = 1,
29 Video = 2,
31 Voice = 3,
37 File = 4,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
43struct QQMediaAttachment {
44 kind: QQMediaFileType,
45 target: String,
46}
47
48#[derive(Debug, Deserialize)]
50struct QQUploadResponse {
51 file_info: String,
52 ttl: Option<u64>,
53}
54
55struct UploadCacheEntry {
57 file_info: String,
58 expires_at: u64,
59}
60
61fn ensure_https(url: &str) -> anyhow::Result<()> {
62 if !url.starts_with("https://") {
63 anyhow::bail!(
64 "Refusing to transmit sensitive data over non-HTTPS URL: URL scheme must be https"
65 );
66 }
67 Ok(())
68}
69
70fn is_native_voice_ext(ext: &str) -> bool {
72 matches!(ext.to_ascii_lowercase().as_str(), "wav" | "mp3" | "silk")
73}
74
75fn marker_kind_to_qq_file_type(marker: &str, target: &str) -> Option<QQMediaFileType> {
80 match marker.trim().to_ascii_uppercase().as_str() {
81 "IMAGE" | "PHOTO" => Some(QQMediaFileType::Image),
82 "DOCUMENT" | "FILE" => Some(QQMediaFileType::File),
83 "VIDEO" => Some(QQMediaFileType::Video),
84 "AUDIO" | "VOICE" => {
85 let ext = Path::new(target.split('?').next().unwrap_or(target))
86 .extension()
87 .and_then(|e| e.to_str())
88 .unwrap_or("");
89 if is_native_voice_ext(ext) {
90 Some(QQMediaFileType::Voice)
91 } else {
92 Some(QQMediaFileType::File)
93 }
94 }
95 _ => None,
96 }
97}
98
99fn find_matching_close(s: &str) -> Option<usize> {
101 let mut depth = 1usize;
102 for (i, ch) in s.char_indices() {
103 match ch {
104 '[' => depth += 1,
105 ']' => {
106 depth -= 1;
107 if depth == 0 {
108 return Some(i);
109 }
110 }
111 _ => {}
112 }
113 }
114 None
115}
116
117fn parse_qq_attachment_markers(content: &str) -> (String, Vec<QQMediaAttachment>) {
122 let mut cleaned = String::with_capacity(content.len());
123 let mut attachments = Vec::new();
124 let mut cursor = 0;
125
126 while cursor < content.len() {
127 let Some(open_rel) = content[cursor..].find('[') else {
128 cleaned.push_str(&content[cursor..]);
129 break;
130 };
131
132 let open = cursor + open_rel;
133 cleaned.push_str(&content[cursor..open]);
134
135 let Some(close_rel) = find_matching_close(&content[open + 1..]) else {
136 cleaned.push_str(&content[open..]);
137 break;
138 };
139
140 let close = open + 1 + close_rel;
141 let marker = &content[open + 1..close];
142
143 let parsed = marker.split_once(':').and_then(|(kind, target)| {
144 let target = target.trim();
145 if target.is_empty() {
146 return None;
147 }
148 let file_type = marker_kind_to_qq_file_type(kind, target)?;
149 Some(QQMediaAttachment {
150 kind: file_type,
151 target: target.to_string(),
152 })
153 });
154
155 if let Some(attachment) = parsed {
156 attachments.push(attachment);
157 } else {
158 cleaned.push_str(&content[open..=close]);
159 }
160
161 cursor = close + 1;
162 }
163
164 (cleaned.trim().to_string(), attachments)
165}
166
167fn infer_attachment_marker(content_type: &str, filename: &str) -> &'static str {
169 let ct = content_type.to_ascii_lowercase();
170 if ct.starts_with("image/") {
171 return "IMAGE";
172 }
173 if ct.starts_with("audio/") || ct.contains("voice") {
174 return "VOICE";
175 }
176 if ct.starts_with("video/") {
177 return "VIDEO";
178 }
179
180 let lower = filename.to_ascii_lowercase();
182 if lower.ends_with(".png")
183 || lower.ends_with(".jpg")
184 || lower.ends_with(".jpeg")
185 || lower.ends_with(".gif")
186 || lower.ends_with(".webp")
187 || lower.ends_with(".bmp")
188 || lower.ends_with(".heic")
189 || lower.ends_with(".heif")
190 || lower.ends_with(".svg")
191 {
192 return "IMAGE";
193 }
194 if lower.ends_with(".mp3")
195 || lower.ends_with(".wav")
196 || lower.ends_with(".silk")
197 || lower.ends_with(".ogg")
198 || lower.ends_with(".flac")
199 || lower.ends_with(".m4a")
200 {
201 return "VOICE";
202 }
203 if lower.ends_with(".mp4")
204 || lower.ends_with(".mov")
205 || lower.ends_with(".mkv")
206 || lower.ends_with(".avi")
207 || lower.ends_with(".webm")
208 {
209 return "VIDEO";
210 }
211 "DOCUMENT"
212}
213
214fn fix_qq_url(url: &str) -> String {
216 let trimmed = url.trim();
217 if trimmed.starts_with("//") {
218 format!("https:{trimmed}")
219 } else {
220 trimmed.to_string()
221 }
222}
223
224fn next_msg_seq() -> u32 {
227 #[allow(clippy::cast_possible_truncation)]
228 let time_part = (std::time::SystemTime::now()
229 .duration_since(std::time::UNIX_EPOCH)
230 .unwrap_or_default()
231 .as_millis() as u32)
232 % 100_000_000;
233 let random = u32::from(rand::random::<u16>());
234 (time_part ^ random) % 65536
235}
236
237fn now_secs() -> u64 {
239 std::time::SystemTime::now()
240 .duration_since(std::time::UNIX_EPOCH)
241 .unwrap_or_default()
242 .as_secs()
243}
244
245const DEDUP_CAPACITY: usize = 10_000;
247
248const AUTH_RETRY_MAX_ATTEMPTS: u32 = 4;
250
251const AUTH_RETRY_INITIAL_BACKOFF_MS: u64 = 500;
253
254const AUTH_RETRY_MAX_BACKOFF_MS: u64 = 8_000;
256
257pub struct QQChannel {
260 app_id: String,
261 app_secret: String,
262 alias: String,
265 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
268 token_cache: Arc<RwLock<Option<(String, u64)>>>,
270 dedup: Arc<RwLock<HashSet<String>>>,
272 workspace_dir: Option<PathBuf>,
274 upload_cache: Arc<RwLock<HashMap<String, UploadCacheEntry>>>,
276 proxy_url: Option<String>,
278 session_id: Arc<RwLock<Option<String>>>,
280 last_sequence: Arc<RwLock<Option<i64>>>,
282}
283
284impl QQChannel {
285 pub fn new(
286 app_id: String,
287 app_secret: String,
288 alias: impl Into<String>,
289 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
290 ) -> Self {
291 Self {
292 app_id,
293 app_secret,
294 alias: alias.into(),
295 peer_resolver,
296 token_cache: Arc::new(RwLock::new(None)),
297 dedup: Arc::new(RwLock::new(HashSet::new())),
298 workspace_dir: None,
299 upload_cache: Arc::new(RwLock::new(HashMap::new())),
300 proxy_url: None,
301 session_id: Arc::new(RwLock::new(None)),
302 last_sequence: Arc::new(RwLock::new(None)),
303 }
304 }
305
306 pub fn alias(&self) -> &str {
309 &self.alias
310 }
311
312 pub fn with_workspace_dir(mut self, dir: PathBuf) -> Self {
314 self.workspace_dir = Some(dir);
315 self
316 }
317
318 pub fn with_proxy_url(mut self, proxy_url: Option<String>) -> Self {
320 self.proxy_url = proxy_url;
321 self
322 }
323
324 fn http_client(&self) -> reqwest::Client {
325 zeroclaw_config::schema::build_channel_proxy_client("channel.qq", self.proxy_url.as_deref())
326 }
327
328 fn is_user_allowed(&self, user_id: &str) -> bool {
329 let peers = (self.peer_resolver)();
330 crate::allowlist::is_user_allowed(&peers, user_id, crate::allowlist::Match::Sensitive)
331 }
332
333 async fn fetch_access_token(&self) -> anyhow::Result<(String, u64)> {
335 let body = json!({
336 "appId": self.app_id,
337 "clientSecret": self.app_secret,
338 });
339
340 let resp = self
341 .http_client()
342 .post(QQ_AUTH_URL)
343 .json(&body)
344 .send()
345 .await?;
346
347 if !resp.status().is_success() {
348 let status = resp.status();
349 let err = resp.text().await.unwrap_or_default();
350 anyhow::bail!("QQ token request failed ({status}): {err}");
351 }
352
353 let data: serde_json::Value = resp.json().await?;
354 let token = data
355 .get("access_token")
356 .and_then(|t| t.as_str())
357 .ok_or_else(|| {
358 ::zeroclaw_log::record!(
359 WARN,
360 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
361 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
362 "Missing access_token in QQ response"
363 );
364 anyhow::Error::msg("Missing access_token in QQ response")
365 })?
366 .to_string();
367
368 let expires_in = data
369 .get("expires_in")
370 .and_then(|e| e.as_str())
371 .and_then(|e| e.parse::<u64>().ok())
372 .unwrap_or(7200);
373
374 let now = std::time::SystemTime::now()
375 .duration_since(std::time::UNIX_EPOCH)
376 .unwrap_or_default()
377 .as_secs();
378
379 let expiry = now + expires_in.saturating_sub(60);
381
382 Ok((token, expiry))
383 }
384
385 async fn fetch_access_token_with_retry(&self) -> anyhow::Result<(String, u64)> {
393 let mut backoff_ms = AUTH_RETRY_INITIAL_BACKOFF_MS;
394 let mut last_err = None;
395
396 for attempt in 1..=AUTH_RETRY_MAX_ATTEMPTS {
397 match self.fetch_access_token().await {
398 Ok(result) => {
399 if attempt > 1 {
400 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"attempt": attempt, "AUTH_RETRY_MAX_ATTEMPTS": AUTH_RETRY_MAX_ATTEMPTS})), "getAppAccessToken succeeded on attempt /");
401 }
402 return Ok(result);
403 }
404 Err(e) => {
405 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"attempt": attempt, "AUTH_RETRY_MAX_ATTEMPTS": AUTH_RETRY_MAX_ATTEMPTS, "e": e.to_string()})), "getAppAccessToken failed (attempt /)");
406 last_err = Some(e);
407
408 if attempt < AUTH_RETRY_MAX_ATTEMPTS {
409 let jitter_factor = 0.75 + (rand::random::<f64>() * 0.5);
411 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
412 let sleep_ms = (backoff_ms as f64 * jitter_factor) as u64;
413 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
414 backoff_ms = (backoff_ms * 2).min(AUTH_RETRY_MAX_BACKOFF_MS);
415 }
416 }
417 }
418 }
419
420 Err(last_err.unwrap_or_else(|| {
421 ::zeroclaw_log::record!(
422 ERROR,
423 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
424 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
425 .with_attrs(::serde_json::json!({
426 "phase": "getAppAccessToken",
427 "max_attempts": AUTH_RETRY_MAX_ATTEMPTS,
428 })),
429 "qq: getAppAccessToken exhausted retries"
430 );
431 anyhow::Error::msg(format!(
432 "getAppAccessToken failed after {AUTH_RETRY_MAX_ATTEMPTS} attempts"
433 ))
434 }))
435 }
436
437 async fn get_token(&self) -> anyhow::Result<String> {
439 let now = std::time::SystemTime::now()
440 .duration_since(std::time::UNIX_EPOCH)
441 .unwrap_or_default()
442 .as_secs();
443
444 {
445 let cache = self.token_cache.read().await;
446 if let Some((ref token, expiry)) = *cache
447 && now < expiry
448 {
449 return Ok(token.clone());
450 }
451 }
452
453 let (token, expiry) = self.fetch_access_token_with_retry().await?;
454 {
455 let mut cache = self.token_cache.write().await;
456 *cache = Some((token.clone(), expiry));
457 }
458 Ok(token)
459 }
460
461 async fn get_gateway_url(&self, token: &str) -> anyhow::Result<String> {
463 let resp = self
464 .http_client()
465 .get(format!("{QQ_API_BASE}/gateway"))
466 .header("Authorization", format!("QQBot {token}"))
467 .send()
468 .await?;
469
470 if !resp.status().is_success() {
471 let status = resp.status();
472 let err = resp.text().await.unwrap_or_default();
473 anyhow::bail!("QQ gateway request failed ({status}): {err}");
474 }
475
476 let data: serde_json::Value = resp.json().await?;
477 let url = data
478 .get("url")
479 .and_then(|u| u.as_str())
480 .ok_or_else(|| {
481 ::zeroclaw_log::record!(
482 WARN,
483 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
484 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
485 "Missing gateway URL in QQ response"
486 );
487 anyhow::Error::msg("Missing gateway URL in QQ response")
488 })?
489 .to_string();
490
491 Ok(url)
492 }
493
494 async fn is_duplicate(&self, msg_id: &str) -> bool {
496 if msg_id.is_empty() {
497 return false;
498 }
499
500 let mut dedup = self.dedup.write().await;
501
502 if dedup.contains(msg_id) {
503 return true;
504 }
505
506 if dedup.len() >= DEDUP_CAPACITY {
508 let to_remove: Vec<String> = dedup.iter().take(DEDUP_CAPACITY / 2).cloned().collect();
509 for key in to_remove {
510 dedup.remove(&key);
511 }
512 }
513
514 dedup.insert(msg_id.to_string());
515 false
516 }
517
518 fn upload_cache_key(
520 file_data: &[u8],
521 scope: &str,
522 target_id: &str,
523 file_type: QQMediaFileType,
524 ) -> String {
525 let mut hasher = Sha256::new();
526 hasher.update(file_data);
527 let hash = format!("{:x}", hasher.finalize());
528 format!("{hash}:{scope}:{target_id}:{}", file_type as u8)
529 }
530
531 async fn get_cached_upload(&self, cache_key: &str) -> Option<String> {
533 let cache = self.upload_cache.read().await;
534 if let Some(entry) = cache.get(cache_key) {
535 if now_secs() + 60 < entry.expires_at {
537 return Some(entry.file_info.clone());
538 }
539 }
540 None
541 }
542
543 async fn set_cached_upload(&self, cache_key: String, file_info: String, ttl: u64) {
545 let mut cache = self.upload_cache.write().await;
546
547 if cache.len() >= UPLOAD_CACHE_CAPACITY {
549 let now = now_secs();
550 cache.retain(|_, v| v.expires_at > now);
551
552 if cache.len() >= UPLOAD_CACHE_CAPACITY {
554 let keys_to_remove: Vec<String> = cache
555 .keys()
556 .take(UPLOAD_CACHE_CAPACITY / 2)
557 .cloned()
558 .collect();
559 for key in keys_to_remove {
560 cache.remove(&key);
561 }
562 }
563 }
564
565 cache.insert(
566 cache_key,
567 UploadCacheEntry {
568 file_info,
569 expires_at: now_secs() + ttl,
570 },
571 );
572 }
573
574 fn resolve_recipient(recipient: &str) -> (&str, String) {
577 if let Some(group_id) = recipient.strip_prefix("group:") {
578 ("groups", group_id.to_string())
579 } else {
580 let raw_uid = recipient.strip_prefix("user:").unwrap_or(recipient);
581 let user_id: String = raw_uid
582 .chars()
583 .filter(|c| c.is_alphanumeric() || *c == '_')
584 .collect();
585 ("users", user_id)
586 }
587 }
588
589 async fn upload_media(
595 &self,
596 recipient: &str,
597 file_type: QQMediaFileType,
598 url: Option<&str>,
599 file_data: Option<&str>,
600 file_name: Option<&str>,
601 ) -> anyhow::Result<(String, Option<u64>)> {
602 let token = self.get_token().await?;
603 let (scope, id) = Self::resolve_recipient(recipient);
604
605 let api_url = format!("{QQ_API_BASE}/v2/{scope}/{id}/files");
606 ensure_https(&api_url)?;
607
608 let mut body = json!({
609 "file_type": file_type as u8,
610 "srv_send_msg": false,
611 });
612
613 if let Some(u) = url {
614 body["url"] = json!(u);
615 }
616 if let Some(d) = file_data {
617 body["file_data"] = json!(d);
618 }
619 if file_type == QQMediaFileType::File
621 && let Some(name) = file_name
622 {
623 body["file_name"] = json!(name);
624 }
625
626 let resp = self
627 .http_client()
628 .post(&api_url)
629 .header("Authorization", format!("QQBot {token}"))
630 .json(&body)
631 .send()
632 .await?;
633
634 if !resp.status().is_success() {
635 let status = resp.status();
636 let err = resp.text().await.unwrap_or_default();
637 anyhow::bail!("QQ upload media failed ({status}): {err}");
638 }
639
640 let upload_resp: QQUploadResponse = resp.json().await?;
641 Ok((upload_resp.file_info, upload_resp.ttl))
642 }
643
644 async fn send_media_message(&self, recipient: &str, file_info: &str) -> anyhow::Result<()> {
646 let token = self.get_token().await?;
647 let (scope, id) = Self::resolve_recipient(recipient);
648
649 let url = format!("{QQ_API_BASE}/v2/{scope}/{id}/messages");
650 ensure_https(&url)?;
651
652 let body = json!({
653 "msg_type": 7,
654 "media": {
655 "file_info": file_info,
656 },
657 "msg_seq": next_msg_seq(),
658 });
659
660 let resp = self
661 .http_client()
662 .post(&url)
663 .header("Authorization", format!("QQBot {token}"))
664 .json(&body)
665 .send()
666 .await?;
667
668 if !resp.status().is_success() {
669 let status = resp.status();
670 let err = resp.text().await.unwrap_or_default();
671 anyhow::bail!("QQ send media message failed ({status}): {err}");
672 }
673
674 Ok(())
675 }
676
677 async fn send_attachment(
679 &self,
680 recipient: &str,
681 attachment: &QQMediaAttachment,
682 ) -> anyhow::Result<()> {
683 let target = attachment.target.trim();
684
685 let file_name = Path::new(target.split('?').next().unwrap_or(target))
687 .file_name()
688 .and_then(|n| n.to_str())
689 .map(|s| s.to_string());
690
691 if target.starts_with("http://") || target.starts_with("https://") {
692 let (file_info, _ttl) = self
694 .upload_media(
695 recipient,
696 attachment.kind,
697 Some(target),
698 None,
699 file_name.as_deref(),
700 )
701 .await?;
702 self.send_media_message(recipient, &file_info).await?;
703 } else {
704 let path = Path::new(target);
706 if !path.exists() {
707 anyhow::bail!("QQ attachment path not found: {target}");
708 }
709
710 let metadata = tokio::fs::metadata(path).await?;
711 if metadata.len() > QQ_MAX_UPLOAD_BYTES {
712 anyhow::bail!(
713 "QQ attachment too large ({} bytes, max {}): {target}",
714 metadata.len(),
715 QQ_MAX_UPLOAD_BYTES
716 );
717 }
718
719 let file_bytes = tokio::fs::read(path).await?;
720 let (scope_label, target_id) = Self::resolve_recipient(recipient);
721 let scope = if scope_label == "groups" {
722 "group"
723 } else {
724 "c2c"
725 };
726 let cache_key = Self::upload_cache_key(&file_bytes, scope, &target_id, attachment.kind);
727
728 if let Some(cached_file_info) = self.get_cached_upload(&cache_key).await {
730 ::zeroclaw_log::record!(
731 DEBUG,
732 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
733 .with_attrs(::serde_json::json!({"target": target})),
734 "using cached upload for"
735 );
736 self.send_media_message(recipient, &cached_file_info)
737 .await?;
738 return Ok(());
739 }
740
741 let b64 = base64::engine::general_purpose::STANDARD.encode(&file_bytes);
742 let (file_info, ttl) = self
743 .upload_media(
744 recipient,
745 attachment.kind,
746 None,
747 Some(&b64),
748 file_name.as_deref(),
749 )
750 .await?;
751
752 if let Some(ttl_secs) = ttl {
754 self.set_cached_upload(cache_key, file_info.clone(), ttl_secs)
755 .await;
756 }
757
758 self.send_media_message(recipient, &file_info).await?;
759 }
760
761 Ok(())
762 }
763
764 async fn compose_message_content(&self, payload: &serde_json::Value) -> Option<String> {
769 let text = payload
770 .get("content")
771 .and_then(|c| c.as_str())
772 .unwrap_or("")
773 .trim();
774
775 let mut markers: Vec<String> = Vec::new();
776 let mut voice_transcripts: Vec<String> = Vec::new();
777
778 if let Some(attachments) = payload.get("attachments").and_then(|a| a.as_array()) {
779 for att in attachments {
780 let url = match att.get("url").and_then(|u| u.as_str()) {
781 Some(u) if !u.trim().is_empty() => fix_qq_url(u),
782 _ => continue,
783 };
784
785 let content_type = att
786 .get("content_type")
787 .and_then(|ct| ct.as_str())
788 .unwrap_or("");
789 let filename = att
790 .get("filename")
791 .and_then(|f| f.as_str())
792 .unwrap_or("attachment");
793
794 let marker_type = infer_attachment_marker(content_type, filename);
795
796 let is_voice = content_type == "voice"
800 || content_type.starts_with("audio/")
801 || marker_type == "VOICE";
802 let (download_url, save_filename) = if is_voice {
803 if let Some(wav_url) = att
804 .get("voice_wav_url")
805 .and_then(|u| u.as_str())
806 .filter(|u| !u.trim().is_empty())
807 {
808 let fixed = fix_qq_url(wav_url);
809 let wav_name = Path::new(fixed.split('?').next().unwrap_or(&fixed))
811 .file_name()
812 .and_then(|n| n.to_str())
813 .unwrap_or("voice.wav")
814 .to_string();
815 (fixed, wav_name)
816 } else {
817 (url.clone(), filename.to_string())
818 }
819 } else {
820 (url.clone(), filename.to_string())
821 };
822
823 let location = if let Some(ref ws) = self.workspace_dir {
825 let dir = ws.join("qq_files");
826 match self
827 .download_attachment(&download_url, &dir, &save_filename)
828 .await
829 {
830 Ok(local_path) => local_path.display().to_string(),
831 Err(e) => {
832 ::zeroclaw_log::record!(
833 WARN,
834 ::zeroclaw_log::Event::new(
835 module_path!(),
836 ::zeroclaw_log::Action::Note
837 )
838 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
839 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
840 "failed to download attachment"
841 );
842 url.clone()
843 }
844 }
845 } else {
846 url.clone()
847 };
848
849 if is_voice {
850 markers.push(format!("[{marker_type}:{location}]"));
854 if let Some(asr_text) = att
855 .get("asr_refer_text")
856 .and_then(|t| t.as_str())
857 .map(|t| t.trim())
858 .filter(|t| !t.is_empty())
859 {
860 voice_transcripts.push(asr_text.to_string());
861 }
862 } else {
863 markers.push(format!("[{marker_type}:{location}]"));
864 }
865 }
866 }
867
868 let voice_text = match voice_transcripts.len() {
871 0 => String::new(),
872 1 => format!(
873 "<VOICE_TRANSCRIPTION>{}</VOICE_TRANSCRIPTION>",
874 voice_transcripts[0]
875 ),
876 _ => voice_transcripts
877 .iter()
878 .enumerate()
879 .map(|(i, t)| format!("<VOICE_TRANSCRIPTION_{i}>{t}</VOICE_TRANSCRIPTION_{i}>"))
880 .collect::<Vec<_>>()
881 .join("\n"),
882 };
883
884 let mut parts: Vec<&str> = Vec::new();
885 if !text.is_empty() {
886 parts.push(text);
887 }
888 if !voice_text.is_empty() {
889 parts.push(&voice_text);
890 }
891 let markers_joined = markers.join("\n");
892 if !markers_joined.is_empty() {
893 parts.push(&markers_joined);
894 }
895
896 if parts.is_empty() {
897 return None;
898 }
899
900 Some(parts.join("\n"))
901 }
902
903 async fn download_attachment(
905 &self,
906 url: &str,
907 dir: &Path,
908 filename: &str,
909 ) -> anyhow::Result<PathBuf> {
910 tokio::fs::create_dir_all(dir).await?;
911
912 let stem = Path::new(filename)
914 .file_stem()
915 .and_then(|s| s.to_str())
916 .unwrap_or("file");
917 let ext = Path::new(filename)
918 .extension()
919 .and_then(|e| e.to_str())
920 .unwrap_or("");
921 let unique = &Uuid::new_v4().to_string()[..8];
922 let safe_name = if ext.is_empty() {
923 format!("{stem}_{unique}")
924 } else {
925 format!("{stem}_{unique}.{ext}")
926 };
927
928 let dest = dir.join(&safe_name);
929
930 let resp = self.http_client().get(url).send().await?;
933 if !resp.status().is_success() {
934 anyhow::bail!("Download failed ({}): {url}", resp.status());
935 }
936
937 let bytes = resp.bytes().await?;
938 tokio::fs::write(&dest, &bytes).await?;
939
940 Ok(dest)
941 }
942
943 async fn send_text_markdown(&self, recipient: &str, content: &str) -> anyhow::Result<()> {
945 let token = self.get_token().await?;
946 let (scope, id) = Self::resolve_recipient(recipient);
947
948 let url = format!("{QQ_API_BASE}/v2/{scope}/{id}/messages");
949 ensure_https(&url)?;
950
951 let body = json!({
952 "markdown": {
953 "content": content,
954 },
955 "msg_type": 2,
956 "msg_seq": next_msg_seq(),
957 });
958
959 let resp = self
960 .http_client()
961 .post(&url)
962 .header("Authorization", format!("QQBot {token}"))
963 .json(&body)
964 .send()
965 .await?;
966
967 if !resp.status().is_success() {
968 let status = resp.status();
969 let err = resp.text().await.unwrap_or_default();
970 anyhow::bail!("QQ send message failed ({status}): {err}");
971 }
972
973 Ok(())
974 }
975}
976
977impl ::zeroclaw_api::attribution::Attributable for QQChannel {
978 fn role(&self) -> ::zeroclaw_api::attribution::Role {
979 ::zeroclaw_api::attribution::Role::Channel(::zeroclaw_api::attribution::ChannelKind::Qq)
980 }
981 fn alias(&self) -> &str {
982 &self.alias
983 }
984}
985
986#[async_trait]
987impl Channel for QQChannel {
988 fn name(&self) -> &str {
989 "qq"
990 }
991
992 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
993 let (cleaned_text, attachments) = parse_qq_attachment_markers(&message.content);
994
995 if attachments.is_empty() {
996 return self
998 .send_text_markdown(&message.recipient, &message.content)
999 .await;
1000 }
1001
1002 if !cleaned_text.is_empty() {
1004 self.send_text_markdown(&message.recipient, &cleaned_text)
1005 .await?;
1006 }
1007
1008 for attachment in &attachments {
1010 if let Err(e) = self.send_attachment(&message.recipient, attachment).await {
1011 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"target": attachment.target, "error": format!("{}", e)})), "failed to send media attachment; falling back to text");
1012 let fallback = format!(
1014 "{}: {}",
1015 match attachment.kind {
1016 QQMediaFileType::Image => "Image",
1017 QQMediaFileType::Video => "Video",
1018 QQMediaFileType::Voice => "Voice",
1019 QQMediaFileType::File => "File",
1020 },
1021 attachment.target
1022 );
1023 self.send_text_markdown(&message.recipient, &fallback)
1024 .await?;
1025 }
1026 }
1027
1028 Ok(())
1029 }
1030
1031 #[allow(clippy::too_many_lines)]
1032 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
1033 ::zeroclaw_log::record!(
1034 INFO,
1035 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1036 "authenticating..."
1037 );
1038 let token = self.get_token().await?;
1039
1040 ::zeroclaw_log::record!(
1041 INFO,
1042 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1043 "fetching gateway URL..."
1044 );
1045 let gw_url = self.get_gateway_url(&token).await?;
1046
1047 ::zeroclaw_log::record!(
1048 INFO,
1049 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1050 "connecting to gateway WebSocket..."
1051 );
1052 let (ws_stream, _) = zeroclaw_config::schema::ws_connect_with_proxy(
1053 &gw_url,
1054 "channel.qq",
1055 self.proxy_url.as_deref(),
1056 )
1057 .await?;
1058 let (mut write, mut read) = ws_stream.split();
1059
1060 let hello = read.next().await.ok_or_else(|| {
1062 ::zeroclaw_log::record!(
1063 ERROR,
1064 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1065 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1066 .with_attrs(::serde_json::json!({"phase": "gateway_hello"})),
1067 "qq: gateway closed before Hello frame"
1068 );
1069 anyhow::Error::msg("no hello frame")
1070 })??;
1071 let hello_data: serde_json::Value = serde_json::from_str(&hello.to_string())?;
1072 let heartbeat_interval = hello_data
1073 .get("d")
1074 .and_then(|d| d.get("heartbeat_interval"))
1075 .and_then(serde_json::Value::as_u64)
1076 .unwrap_or(41250);
1077
1078 let stored_session = self.session_id.read().await.clone();
1080 let stored_seq = *self.last_sequence.read().await;
1081
1082 if let (Some(sid), Some(seq)) = (&stored_session, stored_seq) {
1083 ::zeroclaw_log::record!(
1085 INFO,
1086 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1087 .with_attrs(::serde_json::json!({"sid": sid, "seq": seq})),
1088 "attempting session resume (session_id=, seq=)"
1089 );
1090 let resume = json!({
1091 "op": 6,
1092 "d": {
1093 "token": format!("QQBot {token}"),
1094 "session_id": sid,
1095 "seq": seq,
1096 }
1097 });
1098 write.send(Message::Text(resume.to_string().into())).await?;
1099 } else {
1100 let intents: u64 = (1 << 25) | (1 << 30);
1103 let identify = json!({
1104 "op": 2,
1105 "d": {
1106 "token": format!("QQBot {token}"),
1107 "intents": intents,
1108 "properties": {
1109 "os": "linux",
1110 "browser": "zeroclaw",
1111 "device": "zeroclaw",
1112 }
1113 }
1114 });
1115 write
1116 .send(Message::Text(identify.to_string().into()))
1117 .await?;
1118 ::zeroclaw_log::record!(
1119 INFO,
1120 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1121 "connected and sent Identify"
1122 );
1123 }
1124
1125 let mut sequence: i64 = stored_seq.unwrap_or(-1);
1126
1127 const MAX_MISSED_ACKS: u32 = 3;
1134 let mut missed_ack_count: u32 = 0;
1135
1136 let hb_interval = heartbeat_interval;
1142 let grace_ms: u64 = (hb_interval / 10).min(5_000);
1143 let effective_interval = hb_interval.saturating_add(grace_ms);
1144
1145 let (hb_tx, mut hb_rx) = tokio::sync::mpsc::channel::<()>(1);
1146 tokio::spawn(async move {
1147 let mut interval =
1148 tokio::time::interval(std::time::Duration::from_millis(effective_interval));
1149 loop {
1150 interval.tick().await;
1151 if hb_tx.send(()).await.is_err() {
1152 break;
1153 }
1154 }
1155 });
1156
1157 enum ExitReason {
1159 Reconnect,
1160 InvalidSession,
1161 Close(Option<tokio_tungstenite::tungstenite::protocol::CloseFrame>),
1162 StreamEnded,
1163 HeartbeatTimeout,
1164 WriteFailed,
1165 ChannelClosed,
1166 }
1167
1168 let exit_reason;
1169
1170 'outer: loop {
1171 tokio::select! {
1172 _ = hb_rx.recv() => {
1173 if missed_ack_count > 0 {
1177 if missed_ack_count >= MAX_MISSED_ACKS {
1178 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"missed_ack_count": missed_ack_count, "hb_interval": hb_interval, "grace_ms": grace_ms})), "consecutive heartbeat ACKs missed (interval ms + ms grace); connection appears zombied");
1179 exit_reason = ExitReason::HeartbeatTimeout;
1180 break;
1181 }
1182 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"missed_ack_count": missed_ack_count, "MAX_MISSED_ACKS": MAX_MISSED_ACKS})), "heartbeat ACK missed (/); tolerating transient delay");
1183 }
1184 let d = if sequence >= 0 { json!(sequence) } else { json!(null) };
1185 let hb = json!({"op": 1, "d": d});
1186 if write
1187 .send(Message::Text(hb.to_string().into()))
1188 .await
1189 .is_err()
1190 {
1191 exit_reason = ExitReason::WriteFailed;
1192 break;
1193 }
1194 missed_ack_count += 1;
1195 }
1196 msg = read.next() => {
1197 let msg = match msg {
1198 Some(Ok(Message::Text(t))) => t,
1199 Some(Ok(Message::Ping(payload))) => {
1200 if write.send(Message::Pong(payload)).await.is_err() {
1201 exit_reason = ExitReason::WriteFailed;
1202 break;
1203 }
1204 continue;
1205 }
1206 Some(Ok(Message::Close(frame))) => {
1207 exit_reason = ExitReason::Close(frame);
1208 break;
1209 }
1210 None => {
1211 exit_reason = ExitReason::StreamEnded;
1212 break;
1213 }
1214 _ => continue,
1215 };
1216
1217 let event: serde_json::Value = match serde_json::from_str(msg.as_ref()) {
1218 Ok(e) => e,
1219 Err(_) => continue,
1220 };
1221
1222 if let Some(s) = event.get("s").and_then(serde_json::Value::as_i64) {
1224 sequence = s;
1225 }
1226
1227 let op = event.get("op").and_then(serde_json::Value::as_u64).unwrap_or(0);
1228
1229 match op {
1230 1 => {
1232 let d = if sequence >= 0 { json!(sequence) } else { json!(null) };
1233 let hb = json!({"op": 1, "d": d});
1234 if write
1235 .send(Message::Text(hb.to_string().into()))
1236 .await
1237 .is_err()
1238 {
1239 exit_reason = ExitReason::WriteFailed;
1240 break;
1241 }
1242 missed_ack_count += 1;
1243 continue;
1244 }
1245 7 => {
1247 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "received Reconnect (op 7); will resume");
1248 exit_reason = ExitReason::Reconnect;
1249 break;
1250 }
1251 9 => {
1253 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "received Invalid Session (op 9); clearing session for fresh auth");
1254 exit_reason = ExitReason::InvalidSession;
1255 break;
1256 }
1257 11 => {
1259 missed_ack_count = 0;
1260 continue;
1261 }
1262 _ => {}
1263 }
1264
1265 if op != 0 {
1267 continue;
1268 }
1269
1270 let event_type = event.get("t").and_then(|t| t.as_str()).unwrap_or("");
1271 let d = match event.get("d") {
1272 Some(d) => d,
1273 None => continue,
1274 };
1275
1276 if event_type == "READY" || event_type == "RESUMED" {
1278 if let Some(sid) = d.get("session_id").and_then(|s| s.as_str()) {
1279 *self.session_id.write().await = Some(sid.to_string());
1280 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"sid": sid, "event_type": event_type})), "session established (session_id=, event=)");
1281 }
1282 continue;
1283 }
1284
1285 ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"event_type": event_type, "d": d})), "event_type= payload=");
1286
1287 match event_type {
1288 "C2C_MESSAGE_CREATE" => {
1289 let msg_id = d.get("id").and_then(|i| i.as_str()).unwrap_or("");
1290 if self.is_duplicate(msg_id).await {
1291 continue;
1292 }
1293
1294 let Some(content) = self.compose_message_content(d).await else {
1295 continue;
1296 };
1297
1298 let author_id = d.get("author").and_then(|a| a.get("id")).and_then(|i| i.as_str()).unwrap_or("unknown");
1299 let user_openid = d.get("author").and_then(|a| a.get("user_openid")).and_then(|u| u.as_str()).unwrap_or(author_id);
1301
1302 if !self.is_user_allowed(user_openid) {
1303 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"user_openid": user_openid})), "ignoring C2C message from unauthorized user");
1304 continue;
1305 }
1306
1307 let chat_id = format!("user:{user_openid}");
1308
1309 let channel_msg = ChannelMessage {
1310 id: Uuid::new_v4().to_string(),
1311 sender: user_openid.to_string(),
1312 reply_target: chat_id,
1313 content,
1314 channel: "qq".to_string(),
1315 channel_alias: Some(self.alias.clone()),
1316 timestamp: std::time::SystemTime::now()
1317 .duration_since(std::time::UNIX_EPOCH)
1318 .unwrap_or_default()
1319 .as_secs(),
1320 thread_ts: None,
1321 interruption_scope_id: None,
1322 attachments: vec![],
1323 subject: None,
1324 };
1325
1326 if tx.send(channel_msg).await.is_err() {
1327 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "message channel closed");
1328 exit_reason = ExitReason::ChannelClosed;
1329 break 'outer;
1330 }
1331 }
1332 "GROUP_AT_MESSAGE_CREATE" => {
1333 let msg_id = d.get("id").and_then(|i| i.as_str()).unwrap_or("");
1334 if self.is_duplicate(msg_id).await {
1335 continue;
1336 }
1337
1338 let Some(content) = self.compose_message_content(d).await else {
1339 continue;
1340 };
1341
1342 let author_id = d.get("author").and_then(|a| a.get("member_openid")).and_then(|m| m.as_str()).unwrap_or("unknown");
1343
1344 if !self.is_user_allowed(author_id) {
1345 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"author_id": author_id})), "ignoring group message from unauthorized user");
1346 continue;
1347 }
1348
1349 let group_openid = d.get("group_openid").and_then(|g| g.as_str()).unwrap_or("unknown");
1350 let chat_id = format!("group:{group_openid}");
1351
1352 let channel_msg = ChannelMessage {
1353 id: Uuid::new_v4().to_string(),
1354 sender: author_id.to_string(),
1355 reply_target: chat_id,
1356 content,
1357 channel: "qq".to_string(),
1358 channel_alias: Some(self.alias.clone()),
1359 timestamp: std::time::SystemTime::now()
1360 .duration_since(std::time::UNIX_EPOCH)
1361 .unwrap_or_default()
1362 .as_secs(),
1363 thread_ts: None,
1364 interruption_scope_id: None,
1365 attachments: vec![],
1366 subject: None,
1367 };
1368
1369 if tx.send(channel_msg).await.is_err() {
1370 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "message channel closed");
1371 exit_reason = ExitReason::ChannelClosed;
1372 break 'outer;
1373 }
1374 }
1375 _ => {}
1376 }
1377 }
1378 }
1379 }
1380
1381 *self.last_sequence.write().await = if sequence >= 0 { Some(sequence) } else { None };
1383
1384 match exit_reason {
1385 ExitReason::InvalidSession => {
1386 *self.session_id.write().await = None;
1388 *self.last_sequence.write().await = None;
1389 anyhow::bail!(
1390 "QQ WebSocket connection closed: invalid session (fresh auth required)"
1391 )
1392 }
1393 ExitReason::Reconnect => {
1394 anyhow::bail!(
1396 "QQ WebSocket connection closed: server requested reconnect (resume will be attempted)"
1397 )
1398 }
1399 ExitReason::Close(ref frame) => {
1400 let (code, reason) = frame
1401 .as_ref()
1402 .map(|f| (f.code.to_string(), f.reason.to_string()))
1403 .unwrap_or_else(|| ("unknown".into(), "none".into()));
1404 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"code": code.to_string(), "reason": reason.to_string()})), "WebSocket closed with code=, reason=\"\"; resume will be attempted on reconnect");
1405 anyhow::bail!(
1406 "QQ WebSocket connection closed: close_code={code}, reason=\"{reason}\""
1407 )
1408 }
1409 ExitReason::StreamEnded => {
1410 ::zeroclaw_log::record!(
1411 WARN,
1412 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1413 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1414 "WebSocket stream ended unexpectedly; resume will be attempted on reconnect"
1415 );
1416 anyhow::bail!("QQ WebSocket connection closed: stream ended unexpectedly")
1417 }
1418 ExitReason::HeartbeatTimeout => {
1419 ::zeroclaw_log::record!(
1420 WARN,
1421 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1422 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1423 .with_attrs(::serde_json::json!({"MAX_MISSED_ACKS": MAX_MISSED_ACKS})),
1424 "heartbeat timeout after consecutive missed ACKs; resume will be attempted on reconnect"
1425 );
1426 anyhow::bail!(
1427 "QQ WebSocket connection closed: heartbeat ACK timeout \
1428 ({MAX_MISSED_ACKS} consecutive missed ACKs)"
1429 )
1430 }
1431 ExitReason::WriteFailed => {
1432 ::zeroclaw_log::record!(
1433 WARN,
1434 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1435 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1436 "WebSocket write failed; resume will be attempted on reconnect"
1437 );
1438 anyhow::bail!("QQ WebSocket connection closed: write failed")
1439 }
1440 ExitReason::ChannelClosed => {
1441 anyhow::bail!("QQ WebSocket connection closed: internal message channel closed")
1442 }
1443 }
1444 }
1445
1446 async fn health_check(&self) -> bool {
1447 self.fetch_access_token_with_retry().await.is_ok()
1448 }
1449}
1450
1451#[cfg(test)]
1452mod tests {
1453 use super::*;
1454 use serde_json::json;
1455
1456 #[test]
1457 fn test_name() {
1458 let ch = QQChannel::new(
1459 "id".into(),
1460 "secret".into(),
1461 "qq_test_alias",
1462 Arc::new(Vec::new),
1463 );
1464 assert_eq!(ch.name(), "qq");
1465 }
1466
1467 #[test]
1468 fn test_user_allowed_wildcard() {
1469 let ch = QQChannel::new(
1470 "id".into(),
1471 "secret".into(),
1472 "qq_test_alias",
1473 Arc::new(|| vec!["*".into()]),
1474 );
1475 assert!(ch.is_user_allowed("anyone"));
1476 }
1477
1478 #[test]
1479 fn test_user_allowed_specific() {
1480 let ch = QQChannel::new(
1481 "id".into(),
1482 "secret".into(),
1483 "qq_test_alias",
1484 Arc::new(|| vec!["user123".into()]),
1485 );
1486 assert!(ch.is_user_allowed("user123"));
1487 assert!(!ch.is_user_allowed("other"));
1488 }
1489
1490 #[test]
1491 fn test_user_denied_empty() {
1492 let ch = QQChannel::new(
1493 "id".into(),
1494 "secret".into(),
1495 "qq_test_alias",
1496 Arc::new(Vec::new),
1497 );
1498 assert!(!ch.is_user_allowed("anyone"));
1499 }
1500
1501 #[tokio::test]
1502 async fn test_dedup() {
1503 let ch = QQChannel::new(
1504 "id".into(),
1505 "secret".into(),
1506 "qq_test_alias",
1507 Arc::new(Vec::new),
1508 );
1509 assert!(!ch.is_duplicate("msg1").await);
1510 assert!(ch.is_duplicate("msg1").await);
1511 assert!(!ch.is_duplicate("msg2").await);
1512 }
1513
1514 #[tokio::test]
1515 async fn test_dedup_empty_id() {
1516 let ch = QQChannel::new(
1517 "id".into(),
1518 "secret".into(),
1519 "qq_test_alias",
1520 Arc::new(Vec::new),
1521 );
1522 assert!(!ch.is_duplicate("").await);
1523 assert!(!ch.is_duplicate("").await);
1524 }
1525
1526 #[test]
1527 fn v2_allowed_users_fold_into_peer_groups() {
1528 let v2_toml = r#"
1532schema_version = 2
1533
1534[channels.qq]
1535enabled = true
1536app_id = "12345"
1537app_secret = "secret_abc"
1538allowed_users = ["user1"]
1539"#;
1540 let cfg = zeroclaw_config::migration::migrate_to_current(v2_toml)
1541 .expect("V2 qq config migrates to V3");
1542 let qq = cfg
1543 .channels
1544 .qq
1545 .get("default")
1546 .expect("V2 qq folds under alias `default`");
1547 assert_eq!(qq.app_id, "12345");
1548 assert_eq!(qq.app_secret, "secret_abc");
1549
1550 let group = cfg
1551 .peer_groups
1552 .get("qq_default")
1553 .expect("qq allow-list synthesizes [peer_groups.qq_default]");
1554 assert_eq!(group.channel, "qq");
1555 let peers: Vec<&str> = group.external_peers.iter().map(|p| p.as_str()).collect();
1556 assert_eq!(peers, vec!["user1"]);
1557 }
1558
1559 #[test]
1562 fn test_parse_qq_markers_single_image() {
1563 let (text, atts) = parse_qq_attachment_markers("Hello [IMAGE:/tmp/a.png] world");
1564 assert_eq!(text, "Hello world");
1565 assert_eq!(atts.len(), 1);
1566 assert_eq!(atts[0].kind, QQMediaFileType::Image);
1567 assert_eq!(atts[0].target, "/tmp/a.png");
1568 }
1569
1570 #[test]
1571 fn test_parse_qq_markers_multiple() {
1572 let (text, atts) =
1573 parse_qq_attachment_markers("[IMAGE:/a.png] text [VIDEO:https://example.com/v.mp4]");
1574 assert_eq!(text, "text");
1575 assert_eq!(atts.len(), 2);
1576 assert_eq!(atts[0].kind, QQMediaFileType::Image);
1577 assert_eq!(atts[1].kind, QQMediaFileType::Video);
1578 }
1579
1580 #[test]
1581 fn test_parse_qq_markers_no_markers() {
1582 let (text, atts) = parse_qq_attachment_markers("Just plain text");
1583 assert_eq!(text, "Just plain text");
1584 assert!(atts.is_empty());
1585 }
1586
1587 #[test]
1588 fn test_parse_qq_markers_case_insensitive() {
1589 let (_, atts) = parse_qq_attachment_markers("[image:/a.png]");
1590 assert_eq!(atts.len(), 1);
1591 assert_eq!(atts[0].kind, QQMediaFileType::Image);
1592
1593 let (_, atts) = parse_qq_attachment_markers("[Image:/a.png]");
1594 assert_eq!(atts.len(), 1);
1595 assert_eq!(atts[0].kind, QQMediaFileType::Image);
1596 }
1597
1598 #[test]
1599 fn test_parse_qq_markers_invalid_preserved() {
1600 let (text, atts) = parse_qq_attachment_markers("Keep [UNKNOWN:foo] here");
1601 assert_eq!(text, "Keep [UNKNOWN:foo] here");
1602 assert!(atts.is_empty());
1603 }
1604
1605 #[test]
1606 fn test_parse_qq_markers_mixed_text_and_markers() {
1607 let (text, atts) =
1608 parse_qq_attachment_markers("Before [DOCUMENT:/doc.pdf] middle [PHOTO:/p.jpg] after");
1609 assert_eq!(text, "Before middle after");
1610 assert_eq!(atts.len(), 2);
1611 assert_eq!(atts[0].kind, QQMediaFileType::File);
1612 assert_eq!(atts[0].target, "/doc.pdf");
1613 assert_eq!(atts[1].kind, QQMediaFileType::Image);
1614 assert_eq!(atts[1].target, "/p.jpg");
1615 }
1616
1617 #[test]
1620 fn test_marker_kind_image() {
1621 assert_eq!(
1622 marker_kind_to_qq_file_type("IMAGE", "/a.png"),
1623 Some(QQMediaFileType::Image)
1624 );
1625 assert_eq!(
1626 marker_kind_to_qq_file_type("PHOTO", "/a.png"),
1627 Some(QQMediaFileType::Image)
1628 );
1629 }
1630
1631 #[test]
1632 fn test_marker_kind_document() {
1633 assert_eq!(
1634 marker_kind_to_qq_file_type("DOCUMENT", "/a.pdf"),
1635 Some(QQMediaFileType::File)
1636 );
1637 assert_eq!(
1638 marker_kind_to_qq_file_type("FILE", "/a.zip"),
1639 Some(QQMediaFileType::File)
1640 );
1641 }
1642
1643 #[test]
1644 fn test_marker_kind_video() {
1645 assert_eq!(
1646 marker_kind_to_qq_file_type("VIDEO", "/v.mp4"),
1647 Some(QQMediaFileType::Video)
1648 );
1649 }
1650
1651 #[test]
1652 fn test_marker_kind_voice_native() {
1653 assert_eq!(
1654 marker_kind_to_qq_file_type("VOICE", "/a.mp3"),
1655 Some(QQMediaFileType::Voice)
1656 );
1657 assert_eq!(
1658 marker_kind_to_qq_file_type("AUDIO", "/a.wav"),
1659 Some(QQMediaFileType::Voice)
1660 );
1661 assert_eq!(
1662 marker_kind_to_qq_file_type("VOICE", "/a.silk"),
1663 Some(QQMediaFileType::Voice)
1664 );
1665 }
1666
1667 #[test]
1668 fn test_marker_kind_voice_non_native_degrades() {
1669 assert_eq!(
1671 marker_kind_to_qq_file_type("VOICE", "/a.ogg"),
1672 Some(QQMediaFileType::File)
1673 );
1674 assert_eq!(
1675 marker_kind_to_qq_file_type("AUDIO", "/a.flac"),
1676 Some(QQMediaFileType::File)
1677 );
1678 }
1679
1680 #[test]
1683 fn test_upload_body_url() {
1684 let body = json!({
1685 "file_type": QQMediaFileType::Image as u8,
1686 "srv_send_msg": false,
1687 "url": "https://example.com/a.jpg",
1688 });
1689 assert_eq!(body["file_type"], 1);
1690 assert_eq!(body["srv_send_msg"], false);
1691 assert_eq!(body["url"], "https://example.com/a.jpg");
1692 assert!(body.get("file_data").is_none());
1693 }
1694
1695 #[test]
1696 fn test_upload_body_base64() {
1697 let body = json!({
1698 "file_type": QQMediaFileType::File as u8,
1699 "srv_send_msg": false,
1700 "file_data": "dGVzdA==",
1701 });
1702 assert_eq!(body["file_type"], 4);
1703 assert_eq!(body["file_data"], "dGVzdA==");
1704 assert!(body.get("url").is_none());
1705 }
1706
1707 #[test]
1708 fn test_send_media_body_msg_type_7() {
1709 let file_info = "some_file_info_string";
1710 let body = json!({
1711 "msg_type": 7,
1712 "media": {
1713 "file_info": file_info,
1714 },
1715 "msg_seq": 1,
1716 });
1717 assert_eq!(body["msg_type"], 7);
1718 assert_eq!(body["media"]["file_info"], file_info);
1719 }
1720
1721 #[tokio::test]
1724 async fn test_compose_message_content_text_only() {
1725 let ch = QQChannel::new(
1726 "id".into(),
1727 "secret".into(),
1728 "qq_test_alias",
1729 Arc::new(Vec::new),
1730 );
1731 let payload = json!({ "content": " hello world " });
1732 assert_eq!(
1733 ch.compose_message_content(&payload).await,
1734 Some("hello world".to_string())
1735 );
1736 }
1737
1738 #[tokio::test]
1739 async fn test_compose_message_content_image_attachment() {
1740 let ch = QQChannel::new(
1741 "id".into(),
1742 "secret".into(),
1743 "qq_test_alias",
1744 Arc::new(Vec::new),
1745 );
1746 let payload = json!({
1747 "content": " ",
1748 "attachments": [{
1749 "content_type": "image/jpg",
1750 "url": "https://cdn.example.com/a.jpg"
1751 }]
1752 });
1753 assert_eq!(
1754 ch.compose_message_content(&payload).await,
1755 Some("[IMAGE:https://cdn.example.com/a.jpg]".to_string())
1756 );
1757 }
1758
1759 #[tokio::test]
1760 async fn test_compose_message_content_text_and_attachments() {
1761 let ch = QQChannel::new(
1762 "id".into(),
1763 "secret".into(),
1764 "qq_test_alias",
1765 Arc::new(Vec::new),
1766 );
1767 let payload = json!({
1768 "content": "Here is an image",
1769 "attachments": [
1770 { "content_type": "image/png", "url": "https://cdn.example.com/a.png" },
1771 { "filename": "b.jpeg", "url": "https://cdn.example.com/b.jpeg" }
1772 ]
1773 });
1774 assert_eq!(
1775 ch.compose_message_content(&payload).await,
1776 Some(
1777 "Here is an image\n[IMAGE:https://cdn.example.com/a.png]\n[IMAGE:https://cdn.example.com/b.jpeg]"
1778 .to_string()
1779 )
1780 );
1781 }
1782
1783 #[tokio::test]
1784 async fn test_compose_all_attachment_types() {
1785 let ch = QQChannel::new(
1786 "id".into(),
1787 "secret".into(),
1788 "qq_test_alias",
1789 Arc::new(Vec::new),
1790 );
1791 let payload = json!({
1792 "content": "",
1793 "attachments": [
1794 { "content_type": "image/png", "url": "https://cdn.example.com/a.png" },
1795 { "content_type": "audio/mpeg", "url": "https://cdn.example.com/b.mp3" },
1796 { "content_type": "video/mp4", "url": "https://cdn.example.com/c.mp4" },
1797 { "content_type": "application/pdf", "url": "https://cdn.example.com/d.pdf" }
1798 ]
1799 });
1800 let result = ch.compose_message_content(&payload).await.unwrap();
1801 assert!(result.contains("[IMAGE:"));
1802 assert!(result.contains("[VOICE:"));
1803 assert!(result.contains("[VIDEO:"));
1804 assert!(result.contains("[DOCUMENT:"));
1805 }
1806
1807 #[tokio::test]
1808 async fn test_compose_fixes_double_slash_url() {
1809 let ch = QQChannel::new(
1810 "id".into(),
1811 "secret".into(),
1812 "qq_test_alias",
1813 Arc::new(Vec::new),
1814 );
1815 let payload = json!({
1816 "content": "",
1817 "attachments": [{
1818 "content_type": "image/png",
1819 "url": "//cdn.example.com/a.png"
1820 }]
1821 });
1822 let result = ch.compose_message_content(&payload).await.unwrap();
1823 assert!(result.contains("https://cdn.example.com/a.png"));
1824 assert!(!result.starts_with("[IMAGE://"));
1826 }
1827
1828 #[tokio::test]
1829 async fn test_compose_fallback_no_workspace() {
1830 let ch = QQChannel::new(
1832 "id".into(),
1833 "secret".into(),
1834 "qq_test_alias",
1835 Arc::new(Vec::new),
1836 );
1837 let payload = json!({
1838 "content": "text",
1839 "attachments": [{
1840 "content_type": "application/pdf",
1841 "filename": "report.pdf",
1842 "url": "https://cdn.example.com/report.pdf"
1843 }]
1844 });
1845 let result = ch.compose_message_content(&payload).await.unwrap();
1846 assert!(result.contains("[DOCUMENT:https://cdn.example.com/report.pdf]"));
1847 }
1848
1849 #[tokio::test]
1850 async fn test_compose_drops_empty_url() {
1851 let ch = QQChannel::new(
1852 "id".into(),
1853 "secret".into(),
1854 "qq_test_alias",
1855 Arc::new(Vec::new),
1856 );
1857 let payload = json!({
1858 "content": " ",
1859 "attachments": [{
1860 "content_type": "image/png",
1861 "url": " "
1862 }]
1863 });
1864 assert_eq!(ch.compose_message_content(&payload).await, None);
1865 }
1866
1867 #[test]
1870 fn test_send_body_uses_markdown_msg_type() {
1871 let content = "**bold** and `code`";
1872 let body = json!({
1873 "markdown": { "content": content },
1874 "msg_type": 2,
1875 });
1876 assert_eq!(body["msg_type"], 2);
1877 assert_eq!(body["markdown"]["content"], content);
1878 assert!(
1879 body.get("content").is_none(),
1880 "top-level 'content' must not be present"
1881 );
1882 }
1883
1884 #[test]
1887 fn test_fix_qq_url() {
1888 assert_eq!(
1889 fix_qq_url("//cdn.example.com/a.png"),
1890 "https://cdn.example.com/a.png"
1891 );
1892 assert_eq!(
1893 fix_qq_url("https://cdn.example.com/a.png"),
1894 "https://cdn.example.com/a.png"
1895 );
1896 }
1897
1898 #[test]
1899 fn test_next_msg_seq_range() {
1900 for _ in 0..100 {
1901 let seq = next_msg_seq();
1902 assert!(seq < 65536);
1903 }
1904 }
1905
1906 #[test]
1907 fn test_resolve_recipient_group() {
1908 let (scope, id) = QQChannel::resolve_recipient("group:abc123");
1909 assert_eq!(scope, "groups");
1910 assert_eq!(id, "abc123");
1911 }
1912
1913 #[test]
1914 fn test_resolve_recipient_user() {
1915 let (scope, id) = QQChannel::resolve_recipient("user:xyz789");
1916 assert_eq!(scope, "users");
1917 assert_eq!(id, "xyz789");
1918 }
1919
1920 #[test]
1921 fn test_resolve_recipient_bare_id() {
1922 let (scope, id) = QQChannel::resolve_recipient("raw_id_123");
1923 assert_eq!(scope, "users");
1924 assert_eq!(id, "raw_id_123");
1925 }
1926
1927 #[test]
1928 fn test_infer_attachment_marker() {
1929 assert_eq!(infer_attachment_marker("image/png", "a.png"), "IMAGE");
1930 assert_eq!(infer_attachment_marker("audio/mpeg", "a.mp3"), "VOICE");
1931 assert_eq!(infer_attachment_marker("video/mp4", "a.mp4"), "VIDEO");
1932 assert_eq!(
1933 infer_attachment_marker("application/pdf", "doc.pdf"),
1934 "DOCUMENT"
1935 );
1936 assert_eq!(infer_attachment_marker("", "photo.jpg"), "IMAGE");
1937 assert_eq!(infer_attachment_marker("", "song.mp3"), "VOICE");
1938 assert_eq!(infer_attachment_marker("", "clip.mp4"), "VIDEO");
1939 assert_eq!(infer_attachment_marker("", "unknown.xyz"), "DOCUMENT");
1940 }
1941
1942 #[tokio::test]
1945 async fn test_upload_cache_hit_and_miss() {
1946 let ch = QQChannel::new(
1947 "id".into(),
1948 "secret".into(),
1949 "qq_test_alias",
1950 Arc::new(Vec::new),
1951 );
1952 let key = QQChannel::upload_cache_key(b"test_data", "c2c", "user1", QQMediaFileType::Image);
1953
1954 assert!(ch.get_cached_upload(&key).await.is_none());
1956
1957 ch.set_cached_upload(key.clone(), "cached_file_info".into(), 3600)
1959 .await;
1960
1961 assert_eq!(
1963 ch.get_cached_upload(&key).await,
1964 Some("cached_file_info".to_string())
1965 );
1966 }
1967
1968 #[tokio::test]
1969 async fn test_upload_cache_expired() {
1970 let ch = QQChannel::new(
1971 "id".into(),
1972 "secret".into(),
1973 "qq_test_alias",
1974 Arc::new(Vec::new),
1975 );
1976 let key = QQChannel::upload_cache_key(b"test_data", "group", "g1", QQMediaFileType::Video);
1977
1978 ch.set_cached_upload(key.clone(), "old_info".into(), 0)
1980 .await;
1981
1982 assert!(ch.get_cached_upload(&key).await.is_none());
1984 }
1985
1986 #[test]
1989 fn test_auth_retry_constants_are_sensible() {
1990 const {
1991 assert!(AUTH_RETRY_MAX_ATTEMPTS >= 2, "should retry at least once");
1992 assert!(
1993 AUTH_RETRY_INITIAL_BACKOFF_MS > 0,
1994 "initial backoff must be positive"
1995 );
1996 assert!(
1997 AUTH_RETRY_MAX_BACKOFF_MS >= AUTH_RETRY_INITIAL_BACKOFF_MS,
1998 "max backoff must be >= initial"
1999 );
2000 }
2001 }
2002
2003 #[test]
2004 fn test_auth_retry_backoff_stays_within_bounds() {
2005 let mut backoff = AUTH_RETRY_INITIAL_BACKOFF_MS;
2007 for _ in 1..AUTH_RETRY_MAX_ATTEMPTS {
2008 backoff = (backoff * 2).min(AUTH_RETRY_MAX_BACKOFF_MS);
2009 }
2010 assert!(
2011 backoff <= AUTH_RETRY_MAX_BACKOFF_MS,
2012 "backoff must never exceed the configured maximum"
2013 );
2014 }
2015
2016 #[tokio::test]
2017 async fn test_get_token_returns_cached_token_without_fetch() {
2018 let ch = QQChannel::new(
2019 "id".into(),
2020 "secret".into(),
2021 "qq_test_alias",
2022 Arc::new(Vec::new),
2023 );
2024 let future_expiry = now_secs() + 3600;
2026 *ch.token_cache.write().await = Some(("cached_tok".to_string(), future_expiry));
2027
2028 let tok = ch.get_token().await.unwrap();
2030 assert_eq!(tok, "cached_tok");
2031 }
2032
2033 #[tokio::test]
2034 async fn test_get_token_refreshes_expired_cache() {
2035 let ch = QQChannel::new(
2036 "id".into(),
2037 "secret".into(),
2038 "qq_test_alias",
2039 Arc::new(Vec::new),
2040 );
2041 *ch.token_cache.write().await = Some(("old_tok".to_string(), 0));
2043
2044 let result = ch.get_token().await;
2047 assert!(
2048 result.is_err(),
2049 "should fail when token expired and no server available"
2050 );
2051 }
2052
2053 #[test]
2056 fn test_heartbeat_grace_period_calculation() {
2057 let cases: Vec<(u64, u64)> = vec![
2059 (41_250, 4_125), (30_000, 3_000), (60_000, 5_000), (100_000, 5_000), (5_000, 500), (0, 0), ];
2066 for (interval, expected_grace) in cases {
2067 let grace: u64 = (interval / 10).min(5_000);
2068 assert_eq!(
2069 grace, expected_grace,
2070 "grace for interval {interval} should be {expected_grace}"
2071 );
2072 let effective = interval.saturating_add(grace);
2073 assert!(effective >= interval);
2074 }
2075 }
2076
2077 #[test]
2078 fn test_missed_ack_counter_logic() {
2079 let max_missed: u32 = 3;
2080 let mut missed: u32 = 0;
2081
2082 assert!(missed < max_missed);
2084 missed += 1;
2085 assert_eq!(missed, 1, "counter should be 1 after first heartbeat");
2086
2087 missed = 0;
2089 assert_eq!(missed, 0, "counter should reset on ACK");
2090
2091 for _ in 0..max_missed {
2093 assert!(
2094 missed < max_missed,
2095 "should not reach zombie state before {max_missed} misses"
2096 );
2097 missed += 1;
2098 }
2099 assert!(
2100 missed >= max_missed,
2101 "should declare zombie after {max_missed} missed ACKs"
2102 );
2103 }
2104
2105 #[test]
2106 fn test_missed_ack_counter_reset_on_ack() {
2107 let max_missed: u32 = 3;
2108 let mut missed: u32 = 0;
2109
2110 missed += 1;
2111 missed += 1;
2112 assert_eq!(missed, 2);
2113
2114 missed = 0;
2116 assert_eq!(missed, 0);
2117
2118 missed += 1;
2120 assert!(missed < max_missed);
2121 }
2122
2123 #[test]
2124 fn test_effective_interval_never_overflows() {
2125 let interval = u64::MAX;
2126 let grace: u64 = (interval / 10).min(5_000);
2127 let effective = interval.saturating_add(grace);
2128 assert_eq!(effective, u64::MAX);
2129 }
2130}