1use aes::Aes256;
2use anyhow::{Context, Result};
3use async_trait::async_trait;
4use base64::Engine as _;
5use cbc::cipher::{BlockDecryptMut, KeyIvInit, block_padding::NoPadding};
6use futures_util::{SinkExt, StreamExt};
7use parking_lot::Mutex;
8use rand::RngExt;
9use serde_json::Value;
10use std::borrow::Cow;
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
15use tokio::sync::mpsc;
16use tokio_tungstenite::tungstenite::Message as WsMessage;
17use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
18use zeroclaw_config::schema::{StreamMode, WeComWsConfig};
19use zeroclaw_runtime::i18n;
20
21const WECOM_WS_URL: &str = "wss://openws.work.weixin.qq.com";
24const WECOM_BACKOFF_INITIAL_SECS: u64 = 5;
25const WECOM_BACKOFF_MAX_SECS: u64 = 60;
26const WECOM_PING_INTERVAL_SECS: u64 = 30;
27const WECOM_SUBSCRIBE_TIMEOUT_SECS: u64 = 10;
28const WECOM_COMMAND_TIMEOUT_SECS: u64 = 10;
29const WECOM_HTTP_TIMEOUT_SECS: u64 = 60;
30const WECOM_CONNECT_TIMEOUT_SECS: u64 = 15;
31const WECOM_WS_READY_WAIT_SECS: u64 = 10;
32const WECOM_WS_READY_POLL_MILLIS: u64 = 100;
33const WECOM_STREAM_CONFLICT_MAX_RETRIES: usize = 3;
34const WECOM_STREAM_CONFLICT_RETRY_BASE_MILLIS: u64 = 150;
35const WECOM_IDEMPOTENCY_MAX_KEYS: usize = 4096;
36const WECOM_PROVIDER_TRAILING_SENTINELS: &[&str] = &["<|eom|>"];
37
38const WECOM_MARKDOWN_MAX_BYTES: usize = 20_480;
39const WECOM_MARKDOWN_CHUNK_BYTES: usize = 8_000;
40const WECOM_EMOJIS: &[&str] = &[
41 "\u{1F642}",
42 "\u{1F604}",
43 "\u{1F91D}",
44 "\u{1F680}",
45 "\u{1F44C}",
46];
47const WECOM_FILE_CLEANUP_INTERVAL_SECS: u64 = 1800;
48macro_rules! wecom_log_debug {
49 ($($arg:tt)*) => {
50 ::zeroclaw_log::record!(
51 DEBUG,
52 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
53 format!($($arg)*),
54 )
55 };
56}
57
58fn wecom_ws_cli_string(key: &str) -> String {
59 i18n::get_required_cli_string(key)
60}
61
62fn wecom_ws_cli_string_with_args(key: &str, args: &[(&str, &str)]) -> String {
63 i18n::get_required_cli_string_with_args(key, args)
64}
65
66macro_rules! wecom_log_info {
67 ($($arg:tt)*) => {
68 ::zeroclaw_log::record!(
69 INFO,
70 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
71 format!($($arg)*),
72 )
73 };
74}
75
76macro_rules! wecom_log_warn {
77 ($($arg:tt)*) => {
78 ::zeroclaw_log::record!(
79 WARN,
80 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
81 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
82 format!($($arg)*),
83 )
84 };
85}
86
87macro_rules! wecom_log_error {
88 ($($arg:tt)*) => {
89 ::zeroclaw_log::record!(
90 ERROR,
91 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
92 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
93 format!($($arg)*),
94 )
95 };
96}
97
98enum WsOutbound {
101 Frame(Value),
102}
103
104#[derive(Debug, Clone)]
107struct ParsedInbound {
108 msg_id: String,
109 msg_type: String,
110 chat_type: String,
111 chat_id: Option<String>,
112 sender_userid: String,
113 aibot_id: String,
114 raw_payload: Value,
115}
116
117#[derive(Debug, Clone)]
118struct ScopeDecision {
119 conversation_scope: String,
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123enum AccessDecision {
124 Allowed,
125 AllowlistMissing,
126 Denied,
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130enum AttachmentKind {
131 Image,
132 File,
133}
134
135impl AttachmentKind {
136 fn as_str(self) -> &'static str {
137 match self {
138 Self::Image => "image",
139 Self::File => "file",
140 }
141 }
142}
143
144#[derive(Debug)]
145enum NormalizedMessage {
146 Ready(String),
147 VoiceMissingTranscript,
148 Unsupported,
149}
150
151struct SimpleIdempotencyStore {
152 seen: Mutex<HashSet<String>>,
153 order: Mutex<VecDeque<String>>,
154}
155
156impl SimpleIdempotencyStore {
157 fn new() -> Self {
158 Self {
159 seen: Mutex::new(HashSet::new()),
160 order: Mutex::new(VecDeque::new()),
161 }
162 }
163 fn record_if_new(&self, key: &str) -> bool {
164 let mut seen = self.seen.lock();
165 if !seen.insert(key.to_string()) {
166 return false;
167 }
168
169 let mut order = self.order.lock();
170 order.push_back(key.to_string());
171 while order.len() > WECOM_IDEMPOTENCY_MAX_KEYS {
172 if let Some(old_key) = order.pop_front() {
173 seen.remove(&old_key);
174 }
175 }
176 true
177 }
178}
179
180#[derive(Clone)]
181struct WeComRuntimeConfig {
182 workspace_dir: PathBuf,
183 allowed_groups: Vec<String>,
184 bot_name: Option<String>,
185 file_retention_days: u32,
186 max_file_size_bytes: u64,
187 stream_mode: StreamMode,
188 proxy_url: Option<String>,
189}
190
191struct MediaDecryptor;
194
195impl MediaDecryptor {
196 fn decrypt(aeskey_b64: &str, encrypted: &[u8]) -> Result<Vec<u8>> {
199 let raw_key = base64::engine::general_purpose::STANDARD
200 .decode(aeskey_b64.trim())
201 .or_else(|_| base64::engine::general_purpose::STANDARD_NO_PAD.decode(aeskey_b64.trim()))
202 .or_else(|_| base64::engine::general_purpose::URL_SAFE.decode(aeskey_b64.trim()))
203 .context("failed to decode WeCom media aeskey")?;
204
205 if raw_key.len() < 32 {
206 anyhow::bail!(
207 "WeCom media aeskey too short: expected >= 32 bytes, got {}",
208 raw_key.len()
209 );
210 }
211
212 let key = &raw_key[..32];
213 let iv = &key[..16];
214
215 let mut buf = encrypted.to_vec();
216 let plaintext = cbc::Decryptor::<Aes256>::new(key.into(), iv.into())
217 .decrypt_padded_mut::<NoPadding>(&mut buf)
218 .map_err(|_| anyhow::Error::msg("failed to decrypt WeCom media attachment"))?;
219 Ok(strip_wecom_padding(plaintext)?.to_vec())
220 }
221}
222
223#[derive(Clone)]
232pub struct WeComWsChannel {
233 bot_id: String,
234 secret: String,
235 alias: String,
236 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
237 cfg: WeComRuntimeConfig,
238 client: reqwest::Client,
239 ws_tx: Arc<tokio::sync::Mutex<Option<mpsc::Sender<WsOutbound>>>>,
240 pending_responses:
241 Arc<tokio::sync::Mutex<HashMap<String, tokio::sync::oneshot::Sender<Result<()>>>>>,
242 respond_msg_locks: Arc<tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
243 last_cleanup: Arc<Mutex<Instant>>,
244 idempotency: Arc<SimpleIdempotencyStore>,
245 req_id_map: Arc<Mutex<HashMap<String, String>>>, }
247
248impl WeComWsChannel {
251 pub fn new(config: &WeComWsConfig, workspace_dir: &Path) -> Result<Self> {
252 let allowed_users = normalize_wecom_allowlist(config.allowed_users.clone());
253 Self::new_with_alias(
254 config,
255 "default",
256 Arc::new(move || allowed_users.clone()),
257 workspace_dir,
258 )
259 }
260
261 pub fn new_with_alias(
262 config: &WeComWsConfig,
263 alias: impl Into<String>,
264 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
265 workspace_dir: &Path,
266 ) -> Result<Self> {
267 if config.stream_mode == StreamMode::MultiMessage {
268 anyhow::bail!(
269 "WeCom WebSocket stream_mode=multi_message is not supported; use partial or off"
270 );
271 }
272
273 let client = zeroclaw_config::schema::build_channel_proxy_client_with_timeouts(
274 "channel.wecom_ws",
275 config.proxy_url.as_deref(),
276 WECOM_HTTP_TIMEOUT_SECS,
277 WECOM_CONNECT_TIMEOUT_SECS,
278 );
279
280 Ok(Self {
281 bot_id: config.bot_id.clone(),
282 secret: config.secret.clone(),
283 alias: alias.into(),
284 peer_resolver,
285 cfg: WeComRuntimeConfig {
286 workspace_dir: workspace_dir.to_path_buf(),
287 allowed_groups: normalize_wecom_allowlist(config.allowed_groups.clone()),
288 bot_name: normalize_optional_wecom_identity(config.bot_name.as_deref()),
289 file_retention_days: config.file_retention_days,
290 max_file_size_bytes: config.max_file_size_mb.saturating_mul(1024 * 1024),
291 stream_mode: config.stream_mode,
292 proxy_url: config.proxy_url.clone(),
293 },
294 client,
295 ws_tx: Arc::new(tokio::sync::Mutex::new(None)),
296 pending_responses: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
297 respond_msg_locks: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
298 last_cleanup: Arc::new(Mutex::new(Instant::now())),
299 idempotency: Arc::new(SimpleIdempotencyStore::new()),
300 req_id_map: Arc::new(Mutex::new(HashMap::new())),
301 })
302 }
303
304 async fn wait_for_ws_sender(&self) -> Result<mpsc::Sender<WsOutbound>> {
305 let deadline = Instant::now() + Duration::from_secs(WECOM_WS_READY_WAIT_SECS);
306
307 loop {
308 if let Some(tx) = self.ws_tx.lock().await.as_ref().cloned() {
309 return Ok(tx);
310 }
311
312 if Instant::now() >= deadline {
313 anyhow::bail!("WeCom WebSocket not connected");
314 }
315
316 tokio::time::sleep(Duration::from_millis(WECOM_WS_READY_POLL_MILLIS)).await;
317 }
318 }
319
320 async fn ws_send_frame(&self, frame: Value) -> Result<()> {
322 let tx = self.wait_for_ws_sender().await?;
323 tx.send(WsOutbound::Frame(frame))
324 .await
325 .map_err(|_| anyhow::Error::msg("WeCom WS outbound channel closed"))
326 }
327
328 async fn ws_send_frame_and_wait_for_response(
329 &self,
330 frame: Value,
331 req_id: &str,
332 command: &str,
333 ) -> Result<()> {
334 let (tx, rx) = tokio::sync::oneshot::channel();
335 self.pending_responses
336 .lock()
337 .await
338 .insert(req_id.to_string(), tx);
339
340 if let Err(err) = self.ws_send_frame(frame).await {
341 self.pending_responses.lock().await.remove(req_id);
342 return Err(err);
343 }
344
345 match tokio::time::timeout(Duration::from_secs(WECOM_COMMAND_TIMEOUT_SECS), rx).await {
346 Ok(Ok(result)) => result,
347 Ok(Err(_)) => anyhow::bail!(
348 "WeCom WS {command} response channel closed before ack (req_id={req_id})"
349 ),
350 Err(_) => {
351 self.pending_responses.lock().await.remove(req_id);
352 anyhow::bail!(
353 "WeCom WS {command} ack timeout after {}s (req_id={req_id})",
354 WECOM_COMMAND_TIMEOUT_SECS
355 );
356 }
357 }
358 }
359
360 async fn maybe_handle_command_response(&self, frame: &Value) -> bool {
361 let Some(req_id) = frame
362 .get("headers")
363 .and_then(|headers| headers.get("req_id"))
364 .and_then(Value::as_str)
365 else {
366 return false;
367 };
368
369 let Some(errcode) = frame.get("errcode").and_then(Value::as_i64) else {
370 return false;
371 };
372
373 let errmsg = frame
374 .get("errmsg")
375 .and_then(Value::as_str)
376 .unwrap_or("unknown");
377
378 if let Some(waiter) = self.pending_responses.lock().await.remove(req_id) {
379 let result = if errcode == 0 {
380 Ok(())
381 } else {
382 Err(anyhow::Error::msg(format!(
383 "WeCom command failed: req_id={req_id} errcode={errcode} errmsg={errmsg}"
384 )))
385 };
386 let _ = waiter.send(result);
387 return true;
388 }
389
390 if errcode == 0 {
391 wecom_log_debug!(
392 "[wecom_ws] unsolicited command response req_id={req_id} errcode={errcode} errmsg={errmsg}"
393 );
394 } else {
395 wecom_log_warn!(
396 "[wecom_ws] command response failed without a waiter req_id={req_id} errcode={errcode} errmsg={errmsg}"
397 );
398 }
399
400 true
401 }
402
403 async fn respond_msg_lock_for_req_id(&self, req_id: &str) -> Arc<tokio::sync::Mutex<()>> {
404 self.respond_msg_locks
405 .lock()
406 .await
407 .entry(req_id.to_string())
408 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
409 .clone()
410 }
411
412 async fn cleanup_respond_msg_lock(&self, req_id: &str) {
413 self.respond_msg_locks.lock().await.remove(req_id);
414 }
415
416 async fn fail_pending_responses(&self, reason: &str) {
417 let pending = {
418 let mut guard = self.pending_responses.lock().await;
419 std::mem::take(&mut *guard)
420 };
421
422 for (req_id, waiter) in pending {
423 let _ = waiter.send(Err(anyhow::Error::msg(format!(
424 "WeCom WebSocket disconnected before response: req_id={req_id} reason={reason}"
425 ))));
426 }
427 }
428
429 fn access_decision(&self, inbound: &ParsedInbound) -> AccessDecision {
430 let allowed_users = normalize_wecom_allowlist((self.peer_resolver)());
431 evaluate_access_decision(&allowed_users, &self.cfg.allowed_groups, inbound)
432 }
433
434 fn compose_content_for_framework_with_bot_hint(
435 &self,
436 inbound: &ParsedInbound,
437 normalized: &str,
438 ) -> String {
439 compose_content_for_framework(inbound, normalized, self.cfg.bot_name.as_deref())
440 }
441
442 async fn respond_access_denied(
443 &self,
444 req_id: &str,
445 inbound: &ParsedInbound,
446 decision: AccessDecision,
447 ) {
448 let message = build_access_denied_message(inbound, decision, &self.alias);
449 let stream_id = next_stream_id();
450 if let Err(err) = self
451 .ws_queue_respond_msg(req_id, &stream_id, &message, true)
452 .await
453 {
454 wecom_log_warn!(
455 "[wecom_ws] failed to send access-denied response sender_userid={} chat_type={} chat_id={} error={err:#}",
456 inbound.sender_userid,
457 inbound.chat_type,
458 inbound.chat_id.as_deref().unwrap_or("-")
459 );
460 }
461 }
462
463 fn build_respond_msg_frame(
465 req_id: &str,
466 stream_id: &str,
467 content: &str,
468 finish: bool,
469 ) -> Value {
470 let stream_obj = serde_json::json!({
471 "id": stream_id,
472 "finish": finish,
473 "content": normalize_stream_content(content),
474 });
475 serde_json::json!({
476 "cmd": "aibot_respond_msg",
477 "headers": { "req_id": req_id },
478 "body": {
479 "msgtype": "stream",
480 "stream": stream_obj,
481 },
482 })
483 }
484
485 async fn ws_queue_respond_msg(
486 &self,
487 req_id: &str,
488 stream_id: &str,
489 content: &str,
490 finish: bool,
491 ) -> Result<()> {
492 let frame = Self::build_respond_msg_frame(req_id, stream_id, content, finish);
493 self.ws_send_frame(frame).await
494 }
495
496 async fn ws_send_respond_msg(
497 &self,
498 req_id: &str,
499 stream_id: &str,
500 content: &str,
501 finish: bool,
502 ) -> Result<()> {
503 let frame = Self::build_respond_msg_frame(req_id, stream_id, content, finish);
504 if req_id.is_empty() {
505 return self.ws_send_frame(frame).await;
506 }
507
508 let stream_lock = self.respond_msg_lock_for_req_id(req_id).await;
509 let _guard = stream_lock.lock().await;
510 let mut attempt = 0usize;
511
512 let result = loop {
513 match self
514 .ws_send_frame_and_wait_for_response(frame.clone(), req_id, "aibot_respond_msg")
515 .await
516 {
517 Ok(()) => break Ok(()),
518 Err(err)
519 if is_wecom_data_version_conflict_error(&err)
520 && attempt < WECOM_STREAM_CONFLICT_MAX_RETRIES =>
521 {
522 let retry_in_ms =
523 WECOM_STREAM_CONFLICT_RETRY_BASE_MILLIS.saturating_mul(1u64 << attempt);
524 attempt += 1;
525 wecom_log_warn!(
526 "WeCom stream reply hit data-version conflict; retrying req_id={req_id} stream_id={stream_id} attempt={attempt} retry_in_ms={retry_in_ms}"
527 );
528 tokio::time::sleep(Duration::from_millis(retry_in_ms)).await;
529 }
530 Err(err) => break Err(err),
531 }
532 };
533
534 if finish {
535 self.cleanup_respond_msg_lock(req_id).await;
536 }
537
538 result
539 }
540
541 fn maybe_cleanup_files(&self) {
544 let now = Instant::now();
545 {
546 let mut last = self.last_cleanup.lock();
547 if now.duration_since(*last) < Duration::from_secs(WECOM_FILE_CLEANUP_INTERVAL_SECS) {
548 return;
549 }
550 *last = now;
551 }
552
553 let retention = Duration::from_secs(u64::from(self.cfg.file_retention_days) * 86_400);
554 let root = self.cfg.workspace_dir.join("wecom_ws_files");
555 tokio::spawn(async move {
556 cleanup_inbox_files(root, retention).await;
557 });
558 }
559
560 async fn handle_ws_message(&self, frame: Value, tx: &mpsc::Sender<ChannelMessage>) -> bool {
564 if self.maybe_handle_command_response(&frame).await {
565 return false;
566 }
567
568 let cmd = frame.get("cmd").and_then(Value::as_str).unwrap_or("");
569
570 match cmd {
571 "aibot_msg_callback" => {
572 let channel = self.clone();
573 let tx = tx.clone();
574 tokio::spawn(async move {
575 channel.handle_msg_callback(frame, &tx).await;
576 });
577 false
578 }
579 "aibot_event_callback" => self.handle_event_callback(frame).await,
580 _ => {
581 wecom_log_debug!("[wecom_ws] ignoring WS frame cmd={cmd}");
582 false
583 }
584 }
585 }
586
587 async fn handle_msg_callback(&self, frame: Value, tx: &mpsc::Sender<ChannelMessage>) {
590 let req_id = frame
591 .get("headers")
592 .and_then(|h| h.get("req_id"))
593 .and_then(Value::as_str)
594 .unwrap_or("")
595 .to_string();
596
597 let body = match frame.get("body") {
598 Some(b) => b.clone(),
599 None => {
600 wecom_log_warn!("[wecom_ws] msg_callback missing body");
601 return;
602 }
603 };
604
605 let parsed = match parse_inbound_payload(body) {
606 Ok(p) => p,
607 Err(err) => {
608 wecom_log_warn!("[wecom_ws] msg_callback parse failed: {err:#}");
609 return;
610 }
611 };
612
613 if !parsed.msg_id.is_empty() {
615 let key = format!("wecom_ws_msg_{}", parsed.msg_id);
616 if !self.idempotency.record_if_new(&key) {
617 return;
618 }
619 }
620
621 let scopes = compute_scopes(&parsed);
622
623 let preview = crate::util::truncate_with_ellipsis(&inbound_content_preview(&parsed), 80);
625 let msg_id_str = if parsed.msg_id.trim().is_empty() {
626 "-"
627 } else {
628 parsed.msg_id.as_str()
629 };
630 wecom_log_info!(
631 "[wecom_ws] from {} in {}: {} (msg_type={}, msg_id={}, aibot_id={})",
632 parsed.sender_userid,
633 scopes.conversation_scope,
634 preview,
635 parsed.msg_type,
636 msg_id_str,
637 parsed.aibot_id
638 );
639
640 match self.access_decision(&parsed) {
641 AccessDecision::Allowed => {}
642 AccessDecision::AllowlistMissing => {
643 wecom_log_warn!(
644 "[wecom_ws] inbound denied because allowlist is not configured sender_userid={} chat_type={} chat_id={}",
645 parsed.sender_userid,
646 parsed.chat_type,
647 parsed.chat_id.as_deref().unwrap_or("-")
648 );
649 self.respond_access_denied(&req_id, &parsed, AccessDecision::AllowlistMissing)
650 .await;
651 return;
652 }
653 AccessDecision::Denied => {
654 wecom_log_warn!(
655 "[wecom_ws] inbound denied by allowlist sender_userid={} chat_type={} chat_id={}",
656 parsed.sender_userid,
657 parsed.chat_type,
658 parsed.chat_id.as_deref().unwrap_or("-")
659 );
660 self.respond_access_denied(&req_id, &parsed, AccessDecision::Denied)
661 .await;
662 return;
663 }
664 }
665
666 self.maybe_cleanup_files();
667
668 let stop_text = extract_stop_signal_text(&parsed).unwrap_or_default();
671
672 if is_clear_session_command(&stop_text) {
674 wecom_log_info!(
675 "WeCom session cleared: scope={} msg_id={}",
676 scopes.conversation_scope,
677 parsed.msg_id
678 );
679 let _ = tx
680 .send(ChannelMessage {
681 channel_alias: Some(self.alias.clone()),
682 thread_ts: Some(req_id),
683 ..ChannelMessage::new(
684 parsed.msg_id.clone(),
685 parsed.sender_userid.clone(),
686 scopes.conversation_scope.clone(),
687 "/new",
688 "wecom_ws",
689 bytes_timestamp_now(),
690 )
691 })
692 .await;
693 return;
694 }
695
696 if contains_stop_command(&stop_text) {
698 let msg = wecom_ws_cli_string("channel-wecom-ws-stop-ack");
699 let stream_id = next_stream_id();
700 let _ = self
701 .ws_queue_respond_msg(&req_id, &stream_id, &msg, true)
702 .await;
703 let _ = tx
704 .send(ChannelMessage {
705 channel_alias: Some(self.alias.clone()),
706 ..ChannelMessage::new(
707 parsed.msg_id.clone(),
708 parsed.sender_userid.clone(),
709 scopes.conversation_scope.clone(),
710 "/stop",
711 "wecom_ws",
712 bytes_timestamp_now(),
713 )
714 })
715 .await;
716 return;
717 }
718
719 if let Some(runtime_command) = extract_runtime_model_switch_command(&stop_text) {
720 wecom_log_info!(
721 "WeCom runtime command forwarded: scope={} msg_id={} command={}",
722 scopes.conversation_scope,
723 parsed.msg_id,
724 runtime_command
725 );
726 let _ = tx
727 .send(ChannelMessage {
728 channel_alias: Some(self.alias.clone()),
729 thread_ts: Some(req_id),
730 ..ChannelMessage::new(
731 parsed.msg_id.clone(),
732 parsed.sender_userid.clone(),
733 scopes.conversation_scope.clone(),
734 runtime_command,
735 "wecom_ws",
736 bytes_timestamp_now(),
737 )
738 })
739 .await;
740 return;
741 }
742
743 if is_voice_without_transcript(&parsed) {
745 let msg = wecom_ws_cli_string_with_args(
746 "channel-wecom-ws-voice-unavailable",
747 &[("emoji", random_emoji())],
748 );
749 let stream_id = next_stream_id();
750 let _ = self
751 .ws_queue_respond_msg(&req_id, &stream_id, &msg, true)
752 .await;
753 return;
754 }
755
756 if !is_model_supported_msgtype(&parsed.msg_type) {
758 wecom_log_info!(
759 "WeCom unsupported message ignored: msg_type={} msg_id={}",
760 parsed.msg_type,
761 parsed.msg_id
762 );
763 return;
764 }
765
766 let channel_self = self.clone();
769 let tx = tx.clone();
770 tokio::spawn(async move {
771 let mut inbound = parsed;
772 channel_self
773 .materialize_quote_attachments(&mut inbound)
774 .await;
775 let normalized = channel_self.normalize_message(&inbound).await;
776
777 let content = match normalized {
778 NormalizedMessage::VoiceMissingTranscript => {
779 let msg = wecom_ws_cli_string_with_args(
780 "channel-wecom-ws-voice-unavailable",
781 &[("emoji", random_emoji())],
782 );
783 let stream_id = next_stream_id();
784 let _ = channel_self
785 .ws_queue_respond_msg(&req_id, &stream_id, &msg, true)
786 .await;
787 return;
788 }
789 NormalizedMessage::Unsupported => {
790 let msg = wecom_ws_cli_string("channel-wecom-ws-unsupported-message");
791 let stream_id = next_stream_id();
792 let _ = channel_self
793 .ws_queue_respond_msg(&req_id, &stream_id, &msg, true)
794 .await;
795 return;
796 }
797 NormalizedMessage::Ready(content) => content,
798 };
799
800 let composed =
801 channel_self.compose_content_for_framework_with_bot_hint(&inbound, &content);
802
803 wecom_log_info!(
804 "WeCom: forwarding to framework: msg_id={} req_id={} scope={}",
805 inbound.msg_id,
806 req_id,
807 scopes.conversation_scope
808 );
809
810 let _ = tx
811 .send(ChannelMessage {
812 channel_alias: Some(channel_self.alias.clone()),
813 thread_ts: Some(req_id),
814 ..ChannelMessage::new(
815 inbound.msg_id.clone(),
816 inbound.sender_userid.clone(),
817 scopes.conversation_scope.clone(),
818 composed,
819 "wecom_ws",
820 bytes_timestamp_now(),
821 )
822 })
823 .await;
824 });
825 }
826
827 async fn handle_event_callback(&self, frame: Value) -> bool {
831 let req_id = frame
832 .get("headers")
833 .and_then(|h| h.get("req_id"))
834 .and_then(Value::as_str)
835 .unwrap_or("")
836 .to_string();
837
838 let body = frame.get("body").cloned().unwrap_or(Value::Null);
839 let event_type = parse_event_type(&body).unwrap_or_else(|| "unknown".to_string());
840
841 match event_type.as_str() {
842 "enter_chat" => {
843 let content = wecom_ws_cli_string_with_args(
844 "channel-wecom-ws-welcome",
845 &[("emoji", random_emoji())],
846 );
847 let welcome = serde_json::json!({
848 "cmd": "aibot_respond_welcome_msg",
849 "headers": { "req_id": req_id },
850 "body": {
851 "msgtype": "text",
852 "text": { "content": content }
853 }
854 });
855 let _ = self.ws_send_frame(welcome).await;
856 false
857 }
858 "template_card_event" => {
859 let event_key =
860 extract_template_card_event_key(&body).unwrap_or_else(|| "-".to_string());
861 wecom_log_info!("WeCom template_card_event received: event_key={event_key}");
862 false
863 }
864 "feedback_event" => {
865 let summary = extract_feedback_event_summary(&body)
866 .unwrap_or_else(|| "feedback=invalid-payload".to_string());
867 wecom_log_info!("WeCom feedback_event received: {summary}");
868 false
869 }
870 "disconnected_event" => {
871 wecom_log_warn!("[wecom_ws] received disconnected_event, triggering reconnect");
872 true
873 }
874 other => {
875 wecom_log_debug!("[wecom_ws] ignoring event_type={other}");
876 false
877 }
878 }
879 }
880
881 async fn materialize_quote_attachments(&self, inbound: &mut ParsedInbound) {
884 let quote_type = inbound
885 .raw_payload
886 .get("quote")
887 .and_then(|v| v.get("msgtype"))
888 .and_then(Value::as_str)
889 .map(str::trim)
890 .unwrap_or("");
891
892 if quote_type == "image" {
893 let quote_obj = inbound
894 .raw_payload
895 .get("quote")
896 .and_then(|v| v.get("image"));
897 let quote_url = quote_obj
898 .and_then(|v| v.get("url"))
899 .and_then(Value::as_str)
900 .map(str::trim)
901 .filter(|v| !v.is_empty())
902 .map(ToOwned::to_owned);
903 let aeskey = quote_obj
904 .and_then(|v| v.get("aeskey"))
905 .and_then(Value::as_str);
906 if let Some(url) = quote_url {
907 let marker = match self
908 .download_and_store_attachment(&url, AttachmentKind::Image, inbound, aeskey)
909 .await
910 {
911 Ok(value) => value,
912 Err(err) => {
913 log_attachment_processing_failure(
914 "WeCom quote image processing failed",
915 &err,
916 inbound,
917 AttachmentKind::Image,
918 &url,
919 );
920 "[\u{5f15}\u{7528}\u{56fe}\u{7247}\u{4e0b}\u{8f7d}\u{5931}\u{8d25}]"
921 .to_string()
922 }
923 };
924 if let Some(quote) = inbound.raw_payload.get_mut("quote") {
925 quote["image"] = serde_json::json!({ "local_path": marker });
926 }
927 }
928 return;
929 }
930
931 if quote_type == "file" {
932 let quote_obj = inbound.raw_payload.get("quote").and_then(|v| v.get("file"));
933 let quote_url = quote_obj
934 .and_then(|v| v.get("url"))
935 .and_then(Value::as_str)
936 .map(str::trim)
937 .filter(|v| !v.is_empty())
938 .map(ToOwned::to_owned);
939 let aeskey = quote_obj
940 .and_then(|v| v.get("aeskey"))
941 .and_then(Value::as_str);
942 if let Some(url) = quote_url {
943 let marker = match self
944 .download_and_store_attachment(&url, AttachmentKind::File, inbound, aeskey)
945 .await
946 {
947 Ok(value) => value,
948 Err(err) => {
949 log_attachment_processing_failure(
950 "WeCom quote file processing failed",
951 &err,
952 inbound,
953 AttachmentKind::File,
954 &url,
955 );
956 "[\u{5f15}\u{7528}\u{6587}\u{4ef6}\u{4e0b}\u{8f7d}\u{5931}\u{8d25}]"
957 .to_string()
958 }
959 };
960 if let Some(quote) = inbound.raw_payload.get_mut("quote") {
961 quote["file"] = serde_json::json!({ "local_path": marker });
962 }
963 }
964 return;
965 }
966
967 if quote_type == "mixed" {
968 let quote_images: Vec<(usize, String, Option<String>)> = inbound
969 .raw_payload
970 .get("quote")
971 .and_then(|v| v.get("mixed"))
972 .and_then(|v| v.get("msg_item"))
973 .and_then(Value::as_array)
974 .map(|items| {
975 items
976 .iter()
977 .enumerate()
978 .filter_map(|(idx, item)| {
979 let item_type = item
980 .get("msgtype")
981 .and_then(Value::as_str)
982 .unwrap_or_default();
983 if item_type != "image" {
984 return None;
985 }
986 let img = item.get("image")?;
987 let url = img
988 .get("url")
989 .and_then(Value::as_str)
990 .map(str::trim)
991 .filter(|v| !v.is_empty())?;
992 let aeskey = img
993 .get("aeskey")
994 .and_then(Value::as_str)
995 .map(ToOwned::to_owned);
996 Some((idx, url.to_string(), aeskey))
997 })
998 .collect()
999 })
1000 .unwrap_or_default();
1001
1002 if quote_images.is_empty() {
1003 return;
1004 }
1005
1006 let mut results: Vec<(usize, String)> = Vec::with_capacity(quote_images.len());
1007 for (idx, url, aeskey) in "e_images {
1008 let marker = match self
1009 .download_and_store_attachment(
1010 url,
1011 AttachmentKind::Image,
1012 inbound,
1013 aeskey.as_deref(),
1014 )
1015 .await
1016 {
1017 Ok(value) => value,
1018 Err(err) => {
1019 log_attachment_processing_failure(
1020 "WeCom quote mixed image processing failed",
1021 &err,
1022 inbound,
1023 AttachmentKind::Image,
1024 url,
1025 );
1026 "[\u{5f15}\u{7528}\u{56fe}\u{7247}\u{4e0b}\u{8f7d}\u{5931}\u{8d25}]"
1027 .to_string()
1028 }
1029 };
1030 results.push((*idx, marker));
1031 }
1032
1033 if let Some(items) = inbound
1034 .raw_payload
1035 .get_mut("quote")
1036 .and_then(|v| v.get_mut("mixed"))
1037 .and_then(|v| v.get_mut("msg_item"))
1038 .and_then(Value::as_array_mut)
1039 {
1040 for (idx, marker) in results {
1041 if let Some(item) = items.get_mut(idx) {
1042 item["image"] = serde_json::json!({ "local_path": marker });
1043 }
1044 }
1045 }
1046 }
1047 }
1048
1049 async fn normalize_message(&self, inbound: &ParsedInbound) -> NormalizedMessage {
1050 match inbound.msg_type.as_str() {
1051 "text" => {
1052 let content = inbound
1053 .raw_payload
1054 .get("text")
1055 .and_then(|v| v.get("content"))
1056 .and_then(Value::as_str)
1057 .unwrap_or("")
1058 .trim()
1059 .to_string();
1060
1061 if content.is_empty() {
1062 NormalizedMessage::Unsupported
1063 } else {
1064 NormalizedMessage::Ready(content)
1065 }
1066 }
1067 "voice" => {
1068 let content = inbound
1069 .raw_payload
1070 .get("voice")
1071 .and_then(|v| v.get("content"))
1072 .and_then(Value::as_str)
1073 .unwrap_or("")
1074 .trim()
1075 .to_string();
1076
1077 if content.is_empty() {
1078 NormalizedMessage::VoiceMissingTranscript
1079 } else {
1080 NormalizedMessage::Ready(format!("[Voice transcript]\n{content}"))
1081 }
1082 }
1083 "image" => {
1084 let image_obj = inbound.raw_payload.get("image");
1085 let url = image_obj
1086 .and_then(|v| v.get("url"))
1087 .and_then(Value::as_str)
1088 .unwrap_or("")
1089 .trim();
1090 let aeskey = image_obj
1091 .and_then(|v| v.get("aeskey"))
1092 .and_then(Value::as_str);
1093
1094 if url.is_empty() {
1095 return NormalizedMessage::Unsupported;
1096 }
1097
1098 match self
1099 .download_and_store_attachment(url, AttachmentKind::Image, inbound, aeskey)
1100 .await
1101 {
1102 Ok(marker) => NormalizedMessage::Ready(marker),
1103 Err(err) => {
1104 log_attachment_processing_failure(
1105 "WeCom image processing failed",
1106 &err,
1107 inbound,
1108 AttachmentKind::Image,
1109 url,
1110 );
1111 NormalizedMessage::Ready(
1112 "[Image attachment processing failed; please continue without this image.]"
1113 .to_string(),
1114 )
1115 }
1116 }
1117 }
1118 "file" => {
1119 let file_obj = inbound.raw_payload.get("file");
1120 let url = file_obj
1121 .and_then(|v| v.get("url"))
1122 .and_then(Value::as_str)
1123 .unwrap_or("")
1124 .trim();
1125 let aeskey = file_obj
1126 .and_then(|v| v.get("aeskey"))
1127 .and_then(Value::as_str);
1128
1129 if url.is_empty() {
1130 return NormalizedMessage::Unsupported;
1131 }
1132
1133 match self
1134 .download_and_store_attachment(url, AttachmentKind::File, inbound, aeskey)
1135 .await
1136 {
1137 Ok(marker) => NormalizedMessage::Ready(marker),
1138 Err(err) => {
1139 log_attachment_processing_failure(
1140 "WeCom file processing failed",
1141 &err,
1142 inbound,
1143 AttachmentKind::File,
1144 url,
1145 );
1146 NormalizedMessage::Ready(
1147 "[File attachment processing failed; please continue without this file.]"
1148 .to_string(),
1149 )
1150 }
1151 }
1152 }
1153 "mixed" => {
1154 let mut text_parts = Vec::new();
1155 if let Some(items) = inbound
1156 .raw_payload
1157 .get("mixed")
1158 .and_then(|v| v.get("msg_item"))
1159 .and_then(Value::as_array)
1160 {
1161 for item in items {
1162 let item_type = item
1163 .get("msgtype")
1164 .and_then(Value::as_str)
1165 .unwrap_or_default();
1166 if item_type == "text" {
1167 if let Some(text) = item
1168 .get("text")
1169 .and_then(|v| v.get("content"))
1170 .and_then(Value::as_str)
1171 {
1172 let trimmed = text.trim();
1173 if !trimmed.is_empty() {
1174 text_parts.push(trimmed.to_string());
1175 }
1176 }
1177 } else if item_type == "image" {
1178 let img = item.get("image");
1179 let url = img.and_then(|v| v.get("url")).and_then(Value::as_str);
1180 let aeskey = img.and_then(|v| v.get("aeskey")).and_then(Value::as_str);
1181 if let Some(url) = url {
1182 match self
1183 .download_and_store_attachment(
1184 url,
1185 AttachmentKind::Image,
1186 inbound,
1187 aeskey,
1188 )
1189 .await
1190 {
1191 Ok(marker) => text_parts.push(marker),
1192 Err(err) => {
1193 log_attachment_processing_failure(
1194 "WeCom mixed image processing failed",
1195 &err,
1196 inbound,
1197 AttachmentKind::Image,
1198 url,
1199 );
1200 text_parts.push(
1201 "[Image attachment processing failed in mixed message.]"
1202 .to_string(),
1203 );
1204 }
1205 }
1206 }
1207 }
1208 }
1209 }
1210
1211 if text_parts.is_empty() {
1212 NormalizedMessage::Unsupported
1213 } else {
1214 NormalizedMessage::Ready(text_parts.join("\n\n"))
1215 }
1216 }
1217 other => {
1218 wecom_log_info!(
1219 "[wecom_ws] unsupported msg_type={other}, raw_payload={}",
1220 inbound.raw_payload
1221 );
1222 NormalizedMessage::Unsupported
1223 }
1224 }
1225 }
1226
1227 async fn download_and_store_attachment(
1228 &self,
1229 url: &str,
1230 kind: AttachmentKind,
1231 inbound: &ParsedInbound,
1232 aeskey: Option<&str>,
1233 ) -> Result<String> {
1234 if self.cfg.max_file_size_bytes == 0 {
1235 anyhow::bail!("WeCom max_file_size_bytes is zero");
1236 }
1237
1238 let started = Instant::now();
1239 let chat_id = inbound.chat_id.as_deref().unwrap_or("single");
1240 let url_target = summarize_attachment_url_for_log(url);
1241 wecom_log_info!(
1242 "WeCom attachment download started msg_id={} msg_type={} chat_type={} chat_id={} sender_userid={} attachment_kind={} url_target={} has_aeskey={} timeout_secs={}",
1243 inbound.msg_id,
1244 inbound.msg_type,
1245 inbound.chat_type,
1246 chat_id,
1247 inbound.sender_userid,
1248 kind.as_str(),
1249 url_target,
1250 aeskey.is_some(),
1251 WECOM_HTTP_TIMEOUT_SECS
1252 );
1253
1254 let response = self
1255 .client
1256 .get(url)
1257 .send()
1258 .await
1259 .with_context(|| {
1260 format!(
1261 "failed to download WeCom attachment: kind={} msg_id={} url_target={} elapsed_ms={}",
1262 kind.as_str(),
1263 inbound.msg_id,
1264 url_target,
1265 started.elapsed().as_millis(),
1266 )
1267 })?;
1268 let status = response.status();
1269 if !status.is_success() {
1270 let body = response.text().await.unwrap_or_default();
1271 let body_preview = truncate_for_log(&body, 512);
1272 anyhow::bail!(
1273 "WeCom attachment download failed: kind={} msg_id={} url_target={} status={} body_preview={}",
1274 kind.as_str(),
1275 inbound.msg_id,
1276 url_target,
1277 status,
1278 body_preview
1279 );
1280 }
1281
1282 if let Some(len) = response.content_length()
1283 && len > self.cfg.max_file_size_bytes
1284 {
1285 wecom_log_warn!(
1286 "WeCom attachment skipped: declared size exceeds configured limit msg_id={} attachment_kind={} declared_bytes={} max_file_size_bytes={}",
1287 inbound.msg_id,
1288 kind.as_str(),
1289 len,
1290 self.cfg.max_file_size_bytes
1291 );
1292 return Ok(format!(
1293 "[AttachmentTooLarge kind={:?} size={}B limit={}B]",
1294 kind, len, self.cfg.max_file_size_bytes
1295 ));
1296 }
1297
1298 let bytes = response
1299 .bytes()
1300 .await
1301 .with_context(|| {
1302 format!(
1303 "failed to read WeCom attachment bytes: kind={} msg_id={} url_target={} elapsed_ms={}",
1304 kind.as_str(),
1305 inbound.msg_id,
1306 url_target,
1307 started.elapsed().as_millis(),
1308 )
1309 })?;
1310
1311 if bytes.len() as u64 > self.cfg.max_file_size_bytes {
1312 wecom_log_warn!(
1313 "WeCom attachment skipped: payload exceeds configured limit msg_id={} attachment_kind={} actual_bytes={} max_file_size_bytes={}",
1314 inbound.msg_id,
1315 kind.as_str(),
1316 bytes.len(),
1317 self.cfg.max_file_size_bytes
1318 );
1319 return Ok(format!(
1320 "[AttachmentTooLarge kind={:?} size={}B limit={}B]",
1321 kind,
1322 bytes.len(),
1323 self.cfg.max_file_size_bytes
1324 ));
1325 }
1326
1327 let stored_bytes: Cow<'_, [u8]> = match aeskey {
1329 Some(key) => Cow::Owned(MediaDecryptor::decrypt(key, &bytes).with_context(|| {
1330 format!(
1331 "failed to decrypt WeCom attachment: kind={} msg_id={} url_target={} encrypted_bytes={}",
1332 kind.as_str(),
1333 inbound.msg_id,
1334 url_target,
1335 bytes.len(),
1336 )
1337 })?),
1338 None => Cow::Borrowed(bytes.as_ref()),
1339 };
1340 let stored_len = stored_bytes.len();
1341
1342 let ext = match kind {
1343 AttachmentKind::Image => image_file_extension(stored_bytes.as_ref()),
1344 AttachmentKind::File => "bin",
1345 };
1346 let safe_scope = normalize_scope_component(&format!(
1347 "{}_{}",
1348 inbound.chat_id.as_deref().unwrap_or("single"),
1349 inbound.sender_userid
1350 ));
1351 let safe_msg_id = normalize_scope_component(&inbound.msg_id);
1352 let ts = bytes_timestamp_now();
1353 let file_name = format!(
1354 "{safe_scope}_{ts}_{safe_msg_id}_{}.{}",
1355 random_ascii_token(6),
1356 ext
1357 );
1358
1359 let dir = self.cfg.workspace_dir.join("wecom_ws_files");
1360 tokio::fs::create_dir_all(&dir).await.with_context(|| {
1361 format!(
1362 "failed to create WeCom inbox directory: msg_id={} path={}",
1363 inbound.msg_id,
1364 dir.display()
1365 )
1366 })?;
1367 let path = dir.join(file_name);
1368
1369 tokio::fs::write(&path, stored_bytes.as_ref())
1370 .await
1371 .with_context(|| {
1372 format!(
1373 "failed to persist WeCom attachment: kind={} msg_id={} path={}",
1374 kind.as_str(),
1375 inbound.msg_id,
1376 path.display()
1377 )
1378 })?;
1379
1380 self.maybe_cleanup_files();
1381
1382 let abs = path.canonicalize().unwrap_or(path);
1383 wecom_log_info!(
1384 "WeCom attachment download completed msg_id={} attachment_kind={} url_target={} encrypted_bytes={} stored_bytes={} local_path={} elapsed_ms={}",
1385 inbound.msg_id,
1386 kind.as_str(),
1387 url_target,
1388 bytes.len(),
1389 stored_len,
1390 abs.display(),
1391 started.elapsed().as_millis()
1392 );
1393 match kind {
1394 AttachmentKind::Image => Ok(format!("[IMAGE:{}]", abs.display())),
1395 AttachmentKind::File => Ok(format!("[Document: {}]", abs.display())),
1396 }
1397 }
1398
1399 async fn send_markdown_chunks_to_scope(&self, scope: &str, content: &str) -> Result<()> {
1400 let (chat_type, chatid) = parse_scope(scope)?;
1401 let chunks = split_markdown_chunks(content);
1402
1403 wecom_log_info!(
1404 "WeCom: sending message to scope={}, len={}, chunks={}",
1405 scope,
1406 content.len(),
1407 chunks.len()
1408 );
1409
1410 let total_chunks = chunks.len();
1411 for (idx, chunk) in chunks.into_iter().enumerate() {
1412 let req_id = random_ascii_token(16);
1413 let chunk_len = chunk.len();
1414 let frame = serde_json::json!({
1415 "cmd": "aibot_send_msg",
1416 "headers": { "req_id": req_id },
1417 "body": {
1418 "chatid": chatid,
1419 "chat_type": chat_type,
1420 "msgtype": "markdown",
1421 "markdown": { "content": chunk }
1422 }
1423 });
1424 self.ws_send_frame_and_wait_for_response(frame, &req_id, "aibot_send_msg")
1425 .await?;
1426 wecom_log_info!(
1427 "WeCom send ack received scope={scope} req_id={req_id} chunk_index={} chunk_count={total_chunks} chunk_len={chunk_len}",
1428 idx + 1
1429 );
1430 }
1431
1432 Ok(())
1433 }
1434}
1435
1436impl ::zeroclaw_api::attribution::Attributable for WeComWsChannel {
1439 fn role(&self) -> ::zeroclaw_api::attribution::Role {
1440 ::zeroclaw_api::attribution::Role::Channel(
1441 ::zeroclaw_api::attribution::ChannelKind::WeComWs,
1442 )
1443 }
1444
1445 fn alias(&self) -> &str {
1446 &self.alias
1447 }
1448}
1449
1450#[async_trait]
1451impl Channel for WeComWsChannel {
1452 fn name(&self) -> &str {
1453 "wecom_ws"
1454 }
1455
1456 async fn send(&self, message: &SendMessage) -> Result<()> {
1457 if let Some(req_id) = message
1458 .thread_ts
1459 .as_deref()
1460 .filter(|req_id| !req_id.is_empty())
1461 {
1462 let stream_id = next_stream_id();
1463 let (stream_content, overflow) = split_stream_content_and_overflow(&message.content);
1464
1465 self.ws_send_respond_msg(req_id, &stream_id, &stream_content, true)
1466 .await?;
1467
1468 if let Some(extra) = overflow {
1469 let extra_msg = wecom_ws_cli_string_with_args(
1470 "channel-wecom-ws-supplemental-message",
1471 &[("extra", &extra)],
1472 );
1473 self.send_markdown_chunks_to_scope(&message.recipient, &extra_msg)
1474 .await?;
1475 }
1476
1477 return Ok(());
1478 }
1479
1480 self.send_markdown_chunks_to_scope(&message.recipient, &message.content)
1481 .await
1482 }
1483
1484 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
1485 wecom_log_info!(
1486 "[wecom_ws] starting WebSocket listener (bot_id={})",
1487 self.bot_id
1488 );
1489
1490 let mut backoff = WECOM_BACKOFF_INITIAL_SECS;
1491
1492 loop {
1493 wecom_log_info!("[wecom_ws] connecting to {WECOM_WS_URL}");
1494
1495 let ws_stream = match zeroclaw_config::schema::ws_connect_with_proxy(
1496 WECOM_WS_URL,
1497 "channel.wecom_ws",
1498 self.cfg.proxy_url.as_deref(),
1499 )
1500 .await
1501 {
1502 Ok((stream, _)) => {
1503 wecom_log_info!("[wecom_ws] WebSocket connected");
1504 stream
1505 }
1506 Err(err) => {
1507 wecom_log_warn!(
1508 "[wecom_ws] WebSocket connect failed: {err:#}, retrying in {backoff}s"
1509 );
1510 tokio::time::sleep(Duration::from_secs(backoff)).await;
1511 backoff = (backoff * 2).min(WECOM_BACKOFF_MAX_SECS);
1512 continue;
1513 }
1514 };
1515
1516 let (mut ws_write, mut ws_read) = ws_stream.split();
1517
1518 let subscribe_req_id = random_ascii_token(16);
1520 let subscribe = serde_json::json!({
1521 "cmd": "aibot_subscribe",
1522 "headers": { "req_id": subscribe_req_id },
1523 "body": {
1524 "bot_id": self.bot_id,
1525 "secret": self.secret,
1526 },
1527 });
1528 if let Err(err) = ws_write
1529 .send(WsMessage::Text(subscribe.to_string().into()))
1530 .await
1531 {
1532 wecom_log_warn!(
1533 "[wecom_ws] subscribe send failed: {err:#}, retrying in {backoff}s"
1534 );
1535 tokio::time::sleep(Duration::from_secs(backoff)).await;
1536 backoff = (backoff * 2).min(WECOM_BACKOFF_MAX_SECS);
1537 continue;
1538 }
1539
1540 let subscribe_ok = match tokio::time::timeout(
1542 Duration::from_secs(WECOM_SUBSCRIBE_TIMEOUT_SECS),
1543 ws_read.next(),
1544 )
1545 .await
1546 {
1547 Ok(Some(Ok(WsMessage::Text(text)))) => match serde_json::from_str::<Value>(&text) {
1548 Ok(val) => {
1549 if let Some(resp_req_id) = val
1550 .get("headers")
1551 .and_then(|h| h.get("req_id"))
1552 .and_then(Value::as_str)
1553 && resp_req_id != subscribe_req_id
1554 {
1555 wecom_log_warn!(
1556 "[wecom_ws] subscribe response req_id mismatch expected_req_id={subscribe_req_id} got_req_id={resp_req_id}"
1557 );
1558 }
1559 let errcode = val.get("errcode").and_then(Value::as_i64).unwrap_or(-1);
1560 if errcode == 0 {
1561 wecom_log_info!("[wecom_ws] subscribe succeeded");
1562 true
1563 } else {
1564 let errmsg = val
1565 .get("errmsg")
1566 .and_then(Value::as_str)
1567 .unwrap_or("unknown");
1568 wecom_log_error!(
1569 "[wecom_ws] subscribe rejected: errcode={errcode} errmsg={errmsg}"
1570 );
1571 false
1572 }
1573 }
1574 Err(err) => {
1575 wecom_log_warn!("[wecom_ws] subscribe response parse failed: {err:#}");
1576 false
1577 }
1578 },
1579 Ok(Some(Ok(_))) => {
1580 wecom_log_warn!("[wecom_ws] unexpected subscribe response frame type");
1581 false
1582 }
1583 Ok(Some(Err(err))) => {
1584 wecom_log_warn!("[wecom_ws] subscribe response read error: {err:#}");
1585 false
1586 }
1587 Ok(None) => {
1588 wecom_log_warn!("[wecom_ws] WebSocket closed before subscribe response");
1589 false
1590 }
1591 Err(_) => {
1592 wecom_log_warn!("[wecom_ws] subscribe response timeout");
1593 false
1594 }
1595 };
1596
1597 if !subscribe_ok {
1598 tokio::time::sleep(Duration::from_secs(backoff)).await;
1599 backoff = (backoff * 2).min(WECOM_BACKOFF_MAX_SECS);
1600 continue;
1601 }
1602
1603 let (out_tx, mut out_rx) = mpsc::channel::<WsOutbound>(64);
1605 *self.ws_tx.lock().await = Some(out_tx);
1606 backoff = WECOM_BACKOFF_INITIAL_SECS; let mut ping_interval =
1609 tokio::time::interval(Duration::from_secs(WECOM_PING_INTERVAL_SECS));
1610 ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1611
1612 let mut should_reconnect = false;
1613
1614 loop {
1616 tokio::select! {
1617 _ = ping_interval.tick() => {
1618 let ping = serde_json::json!({
1619 "cmd": "ping",
1620 "headers": { "req_id": random_ascii_token(16) },
1621 });
1622 if let Err(err) = ws_write
1623 .send(WsMessage::Text(ping.to_string().into()))
1624 .await
1625 {
1626 wecom_log_warn!("[wecom_ws] ping send failed: {err:#}");
1627 break;
1628 }
1629 }
1630 Some(outbound) = out_rx.recv() => {
1631 match outbound {
1632 WsOutbound::Frame(value) => {
1633 if let Err(err) = ws_write
1634 .send(WsMessage::Text(value.to_string().into()))
1635 .await
1636 {
1637 wecom_log_warn!(
1638 "[wecom_ws] outbound frame send failed: {err:#}"
1639 );
1640 break;
1641 }
1642 }
1643 }
1644 }
1645 msg = ws_read.next() => {
1646 match msg {
1647 Some(Ok(WsMessage::Text(text))) => {
1648 match serde_json::from_str::<Value>(&text) {
1649 Ok(frame) => {
1650 should_reconnect =
1651 self.handle_ws_message(frame, &tx).await;
1652 if should_reconnect {
1653 break;
1654 }
1655 }
1656 Err(err) => {
1657 wecom_log_warn!(
1658 "[wecom_ws] WS frame parse error: {err:#}"
1659 );
1660 }
1661 }
1662 }
1663 Some(Ok(WsMessage::Close(_))) => {
1664 wecom_log_info!("[wecom_ws] WebSocket closed by server");
1665 break;
1666 }
1667 Some(Ok(WsMessage::Pong(_) | _)) => {}
1668 Some(Err(err)) => {
1669 wecom_log_warn!("[wecom_ws] WS read error: {err:#}");
1670 break;
1671 }
1672 None => {
1673 wecom_log_info!("[wecom_ws] WebSocket stream ended");
1674 break;
1675 }
1676 }
1677 }
1678 }
1679 }
1680
1681 *self.ws_tx.lock().await = None;
1683 self.fail_pending_responses("socket disconnected").await;
1684
1685 if should_reconnect {
1686 wecom_log_info!("[wecom_ws] disconnected (server event), reconnecting immediately");
1688 backoff = WECOM_BACKOFF_INITIAL_SECS;
1689 } else {
1690 wecom_log_info!("[wecom_ws] disconnected, will reconnect in {backoff}s");
1691 tokio::time::sleep(Duration::from_secs(backoff)).await;
1692 backoff = (backoff * 2).min(WECOM_BACKOFF_MAX_SECS);
1693 }
1694 }
1695 }
1696
1697 async fn health_check(&self) -> bool {
1698 self.ws_tx.lock().await.is_some()
1699 }
1700
1701 fn supports_draft_updates(&self) -> bool {
1702 self.cfg.stream_mode != StreamMode::Off
1703 }
1704
1705 async fn send_draft(&self, message: &SendMessage) -> Result<Option<String>> {
1706 if self.cfg.stream_mode == StreamMode::Off {
1707 return Ok(None);
1708 }
1709
1710 let req_id = message.thread_ts.as_deref().unwrap_or("");
1712 if req_id.is_empty() {
1713 return Ok(None);
1714 }
1715 let stream_id = next_stream_id();
1716
1717 let bootstrap = wecom_ws_cli_string("channel-wecom-ws-stream-bootstrap");
1718 self.ws_send_respond_msg(req_id, &stream_id, &bootstrap, false)
1719 .await?;
1720 self.req_id_map
1721 .lock()
1722 .insert(stream_id.clone(), req_id.to_string());
1723 Ok(Some(stream_id))
1724 }
1725
1726 async fn update_draft(&self, _recipient: &str, message_id: &str, content: &str) -> Result<()> {
1727 let req_id = self
1728 .req_id_map
1729 .lock()
1730 .get(message_id)
1731 .cloned()
1732 .unwrap_or_default();
1733 if req_id.is_empty() {
1734 return Ok(());
1735 }
1736 self.ws_send_respond_msg(&req_id, message_id, content, false)
1737 .await?;
1738 Ok(())
1739 }
1740
1741 async fn finalize_draft(&self, recipient: &str, message_id: &str, content: &str) -> Result<()> {
1742 let req_id = self
1743 .req_id_map
1744 .lock()
1745 .remove(message_id)
1746 .unwrap_or_default();
1747
1748 let (stream_content, overflow) = split_stream_content_and_overflow(content);
1749
1750 if !req_id.is_empty() {
1751 self.ws_send_respond_msg(&req_id, message_id, &stream_content, true)
1752 .await?;
1753 }
1754
1755 if let Some(extra) = overflow {
1757 let extra_msg = format!("[\u{8865}\u{5145}\u{6d88}\u{606f}]\n{extra}");
1758 if let Ok((chat_type, chatid)) = parse_scope(recipient) {
1759 for chunk in split_markdown_chunks(&extra_msg) {
1760 let frame = serde_json::json!({
1761 "cmd": "aibot_send_msg",
1762 "headers": { "req_id": random_ascii_token(16) },
1763 "body": {
1764 "chatid": chatid,
1765 "chat_type": chat_type,
1766 "msgtype": "markdown",
1767 "markdown": { "content": chunk }
1768 }
1769 });
1770 let _ = self.ws_send_frame(frame).await;
1771 }
1772 }
1773 }
1774
1775 Ok(())
1776 }
1777
1778 async fn cancel_draft(&self, _recipient: &str, message_id: &str) -> Result<()> {
1779 let req_id = self
1780 .req_id_map
1781 .lock()
1782 .remove(message_id)
1783 .unwrap_or_default();
1784 if !req_id.is_empty() {
1785 self.ws_send_respond_msg(&req_id, message_id, "", true)
1786 .await?;
1787 }
1788 Ok(())
1789 }
1790}
1791
1792fn strip_wecom_padding(input: &[u8]) -> Result<&[u8]> {
1795 let Some(last) = input.last() else {
1796 anyhow::bail!("invalid WeCom padding: empty payload");
1797 };
1798 let pad_len = *last as usize;
1799 if pad_len == 0 || pad_len > 32 || pad_len > input.len() {
1800 anyhow::bail!("invalid WeCom padding length");
1801 }
1802 Ok(&input[..input.len() - pad_len])
1803}
1804
1805fn is_wecom_data_version_conflict_error(err: &anyhow::Error) -> bool {
1806 let msg = err.to_string();
1807 msg.contains("errcode=6000") || msg.contains("data version conflict")
1808}
1809
1810fn parse_inbound_payload(payload: Value) -> Result<ParsedInbound> {
1811 let msg_type = payload
1812 .get("msgtype")
1813 .and_then(Value::as_str)
1814 .unwrap_or("")
1815 .to_string();
1816 if msg_type.is_empty() {
1817 anyhow::bail!("missing msgtype");
1818 }
1819
1820 let msg_id = payload
1821 .get("msgid")
1822 .and_then(Value::as_str)
1823 .unwrap_or("")
1824 .to_string();
1825
1826 let chat_type = payload
1827 .get("chattype")
1828 .and_then(Value::as_str)
1829 .unwrap_or("single")
1830 .to_string();
1831
1832 let chat_id = payload
1833 .get("chatid")
1834 .and_then(Value::as_str)
1835 .map(ToOwned::to_owned);
1836
1837 let sender_userid = payload
1838 .get("from")
1839 .and_then(|v| v.get("userid"))
1840 .and_then(Value::as_str)
1841 .unwrap_or("unknown")
1842 .to_string();
1843
1844 let aibot_id = payload
1845 .get("aibotid")
1846 .and_then(Value::as_str)
1847 .unwrap_or("unknown")
1848 .to_string();
1849
1850 Ok(ParsedInbound {
1851 msg_id,
1852 msg_type,
1853 chat_type,
1854 chat_id,
1855 sender_userid,
1856 aibot_id,
1857 raw_payload: payload,
1858 })
1859}
1860
1861fn compute_scopes(inbound: &ParsedInbound) -> ScopeDecision {
1862 let chat_type = inbound.chat_type.to_ascii_lowercase();
1863 if chat_type == "group" {
1864 let chat_id = inbound
1865 .chat_id
1866 .clone()
1867 .unwrap_or_else(|| "unknown".to_string());
1868 let scope = format!("group--{chat_id}");
1869 return ScopeDecision {
1870 conversation_scope: scope,
1871 };
1872 }
1873
1874 let scope = format!("user--{}", inbound.sender_userid);
1875 ScopeDecision {
1876 conversation_scope: scope,
1877 }
1878}
1879
1880fn normalize_wecom_identity(value: &str) -> String {
1881 value.trim().to_string()
1882}
1883
1884fn normalize_optional_wecom_identity(value: Option<&str>) -> Option<String> {
1885 value
1886 .map(normalize_wecom_identity)
1887 .filter(|value| !value.is_empty())
1888}
1889
1890fn normalize_wecom_allowlist(entries: Vec<String>) -> Vec<String> {
1891 entries
1892 .into_iter()
1893 .map(|entry| normalize_wecom_identity(&entry))
1894 .filter(|entry| !entry.is_empty())
1895 .collect()
1896}
1897
1898fn allowlist_matches(allowlist: &[String], candidate: &str) -> bool {
1899 let candidate = normalize_wecom_identity(candidate);
1900 !candidate.is_empty()
1901 && allowlist
1902 .iter()
1903 .any(|entry| entry == "*" || entry == &candidate)
1904}
1905
1906fn evaluate_access_decision(
1907 allowed_users: &[String],
1908 allowed_groups: &[String],
1909 inbound: &ParsedInbound,
1910) -> AccessDecision {
1911 if allowed_users.is_empty() && allowed_groups.is_empty() {
1912 return AccessDecision::AllowlistMissing;
1913 }
1914
1915 if allowlist_matches(allowed_users, &inbound.sender_userid) {
1916 return AccessDecision::Allowed;
1917 }
1918
1919 if inbound.chat_type.eq_ignore_ascii_case("group")
1920 && inbound
1921 .chat_id
1922 .as_deref()
1923 .is_some_and(|chat_id| allowlist_matches(allowed_groups, chat_id))
1924 {
1925 return AccessDecision::Allowed;
1926 }
1927
1928 AccessDecision::Denied
1929}
1930
1931fn build_access_denied_message(
1932 inbound: &ParsedInbound,
1933 decision: AccessDecision,
1934 alias: &str,
1935) -> String {
1936 let userid = normalize_wecom_identity(&inbound.sender_userid);
1937 let userid = if userid.is_empty() {
1938 "unknown"
1939 } else {
1940 userid.as_str()
1941 };
1942
1943 if inbound.chat_type.eq_ignore_ascii_case("group") {
1944 let chatid = inbound
1945 .chat_id
1946 .as_deref()
1947 .map(normalize_wecom_identity)
1948 .filter(|chatid| !chatid.is_empty())
1949 .unwrap_or_else(|| "unknown".to_string());
1950 let allowed_groups_path = format!("channels.wecom_ws.{alias}.allowed_groups");
1951 let allowed_users_path = format!("channels.wecom_ws.{alias}.allowed_users");
1952 return match decision {
1953 AccessDecision::AllowlistMissing => wecom_ws_cli_string_with_args(
1954 "channel-wecom-ws-group-allowlist-missing",
1955 &[
1956 ("chatid", &chatid),
1957 ("userid", userid),
1958 ("allowed_groups_path", &allowed_groups_path),
1959 ("allowed_users_path", &allowed_users_path),
1960 ],
1961 ),
1962 AccessDecision::Denied => wecom_ws_cli_string_with_args(
1963 "channel-wecom-ws-group-access-denied",
1964 &[
1965 ("chatid", &chatid),
1966 ("userid", userid),
1967 ("allowed_groups_path", &allowed_groups_path),
1968 ("allowed_users_path", &allowed_users_path),
1969 ],
1970 ),
1971 AccessDecision::Allowed => String::new(),
1972 };
1973 }
1974
1975 let allowed_users_path = format!("channels.wecom_ws.{alias}.allowed_users");
1976 match decision {
1977 AccessDecision::AllowlistMissing => wecom_ws_cli_string_with_args(
1978 "channel-wecom-ws-dm-allowlist-missing",
1979 &[
1980 ("userid", userid),
1981 ("allowed_users_path", &allowed_users_path),
1982 ],
1983 ),
1984 AccessDecision::Denied => wecom_ws_cli_string_with_args(
1985 "channel-wecom-ws-dm-access-denied",
1986 &[
1987 ("userid", userid),
1988 ("allowed_users_path", &allowed_users_path),
1989 ],
1990 ),
1991 AccessDecision::Allowed => String::new(),
1992 }
1993}
1994
1995fn compose_content_for_framework(
1998 inbound: &ParsedInbound,
1999 normalized: &str,
2000 bot_name: Option<&str>,
2001) -> String {
2002 let quote_context = extract_quote_context(&inbound.raw_payload);
2003 let mention_hint = build_group_bot_mention_hint(inbound, normalized, bot_name);
2004 let body = match mention_hint {
2005 Some(hint) => format!("{hint}\n{normalized}"),
2006 None => normalized.to_string(),
2007 };
2008
2009 match quote_context {
2010 Some(quote) => format!("{quote}\n\n{body}"),
2011 None => body,
2012 }
2013}
2014
2015fn build_group_bot_mention_hint(
2016 inbound: &ParsedInbound,
2017 normalized: &str,
2018 bot_name: Option<&str>,
2019) -> Option<String> {
2020 if !inbound.chat_type.eq_ignore_ascii_case("group") {
2021 return None;
2022 }
2023
2024 let bot_name = bot_name.map(str::trim).filter(|name| !name.is_empty())?;
2025 if !text_mentions_bot_name(normalized, bot_name) {
2026 return None;
2027 }
2028
2029 Some(format!(
2030 "[WeCom group message addressed to this bot via @{bot_name}]"
2031 ))
2032}
2033
2034fn text_mentions_bot_name(text: &str, bot_name: &str) -> bool {
2035 let needle = format!("@{}", bot_name.trim());
2036 if needle == "@" {
2037 return false;
2038 }
2039
2040 text.match_indices(&needle).any(|(start, _)| {
2041 let after = start + needle.len();
2042 text[after..]
2043 .chars()
2044 .next()
2045 .is_none_or(|ch| ch.is_whitespace() || ch.is_ascii_punctuation())
2046 })
2047}
2048
2049fn normalize_scope_component(raw: &str) -> String {
2050 raw.chars()
2051 .map(|ch| {
2052 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
2053 ch
2054 } else {
2055 '_'
2056 }
2057 })
2058 .collect()
2059}
2060
2061fn image_file_extension(bytes: &[u8]) -> &'static str {
2062 if bytes.starts_with(b"\x89PNG\r\n\x1a\n") {
2063 "png"
2064 } else if bytes.starts_with(&[0xff, 0xd8, 0xff]) {
2065 "jpg"
2066 } else if bytes.starts_with(b"GIF87a") || bytes.starts_with(b"GIF89a") {
2067 "gif"
2068 } else if bytes.len() >= 12 && &bytes[..4] == b"RIFF" && &bytes[8..12] == b"WEBP" {
2069 "webp"
2070 } else {
2071 "bin"
2072 }
2073}
2074
2075fn parse_scope(scope: &str) -> Result<(u32, &str)> {
2078 if let Some(userid) = scope.strip_prefix("user--") {
2079 Ok((1, userid))
2080 } else if let Some(chatid) = scope.strip_prefix("group--") {
2081 Ok((2, chatid))
2082 } else {
2083 anyhow::bail!("WeCom: invalid scope format: {scope}")
2084 }
2085}
2086
2087fn summarize_attachment_url_for_log(url: &str) -> String {
2088 let trimmed = url.trim();
2089 if trimmed.is_empty() {
2090 return "empty-url".to_string();
2091 }
2092 match reqwest::Url::parse(trimmed) {
2093 Ok(parsed) => {
2094 let host = parsed.host_str().unwrap_or("unknown-host");
2095 let query_state = if parsed.query().is_some() {
2096 "query=present"
2097 } else {
2098 "query=none"
2099 };
2100 format!(
2101 "{}://{}{} ({query_state})",
2102 parsed.scheme(),
2103 host,
2104 parsed.path()
2105 )
2106 }
2107 Err(_) => format!("invalid-url(len={})", trimmed.len()),
2108 }
2109}
2110
2111fn truncate_for_log(input: &str, max_chars: usize) -> String {
2112 if input.chars().count() <= max_chars {
2113 return input.to_string();
2114 }
2115 let prefix: String = input.chars().take(max_chars).collect();
2116 format!("{prefix}...(truncated)")
2117}
2118
2119fn log_attachment_processing_failure(
2120 stage: &str,
2121 err: &anyhow::Error,
2122 inbound: &ParsedInbound,
2123 kind: AttachmentKind,
2124 url: &str,
2125) {
2126 wecom_log_warn!(
2127 "{stage} msg_id={} msg_type={} chat_type={} chat_id={} sender_userid={} attachment_kind={} url_target={} error={err:#}",
2128 inbound.msg_id,
2129 inbound.msg_type,
2130 inbound.chat_type,
2131 inbound.chat_id.as_deref().unwrap_or("single"),
2132 inbound.sender_userid,
2133 kind.as_str(),
2134 summarize_attachment_url_for_log(url)
2135 );
2136}
2137
2138fn random_emoji() -> &'static str {
2139 let idx = rand::rng().random_range(0..WECOM_EMOJIS.len());
2140 WECOM_EMOJIS[idx]
2141}
2142
2143fn random_ascii_token(len: usize) -> String {
2144 const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
2145 let mut out = String::with_capacity(len);
2146 let mut rng = rand::rng();
2147 for _ in 0..len {
2148 let idx = rng.random_range(0..CHARSET.len());
2149 out.push(CHARSET[idx] as char);
2150 }
2151 out
2152}
2153
2154fn next_stream_id() -> String {
2155 format!("zs_{}", random_ascii_token(20))
2156}
2157
2158fn contains_stop_command(text: &str) -> bool {
2159 let stripped = strip_edge_mentions(text);
2160 if stripped.contains("\u{505c}\u{6b62}") {
2161 return true;
2162 }
2163 stripped.split_whitespace().any(|word| {
2164 let token = word
2165 .trim_matches(|ch: char| !ch.is_ascii_alphanumeric() && ch != '/')
2166 .to_ascii_lowercase();
2167 token == "stop" || token == "/stop"
2168 })
2169}
2170
2171fn is_clear_session_command(text: &str) -> bool {
2172 let stripped = strip_edge_mentions(text);
2173 stripped.eq_ignore_ascii_case("/clear") || stripped.eq_ignore_ascii_case("/new")
2174}
2175
2176fn extract_runtime_model_switch_command(text: &str) -> Option<String> {
2177 let stripped = strip_edge_mentions(text);
2178 if stripped.is_empty() || !stripped.starts_with('/') {
2179 return None;
2180 }
2181
2182 let command_token = stripped.split_whitespace().next()?;
2183 let base_command = command_token.split('@').next().unwrap_or(command_token);
2184 if base_command.eq_ignore_ascii_case("/model") || base_command.eq_ignore_ascii_case("/models") {
2185 Some(stripped)
2186 } else {
2187 None
2188 }
2189}
2190
2191fn strip_edge_mentions(text: &str) -> String {
2192 let s = text.trim();
2193 if s.is_empty() {
2194 return String::new();
2195 }
2196
2197 let bytes = s.as_bytes();
2198 let len = bytes.len();
2199 let mut start = 0usize;
2200 loop {
2201 while start < len && bytes[start].is_ascii_whitespace() {
2202 start += 1;
2203 }
2204 if start >= len || bytes[start] != b'@' {
2205 break;
2206 }
2207 start += 1;
2208 while start < len && !bytes[start].is_ascii_whitespace() {
2209 start += 1;
2210 }
2211 }
2212
2213 let mut end = len;
2214 loop {
2215 while end > start && bytes[end - 1].is_ascii_whitespace() {
2216 end -= 1;
2217 }
2218 if end <= start {
2219 break;
2220 }
2221 let mut probe = end;
2222 while probe > start && !bytes[probe - 1].is_ascii_whitespace() && bytes[probe - 1] != b'@' {
2223 probe -= 1;
2224 }
2225 if probe > start && bytes[probe - 1] == b'@' {
2226 end = probe - 1;
2227 } else {
2228 break;
2229 }
2230 }
2231
2232 s[start..end].trim().to_string()
2233}
2234
2235fn extract_stop_signal_text(inbound: &ParsedInbound) -> Option<String> {
2236 match inbound.msg_type.as_str() {
2237 "text" => inbound
2238 .raw_payload
2239 .get("text")
2240 .and_then(|v| v.get("content"))
2241 .and_then(Value::as_str)
2242 .map(str::trim)
2243 .filter(|v| !v.is_empty())
2244 .map(ToOwned::to_owned),
2245 "voice" => inbound
2246 .raw_payload
2247 .get("voice")
2248 .and_then(|v| v.get("content"))
2249 .and_then(Value::as_str)
2250 .map(str::trim)
2251 .filter(|v| !v.is_empty())
2252 .map(ToOwned::to_owned),
2253 "mixed" => {
2254 let mut texts = Vec::new();
2255 let items = inbound
2256 .raw_payload
2257 .get("mixed")
2258 .and_then(|v| v.get("msg_item"))
2259 .and_then(Value::as_array)?;
2260 for item in items {
2261 if item
2262 .get("msgtype")
2263 .and_then(Value::as_str)
2264 .is_some_and(|v| v == "text")
2265 && let Some(content) = item
2266 .get("text")
2267 .and_then(|v| v.get("content"))
2268 .and_then(Value::as_str)
2269 .map(str::trim)
2270 .filter(|v| !v.is_empty())
2271 {
2272 texts.push(content.to_string());
2273 }
2274 }
2275 if texts.is_empty() {
2276 None
2277 } else {
2278 Some(texts.join("\n"))
2279 }
2280 }
2281 _ => None,
2282 }
2283}
2284
2285fn inbound_content_preview(inbound: &ParsedInbound) -> String {
2286 if let Some(text) = extract_stop_signal_text(inbound) {
2287 return text;
2288 }
2289
2290 match inbound.msg_type.as_str() {
2291 "image" => "[Image message]".to_string(),
2292 "file" => inbound
2293 .raw_payload
2294 .get("file")
2295 .and_then(|v| v.get("filename"))
2296 .and_then(Value::as_str)
2297 .map(|name| format!("[File message: {name}]"))
2298 .unwrap_or_else(|| "[File message]".to_string()),
2299 "event" => "[Event callback]".to_string(),
2300 other => format!("[{other} message]"),
2301 }
2302}
2303
2304fn trim_utf8_to_max_bytes(input: &str, max_bytes: usize) -> String {
2305 if input.len() <= max_bytes {
2306 return input.to_string();
2307 }
2308 let mut out = String::new();
2309 for ch in input.chars() {
2310 if out.len() + ch.len_utf8() > max_bytes {
2311 break;
2312 }
2313 out.push(ch);
2314 }
2315 out
2316}
2317
2318fn normalize_stream_content(input: &str) -> String {
2319 let sanitized = strip_trailing_provider_sentinels(input);
2320 trim_utf8_to_max_bytes(&sanitized, WECOM_MARKDOWN_MAX_BYTES)
2321}
2322
2323fn split_stream_content_and_overflow(input: &str) -> (String, Option<String>) {
2324 let input = strip_trailing_provider_sentinels(input);
2325 if input.len() <= WECOM_MARKDOWN_MAX_BYTES {
2326 return (input, None);
2327 }
2328
2329 let mut head = String::new();
2330 let mut tail = String::new();
2331 let mut overflow = false;
2332 for ch in input.chars() {
2333 if !overflow && head.len() + ch.len_utf8() <= WECOM_MARKDOWN_MAX_BYTES {
2334 head.push(ch);
2335 } else {
2336 overflow = true;
2337 tail.push(ch);
2338 }
2339 }
2340
2341 if tail.is_empty() {
2342 (head, None)
2343 } else {
2344 (head, Some(tail))
2345 }
2346}
2347
2348fn strip_trailing_provider_sentinels(input: &str) -> String {
2349 let mut trimmed = input.trim_end();
2350
2351 while let Some(sentinel) = WECOM_PROVIDER_TRAILING_SENTINELS
2352 .iter()
2353 .find(|sentinel| trimmed.ends_with(**sentinel))
2354 {
2355 trimmed = trimmed[..trimmed.len() - sentinel.len()].trim_end();
2356 }
2357
2358 trimmed.to_string()
2359}
2360
2361fn parse_event_type(payload: &Value) -> Option<String> {
2362 payload
2363 .get("event")
2364 .and_then(|v| v.get("eventtype"))
2365 .and_then(Value::as_str)
2366 .map(str::trim)
2367 .filter(|v| !v.is_empty())
2368 .map(ToOwned::to_owned)
2369}
2370
2371fn extract_template_card_event_key(payload: &Value) -> Option<String> {
2372 payload
2373 .get("event")
2374 .and_then(|v| v.get("template_card_event"))
2375 .and_then(|v| {
2376 v.get("event_key")
2377 .or_else(|| v.get("eventkey"))
2378 .and_then(Value::as_str)
2379 })
2380 .map(str::trim)
2381 .filter(|v| !v.is_empty())
2382 .map(ToOwned::to_owned)
2383}
2384
2385fn extract_feedback_event_summary(payload: &Value) -> Option<String> {
2386 let feedback = payload.get("event")?.get("feedback_event")?;
2387 let feedback_id = feedback
2388 .get("id")
2389 .and_then(Value::as_str)
2390 .map(str::trim)
2391 .filter(|v| !v.is_empty())
2392 .unwrap_or("-");
2393 let feedback_type = feedback
2394 .get("type")
2395 .and_then(Value::as_i64)
2396 .map(|v| v.to_string())
2397 .unwrap_or_else(|| "-".to_string());
2398 let content = feedback
2399 .get("content")
2400 .and_then(Value::as_str)
2401 .map(str::trim)
2402 .filter(|v| !v.is_empty())
2403 .unwrap_or("-");
2404 Some(format!(
2405 "feedback_id={feedback_id} feedback_type={feedback_type} content={content}"
2406 ))
2407}
2408
2409fn extract_quote_context(payload: &Value) -> Option<String> {
2410 let quote = payload.get("quote")?;
2411 let quote_type = quote
2412 .get("msgtype")
2413 .and_then(Value::as_str)
2414 .map(str::trim)
2415 .filter(|v| !v.is_empty())?;
2416
2417 let content = match quote_type {
2418 "text" => quote
2419 .get("text")
2420 .and_then(|v| v.get("content"))
2421 .and_then(Value::as_str)
2422 .map(str::trim)
2423 .filter(|v| !v.is_empty())
2424 .map(ToOwned::to_owned)
2425 .unwrap_or_else(|| "[\u{5f15}\u{7528}\u{6587}\u{672c}\u{4e3a}\u{7a7a}]".to_string()),
2426 "voice" => quote
2427 .get("voice")
2428 .and_then(|v| v.get("content"))
2429 .and_then(Value::as_str)
2430 .map(str::trim)
2431 .filter(|v| !v.is_empty())
2432 .map(|v| format!("[\u{5f15}\u{7528}\u{8bed}\u{97f3}\u{8f6c}\u{5199}] {v}"))
2433 .unwrap_or_else(|| {
2434 "[\u{5f15}\u{7528}\u{8bed}\u{97f3}\u{65e0}\u{8f6c}\u{5199}]".to_string()
2435 }),
2436 "image" => quote
2437 .get("image")
2438 .and_then(|v| v.get("local_path"))
2439 .and_then(Value::as_str)
2440 .map(str::trim)
2441 .filter(|v| !v.is_empty())
2442 .map(|v| format!("[\u{5f15}\u{7528}\u{56fe}\u{7247}] {v}"))
2443 .unwrap_or_else(|| "[\u{5f15}\u{7528}\u{56fe}\u{7247}]".to_string()),
2444 "file" => quote
2445 .get("file")
2446 .and_then(|v| v.get("local_path"))
2447 .and_then(Value::as_str)
2448 .map(str::trim)
2449 .filter(|v| !v.is_empty())
2450 .map(|v| format!("[\u{5f15}\u{7528}\u{6587}\u{4ef6}] {v}"))
2451 .unwrap_or_else(|| "[\u{5f15}\u{7528}\u{6587}\u{4ef6}]".to_string()),
2452 "mixed" => {
2453 let mut parts = Vec::new();
2454 if let Some(items) = quote
2455 .get("mixed")
2456 .and_then(|v| v.get("msg_item"))
2457 .and_then(Value::as_array)
2458 {
2459 for item in items {
2460 let item_type = item
2461 .get("msgtype")
2462 .and_then(Value::as_str)
2463 .unwrap_or_default();
2464 if item_type == "text" {
2465 if let Some(text) = item
2466 .get("text")
2467 .and_then(|v| v.get("content"))
2468 .and_then(Value::as_str)
2469 .map(str::trim)
2470 .filter(|v| !v.is_empty())
2471 {
2472 parts.push(text.to_string());
2473 }
2474 } else if item_type == "image" {
2475 if let Some(path) = item
2476 .get("image")
2477 .and_then(|v| v.get("local_path"))
2478 .and_then(Value::as_str)
2479 .map(str::trim)
2480 .filter(|v| !v.is_empty())
2481 {
2482 parts.push(format!("[\u{5f15}\u{7528}\u{56fe}\u{7247}] {path}"));
2483 } else {
2484 parts.push("[\u{5f15}\u{7528}\u{56fe}\u{7247}]".to_string());
2485 }
2486 }
2487 }
2488 }
2489
2490 if parts.is_empty() {
2491 "[\u{5f15}\u{7528}\u{56fe}\u{6587}\u{6d88}\u{606f}]".to_string()
2492 } else {
2493 parts.join("\n")
2494 }
2495 }
2496 _ => format!("[\u{5f15}\u{7528}\u{6d88}\u{606f} type={quote_type}]"),
2497 };
2498
2499 let content = trim_utf8_to_max_bytes(&content, 4_096);
2500 Some(format!(
2501 "[WECOM_QUOTE]\nmsgtype={quote_type}\ncontent={content}\n[/WECOM_QUOTE]"
2502 ))
2503}
2504
2505fn bytes_timestamp_now() -> u64 {
2506 SystemTime::now()
2507 .duration_since(UNIX_EPOCH)
2508 .unwrap_or_default()
2509 .as_secs()
2510}
2511
2512fn split_markdown_chunks(input: &str) -> Vec<String> {
2513 let input = strip_trailing_provider_sentinels(input);
2514 if input.is_empty() {
2515 return vec![String::new()];
2516 }
2517
2518 let mut chunks = Vec::new();
2519 let mut current = String::new();
2520
2521 for line in input.lines() {
2522 let candidate = if current.is_empty() {
2523 line.to_string()
2524 } else {
2525 format!("{current}\n{line}")
2526 };
2527
2528 if candidate.len() > WECOM_MARKDOWN_CHUNK_BYTES
2529 && !current.is_empty()
2530 && current.len() <= WECOM_MARKDOWN_MAX_BYTES
2531 {
2532 chunks.push(current);
2533 current = line.to_string();
2534 continue;
2535 }
2536
2537 current = candidate;
2538 }
2539
2540 if !current.is_empty() {
2541 if current.len() <= WECOM_MARKDOWN_MAX_BYTES {
2542 chunks.push(current);
2543 } else {
2544 let mut buf = String::new();
2545 for ch in current.chars() {
2546 if buf.len() + ch.len_utf8() > WECOM_MARKDOWN_CHUNK_BYTES {
2547 chunks.push(buf);
2548 buf = String::new();
2549 }
2550 buf.push(ch);
2551 }
2552 if !buf.is_empty() {
2553 chunks.push(buf);
2554 }
2555 }
2556 }
2557
2558 if chunks.is_empty() {
2559 chunks.push(String::new());
2560 }
2561
2562 chunks
2563}
2564
2565fn is_model_supported_msgtype(msg_type: &str) -> bool {
2566 matches!(msg_type, "text" | "voice" | "image" | "file" | "mixed")
2567}
2568
2569fn is_voice_without_transcript(inbound: &ParsedInbound) -> bool {
2570 if inbound.msg_type != "voice" {
2571 return false;
2572 }
2573 inbound
2574 .raw_payload
2575 .get("voice")
2576 .and_then(|v| v.get("content"))
2577 .and_then(Value::as_str)
2578 .map(str::trim)
2579 .unwrap_or("")
2580 .is_empty()
2581}
2582
2583async fn cleanup_inbox_files(root: PathBuf, retention: Duration) {
2584 if !root.exists() {
2585 return;
2586 }
2587
2588 let mut stack = vec![root];
2589 while let Some(dir) = stack.pop() {
2590 let Ok(mut rd) = tokio::fs::read_dir(&dir).await else {
2591 continue;
2592 };
2593
2594 while let Ok(Some(entry)) = rd.next_entry().await {
2595 let path = entry.path();
2596 let Ok(meta) = entry.metadata().await else {
2597 continue;
2598 };
2599
2600 if meta.is_dir() {
2601 stack.push(path);
2602 continue;
2603 }
2604
2605 let Ok(modified) = meta.modified() else {
2606 continue;
2607 };
2608
2609 let age = SystemTime::now()
2610 .duration_since(modified)
2611 .unwrap_or_else(|_| Duration::from_secs(0));
2612 if age > retention {
2613 let _ = tokio::fs::remove_file(&path).await;
2614 }
2615 }
2616 }
2617}
2618
2619#[cfg(test)]
2620mod tests {
2621 use super::*;
2622
2623 #[test]
2624 fn scope_uses_group_shared_mode_by_default_for_group_chat() {
2625 let inbound = ParsedInbound {
2626 msg_id: "m1".to_string(),
2627 msg_type: "text".to_string(),
2628 chat_type: "group".to_string(),
2629 chat_id: Some("g1".to_string()),
2630 sender_userid: "u1".to_string(),
2631 aibot_id: "b1".to_string(),
2632 raw_payload: serde_json::json!({}),
2633 };
2634
2635 let scopes = compute_scopes(&inbound);
2636 assert_eq!(scopes.conversation_scope, "group--g1");
2637 }
2638
2639 #[test]
2640 fn split_markdown_chunks_preserves_large_input() {
2641 let input = "a".repeat(WECOM_MARKDOWN_CHUNK_BYTES * 3 + 100);
2642 let chunks = split_markdown_chunks(&input);
2643 assert!(chunks.len() >= 3);
2644 for chunk in chunks {
2645 assert!(chunk.len() <= WECOM_MARKDOWN_MAX_BYTES);
2646 }
2647 }
2648
2649 #[test]
2650 fn split_markdown_chunks_small_input() {
2651 let input = "Hello WeCom!";
2652 let chunks = split_markdown_chunks(input);
2653 assert_eq!(chunks.len(), 1);
2654 assert_eq!(chunks[0], "Hello WeCom!");
2655 }
2656
2657 #[test]
2658 fn split_markdown_chunks_empty_input() {
2659 let chunks = split_markdown_chunks("");
2660 assert_eq!(chunks.len(), 1);
2661 assert_eq!(chunks[0], "");
2662 }
2663
2664 #[test]
2665 fn strip_trailing_provider_sentinels_removes_eom_token() {
2666 assert_eq!(
2667 strip_trailing_provider_sentinels("Hi there!<|eom|>"),
2668 "Hi there!"
2669 );
2670 assert_eq!(
2671 strip_trailing_provider_sentinels("Hi there! <|eom|>\n\n"),
2672 "Hi there!"
2673 );
2674 }
2675
2676 #[test]
2677 fn strip_trailing_provider_sentinels_keeps_mid_message_token() {
2678 assert_eq!(
2679 strip_trailing_provider_sentinels("Literal <|eom|> marker in text."),
2680 "Literal <|eom|> marker in text."
2681 );
2682 }
2683
2684 #[test]
2685 fn outbound_stream_normalization_strips_trailing_provider_sentinel() {
2686 assert_eq!(normalize_stream_content("Hi there!<|eom|>"), "Hi there!");
2687 assert_eq!(
2688 split_stream_content_and_overflow("Hi there!<|eom|>"),
2689 ("Hi there!".to_string(), None)
2690 );
2691 assert_eq!(split_markdown_chunks("Hi there!<|eom|>"), vec!["Hi there!"]);
2692 }
2693
2694 #[test]
2695 fn group_bot_mention_hint_marks_addressed_wecom_message() {
2696 let inbound = test_inbound("group", Some("group-1"), "user-1");
2697 let composed = compose_content_for_framework(&inbound, "@danya say hi", Some("danya"));
2698
2699 assert!(composed.starts_with("[WeCom group message addressed to this bot via @danya]"));
2700 assert!(composed.ends_with("@danya say hi"));
2701 }
2702
2703 #[test]
2704 fn group_bot_mention_hint_omits_non_matching_messages() {
2705 let inbound = test_inbound("group", Some("group-1"), "user-1");
2706 assert_eq!(
2707 compose_content_for_framework(&inbound, "@otherbot say hi", Some("danya")),
2708 "@otherbot say hi"
2709 );
2710 assert_eq!(
2711 compose_content_for_framework(&inbound, "@danya say hi", None),
2712 "@danya say hi"
2713 );
2714
2715 let dm = test_inbound("single", None, "user-1");
2716 assert_eq!(
2717 compose_content_for_framework(&dm, "@danya say hi", Some("danya")),
2718 "@danya say hi"
2719 );
2720 }
2721
2722 #[test]
2723 fn text_mentions_bot_name_uses_simple_boundary_check() {
2724 assert!(text_mentions_bot_name("@danya say hi", "danya"));
2725 assert!(text_mentions_bot_name("hey @danya, say hi", "danya"));
2726 assert!(!text_mentions_bot_name("@danyabot say hi", "danya"));
2727 }
2728
2729 #[test]
2730 fn summarize_attachment_url_for_log_redacts_query_string() {
2731 let url = "https://wework.qpic.cn/wwpic/123456/0?auth=secret_token&expires=123";
2732 let summary = summarize_attachment_url_for_log(url);
2733 assert_eq!(
2734 summary,
2735 "https://wework.qpic.cn/wwpic/123456/0 (query=present)"
2736 );
2737 assert!(!summary.contains("secret_token"));
2738 }
2739
2740 #[test]
2741 fn summarize_attachment_url_for_log_handles_invalid_input() {
2742 let summary = summarize_attachment_url_for_log("not a url");
2743 assert_eq!(summary, "invalid-url(len=9)");
2744 }
2745
2746 #[test]
2747 fn stop_command_detection_supports_cn_and_en() {
2748 assert!(contains_stop_command("\u{505c}\u{6b62}"));
2749 assert!(contains_stop_command("Please STOP now"));
2750 assert!(contains_stop_command("@bot /stop"));
2751 assert!(!contains_stop_command("\u{7ee7}\u{7eed}\u{5904}\u{7406}"));
2752 assert!(!contains_stop_command("explain nonstop operation"));
2753 assert!(!contains_stop_command("what are stopwords?"));
2754 }
2755
2756 #[test]
2757 fn image_file_extension_uses_magic_bytes() {
2758 assert_eq!(image_file_extension(b"\x89PNG\r\n\x1a\nrest"), "png");
2759 assert_eq!(image_file_extension(&[0xff, 0xd8, 0xff, 0x00]), "jpg");
2760 assert_eq!(image_file_extension(b"GIF89a rest"), "gif");
2761 assert_eq!(
2762 image_file_extension(b"RIFF\x00\x00\x00\x00WEBPrest"),
2763 "webp"
2764 );
2765 assert_eq!(image_file_extension(b"not an image"), "bin");
2766 }
2767
2768 #[test]
2769 fn filename_scope_components_reject_path_separators() {
2770 assert_eq!(normalize_scope_component("../room/msg-1"), "___room_msg-1");
2771 }
2772
2773 #[test]
2774 fn idempotency_store_is_bounded() {
2775 let store = SimpleIdempotencyStore::new();
2776 for idx in 0..(WECOM_IDEMPOTENCY_MAX_KEYS + 1) {
2777 assert!(store.record_if_new(&format!("msg-{idx}")));
2778 }
2779 assert_eq!(store.seen.lock().len(), WECOM_IDEMPOTENCY_MAX_KEYS);
2780 assert_eq!(store.order.lock().len(), WECOM_IDEMPOTENCY_MAX_KEYS);
2781 assert!(store.record_if_new("msg-0"));
2782 }
2783
2784 #[test]
2785 fn parse_event_type_extracts_enter_chat() {
2786 let payload = serde_json::json!({
2787 "event": {
2788 "eventtype": "enter_chat"
2789 }
2790 });
2791 assert_eq!(parse_event_type(&payload).as_deref(), Some("enter_chat"));
2792 }
2793
2794 #[test]
2795 fn extract_quote_context_from_text_quote() {
2796 let payload = serde_json::json!({
2797 "quote": {
2798 "msgtype": "text",
2799 "text": {
2800 "content": " \u{5f15}\u{7528}\u{5185}\u{5bb9} "
2801 }
2802 }
2803 });
2804
2805 let quote = extract_quote_context(&payload).expect("quote should be extracted");
2806 assert!(quote.contains("msgtype=text"));
2807 assert!(quote.contains("content=\u{5f15}\u{7528}\u{5185}\u{5bb9}"));
2808 }
2809
2810 #[test]
2811 fn extract_quote_context_from_mixed_quote() {
2812 let payload = serde_json::json!({
2813 "quote": {
2814 "msgtype": "mixed",
2815 "mixed": {
2816 "msg_item": [
2817 {
2818 "msgtype": "text",
2819 "text": {
2820 "content": "\u{7b2c}\u{4e00}\u{6bb5}"
2821 }
2822 },
2823 {
2824 "msgtype": "image",
2825 "image": {
2826 "url": "https://example.com/image.png"
2827 }
2828 }
2829 ]
2830 }
2831 }
2832 });
2833
2834 let quote = extract_quote_context(&payload).expect("quote should be extracted");
2835 assert!(quote.contains("\u{7b2c}\u{4e00}\u{6bb5}"));
2836 assert!(quote.contains("\u{5f15}\u{7528}\u{56fe}\u{7247}"));
2837 }
2838
2839 #[test]
2840 fn extract_quote_context_does_not_leak_remote_media_url() {
2841 let payload = serde_json::json!({
2842 "quote": {
2843 "msgtype": "image",
2844 "image": {
2845 "url": "https://example.com/tmp-sign-url"
2846 }
2847 }
2848 });
2849
2850 let quote = extract_quote_context(&payload).expect("quote should be extracted");
2851 assert!(quote.contains("[\u{5f15}\u{7528}\u{56fe}\u{7247}]"));
2852 assert!(!quote.contains("example.com/tmp-sign-url"));
2853 }
2854
2855 #[test]
2856 fn extract_template_card_event_key_reads_event_key() {
2857 let payload = serde_json::json!({
2858 "event": {
2859 "eventtype": "template_card_event",
2860 "template_card_event": {
2861 "event_key": "button_confirm"
2862 }
2863 }
2864 });
2865 assert_eq!(
2866 extract_template_card_event_key(&payload).as_deref(),
2867 Some("button_confirm")
2868 );
2869 }
2870
2871 #[test]
2872 fn extract_feedback_event_summary_reads_fields() {
2873 let payload = serde_json::json!({
2874 "event": {
2875 "eventtype": "feedback_event",
2876 "feedback_event": {
2877 "id": "fb_1",
2878 "type": 2,
2879 "content": "not accurate"
2880 }
2881 }
2882 });
2883 let summary = extract_feedback_event_summary(&payload).expect("summary should exist");
2884 assert!(summary.contains("feedback_id=fb_1"));
2885 assert!(summary.contains("feedback_type=2"));
2886 assert!(summary.contains("content=not accurate"));
2887 }
2888
2889 #[test]
2890 fn clear_session_bare_commands() {
2891 assert!(is_clear_session_command("/clear"));
2892 assert!(is_clear_session_command("/new"));
2893 assert!(is_clear_session_command("/CLEAR"));
2894 assert!(is_clear_session_command("/New"));
2895 assert!(is_clear_session_command(" /clear "));
2896 }
2897
2898 #[test]
2899 fn clear_session_with_mentions() {
2900 assert!(is_clear_session_command("@bot /clear"));
2901 assert!(is_clear_session_command("/clear @bot"));
2902 assert!(is_clear_session_command("@bot1 @bot2 /new"));
2903 assert!(is_clear_session_command("@bot /new @other"));
2904 }
2905
2906 #[test]
2907 fn clear_session_rejects_old_and_invalid() {
2908 assert!(!is_clear_session_command("\u{65b0}\u{4f1a}\u{8bdd}"));
2909 assert!(!is_clear_session_command("clear history"));
2910 assert!(!is_clear_session_command("/clear now"));
2911 assert!(!is_clear_session_command("please /new"));
2912 assert!(!is_clear_session_command(""));
2913 assert!(!is_clear_session_command(" "));
2914 }
2915
2916 #[test]
2917 fn runtime_model_switch_command_with_mentions() {
2918 assert_eq!(
2919 extract_runtime_model_switch_command("@bot /model gpt-5 @other"),
2920 Some("/model gpt-5".to_string())
2921 );
2922 assert_eq!(
2923 extract_runtime_model_switch_command("@bot /models openrouter"),
2924 Some("/models openrouter".to_string())
2925 );
2926 assert_eq!(
2927 extract_runtime_model_switch_command(" /MODEL@zeroclaw qwen-max "),
2928 Some("/MODEL@zeroclaw qwen-max".to_string())
2929 );
2930 }
2931
2932 #[test]
2933 fn runtime_model_switch_command_rejects_non_commands() {
2934 assert_eq!(extract_runtime_model_switch_command("/new"), None);
2935 assert_eq!(
2936 extract_runtime_model_switch_command("please /model gpt-5"),
2937 None
2938 );
2939 assert_eq!(extract_runtime_model_switch_command(""), None);
2940 }
2941
2942 #[test]
2943 fn parse_scope_user() {
2944 let (chat_type, chatid) = parse_scope("user--zeroclaw_user").unwrap();
2945 assert_eq!(chat_type, 1);
2946 assert_eq!(chatid, "zeroclaw_user");
2947 }
2948
2949 #[test]
2950 fn parse_scope_group() {
2951 let (chat_type, chatid) = parse_scope("group--zeroclaw_group").unwrap();
2952 assert_eq!(chat_type, 2);
2953 assert_eq!(chatid, "zeroclaw_group");
2954 }
2955
2956 #[test]
2957 fn parse_scope_invalid() {
2958 assert!(parse_scope("invalid_scope").is_err());
2959 }
2960
2961 fn test_inbound(chat_type: &str, chat_id: Option<&str>, sender_userid: &str) -> ParsedInbound {
2962 ParsedInbound {
2963 msg_id: "msg-1".to_string(),
2964 msg_type: "text".to_string(),
2965 chat_type: chat_type.to_string(),
2966 chat_id: chat_id.map(str::to_string),
2967 sender_userid: sender_userid.to_string(),
2968 aibot_id: "bot123".to_string(),
2969 raw_payload: serde_json::json!({
2970 "msgtype": "text",
2971 "msgid": "msg-1",
2972 "chattype": chat_type,
2973 "chatid": chat_id,
2974 "from": { "userid": sender_userid },
2975 "text": { "content": "@bot hello" }
2976 }),
2977 }
2978 }
2979
2980 fn test_wecom_ws_config() -> WeComWsConfig {
2981 WeComWsConfig {
2982 enabled: true,
2983 bot_id: "bot123".to_string(),
2984 secret: "secret456".to_string(),
2985 allowed_users: vec![],
2986 allowed_groups: vec![],
2987 bot_name: None,
2988 file_retention_days: 3,
2989 max_file_size_mb: 20,
2990 stream_mode: StreamMode::Partial,
2991 proxy_url: None,
2992 excluded_tools: vec![],
2993 }
2994 }
2995
2996 #[test]
2997 fn access_decision_denies_when_allowlists_missing() {
2998 let inbound = test_inbound("single", None, "zeroclaw_user");
2999 assert_eq!(
3000 evaluate_access_decision(&[], &[], &inbound),
3001 AccessDecision::AllowlistMissing
3002 );
3003 }
3004
3005 #[test]
3006 fn access_decision_allows_userid_in_single_chat() {
3007 let inbound = test_inbound("single", None, "zeroclaw_user");
3008 assert_eq!(
3009 evaluate_access_decision(&["zeroclaw_user".to_string()], &[], &inbound),
3010 AccessDecision::Allowed
3011 );
3012 }
3013
3014 #[test]
3015 fn access_decision_allows_group_chatid() {
3016 let inbound = test_inbound("group", Some("zeroclaw_group"), "zeroclaw_user");
3017 assert_eq!(
3018 evaluate_access_decision(&[], &["zeroclaw_group".to_string()], &inbound),
3019 AccessDecision::Allowed
3020 );
3021 }
3022
3023 #[test]
3024 fn access_decision_allows_wildcards() {
3025 let inbound = test_inbound("group", Some("zeroclaw_group"), "zeroclaw_user");
3026 assert_eq!(
3027 evaluate_access_decision(&["*".to_string()], &[], &inbound),
3028 AccessDecision::Allowed
3029 );
3030 assert_eq!(
3031 evaluate_access_decision(&[], &["*".to_string()], &inbound),
3032 AccessDecision::Allowed
3033 );
3034 }
3035
3036 #[test]
3037 fn denied_group_message_mentions_chatid_and_userid() {
3038 let inbound = test_inbound("group", Some("zeroclaw_group"), "zeroclaw_user");
3039 let text = build_access_denied_message(&inbound, AccessDecision::Denied, "primary");
3040 assert!(text.contains("zeroclaw_group"));
3041 assert!(text.contains("zeroclaw_user"));
3042 assert!(text.contains("allowed_groups"));
3043 assert!(text.contains("wecom_ws"));
3044 }
3045
3046 #[test]
3047 fn supports_draft_updates_respects_stream_mode() {
3048 let mut off_cfg = test_wecom_ws_config();
3049 off_cfg.stream_mode = StreamMode::Off;
3050 let off = WeComWsChannel::new(&off_cfg, Path::new("/tmp")).unwrap();
3051 assert!(!off.supports_draft_updates());
3052
3053 let partial = WeComWsChannel::new(&test_wecom_ws_config(), Path::new("/tmp")).unwrap();
3054 assert!(partial.supports_draft_updates());
3055 }
3056
3057 #[test]
3058 fn multi_message_stream_mode_is_rejected() {
3059 let mut cfg = test_wecom_ws_config();
3060 cfg.stream_mode = StreamMode::MultiMessage;
3061 let err = match WeComWsChannel::new(&cfg, Path::new("/tmp")) {
3062 Ok(_) => panic!("multi_message should be rejected"),
3063 Err(err) => err.to_string(),
3064 };
3065 assert!(err.contains("multi_message is not supported"));
3066 }
3067
3068 #[tokio::test]
3069 async fn send_draft_returns_none_when_stream_mode_off() {
3070 let mut cfg = test_wecom_ws_config();
3071 cfg.stream_mode = StreamMode::Off;
3072 let channel = WeComWsChannel::new(&cfg, Path::new("/tmp")).unwrap();
3073
3074 let id = channel
3075 .send_draft(&SendMessage::new("draft", "user--zeroclaw_user"))
3076 .await
3077 .unwrap();
3078
3079 assert!(id.is_none());
3080 }
3081
3082 #[tokio::test]
3083 async fn send_draft_failure_does_not_record_req_id_mapping() {
3084 let channel = WeComWsChannel::new(&test_wecom_ws_config(), Path::new("/tmp")).unwrap();
3085 let result = channel
3086 .send_draft(
3087 &SendMessage::new("draft", "user--zeroclaw_user")
3088 .in_thread(Some("req-draft".to_string())),
3089 )
3090 .await;
3091
3092 assert!(result.is_err());
3093 assert!(channel.req_id_map.lock().is_empty());
3094 }
3095
3096 #[tokio::test]
3097 async fn finalize_draft_failure_cleans_req_id_mapping() {
3098 let channel = WeComWsChannel::new(&test_wecom_ws_config(), Path::new("/tmp")).unwrap();
3099 channel
3100 .req_id_map
3101 .lock()
3102 .insert("stream-1".to_string(), "req-finalize".to_string());
3103
3104 let result = channel
3105 .finalize_draft("user--zeroclaw_user", "stream-1", "final")
3106 .await;
3107
3108 assert!(result.is_err());
3109 assert!(channel.req_id_map.lock().is_empty());
3110 }
3111
3112 #[tokio::test]
3113 async fn send_with_req_id_uses_respond_msg_when_stream_mode_off() {
3114 let mut cfg = test_wecom_ws_config();
3115 cfg.stream_mode = StreamMode::Off;
3116 let channel = WeComWsChannel::new(&cfg, Path::new("/tmp")).unwrap();
3117
3118 let (ws_tx, mut ws_rx) = mpsc::channel::<WsOutbound>(4);
3119 *channel.ws_tx.lock().await = Some(ws_tx);
3120
3121 let responder_channel = channel.clone();
3122 let responder = tokio::spawn(async move {
3123 let Some(WsOutbound::Frame(frame)) = ws_rx.recv().await else {
3124 panic!("expected respond_msg frame");
3125 };
3126 let req_id = frame
3127 .get("headers")
3128 .and_then(|headers| headers.get("req_id"))
3129 .and_then(Value::as_str)
3130 .unwrap_or("")
3131 .to_string();
3132 responder_channel
3133 .maybe_handle_command_response(&serde_json::json!({
3134 "headers": { "req_id": req_id },
3135 "errcode": 0,
3136 "errmsg": "ok"
3137 }))
3138 .await;
3139 frame
3140 });
3141
3142 channel
3143 .send(
3144 &SendMessage::new("runtime ok", "user--zeroclaw_user")
3145 .in_thread(Some("req-runtime".to_string())),
3146 )
3147 .await
3148 .unwrap();
3149
3150 let frame = responder.await.unwrap();
3151 assert_eq!(
3152 frame.get("cmd").and_then(Value::as_str),
3153 Some("aibot_respond_msg")
3154 );
3155 assert_eq!(
3156 frame
3157 .get("headers")
3158 .and_then(|headers| headers.get("req_id"))
3159 .and_then(Value::as_str),
3160 Some("req-runtime")
3161 );
3162 assert_eq!(
3163 frame
3164 .pointer("/body/stream/content")
3165 .and_then(Value::as_str),
3166 Some("runtime ok")
3167 );
3168 assert_eq!(
3169 frame
3170 .pointer("/body/stream/finish")
3171 .and_then(Value::as_bool),
3172 Some(true)
3173 );
3174 }
3175
3176 #[tokio::test]
3177 async fn send_without_req_id_uses_send_msg() {
3178 let channel = WeComWsChannel::new(&test_wecom_ws_config(), Path::new("/tmp")).unwrap();
3179
3180 let (ws_tx, mut ws_rx) = mpsc::channel::<WsOutbound>(4);
3181 *channel.ws_tx.lock().await = Some(ws_tx);
3182
3183 let responder_channel = channel.clone();
3184 let responder = tokio::spawn(async move {
3185 let Some(WsOutbound::Frame(frame)) = ws_rx.recv().await else {
3186 panic!("expected send_msg frame");
3187 };
3188 let req_id = frame
3189 .get("headers")
3190 .and_then(|headers| headers.get("req_id"))
3191 .and_then(Value::as_str)
3192 .unwrap_or("")
3193 .to_string();
3194 responder_channel
3195 .maybe_handle_command_response(&serde_json::json!({
3196 "headers": { "req_id": req_id },
3197 "errcode": 0,
3198 "errmsg": "ok"
3199 }))
3200 .await;
3201 frame
3202 });
3203
3204 channel
3205 .send(&SendMessage::new("hello proactive", "user--zeroclaw_user"))
3206 .await
3207 .unwrap();
3208
3209 let frame = responder.await.unwrap();
3210 assert_eq!(
3211 frame.get("cmd").and_then(Value::as_str),
3212 Some("aibot_send_msg")
3213 );
3214 assert_eq!(
3215 frame
3216 .pointer("/body/markdown/content")
3217 .and_then(Value::as_str),
3218 Some("hello proactive")
3219 );
3220 }
3221
3222 #[tokio::test]
3223 async fn command_response_resolves_waiter_successfully() {
3224 let config = test_wecom_ws_config();
3225 let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3226
3227 let (waiter, rx) = tokio::sync::oneshot::channel();
3228 channel
3229 .pending_responses
3230 .lock()
3231 .await
3232 .insert("req-ok".to_string(), waiter);
3233
3234 assert!(
3235 channel
3236 .maybe_handle_command_response(&serde_json::json!({
3237 "headers": { "req_id": "req-ok" },
3238 "errcode": 0,
3239 "errmsg": "ok"
3240 }))
3241 .await
3242 );
3243 assert!(rx.await.unwrap().is_ok());
3244 }
3245
3246 #[tokio::test]
3247 async fn command_response_resolves_waiter_failure() {
3248 let config = test_wecom_ws_config();
3249 let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3250
3251 let (waiter, rx) = tokio::sync::oneshot::channel();
3252 channel
3253 .pending_responses
3254 .lock()
3255 .await
3256 .insert("req-fail".to_string(), waiter);
3257
3258 assert!(
3259 channel
3260 .maybe_handle_command_response(&serde_json::json!({
3261 "headers": { "req_id": "req-fail" },
3262 "errcode": 93001,
3263 "errmsg": "session not allowed"
3264 }))
3265 .await
3266 );
3267 let err = rx.await.unwrap().unwrap_err().to_string();
3268 assert!(err.contains("errcode=93001"));
3269 assert!(err.contains("session not allowed"));
3270 }
3271
3272 #[tokio::test]
3273 async fn handle_ws_message_consumes_command_ack_without_forwarding() {
3274 let config = test_wecom_ws_config();
3275 let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3276
3277 let (waiter, ack_rx) = tokio::sync::oneshot::channel();
3278 channel
3279 .pending_responses
3280 .lock()
3281 .await
3282 .insert("req-ack".to_string(), waiter);
3283
3284 let (tx, mut rx) = mpsc::channel::<ChannelMessage>(1);
3285 let should_reconnect = channel
3286 .handle_ws_message(
3287 serde_json::json!({
3288 "cmd": "aibot_respond_msg",
3289 "headers": { "req_id": "req-ack" },
3290 "errcode": 0,
3291 "errmsg": "ok"
3292 }),
3293 &tx,
3294 )
3295 .await;
3296
3297 assert!(!should_reconnect);
3298 assert!(ack_rx.await.unwrap().is_ok());
3299 assert!(
3300 tokio::time::timeout(Duration::from_millis(100), rx.recv())
3301 .await
3302 .is_err(),
3303 "command ack must not be forwarded as an inbound channel message"
3304 );
3305 }
3306
3307 #[tokio::test]
3308 async fn clear_command_forwards_runtime_new_session_without_immediate_ws_reply() {
3309 let mut config = test_wecom_ws_config();
3310 config.allowed_users = vec!["zeroclaw_user".to_string()];
3311 let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3312
3313 let (ws_tx, mut ws_rx) = mpsc::channel::<WsOutbound>(1);
3314 *channel.ws_tx.lock().await = Some(ws_tx);
3315
3316 let (tx, mut rx) = mpsc::channel::<ChannelMessage>(1);
3317 channel
3318 .handle_msg_callback(
3319 serde_json::json!({
3320 "headers": { "req_id": "req-clear" },
3321 "body": {
3322 "msgtype": "text",
3323 "msgid": "msg-clear",
3324 "chattype": "single",
3325 "from": { "userid": "zeroclaw_user" },
3326 "text": { "content": "/clear" }
3327 }
3328 }),
3329 &tx,
3330 )
3331 .await;
3332
3333 let forwarded = tokio::time::timeout(Duration::from_millis(100), rx.recv())
3334 .await
3335 .expect("clear command should be forwarded promptly")
3336 .expect("clear command should produce a framework message");
3337 assert_eq!(forwarded.content, "/new");
3338 assert_eq!(forwarded.thread_ts.as_deref(), Some("req-clear"));
3339
3340 assert!(
3341 tokio::time::timeout(Duration::from_millis(100), ws_rx.recv())
3342 .await
3343 .is_err(),
3344 "clear command should not emit an immediate websocket reply"
3345 );
3346 }
3347
3348 #[tokio::test]
3349 async fn clear_command_ws_dispatch_does_not_block_when_framework_queue_is_full() {
3350 let mut config = test_wecom_ws_config();
3351 config.allowed_users = vec!["zeroclaw_user".to_string()];
3352 let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3353
3354 let (tx, mut rx) = mpsc::channel::<ChannelMessage>(1);
3355 tx.send(ChannelMessage::new(
3356 "prefill-clear",
3357 "tester",
3358 "user--zeroclaw_user",
3359 "prefill",
3360 "wecom_ws",
3361 bytes_timestamp_now(),
3362 ))
3363 .await
3364 .unwrap();
3365
3366 let should_reconnect = tokio::time::timeout(
3367 Duration::from_millis(100),
3368 channel.handle_ws_message(
3369 serde_json::json!({
3370 "cmd": "aibot_msg_callback",
3371 "headers": { "req_id": "req-clear-dispatch" },
3372 "body": {
3373 "msgtype": "text",
3374 "msgid": "msg-clear-dispatch",
3375 "chattype": "single",
3376 "from": { "userid": "zeroclaw_user" },
3377 "text": { "content": "/clear" }
3378 }
3379 }),
3380 &tx,
3381 ),
3382 )
3383 .await
3384 .expect("clear dispatch should not block the websocket loop");
3385
3386 assert!(!should_reconnect);
3387
3388 let first = tokio::time::timeout(Duration::from_millis(100), rx.recv())
3389 .await
3390 .expect("prefilled framework message should be readable")
3391 .expect("prefilled framework message should exist");
3392 assert_eq!(first.id, "prefill-clear");
3393
3394 let forwarded = tokio::time::timeout(Duration::from_millis(100), rx.recv())
3395 .await
3396 .expect("clear command should forward once queue space is available")
3397 .expect("clear command should produce a framework message");
3398 assert_eq!(forwarded.content, "/new");
3399 assert_eq!(forwarded.thread_ts.as_deref(), Some("req-clear-dispatch"));
3400 }
3401
3402 #[tokio::test]
3403 async fn unauthorized_group_message_replies_with_chatid_and_does_not_forward() {
3404 let config = test_wecom_ws_config();
3405 let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3406
3407 let (ws_tx, mut ws_rx) = mpsc::channel::<WsOutbound>(4);
3408 *channel.ws_tx.lock().await = Some(ws_tx);
3409
3410 let responder_channel = channel.clone();
3411 let responder = tokio::spawn(async move {
3412 let Some(WsOutbound::Frame(frame)) = ws_rx.recv().await else {
3413 panic!("expected access-denied response frame");
3414 };
3415 let req_id = frame
3416 .get("headers")
3417 .and_then(|headers| headers.get("req_id"))
3418 .and_then(Value::as_str)
3419 .unwrap_or("")
3420 .to_string();
3421 let content = frame
3422 .pointer("/body/stream/content")
3423 .and_then(Value::as_str)
3424 .unwrap_or("")
3425 .to_string();
3426 responder_channel
3427 .maybe_handle_command_response(&serde_json::json!({
3428 "headers": { "req_id": req_id },
3429 "errcode": 0,
3430 "errmsg": "ok"
3431 }))
3432 .await;
3433 content
3434 });
3435
3436 let (tx, mut rx) = mpsc::channel::<ChannelMessage>(1);
3437 channel
3438 .handle_msg_callback(
3439 serde_json::json!({
3440 "headers": { "req_id": "req-denied" },
3441 "body": {
3442 "msgtype": "text",
3443 "msgid": "msg-denied",
3444 "chattype": "group",
3445 "chatid": "zeroclaw_group",
3446 "from": { "userid": "zeroclaw_user" },
3447 "text": { "content": "@bot hello" }
3448 }
3449 }),
3450 &tx,
3451 )
3452 .await;
3453
3454 assert!(
3455 tokio::time::timeout(Duration::from_millis(100), rx.recv())
3456 .await
3457 .is_err(),
3458 "unauthorized message must not reach framework"
3459 );
3460
3461 let denied = responder.await.unwrap();
3462 assert!(denied.contains("zeroclaw_group"));
3463 assert!(denied.contains("zeroclaw_user"));
3464 assert!(denied.contains("allowed_groups"));
3465 }
3466
3467 #[tokio::test]
3468 async fn unauthorized_message_ws_dispatch_returns_without_waiting_for_ack() {
3469 let config = test_wecom_ws_config();
3470 let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3471
3472 let (ws_tx, mut ws_rx) = mpsc::channel::<WsOutbound>(4);
3473 *channel.ws_tx.lock().await = Some(ws_tx);
3474
3475 let (tx, mut rx) = mpsc::channel::<ChannelMessage>(1);
3476 let should_reconnect = tokio::time::timeout(
3477 Duration::from_millis(100),
3478 channel.handle_ws_message(
3479 serde_json::json!({
3480 "cmd": "aibot_msg_callback",
3481 "headers": { "req_id": "req-denied-no-ack" },
3482 "body": {
3483 "msgtype": "text",
3484 "msgid": "msg-denied-no-ack",
3485 "chattype": "single",
3486 "from": { "userid": "zeroclaw_user" },
3487 "text": { "content": "@bot hello" }
3488 }
3489 }),
3490 &tx,
3491 ),
3492 )
3493 .await
3494 .expect("access-denied dispatch should not block on websocket ack");
3495
3496 assert!(!should_reconnect);
3497
3498 assert!(
3499 tokio::time::timeout(Duration::from_millis(100), rx.recv())
3500 .await
3501 .is_err(),
3502 "unauthorized message must not reach framework"
3503 );
3504
3505 let Some(WsOutbound::Frame(frame)) =
3506 tokio::time::timeout(Duration::from_millis(100), ws_rx.recv())
3507 .await
3508 .expect("access-denied reply should be queued promptly")
3509 else {
3510 panic!("expected access-denied response frame");
3511 };
3512
3513 assert_eq!(
3514 frame.get("cmd").and_then(Value::as_str),
3515 Some("aibot_respond_msg")
3516 );
3517 assert_eq!(
3518 frame
3519 .get("headers")
3520 .and_then(|headers| headers.get("req_id"))
3521 .and_then(Value::as_str),
3522 Some("req-denied-no-ack")
3523 );
3524 assert!(
3525 frame
3526 .pointer("/body/stream/content")
3527 .and_then(Value::as_str)
3528 .is_some_and(|content| content.contains("allowed_users")),
3529 "access-denied reply should explain how to configure the allowlist"
3530 );
3531 }
3532
3533 #[tokio::test]
3534 async fn stream_reply_retries_data_version_conflict() {
3535 let config = test_wecom_ws_config();
3536 let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3537
3538 let (tx, mut rx) = mpsc::channel::<WsOutbound>(8);
3539 *channel.ws_tx.lock().await = Some(tx);
3540
3541 let attempts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
3542 let responder_channel = channel.clone();
3543 let responder_attempts = Arc::clone(&attempts);
3544 let responder = tokio::spawn(async move {
3545 while let Some(WsOutbound::Frame(frame)) = rx.recv().await {
3546 let attempt = responder_attempts.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
3547 let req_id = frame
3548 .get("headers")
3549 .and_then(|headers| headers.get("req_id"))
3550 .and_then(Value::as_str)
3551 .unwrap_or("")
3552 .to_string();
3553
3554 let errcode = if attempt == 0 { 6000 } else { 0 };
3555 let errmsg = if errcode == 0 {
3556 "ok"
3557 } else {
3558 "more than one callers at the same time, data version conflict"
3559 };
3560 responder_channel
3561 .maybe_handle_command_response(&serde_json::json!({
3562 "headers": { "req_id": req_id },
3563 "errcode": errcode,
3564 "errmsg": errmsg
3565 }))
3566 .await;
3567
3568 if errcode == 0 {
3569 break;
3570 }
3571 }
3572 });
3573
3574 channel
3575 .ws_send_respond_msg("req-stream", "stream-1", "hello", false)
3576 .await
3577 .unwrap();
3578
3579 responder.await.unwrap();
3580 assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 2);
3581 }
3582
3583 #[tokio::test]
3584 async fn stream_reply_serializes_same_req_id_updates() {
3585 let config = test_wecom_ws_config();
3586 let channel = WeComWsChannel::new(&config, Path::new("/tmp")).unwrap();
3587
3588 let (tx, mut rx) = mpsc::channel::<WsOutbound>(8);
3589 *channel.ws_tx.lock().await = Some(tx);
3590
3591 let first_channel = channel.clone();
3592 let first = tokio::spawn(async move {
3593 first_channel
3594 .ws_send_respond_msg("req-serial", "stream-1", "first", false)
3595 .await
3596 });
3597
3598 let second_channel = channel.clone();
3599 let second = tokio::spawn(async move {
3600 second_channel
3601 .ws_send_respond_msg("req-serial", "stream-1", "second", false)
3602 .await
3603 });
3604
3605 let first_frame = tokio::time::timeout(Duration::from_millis(250), rx.recv())
3606 .await
3607 .expect("first frame should arrive")
3608 .expect("first frame should exist");
3609 let WsOutbound::Frame(first_frame) = first_frame;
3610 assert_eq!(
3611 first_frame
3612 .get("body")
3613 .and_then(|body| body.get("stream"))
3614 .and_then(|stream| stream.get("content"))
3615 .and_then(Value::as_str),
3616 Some("first")
3617 );
3618
3619 assert!(
3620 tokio::time::timeout(Duration::from_millis(75), rx.recv())
3621 .await
3622 .is_err(),
3623 "second frame should wait for the first ack"
3624 );
3625
3626 channel
3627 .maybe_handle_command_response(&serde_json::json!({
3628 "headers": { "req_id": "req-serial" },
3629 "errcode": 0,
3630 "errmsg": "ok"
3631 }))
3632 .await;
3633 first.await.unwrap().unwrap();
3634
3635 let second_frame = tokio::time::timeout(Duration::from_millis(250), rx.recv())
3636 .await
3637 .expect("second frame should arrive after first ack")
3638 .expect("second frame should exist");
3639 let WsOutbound::Frame(second_frame) = second_frame;
3640 assert_eq!(
3641 second_frame
3642 .get("body")
3643 .and_then(|body| body.get("stream"))
3644 .and_then(|stream| stream.get("content"))
3645 .and_then(Value::as_str),
3646 Some("second")
3647 );
3648
3649 channel
3650 .maybe_handle_command_response(&serde_json::json!({
3651 "headers": { "req_id": "req-serial" },
3652 "errcode": 0,
3653 "errmsg": "ok"
3654 }))
3655 .await;
3656 second.await.unwrap().unwrap();
3657 }
3658}