1use anyhow::Context;
2use async_trait::async_trait;
3use base64::Engine as _;
4use chrono::Utc;
5use futures_util::{SinkExt, StreamExt};
6use reqwest::header::HeaderMap;
7use std::collections::{HashMap, HashSet};
8use std::path::{Path, PathBuf};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11use tokio::io::AsyncWriteExt;
12use tokio::sync::{Mutex as AsyncMutex, oneshot};
13use tokio_tungstenite::tungstenite::Message as WsMessage;
14use zeroclaw_api::channel::{
15 Channel, ChannelApprovalRequest, ChannelApprovalResponse, ChannelMessage, SendMessage,
16};
17
18#[derive(Clone)]
19struct CachedSlackDisplayName {
20 display_name: String,
21 expires_at: Instant,
22}
23
24#[allow(clippy::struct_excessive_bools)]
26pub struct SlackChannel {
27 bot_token: String,
28 app_token: Option<String>,
29 channel_ids: Vec<String>,
30 alias: String,
33 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
36 thread_replies: bool,
37 mention_only: bool,
38 strict_mention_in_thread: bool,
39 group_reply_allowed_sender_ids: Vec<String>,
40 user_display_name_cache: Mutex<HashMap<String, CachedSlackDisplayName>>,
41 workspace_dir: Option<PathBuf>,
42 active_assistant_thread: Mutex<HashMap<String, String>>,
44 use_markdown_blocks: bool,
46 proxy_url: Option<String>,
48 transcription: Option<zeroclaw_config::schema::TranscriptionConfig>,
51 transcription_manager: Option<std::sync::Arc<super::transcription::TranscriptionManager>>,
52 stream_drafts: bool,
54 draft_update_interval_ms: u64,
56 last_draft_edit: Mutex<HashMap<String, Instant>>,
58 lazy_draft_ts: tokio::sync::Mutex<HashMap<String, String>>,
62 cancel_reaction: Option<String>,
64 pending_approvals: Arc<AsyncMutex<HashMap<String, oneshot::Sender<ChannelApprovalResponse>>>>,
65 approval_timeout_secs: u64,
68 cached_bot_user_id: Mutex<Option<String>>,
73}
74
75const SLACK_HISTORY_MAX_RETRIES: u32 = 3;
76const SLACK_HISTORY_DEFAULT_RETRY_AFTER_SECS: u64 = 1;
77const SLACK_HISTORY_MAX_BACKOFF_SECS: u64 = 120;
78const SLACK_HISTORY_MAX_JITTER_MS: u64 = 500;
79const SLACK_SOCKET_MODE_INITIAL_BACKOFF_SECS: u64 = 3;
80const SLACK_SOCKET_MODE_MAX_BACKOFF_SECS: u64 = 120;
81const SLACK_SOCKET_MODE_MAX_JITTER_MS: u64 = 500;
82const SLACK_USER_CACHE_TTL_SECS: u64 = 6 * 60 * 60;
83const SLACK_ATTACHMENT_IMAGE_MAX_BYTES: usize = 5 * 1024 * 1024;
84const SLACK_ATTACHMENT_IMAGE_INLINE_FALLBACK_MAX_BYTES: usize = 512 * 1024;
85const SLACK_ATTACHMENT_TEXT_DOWNLOAD_MAX_BYTES: usize = 256 * 1024;
86const SLACK_ATTACHMENT_TEXT_INLINE_MAX_CHARS: usize = 12_000;
87const SLACK_MARKDOWN_BLOCK_MAX_CHARS: usize = 12_000;
88const SLACK_BLOCK_TEXT_MAX_CHARS: usize = 3_000;
89const SLACK_MAX_BLOCKS_PER_MESSAGE: usize = 50;
90const SLACK_ATTACHMENT_FILENAME_MAX_CHARS: usize = 128;
91const SLACK_USER_CACHE_MAX_ENTRIES: usize = 1000;
92const SLACK_ATTACHMENT_SAVE_SUBDIR: &str = "slack_files";
93const SLACK_ATTACHMENT_MAX_FILES_PER_MESSAGE: usize = 8;
94const SLACK_PERMALINK_MAX_LINKS_PER_MESSAGE: usize = 3;
95const SLACK_PERMALINK_THREAD_MAX_REPLIES: usize = 20;
96const SLACK_PERMALINK_TEXT_MAX_CHARS: usize = 8_000;
97
98#[derive(Debug, Clone, PartialEq, Eq)]
99struct SlackPermalinkRef {
100 url: String,
101 channel_id: String,
102 message_ts: String,
103 thread_ts_hint: Option<String>,
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107enum SlackPermalinkLookup {
108 Message(serde_json::Value),
109 AccessDenied(String),
110 NotFound,
111}
112
113fn extract_slack_ts(message_id: &str) -> &str {
119 message_id
120 .strip_prefix("slack_")
121 .and_then(|rest| {
122 rest.find('.').map(|dot_pos| {
123 let underscore = rest[..dot_pos].rfind('_').unwrap_or(0);
124 &rest[underscore + 1..]
125 })
126 })
127 .unwrap_or(message_id)
128}
129
130fn unicode_emoji_to_slack_name(emoji: &str) -> &str {
135 match emoji {
136 "\u{1F440}" => "eyes", "\u{2705}" => "white_check_mark", "\u{26A0}\u{FE0F}" | "\u{26A0}" => "warning", "\u{274C}" => "x", "\u{1F44D}" => "thumbsup", "\u{1F44E}" => "thumbsdown", "\u{2B50}" => "star", "\u{1F389}" => "tada", "\u{1F914}" => "thinking_face", "\u{1F525}" => "fire", _ => emoji.trim_matches(':'),
147 }
148}
149const SLACK_DRAFT_UPDATE_INTERVAL_MS: u64 = 1200;
152
153const SLACK_MESSAGE_MAX_CHARS: usize = 40_000;
155
156const LAZY_DRAFT_PREFIX: &str = "lazy:";
158
159const SLACK_ATTACHMENT_RENDER_CONCURRENCY: usize = 3;
160const SLACK_POLL_ACTIVE_THREAD_MAX: usize = 50;
161const SLACK_POLL_THREAD_EXPIRE_SECS: u64 = 24 * 60 * 60;
162const SLACK_MEDIA_REDIRECT_MAX_HOPS: usize = 5;
163const SLACK_ALLOWED_MEDIA_HOST_SUFFIXES: &[&str] =
164 &["slack.com", "slack-edge.com", "slack-files.com"];
165const SLACK_SUPPORTED_IMAGE_MIME_TYPES: &[&str] = &[
166 "image/png",
167 "image/jpeg",
168 "image/webp",
169 "image/gif",
170 "image/bmp",
171];
172
173impl SlackChannel {
174 pub fn new(
175 bot_token: String,
176 app_token: Option<String>,
177 channel_ids: Vec<String>,
178 alias: impl Into<String>,
179 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
180 ) -> Self {
181 Self {
182 bot_token,
183 app_token,
184 channel_ids,
185 alias: alias.into(),
186 peer_resolver,
187 thread_replies: true,
188 mention_only: false,
189 strict_mention_in_thread: false,
190 group_reply_allowed_sender_ids: Vec::new(),
191 user_display_name_cache: Mutex::new(HashMap::new()),
192 workspace_dir: None,
193 active_assistant_thread: Mutex::new(HashMap::new()),
194 use_markdown_blocks: false,
195 proxy_url: None,
196 transcription: None,
197 transcription_manager: None,
198 stream_drafts: false,
199 draft_update_interval_ms: SLACK_DRAFT_UPDATE_INTERVAL_MS,
200 last_draft_edit: Mutex::new(HashMap::new()),
201 lazy_draft_ts: tokio::sync::Mutex::new(HashMap::new()),
202 cancel_reaction: None,
203 pending_approvals: Arc::new(AsyncMutex::new(HashMap::new())),
204 approval_timeout_secs: 300,
205 cached_bot_user_id: Mutex::new(None),
206 }
207 }
208
209 pub fn alias(&self) -> &str {
212 &self.alias
213 }
214
215 async fn cache_bot_user_id(&self) {
220 if let Ok(guard) = self.cached_bot_user_id.lock()
221 && guard.is_some()
222 {
223 return;
224 }
225 if let Some(uid) = self.get_bot_user_id().await
226 && let Ok(mut guard) = self.cached_bot_user_id.lock()
227 {
228 *guard = Some(uid);
229 }
230 }
231
232 pub fn with_group_reply_policy(
234 mut self,
235 mention_only: bool,
236 allowed_sender_ids: Vec<String>,
237 ) -> Self {
238 self.mention_only = mention_only;
239 self.group_reply_allowed_sender_ids =
240 Self::normalize_group_reply_allowed_sender_ids(allowed_sender_ids);
241 self
242 }
243
244 pub fn with_thread_replies(mut self, thread_replies: bool) -> Self {
246 self.thread_replies = thread_replies;
247 self
248 }
249
250 pub fn with_strict_mention_in_thread(mut self, strict: bool) -> Self {
254 self.strict_mention_in_thread = strict;
255 self
256 }
257
258 pub fn with_workspace_dir(mut self, dir: PathBuf) -> Self {
260 self.workspace_dir = Some(dir);
261 self
262 }
263
264 pub fn with_markdown_blocks(mut self, enabled: bool) -> Self {
268 self.use_markdown_blocks = enabled;
269 self
270 }
271
272 pub fn with_proxy_url(mut self, proxy_url: Option<String>) -> Self {
273 self.proxy_url = proxy_url;
274 self
275 }
276
277 pub fn with_approval_timeout_secs(mut self, secs: u64) -> Self {
278 self.approval_timeout_secs = secs;
279 self
280 }
281
282 pub fn with_transcription(
284 mut self,
285 config: zeroclaw_config::schema::TranscriptionConfig,
286 ) -> Self {
287 if !config.enabled {
288 return self;
289 }
290 match super::transcription::TranscriptionManager::new(&config) {
291 Ok(m) => {
292 self.transcription_manager = Some(std::sync::Arc::new(m));
293 self.transcription = Some(config);
294 }
295 Err(e) => {
296 ::zeroclaw_log::record!(
297 WARN,
298 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
299 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
300 .with_attrs(::serde_json::json!({"e": e.to_string()})),
301 "transcription manager init failed, voice transcription disabled"
302 );
303 }
304 }
305 self
306 }
307
308 pub fn with_streaming(mut self, enabled: bool, interval_ms: u64) -> Self {
310 self.stream_drafts = enabled;
311 if interval_ms > 0 {
312 self.draft_update_interval_ms = interval_ms;
313 }
314 self
315 }
316
317 pub fn with_cancel_reaction(mut self, reaction: Option<String>) -> Self {
319 self.cancel_reaction = reaction;
320 self
321 }
322
323 async fn delete_message(&self, channel_id: &str, ts: &str) -> anyhow::Result<()> {
325 let body = serde_json::json!({
326 "channel": channel_id,
327 "ts": ts,
328 });
329
330 let resp = self
331 .http_client()
332 .post("https://slack.com/api/chat.delete")
333 .bearer_auth(&self.bot_token)
334 .json(&body)
335 .send()
336 .await?;
337
338 let resp_body: serde_json::Value = resp.json().await?;
339 if resp_body.get("ok") != Some(&serde_json::Value::Bool(true)) {
340 let err = resp_body
341 .get("error")
342 .and_then(|e| e.as_str())
343 .unwrap_or("unknown");
344 ::zeroclaw_log::record!(
345 DEBUG,
346 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
347 .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
348 "chat.delete failed"
349 );
350 }
351
352 Ok(())
353 }
354
355 async fn resolve_draft_ts(&self, message_id: &str) -> Option<String> {
360 if !message_id.starts_with(LAZY_DRAFT_PREFIX) {
361 return Some(message_id.to_string());
362 }
363 self.lazy_draft_ts.lock().await.get(message_id).cloned()
364 }
365
366 async fn materialize_lazy_draft(
369 &self,
370 lazy_id: &str,
371 text: &str,
372 ) -> anyhow::Result<Option<String>> {
373 let rest = lazy_id.strip_prefix(LAZY_DRAFT_PREFIX).unwrap_or(lazy_id);
375 let (channel_id, thread_ts) = match rest.find(':') {
376 Some(pos) => {
377 let ts = &rest[pos + 1..];
378 (&rest[..pos], if ts.is_empty() { None } else { Some(ts) })
379 }
380 None => (rest, None),
381 };
382
383 let mut body = serde_json::json!({
384 "channel": channel_id,
385 "text": text,
386 });
387 if text.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
388 body["blocks"] = serde_json::json!([{
389 "type": "markdown",
390 "text": text
391 }]);
392 }
393 if let Some(ts) = thread_ts {
394 body["thread_ts"] = serde_json::json!(ts);
395 }
396
397 let resp = self
398 .http_client()
399 .post("https://slack.com/api/chat.postMessage")
400 .bearer_auth(&self.bot_token)
401 .json(&body)
402 .send()
403 .await?;
404
405 let resp_body: serde_json::Value = resp.json().await?;
406 if resp_body.get("ok") != Some(&serde_json::Value::Bool(true)) {
407 let err = resp_body
408 .get("error")
409 .and_then(|e| e.as_str())
410 .unwrap_or("unknown");
411 anyhow::bail!("chat.postMessage (lazy draft) failed: {err}");
412 }
413
414 let ts = resp_body
415 .get("ts")
416 .and_then(|v| v.as_str())
417 .map(ToString::to_string);
418
419 if let Some(ref real_ts) = ts {
420 self.lazy_draft_ts
421 .lock()
422 .await
423 .insert(lazy_id.to_string(), real_ts.clone());
424 }
425
426 Ok(ts)
427 }
428
429 async fn set_assistant_status(&self, channel_id: &str, status: &str) {
431 let thread_ts = {
432 let map = match self.active_assistant_thread.lock() {
433 Ok(m) => m,
434 Err(_) => return,
435 };
436 match map.get(channel_id) {
437 Some(ts) => ts.clone(),
438 None => return,
439 }
440 };
441
442 let body = serde_json::json!({
443 "channel_id": channel_id,
444 "thread_ts": thread_ts,
445 "status": status,
446 });
447
448 let _ = self
449 .http_client()
450 .post("https://slack.com/api/assistant.threads.setStatus")
451 .bearer_auth(&self.bot_token)
452 .json(&body)
453 .send()
454 .await;
455 }
456
457 fn http_client(&self) -> reqwest::Client {
458 zeroclaw_config::schema::build_channel_proxy_client_with_timeouts(
459 "channel.slack",
460 self.proxy_url.as_deref(),
461 30,
462 10,
463 )
464 }
465
466 pub async fn post_message(&self, channel: &str, text: &str) -> anyhow::Result<String> {
472 let body = serde_json::json!({
473 "channel": channel,
474 "text": text,
475 });
476
477 let resp = self
478 .http_client()
479 .post("https://slack.com/api/chat.postMessage")
480 .bearer_auth(&self.bot_token)
481 .json(&body)
482 .send()
483 .await?;
484
485 let status = resp.status();
486 let raw = resp
487 .text()
488 .await
489 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
490
491 if !status.is_success() {
492 let sanitized = zeroclaw_providers::sanitize_api_error(&raw);
493 anyhow::bail!("chat.postMessage failed ({status}): {sanitized}");
494 }
495
496 let parsed: serde_json::Value = serde_json::from_str(&raw).unwrap_or_default();
497 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
498 let err = parsed
499 .get("error")
500 .and_then(|e| e.as_str())
501 .unwrap_or("unknown");
502 anyhow::bail!("chat.postMessage failed: {err}");
503 }
504
505 parsed
506 .get("ts")
507 .and_then(|v| v.as_str())
508 .map(String::from)
509 .ok_or_else(|| {
510 ::zeroclaw_log::record!(
511 WARN,
512 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
513 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
514 .with_attrs(
515 ::serde_json::json!({"field": "ts", "api": "chat.postMessage"})
516 ),
517 "slack: chat.postMessage response missing ts"
518 );
519 anyhow::Error::msg("chat.postMessage response missing 'ts'")
520 })
521 }
522
523 pub async fn update_message(&self, channel: &str, ts: &str, text: &str) -> anyhow::Result<()> {
528 let body = serde_json::json!({
529 "channel": channel,
530 "ts": ts,
531 "text": text,
532 });
533
534 let resp = self
535 .http_client()
536 .post("https://slack.com/api/chat.update")
537 .bearer_auth(&self.bot_token)
538 .json(&body)
539 .send()
540 .await?;
541
542 let status = resp.status();
543 let raw = resp
544 .text()
545 .await
546 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
547
548 if !status.is_success() {
549 let sanitized = zeroclaw_providers::sanitize_api_error(&raw);
550 anyhow::bail!("chat.update failed ({status}): {sanitized}");
551 }
552
553 let parsed: serde_json::Value = serde_json::from_str(&raw).unwrap_or_default();
554 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
555 let err = parsed
556 .get("error")
557 .and_then(|e| e.as_str())
558 .unwrap_or("unknown");
559 anyhow::bail!("chat.update failed: {err}");
560 }
561
562 Ok(())
563 }
564
565 fn is_user_allowed(&self, user_id: &str) -> bool {
569 let peers = (self.peer_resolver)();
570 crate::allowlist::is_user_allowed(&peers, user_id, crate::allowlist::Match::Sensitive)
571 }
572
573 fn is_group_sender_trigger_enabled(&self, user_id: &str) -> bool {
574 let user_id = user_id.trim();
575 if user_id.is_empty() {
576 return false;
577 }
578
579 self.group_reply_allowed_sender_ids
580 .iter()
581 .any(|entry| entry == "*" || entry == user_id)
582 }
583
584 fn outbound_thread_ts<'a>(&self, message: &'a SendMessage) -> Option<&'a str> {
585 if self.thread_replies {
586 message.thread_ts.as_deref()
587 } else {
588 None
589 }
590 }
591
592 async fn get_bot_user_id(&self) -> Option<String> {
594 let resp: serde_json::Value = self
595 .http_client()
596 .get("https://slack.com/api/auth.test")
597 .bearer_auth(&self.bot_token)
598 .send()
599 .await
600 .ok()?
601 .json()
602 .await
603 .ok()?;
604
605 resp.get("user_id")
606 .and_then(|u| u.as_str())
607 .map(String::from)
608 }
609
610 fn inbound_thread_ts(msg: &serde_json::Value, ts: &str) -> Option<String> {
613 msg.get("thread_ts")
614 .and_then(|t| t.as_str())
615 .or(if ts.is_empty() { None } else { Some(ts) })
616 .map(str::to_string)
617 }
618
619 fn inbound_thread_ts_genuine_only(msg: &serde_json::Value) -> Option<String> {
625 msg.get("thread_ts")
626 .and_then(|t| t.as_str())
627 .map(str::to_string)
628 }
629
630 fn inbound_interruption_scope_id(msg: &serde_json::Value, ts: &str) -> Option<String> {
641 msg.get("thread_ts")
642 .and_then(|t| t.as_str())
643 .filter(|&t| t != ts)
644 .map(str::to_string)
645 }
646
647 fn normalized_channel_id(input: Option<&str>) -> Option<String> {
648 input
649 .map(str::trim)
650 .filter(|v| !v.is_empty() && *v != "*")
651 .map(ToOwned::to_owned)
652 }
653
654 fn scoped_channel_ids(&self) -> Option<Vec<String>> {
657 let mut seen = HashSet::new();
658 let ids: Vec<String> = self
659 .channel_ids
660 .iter()
661 .filter_map(|entry| Self::normalized_channel_id(Some(entry)))
662 .filter(|id| seen.insert(id.clone()))
663 .collect();
664 if ids.is_empty() { None } else { Some(ids) }
665 }
666
667 fn configured_app_token(&self) -> Option<String> {
668 self.app_token
669 .as_deref()
670 .map(str::trim)
671 .filter(|value| !value.is_empty())
672 .map(ToOwned::to_owned)
673 }
674
675 fn normalize_group_reply_allowed_sender_ids(sender_ids: Vec<String>) -> Vec<String> {
676 let mut normalized = sender_ids
677 .into_iter()
678 .map(|entry| entry.trim().to_string())
679 .filter(|entry| !entry.is_empty())
680 .collect::<Vec<_>>();
681 normalized.sort();
682 normalized.dedup();
683 normalized
684 }
685
686 fn user_cache_ttl() -> Duration {
687 Duration::from_secs(SLACK_USER_CACHE_TTL_SECS)
688 }
689
690 fn sanitize_display_name(name: &str) -> Option<String> {
691 let trimmed = name.trim();
692 if trimmed.is_empty() {
693 None
694 } else {
695 Some(trimmed.to_string())
696 }
697 }
698
699 fn extract_user_display_name(payload: &serde_json::Value) -> Option<String> {
700 let user = payload.get("user")?;
701 let profile = user.get("profile");
702
703 let candidates = [
704 profile
705 .and_then(|p| p.get("display_name"))
706 .and_then(|v| v.as_str()),
707 profile
708 .and_then(|p| p.get("display_name_normalized"))
709 .and_then(|v| v.as_str()),
710 profile
711 .and_then(|p| p.get("real_name_normalized"))
712 .and_then(|v| v.as_str()),
713 profile
714 .and_then(|p| p.get("real_name"))
715 .and_then(|v| v.as_str()),
716 user.get("real_name").and_then(|v| v.as_str()),
717 user.get("name").and_then(|v| v.as_str()),
718 ];
719
720 for candidate in candidates.into_iter().flatten() {
721 if let Some(display_name) = Self::sanitize_display_name(candidate) {
722 return Some(display_name);
723 }
724 }
725
726 None
727 }
728
729 fn cached_sender_display_name(&self, user_id: &str) -> Option<String> {
730 let now = Instant::now();
731 let Ok(mut cache) = self.user_display_name_cache.lock() else {
732 return None;
733 };
734
735 if let Some(entry) = cache.get(user_id)
736 && now <= entry.expires_at
737 {
738 return Some(entry.display_name.clone());
739 }
740
741 cache.remove(user_id);
742 None
743 }
744
745 fn cache_sender_display_name(&self, user_id: &str, display_name: &str) {
746 let Ok(mut cache) = self.user_display_name_cache.lock() else {
747 return;
748 };
749 if cache.len() >= SLACK_USER_CACHE_MAX_ENTRIES {
750 let now = Instant::now();
751 cache.retain(|_, v| v.expires_at > now);
752 }
753 cache.insert(
754 user_id.to_string(),
755 CachedSlackDisplayName {
756 display_name: display_name.to_string(),
757 expires_at: Instant::now() + Self::user_cache_ttl(),
758 },
759 );
760 }
761
762 async fn fetch_sender_display_name(&self, user_id: &str) -> Option<String> {
763 let resp = match self
764 .http_client()
765 .get("https://slack.com/api/users.info")
766 .bearer_auth(&self.bot_token)
767 .query(&[("user", user_id)])
768 .send()
769 .await
770 {
771 Ok(response) => response,
772 Err(err) => {
773 ::zeroclaw_log::record!(
774 WARN,
775 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
776 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
777 .with_attrs(
778 ::serde_json::json!({"error": format!("{}", err), "user_id": user_id})
779 ),
780 "users.info request failed for"
781 );
782 return None;
783 }
784 };
785
786 let status = resp.status();
787 let body = resp
788 .text()
789 .await
790 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
791
792 if !status.is_success() {
793 let sanitized = zeroclaw_providers::sanitize_api_error(&body);
794 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"user_id": user_id, "status": status.to_string(), "sanitized": sanitized})), "users.info failed for");
795 return None;
796 }
797
798 let payload: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
799 if payload.get("ok") == Some(&serde_json::Value::Bool(false)) {
800 let err = payload
801 .get("error")
802 .and_then(|e| e.as_str())
803 .unwrap_or("unknown");
804 ::zeroclaw_log::record!(
805 WARN,
806 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
807 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
808 .with_attrs(
809 ::serde_json::json!({"error": format!("{}", err), "user_id": user_id})
810 ),
811 "users.info returned error for"
812 );
813 return None;
814 }
815
816 Self::extract_user_display_name(&payload)
817 }
818
819 async fn resolve_sender_identity(&self, user_id: &str) -> String {
820 let user_id = user_id.trim();
821 if user_id.is_empty() {
822 return String::new();
823 }
824
825 if let Some(display_name) = self.cached_sender_display_name(user_id) {
826 return display_name;
827 }
828
829 if let Some(display_name) = self.fetch_sender_display_name(user_id).await {
830 self.cache_sender_display_name(user_id, &display_name);
831 return display_name;
832 }
833
834 user_id.to_string()
835 }
836
837 fn is_group_channel_id(channel_id: &str) -> bool {
838 matches!(channel_id.chars().next(), Some('C' | 'G'))
839 }
840
841 fn contains_bot_mention(text: &str, bot_user_id: &str) -> bool {
842 if bot_user_id.is_empty() {
843 return false;
844 }
845 text.contains(&format!("<@{bot_user_id}>"))
846 }
847
848 fn normalize_incoming_text(
849 text: &str,
850 require_mention: bool,
851 bot_user_id: &str,
852 ) -> Option<String> {
853 if require_mention && !Self::contains_bot_mention(text, bot_user_id) {
854 return None;
855 }
856 Some(text.trim().to_string())
857 }
858
859 #[cfg(test)]
860 fn normalize_incoming_content(
861 text: &str,
862 require_mention: bool,
863 bot_user_id: &str,
864 ) -> Option<String> {
865 let normalized = Self::normalize_incoming_text(text, require_mention, bot_user_id)?;
866 if normalized.is_empty() {
867 return None;
868 }
869 Some(normalized)
870 }
871
872 fn is_supported_message_subtype(subtype: Option<&str>) -> bool {
873 matches!(subtype, None | Some("file_share" | "thread_broadcast"))
874 }
875
876 fn compose_incoming_content(text: String, attachment_blocks: Vec<String>) -> Option<String> {
877 let mut sections = Vec::new();
878 if !text.trim().is_empty() {
879 sections.push(text.trim().to_string());
880 }
881 for block in attachment_blocks {
882 if !block.trim().is_empty() {
883 sections.push(block);
884 }
885 }
886
887 if sections.is_empty() {
888 None
889 } else {
890 Some(sections.join("\n\n"))
891 }
892 }
893
894 async fn build_incoming_content(
895 &self,
896 message: &serde_json::Value,
897 require_mention: bool,
898 bot_user_id: &str,
899 ) -> Option<String> {
900 let text = message
901 .get("text")
902 .and_then(|value| value.as_str())
903 .unwrap_or_default();
904 let normalized_text = Self::normalize_incoming_text(text, require_mention, bot_user_id)?;
905 let attachment_blocks = self.render_file_attachments(message).await;
906 let permalink_blocks = self.resolve_permalink_blocks(&normalized_text).await;
907 let mut blocks = attachment_blocks;
908 blocks.extend(permalink_blocks);
909 Self::compose_incoming_content(normalized_text, blocks)
910 }
911
912 async fn resolve_permalink_blocks(&self, text: &str) -> Vec<String> {
913 let permalinks = Self::extract_slack_permalinks(text);
914 if permalinks.is_empty() {
915 return Vec::new();
916 }
917 let tasks = permalinks
918 .into_iter()
919 .map(|permalink| async move { self.resolve_slack_permalink(&permalink).await });
920
921 futures_util::stream::iter(tasks)
922 .buffer_unordered(SLACK_ATTACHMENT_RENDER_CONCURRENCY)
923 .filter_map(|block| async move { block })
924 .collect()
925 .await
926 }
927
928 fn extract_slack_permalinks(text: &str) -> Vec<SlackPermalinkRef> {
929 let mut permalinks = Vec::new();
930 let mut seen = HashSet::new();
931
932 for token in text.split_whitespace() {
933 if permalinks.len() >= SLACK_PERMALINK_MAX_LINKS_PER_MESSAGE {
934 break;
935 }
936
937 let Some(url) = Self::extract_url_token(token) else {
938 continue;
939 };
940 let Some(permalink) = Self::parse_slack_permalink(&url) else {
941 continue;
942 };
943 if seen.insert((permalink.channel_id.clone(), permalink.message_ts.clone())) {
944 permalinks.push(permalink);
945 }
946 }
947
948 permalinks
949 }
950
951 fn extract_url_token(token: &str) -> Option<String> {
952 let trimmed = token.trim();
953 if trimmed.is_empty() {
954 return None;
955 }
956
957 let candidate = if trimmed.starts_with('<') && trimmed.ends_with('>') {
958 trimmed
959 .trim_start_matches('<')
960 .trim_end_matches('>')
961 .split('|')
962 .next()
963 .unwrap_or_default()
964 .trim()
965 } else {
966 trimmed.trim_matches(|ch: char| {
967 matches!(
968 ch,
969 '(' | ')' | '[' | ']' | '{' | '}' | '"' | '\'' | ',' | ';'
970 )
971 })
972 };
973
974 if candidate.starts_with("https://") || candidate.starts_with("http://") {
975 Some(candidate.to_string())
976 } else {
977 None
978 }
979 }
980
981 fn parse_slack_permalink(raw_url: &str) -> Option<SlackPermalinkRef> {
982 let url = reqwest::Url::parse(raw_url).ok()?;
983 let host = url.host_str()?.trim_end_matches('.').to_ascii_lowercase();
984 if host != "slack.com" && !host.ends_with(".slack.com") {
985 return None;
986 }
987
988 let mut segments = url.path_segments()?;
989 let first = segments.next()?;
990 let second = segments.next()?;
991 let third = segments.next()?;
992 if first != "archives" || segments.next().is_some() {
993 return None;
994 }
995
996 let channel_id = second.trim();
997 if channel_id.is_empty() {
998 return None;
999 }
1000
1001 let message_ts = Self::parse_slack_permalink_ts(third)?;
1002 let thread_ts_hint = url
1003 .query_pairs()
1004 .find(|(key, _)| key == "thread_ts")
1005 .map(|(_, value)| value.trim().to_string())
1006 .filter(|value| Self::is_valid_slack_ts(value));
1007
1008 Some(SlackPermalinkRef {
1009 url: raw_url.to_string(),
1010 channel_id: channel_id.to_string(),
1011 message_ts,
1012 thread_ts_hint,
1013 })
1014 }
1015
1016 fn parse_slack_permalink_ts(segment: &str) -> Option<String> {
1017 let digits = segment.strip_prefix('p')?.trim();
1018 if digits.len() <= 6 || !digits.chars().all(|ch| ch.is_ascii_digit()) {
1019 return None;
1020 }
1021
1022 let (secs, micros) = digits.split_at(digits.len() - 6);
1023 Some(format!("{secs}.{micros}"))
1024 }
1025
1026 fn is_valid_slack_ts(ts: &str) -> bool {
1027 let Some((secs, micros)) = ts.split_once('.') else {
1028 return false;
1029 };
1030 !secs.is_empty()
1031 && micros.len() == 6
1032 && secs.chars().all(|ch| ch.is_ascii_digit())
1033 && micros.chars().all(|ch| ch.is_ascii_digit())
1034 }
1035
1036 async fn resolve_slack_permalink(&self, permalink: &SlackPermalinkRef) -> Option<String> {
1037 let message_lookup = self
1038 .fetch_permalink_message(&permalink.channel_id, &permalink.message_ts)
1039 .await;
1040 let message = match message_lookup {
1041 SlackPermalinkLookup::Message(message) => message,
1042 SlackPermalinkLookup::AccessDenied(reason) => {
1043 return Some(Self::format_permalink_access_denied(permalink, &reason));
1044 }
1045 SlackPermalinkLookup::NotFound => {
1046 let thread_ts = permalink.thread_ts_hint.as_deref()?;
1047 let replies = self
1048 .fetch_thread_messages_with_retry(&permalink.channel_id, thread_ts)
1049 .await?;
1050 let target = replies.into_iter().find(|reply| {
1051 reply.get("ts").and_then(|value| value.as_str())
1052 == Some(permalink.message_ts.as_str())
1053 });
1054 let target = target?;
1055 return self
1056 .format_permalink_context(permalink, target, Some(thread_ts))
1057 .await;
1058 }
1059 };
1060
1061 let thread_ts = message
1062 .get("thread_ts")
1063 .and_then(|value| value.as_str())
1064 .filter(|thread_ts| Self::is_valid_slack_ts(thread_ts))
1065 .map(str::to_string);
1066
1067 self.format_permalink_context(permalink, message, thread_ts.as_deref())
1068 .await
1069 }
1070
1071 async fn fetch_permalink_message(
1072 &self,
1073 channel_id: &str,
1074 message_ts: &str,
1075 ) -> SlackPermalinkLookup {
1076 let resp = match self
1077 .http_client()
1078 .get("https://slack.com/api/conversations.history")
1079 .bearer_auth(&self.bot_token)
1080 .query(&[
1081 ("channel", channel_id),
1082 ("oldest", message_ts),
1083 ("latest", message_ts),
1084 ("inclusive", "true"),
1085 ("limit", "1"),
1086 ])
1087 .send()
1088 .await
1089 {
1090 Ok(response) => response,
1091 Err(err) => {
1092 ::zeroclaw_log::record!(
1093 WARN,
1094 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1095 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1096 &format!(
1097 "Slack permalink resolver: conversations.history request failed for channel={} ts={}: {}",
1098 channel_id, message_ts, err
1099 )
1100 );
1101 return SlackPermalinkLookup::NotFound;
1102 }
1103 };
1104
1105 let status = resp.status();
1106 let body = resp
1107 .text()
1108 .await
1109 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
1110 if !status.is_success() {
1111 let sanitized = zeroclaw_providers::sanitize_api_error(&body);
1112 ::zeroclaw_log::record!(
1113 WARN,
1114 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1115 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1116 &format!(
1117 "Slack permalink resolver: conversations.history failed for channel={} ts={} ({}): {}",
1118 channel_id, message_ts, status, sanitized
1119 )
1120 );
1121 return SlackPermalinkLookup::NotFound;
1122 }
1123
1124 let payload: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
1125 if payload.get("ok") == Some(&serde_json::Value::Bool(false)) {
1126 let err = payload
1127 .get("error")
1128 .and_then(|value| value.as_str())
1129 .unwrap_or("unknown");
1130 return match err {
1131 "not_in_channel" => SlackPermalinkLookup::AccessDenied(
1132 "The Slack bot is not in that channel. Invite the app to the channel and try again."
1133 .to_string(),
1134 ),
1135 "missing_scope" => SlackPermalinkLookup::AccessDenied(
1136 "The Slack app is missing the scope needed to read that channel."
1137 .to_string(),
1138 ),
1139 _ => {
1140 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), &format!("Slack permalink resolver: conversations.history returned error for channel={} ts={}: {}", channel_id, message_ts, err));
1141 SlackPermalinkLookup::NotFound
1142 }
1143 };
1144 }
1145
1146 let messages = payload
1147 .get("messages")
1148 .and_then(|messages| messages.as_array())
1149 .cloned()
1150 .unwrap_or_default();
1151 messages
1152 .first()
1153 .cloned()
1154 .map(SlackPermalinkLookup::Message)
1155 .unwrap_or(SlackPermalinkLookup::NotFound)
1156 }
1157
1158 fn format_permalink_access_denied(permalink: &SlackPermalinkRef, reason: &str) -> String {
1159 format!(
1160 "[Slack Link Access]\nURL: {}\nStatus: {}",
1161 permalink.url, reason
1162 )
1163 }
1164
1165 async fn fetch_thread_messages_with_retry(
1166 &self,
1167 channel_id: &str,
1168 thread_ts: &str,
1169 ) -> Option<Vec<serde_json::Value>> {
1170 let payload = self
1171 .fetch_thread_replies_with_retry(channel_id, thread_ts, "0")
1172 .await?;
1173 let messages = payload
1174 .get("messages")
1175 .and_then(|messages| messages.as_array())
1176 .cloned()
1177 .unwrap_or_default();
1178 Some(messages)
1179 }
1180
1181 async fn format_permalink_context(
1182 &self,
1183 permalink: &SlackPermalinkRef,
1184 message: serde_json::Value,
1185 thread_ts: Option<&str>,
1186 ) -> Option<String> {
1187 let mut lines = vec![
1188 "[Slack Link Context]".to_string(),
1189 format!("URL: {}", permalink.url),
1190 ];
1191
1192 if let Some(thread_ts) = thread_ts {
1193 let replies = self
1194 .fetch_thread_messages_with_retry(&permalink.channel_id, thread_ts)
1195 .await
1196 .unwrap_or_else(|| vec![message.clone()]);
1197 let rendered = self
1198 .render_permalink_thread_messages(&replies, &permalink.message_ts)
1199 .await;
1200 if rendered.is_empty() {
1201 return None;
1202 }
1203 lines.push("Thread:".to_string());
1204 lines.extend(rendered);
1205 } else {
1206 let rendered = self.render_permalink_message_line(&message, true).await?;
1207 lines.push("Message:".to_string());
1208 lines.push(rendered);
1209 }
1210
1211 Self::truncate_text(&lines.join("\n"), SLACK_PERMALINK_TEXT_MAX_CHARS)
1212 }
1213
1214 async fn render_permalink_thread_messages(
1215 &self,
1216 messages: &[serde_json::Value],
1217 target_ts: &str,
1218 ) -> Vec<String> {
1219 let mut rendered = Vec::new();
1220 let total = messages.len();
1221 let start = total.saturating_sub(SLACK_PERMALINK_THREAD_MAX_REPLIES);
1222
1223 if start > 0 {
1224 rendered.push(format!("… {} earlier thread messages omitted …", start));
1225 }
1226
1227 for message in &messages[start..] {
1228 if let Some(line) = self
1229 .render_permalink_message_line(
1230 message,
1231 message.get("ts").and_then(|value| value.as_str()) == Some(target_ts),
1232 )
1233 .await
1234 {
1235 rendered.push(line);
1236 }
1237 }
1238
1239 rendered
1240 }
1241
1242 async fn render_permalink_message_line(
1243 &self,
1244 message: &serde_json::Value,
1245 highlight: bool,
1246 ) -> Option<String> {
1247 let user_id = message
1248 .get("user")
1249 .or_else(|| message.get("bot_id"))
1250 .and_then(|value| value.as_str())
1251 .unwrap_or_default();
1252 let sender = if user_id.is_empty() {
1253 "unknown".to_string()
1254 } else {
1255 self.resolve_sender_identity(user_id).await
1256 };
1257
1258 let text = message
1259 .get("text")
1260 .and_then(|value| value.as_str())
1261 .map(str::trim)
1262 .filter(|value| !value.is_empty())
1263 .unwrap_or("[no text]");
1264 let attachment_blocks = self.render_file_attachments(message).await;
1265 let content = Self::compose_incoming_content(text.to_string(), attachment_blocks)
1266 .unwrap_or_else(|| text.to_string())
1267 .replace('\n', " ");
1268 let prefix = if highlight { ">" } else { "-" };
1269 Some(format!("{prefix} {sender}: {content}"))
1270 }
1271
1272 async fn render_file_attachments(&self, message: &serde_json::Value) -> Vec<String> {
1273 let Some(files) = message.get("files").and_then(|value| value.as_array()) else {
1274 return Vec::new();
1275 };
1276
1277 if files.len() > SLACK_ATTACHMENT_MAX_FILES_PER_MESSAGE {
1278 ::zeroclaw_log::record!(
1279 WARN,
1280 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1281 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1282 &format!(
1283 "message has {} files; processing first {} only",
1284 files.len(),
1285 SLACK_ATTACHMENT_MAX_FILES_PER_MESSAGE
1286 )
1287 );
1288 }
1289
1290 let limited_files = files
1291 .iter()
1292 .take(SLACK_ATTACHMENT_MAX_FILES_PER_MESSAGE)
1293 .cloned()
1294 .collect::<Vec<_>>();
1295
1296 let tasks =
1297 limited_files
1298 .into_iter()
1299 .enumerate()
1300 .map(|(idx, raw_file)| async move {
1301 (idx, self.render_file_attachment(&raw_file).await)
1302 });
1303
1304 let mut rendered = futures_util::stream::iter(tasks)
1305 .buffer_unordered(SLACK_ATTACHMENT_RENDER_CONCURRENCY)
1306 .collect::<Vec<_>>()
1307 .await;
1308 rendered.sort_by_key(|(idx, _)| *idx);
1309 rendered
1310 .into_iter()
1311 .filter_map(|(_, block)| block)
1312 .collect()
1313 }
1314
1315 async fn render_file_attachment(&self, raw_file: &serde_json::Value) -> Option<String> {
1316 let file = self
1317 .hydrate_file_object(raw_file)
1318 .await
1319 .unwrap_or_else(|| raw_file.clone());
1320
1321 if Self::is_audio_file(&file)
1324 && let Some(transcribed) = self.try_transcribe_audio_file(&file).await
1325 {
1326 return Some(transcribed);
1327 }
1328 if Self::is_image_file(&file)
1329 && let Some(marker) = self.fetch_image_marker(&file).await
1330 {
1331 return Some(marker);
1332 }
1333
1334 let mut snippet = Self::file_text_preview(&file);
1335 if snippet.is_none() && Self::is_probably_text_file(&file) {
1336 snippet = self.download_text_snippet(&file).await;
1337 }
1338
1339 if let Some(text) = snippet
1340 && !text.trim().is_empty()
1341 {
1342 return Some(Self::format_snippet_attachment(&file, &text));
1343 }
1344
1345 Some(Self::format_attachment_summary(&file))
1346 }
1347
1348 async fn hydrate_file_object(&self, file: &serde_json::Value) -> Option<serde_json::Value> {
1349 let file_id = Self::slack_file_id(file)?;
1350 let file_access = file
1351 .get("file_access")
1352 .and_then(|value| value.as_str())
1353 .unwrap_or_default();
1354 let mode = Self::slack_file_mode(file).unwrap_or_default();
1355
1356 let requires_lookup = file_access.eq_ignore_ascii_case("check_file_info")
1357 || Self::slack_file_download_url(file).is_none()
1358 || (Self::is_probably_text_file(file) && Self::file_text_preview(file).is_none())
1359 || (mode == "snippet" && file.get("preview").is_none());
1360 if !requires_lookup {
1361 return Some(file.clone());
1362 }
1363
1364 self.fetch_file_info(file_id)
1365 .await
1366 .or_else(|| Some(file.clone()))
1367 }
1368
1369 async fn fetch_file_info(&self, file_id: &str) -> Option<serde_json::Value> {
1370 let resp = match self
1371 .http_client()
1372 .get("https://slack.com/api/files.info")
1373 .bearer_auth(&self.bot_token)
1374 .query(&[("file", file_id)])
1375 .send()
1376 .await
1377 {
1378 Ok(response) => response,
1379 Err(err) => {
1380 ::zeroclaw_log::record!(
1381 WARN,
1382 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1383 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1384 .with_attrs(
1385 ::serde_json::json!({"error": format!("{}", err), "file_id": file_id})
1386 ),
1387 "files.info request failed for"
1388 );
1389 return None;
1390 }
1391 };
1392
1393 let status = resp.status();
1394 let body = resp
1395 .text()
1396 .await
1397 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
1398 if !status.is_success() {
1399 let sanitized = zeroclaw_providers::sanitize_api_error(&body);
1400 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"file_id": file_id, "status": status.to_string(), "sanitized": sanitized})), "files.info failed for");
1401 return None;
1402 }
1403
1404 let payload: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
1405 if payload.get("ok") == Some(&serde_json::Value::Bool(false)) {
1406 let err = payload
1407 .get("error")
1408 .and_then(|value| value.as_str())
1409 .unwrap_or("unknown");
1410 ::zeroclaw_log::record!(
1411 WARN,
1412 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1413 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1414 .with_attrs(
1415 ::serde_json::json!({"error": format!("{}", err), "file_id": file_id})
1416 ),
1417 "files.info returned error for"
1418 );
1419 return None;
1420 }
1421
1422 payload.get("file").cloned()
1423 }
1424
1425 fn slack_file_id(file: &serde_json::Value) -> Option<&str> {
1426 file.get("id").and_then(|value| value.as_str())
1427 }
1428
1429 fn slack_file_name(file: &serde_json::Value) -> String {
1430 file.get("title")
1431 .and_then(|value| value.as_str())
1432 .filter(|value| !value.trim().is_empty())
1433 .or_else(|| file.get("name").and_then(|value| value.as_str()))
1434 .unwrap_or("attachment")
1435 .trim()
1436 .to_string()
1437 }
1438
1439 fn slack_file_mode(file: &serde_json::Value) -> Option<String> {
1440 file.get("mode")
1441 .and_then(|value| value.as_str())
1442 .map(|value| value.to_ascii_lowercase())
1443 }
1444
1445 fn slack_file_mime(file: &serde_json::Value) -> Option<String> {
1446 file.get("mimetype")
1447 .and_then(|value| value.as_str())
1448 .map(|value| value.to_ascii_lowercase())
1449 }
1450
1451 fn slack_file_download_url(file: &serde_json::Value) -> Option<&str> {
1452 file.get("url_private_download")
1453 .and_then(|value| value.as_str())
1454 .or_else(|| file.get("url_private").and_then(|value| value.as_str()))
1455 }
1456
1457 fn slack_image_candidate_urls(file: &serde_json::Value) -> Vec<String> {
1458 let mut urls = Vec::new();
1459 let mut seen = HashSet::new();
1460 for key in [
1461 "thumb_1024",
1462 "thumb_960",
1463 "thumb_800",
1464 "thumb_720",
1465 "thumb_480",
1466 "thumb_360",
1467 "thumb_160",
1468 "url_private_download",
1469 "url_private",
1470 ] {
1471 if let Some(url) = file.get(key).and_then(|value| value.as_str()) {
1472 let trimmed = url.trim();
1473 if trimmed.is_empty() {
1474 continue;
1475 }
1476 if seen.insert(trimmed.to_string()) {
1477 urls.push(trimmed.to_string());
1478 }
1479 }
1480 }
1481 urls
1482 }
1483
1484 fn is_allowed_slack_media_hostname(host: &str) -> bool {
1485 let normalized = host.trim().trim_end_matches('.').to_ascii_lowercase();
1486 if normalized.is_empty() {
1487 return false;
1488 }
1489
1490 SLACK_ALLOWED_MEDIA_HOST_SUFFIXES
1491 .iter()
1492 .any(|suffix| normalized == *suffix || normalized.ends_with(&format!(".{suffix}")))
1493 }
1494
1495 fn redact_slack_url(url: &reqwest::Url) -> String {
1496 let host = url.host_str().unwrap_or("unknown-host");
1497 let tail = url
1498 .path_segments()
1499 .and_then(|mut segments| {
1500 segments
1501 .rfind(|segment| !segment.is_empty())
1502 .map(str::to_string)
1503 })
1504 .unwrap_or_else(|| "root".to_string());
1505 format!("{host}/.../{tail}")
1506 }
1507
1508 fn redact_raw_slack_url(raw_url: &str) -> String {
1509 reqwest::Url::parse(raw_url)
1510 .map(|parsed| Self::redact_slack_url(&parsed))
1511 .unwrap_or_else(|_| "<invalid-url>".to_string())
1512 }
1513
1514 fn redact_redirect_location(location: &str) -> String {
1515 match reqwest::Url::parse(location) {
1516 Ok(url) => Self::redact_slack_url(&url),
1517 Err(_) => {
1518 let tail = location
1519 .split(['?', '#'])
1520 .next()
1521 .unwrap_or_default()
1522 .trim_end_matches('/')
1523 .rsplit('/')
1524 .next()
1525 .filter(|segment| !segment.is_empty())
1526 .unwrap_or("relative");
1527 format!("relative/.../{tail}")
1528 }
1529 }
1530 }
1531
1532 fn validate_slack_private_file_url(raw_url: &str) -> Option<reqwest::Url> {
1533 let parsed = match reqwest::Url::parse(raw_url) {
1534 Ok(url) => url,
1535 Err(err) => {
1536 let redacted_raw = Self::redact_raw_slack_url(raw_url);
1537 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"error": format!("{}", err), "redacted_raw": redacted_raw})), "file URL parse failed for");
1538 return None;
1539 }
1540 };
1541 let redacted = Self::redact_slack_url(&parsed);
1542
1543 if parsed.scheme() != "https" {
1544 ::zeroclaw_log::record!(
1545 WARN,
1546 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1547 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1548 &format!(
1549 "file URL rejected due to non-HTTPS scheme for {}: {}",
1550 redacted,
1551 parsed.scheme()
1552 )
1553 );
1554 return None;
1555 }
1556
1557 let Some(host) = parsed.host_str() else {
1558 ::zeroclaw_log::record!(
1559 WARN,
1560 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1561 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1562 .with_attrs(::serde_json::json!({"redacted": redacted})),
1563 "file URL rejected due to missing host"
1564 );
1565 return None;
1566 };
1567 if !Self::is_allowed_slack_media_hostname(host) {
1568 ::zeroclaw_log::record!(
1569 WARN,
1570 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1571 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1572 .with_attrs(::serde_json::json!({"redacted": redacted})),
1573 "file URL rejected due to non-Slack host"
1574 );
1575 return None;
1576 }
1577
1578 Some(parsed)
1579 }
1580
1581 fn resolve_https_redirect_target(base: &reqwest::Url, location: &str) -> Option<reqwest::Url> {
1582 let redacted_base = Self::redact_slack_url(base);
1583 let redacted_location = Self::redact_redirect_location(location);
1584 let target = match base.join(location) {
1585 Ok(url) => url,
1586 Err(err) => {
1587 ::zeroclaw_log::record!(
1588 WARN,
1589 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1590 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1591 &format!(
1592 "file redirect URL parse failed for base {} and location {}: {}",
1593 redacted_base, redacted_location, err
1594 )
1595 );
1596 return None;
1597 }
1598 };
1599 let redacted_target = Self::redact_slack_url(&target);
1600 if target.scheme() != "https" {
1601 ::zeroclaw_log::record!(
1602 WARN,
1603 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1604 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1605 &format!(
1606 "file redirect rejected due to non-HTTPS scheme for {}",
1607 redacted_target
1608 )
1609 );
1610 return None;
1611 }
1612 let Some(host) = target.host_str() else {
1613 ::zeroclaw_log::record!(
1614 WARN,
1615 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1616 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1617 &format!(
1618 "file redirect rejected due to missing host for {}",
1619 redacted_target
1620 )
1621 );
1622 return None;
1623 };
1624 if !Self::is_allowed_slack_media_hostname(host) {
1625 ::zeroclaw_log::record!(
1626 WARN,
1627 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1628 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1629 &format!(
1630 "file redirect rejected due to non-Slack host for {}",
1631 redacted_target
1632 )
1633 );
1634 return None;
1635 }
1636 Some(target)
1637 }
1638
1639 fn slack_media_http_client_no_redirect(&self) -> anyhow::Result<reqwest::Client> {
1640 let builder = zeroclaw_config::schema::apply_channel_proxy_to_builder(
1641 reqwest::Client::builder()
1642 .redirect(reqwest::redirect::Policy::none())
1643 .timeout(Duration::from_secs(30))
1644 .connect_timeout(Duration::from_secs(10)),
1645 "channel.slack",
1646 self.proxy_url.as_deref(),
1647 );
1648 builder
1649 .build()
1650 .context("failed to build Slack media no-redirect HTTP client")
1651 }
1652
1653 async fn fetch_slack_private_file(&self, raw_url: &str) -> Option<reqwest::Response> {
1654 let parsed = Self::validate_slack_private_file_url(raw_url)?;
1655 let redacted_parsed = Self::redact_slack_url(&parsed);
1656 let client = match self.slack_media_http_client_no_redirect() {
1657 Ok(client) => client,
1658 Err(err) => {
1659 ::zeroclaw_log::record!(
1660 WARN,
1661 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1662 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1663 &format!("file fetch failed for {}: {}", redacted_parsed, err)
1664 );
1665 return None;
1666 }
1667 };
1668 let mut current_url = parsed;
1669
1670 for redirect_hop in 0..=SLACK_MEDIA_REDIRECT_MAX_HOPS {
1671 let redacted_current = Self::redact_slack_url(¤t_url);
1672 let mut req = client.get(current_url.clone());
1673 if redirect_hop == 0 {
1674 req = req.bearer_auth(&self.bot_token);
1675 }
1676 let response = match req.send().await {
1677 Ok(response) => response,
1678 Err(err) => {
1679 ::zeroclaw_log::record!(
1680 WARN,
1681 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1682 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1683 &format!("file fetch failed for {}: {}", redacted_current, err)
1684 );
1685 return None;
1686 }
1687 };
1688
1689 if !response.status().is_redirection() {
1690 return Some(response);
1691 }
1692
1693 if redirect_hop == SLACK_MEDIA_REDIRECT_MAX_HOPS {
1694 ::zeroclaw_log::record!(
1695 WARN,
1696 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1697 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1698 &format!(
1699 "file redirect limit exceeded for {} after {} hops",
1700 redacted_current, SLACK_MEDIA_REDIRECT_MAX_HOPS
1701 )
1702 );
1703 return Some(response);
1704 }
1705
1706 let Some(location) = response.headers().get(reqwest::header::LOCATION) else {
1707 return Some(response);
1708 };
1709 let Ok(location) = location.to_str() else {
1710 ::zeroclaw_log::record!(
1711 WARN,
1712 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1713 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1714 &format!(
1715 "file redirect location header is not valid UTF-8 for {}",
1716 redacted_current
1717 )
1718 );
1719 return Some(response);
1720 };
1721 let Some(next_url) = Self::resolve_https_redirect_target(¤t_url, location) else {
1722 return Some(response);
1723 };
1724 current_url = next_url;
1725 }
1726
1727 None
1728 }
1729
1730 async fn fetch_image_marker(&self, file: &serde_json::Value) -> Option<String> {
1731 let file_name = Self::slack_file_name(file);
1732 let image_urls = Self::slack_image_candidate_urls(file);
1733 if image_urls.is_empty() {
1734 ::zeroclaw_log::record!(
1735 WARN,
1736 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1737 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1738 &format!(
1739 "file attachment is image-like but has no downloadable URL: {}",
1740 file_name
1741 )
1742 );
1743 return None;
1744 }
1745
1746 for url in image_urls {
1747 if let Some(marker) = self.download_private_image_as_marker(&url, file).await {
1748 return Some(marker);
1749 }
1750 }
1751
1752 ::zeroclaw_log::record!(
1753 WARN,
1754 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1755 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1756 .with_attrs(::serde_json::json!({"file_name": file_name})),
1757 "image attachment download failed for"
1758 );
1759 None
1760 }
1761
1762 async fn download_private_image_as_marker(
1763 &self,
1764 url: &str,
1765 file: &serde_json::Value,
1766 ) -> Option<String> {
1767 let redacted_url = Self::redact_raw_slack_url(url);
1768 let resp = self.fetch_slack_private_file(url).await?;
1769
1770 let status = resp.status();
1771 if !status.is_success() {
1772 let body = resp
1773 .text()
1774 .await
1775 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
1776 let sanitized = zeroclaw_providers::sanitize_api_error(&body);
1777 ::zeroclaw_log::record!(
1778 WARN,
1779 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1780 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1781 &format!(
1782 "image fetch failed for {} ({status}): {sanitized}",
1783 redacted_url
1784 )
1785 );
1786 return None;
1787 }
1788
1789 let content_type = resp
1790 .headers()
1791 .get(reqwest::header::CONTENT_TYPE)
1792 .and_then(|value| value.to_str().ok())
1793 .map(str::to_string);
1794 if let Some(content_length) = resp.content_length() {
1795 let content_length = usize::try_from(content_length).unwrap_or(usize::MAX);
1796 if content_length > SLACK_ATTACHMENT_IMAGE_MAX_BYTES {
1797 ::zeroclaw_log::record!(
1798 WARN,
1799 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1800 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1801 &format!(
1802 "image fetch skipped for {}: content-length {} exceeds {} bytes",
1803 redacted_url, content_length, SLACK_ATTACHMENT_IMAGE_MAX_BYTES
1804 )
1805 );
1806 return None;
1807 }
1808 }
1809
1810 let bytes = match resp.bytes().await {
1811 Ok(bytes) => bytes,
1812 Err(err) => {
1813 ::zeroclaw_log::record!(
1814 WARN,
1815 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1816 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1817 .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
1818 &format!("image body read failed for {}", redacted_url)
1819 );
1820 return None;
1821 }
1822 };
1823 if bytes.is_empty() {
1824 ::zeroclaw_log::record!(
1825 WARN,
1826 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1827 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1828 &format!("image body is empty for {}", redacted_url)
1829 );
1830 return None;
1831 }
1832 if bytes.len() > SLACK_ATTACHMENT_IMAGE_MAX_BYTES {
1833 ::zeroclaw_log::record!(
1834 WARN,
1835 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1836 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1837 &format!(
1838 "image body too large for {}: {} bytes exceeds {} bytes",
1839 redacted_url,
1840 bytes.len(),
1841 SLACK_ATTACHMENT_IMAGE_MAX_BYTES
1842 )
1843 );
1844 return None;
1845 }
1846
1847 let Some(mime) =
1848 Self::detect_image_mime(content_type.as_deref(), file, bytes.as_ref(), url)
1849 else {
1850 ::zeroclaw_log::record!(
1851 WARN,
1852 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1853 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1854 &format!("image MIME detection failed for {}", redacted_url)
1855 );
1856 return None;
1857 };
1858 if !Self::is_supported_image_mime(&mime) {
1859 ::zeroclaw_log::record!(
1860 WARN,
1861 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1862 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1863 &format!("image MIME not supported for {}: {mime}", redacted_url)
1864 );
1865 return None;
1866 }
1867
1868 let file_name = Self::slack_file_name(file);
1869 if let Some(saved_path) = self
1870 .persist_image_attachment(file, &file_name, &mime, bytes.as_ref())
1871 .await
1872 {
1873 return Some(format!("[IMAGE:{}]", saved_path.display()));
1874 }
1875
1876 if bytes.len() > SLACK_ATTACHMENT_IMAGE_INLINE_FALLBACK_MAX_BYTES {
1877 ::zeroclaw_log::record!(
1878 WARN,
1879 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1880 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1881 &format!(
1882 "image inline fallback skipped for {}: {} bytes exceeds {} bytes",
1883 redacted_url,
1884 bytes.len(),
1885 SLACK_ATTACHMENT_IMAGE_INLINE_FALLBACK_MAX_BYTES
1886 )
1887 );
1888 return None;
1889 }
1890
1891 let encoded = base64::engine::general_purpose::STANDARD.encode(bytes);
1892 Some(format!("[IMAGE:data:{mime};base64,{encoded}]"))
1893 }
1894
1895 fn detect_image_mime(
1896 content_type_header: Option<&str>,
1897 file: &serde_json::Value,
1898 bytes: &[u8],
1899 source_url: &str,
1900 ) -> Option<String> {
1901 let redacted_source = Self::redact_raw_slack_url(source_url);
1902 if let Some(magic_mime) = Self::mime_from_magic(bytes) {
1903 return Some(magic_mime.to_string());
1904 }
1905
1906 if let Some(header_mime) = content_type_header
1907 .and_then(Self::normalized_content_type)
1908 .filter(|mime| mime.starts_with("image/"))
1909 {
1910 ::zeroclaw_log::record!(
1911 WARN,
1912 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1913 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1914 &format!(
1915 "image MIME mismatch for {}: HTTP header claims {}, but bytes do not match a supported image signature",
1916 redacted_source, header_mime
1917 )
1918 );
1919 }
1920
1921 if let Some(file_mime) =
1922 Self::slack_file_mime(file).filter(|mime| mime.starts_with("image/"))
1923 {
1924 ::zeroclaw_log::record!(
1925 WARN,
1926 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1927 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1928 &format!(
1929 "image MIME mismatch for {}: file metadata claims {}, but bytes do not match a supported image signature",
1930 redacted_source, file_mime
1931 )
1932 );
1933 }
1934
1935 if let Some(ext) = Self::file_extension(source_url)
1936 .or_else(|| Self::file_extension(&Self::slack_file_name(file)))
1937 && let Some(mime) = Self::mime_from_extension(&ext)
1938 {
1939 ::zeroclaw_log::record!(
1940 WARN,
1941 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1942 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1943 &format!(
1944 "image MIME mismatch for {}: filename extension implies {}, but bytes do not match a supported image signature",
1945 redacted_source, mime
1946 )
1947 );
1948 }
1949
1950 None
1951 }
1952
1953 fn normalized_content_type(content_type: &str) -> Option<String> {
1954 let mime = content_type
1955 .split(';')
1956 .next()
1957 .unwrap_or_default()
1958 .trim()
1959 .to_ascii_lowercase();
1960 if mime.is_empty() { None } else { Some(mime) }
1961 }
1962
1963 fn is_supported_image_mime(mime: &str) -> bool {
1964 SLACK_SUPPORTED_IMAGE_MIME_TYPES.contains(&mime)
1965 }
1966
1967 fn mime_from_extension(ext: &str) -> Option<&'static str> {
1968 match ext.to_ascii_lowercase().as_str() {
1969 "png" => Some("image/png"),
1970 "jpg" | "jpeg" => Some("image/jpeg"),
1971 "gif" => Some("image/gif"),
1972 "webp" => Some("image/webp"),
1973 "bmp" => Some("image/bmp"),
1974 _ => None,
1975 }
1976 }
1977
1978 fn mime_from_magic(bytes: &[u8]) -> Option<&'static str> {
1979 if bytes.len() >= 8
1980 && bytes.starts_with(&[0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n'])
1981 {
1982 return Some("image/png");
1983 }
1984 if bytes.len() >= 3 && bytes.starts_with(&[0xff, 0xd8, 0xff]) {
1985 return Some("image/jpeg");
1986 }
1987 if bytes.len() >= 6 && (bytes.starts_with(b"GIF87a") || bytes.starts_with(b"GIF89a")) {
1988 return Some("image/gif");
1989 }
1990 if bytes.len() >= 12 && bytes.starts_with(b"RIFF") && &bytes[8..12] == b"WEBP" {
1991 return Some("image/webp");
1992 }
1993 if bytes.len() >= 2 && bytes.starts_with(b"BM") {
1994 return Some("image/bmp");
1995 }
1996 None
1997 }
1998
1999 async fn persist_image_attachment(
2000 &self,
2001 file: &serde_json::Value,
2002 file_name: &str,
2003 mime: &str,
2004 bytes: &[u8],
2005 ) -> Option<PathBuf> {
2006 let workspace = self.workspace_dir.as_ref()?;
2007 let safe_name = Self::sanitize_attachment_filename(file_name)
2008 .unwrap_or_else(|| "attachment".to_string());
2009 let ext = Self::image_extension_for_mime(mime).unwrap_or("png");
2010 let safe_name = Self::ensure_file_extension(&safe_name, ext);
2011 let file_id = Self::slack_file_id(file)
2012 .map(Self::sanitize_file_id)
2013 .unwrap_or_else(|| "file".to_string());
2014 let generated_name = format!(
2015 "slack_{}_{}_{}",
2016 Utc::now().timestamp_millis(),
2017 file_id,
2018 safe_name
2019 );
2020
2021 let output_path = match Self::resolve_workspace_attachment_output_path(
2022 workspace,
2023 &generated_name,
2024 )
2025 .await
2026 {
2027 Ok(path) => path,
2028 Err(err) => {
2029 ::zeroclaw_log::record!(
2030 WARN,
2031 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2032 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2033 &format!(
2034 "image attachment path resolution failed for {}: {err}",
2035 file_name
2036 )
2037 );
2038 return None;
2039 }
2040 };
2041
2042 let Some(parent_dir) = output_path.parent() else {
2043 ::zeroclaw_log::record!(
2044 WARN,
2045 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2046 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2047 &format!(
2048 "image attachment write failed for {}: missing parent directory",
2049 output_path.display()
2050 )
2051 );
2052 return None;
2053 };
2054
2055 let file_tail = output_path
2056 .file_name()
2057 .and_then(|name| name.to_str())
2058 .unwrap_or("attachment");
2059 let temp_name = format!(
2060 ".{file_tail}.{}.part",
2061 Utc::now().timestamp_nanos_opt().unwrap_or_default()
2062 );
2063 let temp_path = parent_dir.join(temp_name);
2064
2065 let mut temp_file = match tokio::fs::OpenOptions::new()
2066 .create_new(true)
2067 .write(true)
2068 .open(&temp_path)
2069 .await
2070 {
2071 Ok(file) => file,
2072 Err(err) => {
2073 ::zeroclaw_log::record!(
2074 WARN,
2075 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2076 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2077 &format!(
2078 "image attachment temp open failed for {}: {err}",
2079 temp_path.display()
2080 )
2081 );
2082 return None;
2083 }
2084 };
2085
2086 if let Err(err) = temp_file.write_all(bytes).await {
2087 ::zeroclaw_log::record!(
2088 WARN,
2089 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2090 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2091 &format!(
2092 "image attachment temp write failed for {}: {err}",
2093 temp_path.display()
2094 )
2095 );
2096 let _ = tokio::fs::remove_file(&temp_path).await;
2097 return None;
2098 }
2099 if let Err(err) = temp_file.sync_all().await {
2100 ::zeroclaw_log::record!(
2101 WARN,
2102 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2103 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2104 &format!(
2105 "image attachment temp sync failed for {}: {err}",
2106 temp_path.display()
2107 )
2108 );
2109 let _ = tokio::fs::remove_file(&temp_path).await;
2110 return None;
2111 }
2112 drop(temp_file);
2113
2114 match tokio::fs::symlink_metadata(&output_path).await {
2118 Ok(meta) if meta.file_type().is_symlink() => {
2119 ::zeroclaw_log::record!(
2120 WARN,
2121 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2122 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2123 &format!(
2124 "image attachment refused: output path is a symlink: {}",
2125 output_path.display()
2126 )
2127 );
2128 let _ = tokio::fs::remove_file(&temp_path).await;
2129 return None;
2130 }
2131 _ => {}
2132 }
2133
2134 if let Err(err) = tokio::fs::rename(&temp_path, &output_path).await {
2135 ::zeroclaw_log::record!(
2136 WARN,
2137 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2138 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2139 &format!(
2140 "image attachment finalize failed for {}: {err}",
2141 output_path.display()
2142 )
2143 );
2144 let _ = tokio::fs::remove_file(&temp_path).await;
2145 return None;
2146 }
2147
2148 Some(output_path)
2149 }
2150
2151 async fn resolve_workspace_attachment_output_path(
2152 workspace: &Path,
2153 file_name: &str,
2154 ) -> anyhow::Result<PathBuf> {
2155 let safe_name = Self::sanitize_attachment_filename(file_name).ok_or_else(|| {
2156 ::zeroclaw_log::record!(
2157 WARN,
2158 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2159 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2160 .with_attrs(::serde_json::json!({"file_name": file_name})),
2161 "invalid attachment filename"
2162 );
2163 anyhow::Error::msg(format!("invalid attachment filename: {file_name}"))
2164 })?;
2165
2166 tokio::fs::create_dir_all(workspace).await?;
2167 let workspace_root = tokio::fs::canonicalize(workspace)
2168 .await
2169 .unwrap_or_else(|_| workspace.to_path_buf());
2170
2171 let save_dir = workspace.join(SLACK_ATTACHMENT_SAVE_SUBDIR);
2172 tokio::fs::create_dir_all(&save_dir).await?;
2173 let resolved_save_dir = tokio::fs::canonicalize(&save_dir).await.with_context(|| {
2174 format!(
2175 "failed to resolve Slack attachment save directory: {}",
2176 save_dir.display()
2177 )
2178 })?;
2179
2180 if !resolved_save_dir.starts_with(&workspace_root) {
2181 anyhow::bail!(
2182 "Slack attachment save directory escapes workspace: {}",
2183 resolved_save_dir.display()
2184 );
2185 }
2186
2187 Ok(resolved_save_dir.join(safe_name))
2188 }
2189
2190 fn sanitize_attachment_filename(file_name: &str) -> Option<String> {
2191 let basename = Path::new(file_name).file_name()?.to_str()?.trim();
2192 if basename.is_empty() || basename == "." || basename == ".." {
2193 return None;
2194 }
2195
2196 let sanitized: String = basename
2197 .replace(['/', '\\'], "_")
2198 .chars()
2199 .take(SLACK_ATTACHMENT_FILENAME_MAX_CHARS)
2200 .collect();
2201 if sanitized.is_empty() || sanitized == "." || sanitized == ".." {
2202 None
2203 } else {
2204 Some(sanitized)
2205 }
2206 }
2207
2208 fn sanitize_file_id(file_id: &str) -> String {
2209 let cleaned: String = file_id
2210 .chars()
2211 .filter(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '_' | '-'))
2212 .take(64)
2213 .collect();
2214 if cleaned.is_empty() {
2215 "file".to_string()
2216 } else {
2217 cleaned
2218 }
2219 }
2220
2221 fn ensure_file_extension(file_name: &str, extension: &str) -> String {
2222 if Path::new(file_name).extension().is_some() {
2223 file_name.to_string()
2224 } else {
2225 format!("{file_name}.{extension}")
2226 }
2227 }
2228
2229 fn image_extension_for_mime(mime: &str) -> Option<&'static str> {
2230 match mime {
2231 "image/png" => Some("png"),
2232 "image/jpeg" => Some("jpg"),
2233 "image/webp" => Some("webp"),
2234 "image/gif" => Some("gif"),
2235 "image/bmp" => Some("bmp"),
2236 _ => None,
2237 }
2238 }
2239
2240 fn file_extension(value: &str) -> Option<String> {
2241 let before_query = value.split('?').next().unwrap_or(value);
2242 before_query
2243 .rsplit('/')
2244 .next()
2245 .unwrap_or(before_query)
2246 .rsplit_once('.')
2247 .map(|(_, ext)| ext.to_ascii_lowercase())
2248 }
2249
2250 fn file_text_preview(file: &serde_json::Value) -> Option<String> {
2251 let preview = file
2252 .get("preview")
2253 .and_then(|value| value.as_str())
2254 .or_else(|| {
2255 file.get("preview_highlight")
2256 .and_then(|value| value.as_str())
2257 })
2258 .or_else(|| {
2259 file.get("initial_comment")
2260 .and_then(|comment| comment.get("comment"))
2261 .and_then(|value| value.as_str())
2262 })?;
2263 Self::truncate_text(preview, SLACK_ATTACHMENT_TEXT_INLINE_MAX_CHARS)
2264 }
2265
2266 fn truncate_text(value: &str, max_chars: usize) -> Option<String> {
2267 let mut out = String::new();
2268 let mut count = 0usize;
2269 for ch in value.chars() {
2270 if count >= max_chars {
2271 break;
2272 }
2273 out.push(ch);
2274 count += 1;
2275 }
2276 let was_truncated = count >= max_chars && value.chars().nth(max_chars).is_some();
2277 let mut out = out.trim().to_string();
2278 if out.is_empty() {
2279 return None;
2280 }
2281 if was_truncated {
2282 out.push_str("\n…[truncated]");
2283 }
2284 Some(out)
2285 }
2286
2287 fn is_probably_text_file(file: &serde_json::Value) -> bool {
2288 if matches!(
2289 Self::slack_file_mode(file).as_deref(),
2290 Some("snippet" | "post")
2291 ) {
2292 return true;
2293 }
2294
2295 if Self::slack_file_mime(file)
2296 .as_deref()
2297 .is_some_and(|mime| mime.starts_with("text/"))
2298 {
2299 return true;
2300 }
2301
2302 if file
2303 .get("filetype")
2304 .and_then(|value| value.as_str())
2305 .map(|value| value.to_ascii_lowercase())
2306 .as_deref()
2307 .is_some_and(Self::is_text_filetype)
2308 {
2309 return true;
2310 }
2311
2312 Self::file_extension(&Self::slack_file_name(file))
2313 .as_deref()
2314 .is_some_and(Self::is_text_filetype)
2315 }
2316
2317 fn is_text_filetype(filetype: &str) -> bool {
2318 matches!(
2319 filetype,
2320 "txt"
2321 | "text"
2322 | "md"
2323 | "markdown"
2324 | "csv"
2325 | "tsv"
2326 | "json"
2327 | "yaml"
2328 | "yml"
2329 | "toml"
2330 | "xml"
2331 | "html"
2332 | "css"
2333 | "js"
2334 | "ts"
2335 | "jsx"
2336 | "tsx"
2337 | "py"
2338 | "rs"
2339 | "go"
2340 | "java"
2341 | "kt"
2342 | "c"
2343 | "cc"
2344 | "cpp"
2345 | "h"
2346 | "hpp"
2347 | "cs"
2348 | "php"
2349 | "rb"
2350 | "swift"
2351 | "sql"
2352 | "log"
2353 | "ini"
2354 | "conf"
2355 | "cfg"
2356 | "env"
2357 | "sh"
2358 | "bash"
2359 | "zsh"
2360 )
2361 }
2362
2363 fn is_image_file(file: &serde_json::Value) -> bool {
2364 if Self::slack_file_mime(file)
2365 .as_deref()
2366 .is_some_and(|mime| mime.starts_with("image/"))
2367 {
2368 return true;
2369 }
2370
2371 if file
2372 .get("filetype")
2373 .and_then(|value| value.as_str())
2374 .map(|value| value.to_ascii_lowercase())
2375 .as_deref()
2376 .is_some_and(|filetype| Self::mime_from_extension(filetype).is_some())
2377 {
2378 return true;
2379 }
2380
2381 Self::file_extension(&Self::slack_file_name(file))
2382 .as_deref()
2383 .is_some_and(|ext| Self::mime_from_extension(ext).is_some())
2384 }
2385
2386 const AUDIO_EXTENSIONS: &[&str] = &[
2388 "flac", "mp3", "mpeg", "mpga", "mp4", "m4a", "ogg", "oga", "opus", "wav", "webm",
2389 ];
2390
2391 fn is_audio_file(file: &serde_json::Value) -> bool {
2394 if let Some(subtype) = file.get("subtype").and_then(|v| v.as_str())
2396 && subtype == "slack_audio"
2397 {
2398 return true;
2399 }
2400
2401 if Self::slack_file_mime(file)
2402 .as_deref()
2403 .is_some_and(|mime| mime.starts_with("audio/"))
2404 {
2405 return true;
2406 }
2407
2408 if let Some(ft) = file
2409 .get("filetype")
2410 .and_then(|v| v.as_str())
2411 .map(|v| v.to_ascii_lowercase())
2412 && Self::AUDIO_EXTENSIONS.contains(&ft.as_str())
2413 {
2414 return true;
2415 }
2416
2417 Self::file_extension(&Self::slack_file_name(file))
2418 .as_deref()
2419 .is_some_and(|ext| Self::AUDIO_EXTENSIONS.contains(&ext))
2420 }
2421
2422 async fn try_transcribe_audio_file(&self, file: &serde_json::Value) -> Option<String> {
2426 let manager = self.transcription_manager.as_deref()?;
2427
2428 let url = Self::slack_file_download_url(file)?;
2429 let file_name = Self::slack_file_name(file);
2430 let redacted_url = Self::redact_raw_slack_url(url);
2431
2432 let resp = self.fetch_slack_private_file(url).await?;
2433 let status = resp.status();
2434 if !status.is_success() {
2435 ::zeroclaw_log::record!(
2436 WARN,
2437 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2438 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2439 &format!("voice file download failed for {} ({status})", redacted_url)
2440 );
2441 return None;
2442 }
2443
2444 let audio_data = match resp.bytes().await {
2445 Ok(bytes) => bytes.to_vec(),
2446 Err(e) => {
2447 ::zeroclaw_log::record!(
2448 WARN,
2449 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2450 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2451 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2452 &format!("voice file read failed for {}", redacted_url)
2453 );
2454 return None;
2455 }
2456 };
2457
2458 let transcription_filename = if Self::file_extension(&file_name).is_some() {
2460 file_name.clone()
2461 } else {
2462 let mime_ext = Self::slack_file_mime(file)
2464 .and_then(|mime| mime.rsplit('/').next().map(|s| s.to_string()))
2465 .unwrap_or_else(|| "ogg".to_string());
2466 format!("voice.{mime_ext}")
2467 };
2468
2469 match manager
2470 .transcribe(&audio_data, &transcription_filename)
2471 .await
2472 {
2473 Ok(text) => {
2474 let trimmed = text.trim();
2475 if trimmed.is_empty() {
2476 ::zeroclaw_log::record!(
2477 INFO,
2478 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
2479 "voice transcription returned empty text, skipping"
2480 );
2481 None
2482 } else {
2483 ::zeroclaw_log::record!(
2484 INFO,
2485 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
2486 &format!(
2487 "transcribed voice file {} ({} chars)",
2488 file_name,
2489 trimmed.len()
2490 )
2491 );
2492 Some(format!("[Voice] {trimmed}"))
2493 }
2494 }
2495 Err(e) => {
2496 ::zeroclaw_log::record!(
2497 WARN,
2498 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2499 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2500 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2501 &format!("voice transcription failed for {}", file_name)
2502 );
2503 Some(Self::format_attachment_summary(file))
2504 }
2505 }
2506 }
2507
2508 async fn download_text_snippet(&self, file: &serde_json::Value) -> Option<String> {
2509 let url = Self::slack_file_download_url(file)?;
2510 let redacted_url = Self::redact_raw_slack_url(url);
2511 let resp = self.fetch_slack_private_file(url).await?;
2512
2513 let status = resp.status();
2514 if !status.is_success() {
2515 let body = resp
2516 .text()
2517 .await
2518 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
2519 let sanitized = zeroclaw_providers::sanitize_api_error(&body);
2520 ::zeroclaw_log::record!(
2521 WARN,
2522 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2523 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2524 &format!(
2525 "snippet fetch failed for {} ({status}): {sanitized}",
2526 redacted_url
2527 )
2528 );
2529 return None;
2530 }
2531
2532 if let Some(content_length) = resp.content_length() {
2533 let content_length = usize::try_from(content_length).unwrap_or(usize::MAX);
2534 if content_length > SLACK_ATTACHMENT_TEXT_DOWNLOAD_MAX_BYTES {
2535 ::zeroclaw_log::record!(
2536 WARN,
2537 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2538 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2539 &format!(
2540 "snippet download skipped for {}: content-length {} exceeds {} bytes",
2541 redacted_url, content_length, SLACK_ATTACHMENT_TEXT_DOWNLOAD_MAX_BYTES
2542 )
2543 );
2544 return None;
2545 }
2546 }
2547
2548 let bytes = match resp.bytes().await {
2549 Ok(bytes) => bytes,
2550 Err(err) => {
2551 ::zeroclaw_log::record!(
2552 WARN,
2553 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2554 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2555 .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
2556 &format!("snippet body read failed for {}", redacted_url)
2557 );
2558 return None;
2559 }
2560 };
2561 if bytes.is_empty() {
2562 return None;
2563 }
2564 if bytes.len() > SLACK_ATTACHMENT_TEXT_DOWNLOAD_MAX_BYTES {
2565 ::zeroclaw_log::record!(
2566 WARN,
2567 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2568 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2569 &format!(
2570 "snippet body too large for {}: {} bytes exceeds {} bytes",
2571 redacted_url,
2572 bytes.len(),
2573 SLACK_ATTACHMENT_TEXT_DOWNLOAD_MAX_BYTES
2574 )
2575 );
2576 return None;
2577 }
2578 if bytes.contains(&0) {
2579 ::zeroclaw_log::record!(
2580 WARN,
2581 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2582 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2583 &format!("snippet body appears binary for {}", redacted_url)
2584 );
2585 return None;
2586 }
2587
2588 let text = String::from_utf8_lossy(&bytes);
2589 Self::truncate_text(&text, SLACK_ATTACHMENT_TEXT_INLINE_MAX_CHARS)
2590 }
2591
2592 fn format_snippet_attachment(file: &serde_json::Value, snippet: &str) -> String {
2593 let file_name = Self::slack_file_name(file);
2594 let language = file
2595 .get("filetype")
2596 .and_then(|value| value.as_str())
2597 .map(Self::sanitize_code_fence_language)
2598 .unwrap_or_else(|| "text".to_string());
2599
2600 let fence = if snippet.contains("```") {
2601 "````"
2602 } else {
2603 "```"
2604 };
2605 format!("[SNIPPET:{file_name}]\n{fence}{language}\n{snippet}\n{fence}")
2606 }
2607
2608 fn sanitize_code_fence_language(input: &str) -> String {
2609 let normalized = input
2610 .trim()
2611 .chars()
2612 .filter(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '_' | '-' | '+'))
2613 .collect::<String>();
2614 if normalized.is_empty() {
2615 "text".to_string()
2616 } else {
2617 normalized
2618 }
2619 }
2620
2621 fn format_attachment_summary(file: &serde_json::Value) -> String {
2622 let file_name = Self::slack_file_name(file);
2623 let mime = Self::slack_file_mime(file).unwrap_or_else(|| "unknown".to_string());
2624 let size = file
2625 .get("size")
2626 .and_then(|value| value.as_u64())
2627 .map(|value| format!("{value} bytes"))
2628 .unwrap_or_else(|| "unknown size".to_string());
2629 format!("[ATTACHMENT:{file_name} | mime={mime} | size={size}]")
2630 }
2631
2632 fn extract_channel_ids(list_payload: &serde_json::Value) -> Vec<String> {
2633 let mut ids = list_payload
2634 .get("channels")
2635 .and_then(|c| c.as_array())
2636 .into_iter()
2637 .flatten()
2638 .filter_map(|channel| {
2639 let id = channel.get("id").and_then(|id| id.as_str())?;
2640 let is_archived = channel
2641 .get("is_archived")
2642 .and_then(|v| v.as_bool())
2643 .unwrap_or(false);
2644 let is_member = channel
2645 .get("is_member")
2646 .and_then(|v| v.as_bool())
2647 .unwrap_or(true);
2648 if is_archived || !is_member {
2649 return None;
2650 }
2651 Some(id.to_string())
2652 })
2653 .collect::<Vec<_>>();
2654 ids.sort();
2655 ids.dedup();
2656 ids
2657 }
2658
2659 async fn list_accessible_channels(&self) -> anyhow::Result<Vec<String>> {
2660 let mut channels = Vec::new();
2661 let mut cursor: Option<String> = None;
2662
2663 loop {
2664 let mut query_params = vec![
2665 ("exclude_archived", "true".to_string()),
2666 ("limit", "200".to_string()),
2667 (
2668 "types",
2669 "public_channel,private_channel,mpim,im".to_string(),
2670 ),
2671 ];
2672 if let Some(ref next) = cursor {
2673 query_params.push(("cursor", next.clone()));
2674 }
2675
2676 let resp = self
2677 .http_client()
2678 .get("https://slack.com/api/conversations.list")
2679 .bearer_auth(&self.bot_token)
2680 .query(&query_params)
2681 .send()
2682 .await?;
2683
2684 let status = resp.status();
2685 let body = resp
2686 .text()
2687 .await
2688 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
2689
2690 if !status.is_success() {
2691 let sanitized = zeroclaw_providers::sanitize_api_error(&body);
2692 anyhow::bail!("Slack conversations.list failed ({status}): {sanitized}");
2693 }
2694
2695 let data: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
2696 if data.get("ok") == Some(&serde_json::Value::Bool(false)) {
2697 let err = data
2698 .get("error")
2699 .and_then(|e| e.as_str())
2700 .unwrap_or("unknown");
2701 anyhow::bail!("Slack conversations.list failed: {err}");
2702 }
2703
2704 channels.extend(Self::extract_channel_ids(&data));
2705
2706 cursor = data
2707 .get("response_metadata")
2708 .and_then(|rm| rm.get("next_cursor"))
2709 .and_then(|c| c.as_str())
2710 .map(str::trim)
2711 .filter(|c| !c.is_empty())
2712 .map(ToOwned::to_owned);
2713
2714 if cursor.is_none() {
2715 break;
2716 }
2717 }
2718
2719 channels.sort();
2720 channels.dedup();
2721 Ok(channels)
2722 }
2723
2724 fn slack_now_ts() -> String {
2725 let now = SystemTime::now()
2726 .duration_since(UNIX_EPOCH)
2727 .unwrap_or_default();
2728 format!("{}.{:06}", now.as_secs(), now.subsec_micros())
2729 }
2730
2731 fn ensure_poll_cursor(
2732 cursors: &mut HashMap<String, String>,
2733 channel_id: &str,
2734 now_ts: &str,
2735 ) -> String {
2736 cursors
2737 .entry(channel_id.to_string())
2738 .or_insert_with(|| now_ts.to_string())
2739 .clone()
2740 }
2741
2742 fn try_parse_approval_block_action(
2747 envelope: &serde_json::Value,
2748 ) -> Option<(String, ChannelApprovalResponse)> {
2749 let payload = envelope.get("payload")?;
2750 if payload.get("type").and_then(|v| v.as_str())? != "block_actions" {
2751 return None;
2752 }
2753 let action_id = payload
2754 .get("actions")
2755 .and_then(|a| a.as_array())
2756 .and_then(|a| a.first())
2757 .and_then(|a| a.get("action_id"))
2758 .and_then(|v| v.as_str())?;
2759 let rest = action_id.strip_prefix("approval_")?;
2760 let (token, action) = rest.rsplit_once('_')?;
2761 if token.len() != 6 || !token.chars().all(|c| c.is_ascii_alphanumeric()) {
2762 return None;
2763 }
2764 let response = match action {
2765 "approve" => ChannelApprovalResponse::Approve,
2766 "deny" => ChannelApprovalResponse::Deny,
2767 "always" => ChannelApprovalResponse::AlwaysApprove,
2768 _ => return None,
2769 };
2770 Some((token.to_string(), response))
2771 }
2772
2773 fn parse_block_action_as_command(
2778 envelope: &serde_json::Value,
2779 _bot_user_id: &str,
2780 alias: &str,
2781 ) -> Option<ChannelMessage> {
2782 let payload = envelope.get("payload")?;
2783
2784 let payload_type = payload.get("type").and_then(|v| v.as_str())?;
2785 if payload_type != "block_actions" {
2786 return None;
2787 }
2788
2789 let actions = payload.get("actions").and_then(|v| v.as_array())?;
2790 let action = actions.first()?;
2791
2792 let action_id = action.get("action_id").and_then(|v| v.as_str())?;
2793 let selected_value = action
2794 .get("selected_option")
2795 .and_then(|o| o.get("value"))
2796 .and_then(|v| v.as_str())?;
2797
2798 let command = match action_id {
2799 "zeroclaw_config_provider" => format!("/models {selected_value}"),
2800 "zeroclaw_config_model" => format!("/model {selected_value}"),
2801 _ => return None,
2802 };
2803
2804 let user = payload
2805 .get("user")
2806 .and_then(|u| u.get("id"))
2807 .and_then(|v| v.as_str())
2808 .unwrap_or("unknown");
2809
2810 let channel_id = payload
2811 .get("channel")
2812 .and_then(|c| c.get("id"))
2813 .and_then(|v| v.as_str())
2814 .unwrap_or_default();
2815
2816 if channel_id.is_empty() {
2817 ::zeroclaw_log::record!(
2818 WARN,
2819 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2820 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2821 "block_actions: missing channel ID in interactive payload"
2822 );
2823 return None;
2824 }
2825
2826 let ts = payload
2827 .get("message")
2828 .and_then(|m| m.get("ts"))
2829 .and_then(|v| v.as_str())
2830 .unwrap_or("0");
2831
2832 Some(ChannelMessage {
2833 id: format!("slack_{channel_id}_{ts}_action"),
2834 sender: user.to_string(),
2835 reply_target: channel_id.to_string(),
2836 content: command,
2837 channel: "slack".to_string(),
2838 channel_alias: Some(alias.to_string()),
2839 timestamp: SystemTime::now()
2840 .duration_since(UNIX_EPOCH)
2841 .unwrap_or_default()
2842 .as_secs(),
2843 thread_ts: payload
2844 .get("message")
2845 .and_then(|m| m.get("thread_ts"))
2846 .and_then(|v| v.as_str())
2847 .map(str::to_string),
2848 interruption_scope_id: None,
2849 attachments: vec![],
2850 subject: None,
2851 })
2852 }
2853
2854 async fn open_socket_mode_url(&self) -> anyhow::Result<String> {
2855 let app_token = self.configured_app_token().ok_or_else(|| {
2856 ::zeroclaw_log::record!(
2857 ERROR,
2858 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2859 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
2860 "Slack Socket Mode requires app_token"
2861 );
2862 anyhow::Error::msg("Slack Socket Mode requires app_token")
2863 })?;
2864
2865 let resp = self
2866 .http_client()
2867 .post("https://slack.com/api/apps.connections.open")
2868 .bearer_auth(app_token)
2869 .send()
2870 .await?;
2871
2872 let status = resp.status();
2873 let body = resp
2874 .text()
2875 .await
2876 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
2877
2878 if !status.is_success() {
2879 let sanitized = zeroclaw_providers::sanitize_api_error(&body);
2880 anyhow::bail!("Slack apps.connections.open failed ({status}): {sanitized}");
2881 }
2882
2883 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
2884 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
2885 let err = parsed
2886 .get("error")
2887 .and_then(|e| e.as_str())
2888 .unwrap_or("unknown");
2889 anyhow::bail!("Slack apps.connections.open failed: {err}");
2890 }
2891
2892 parsed
2893 .get("url")
2894 .and_then(|v| v.as_str())
2895 .map(ToOwned::to_owned)
2896 .ok_or_else(|| {
2897 ::zeroclaw_log::record!(
2898 WARN,
2899 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2900 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2901 .with_attrs(
2902 ::serde_json::json!({"field": "url", "api": "apps.connections.open"})
2903 ),
2904 "slack: apps.connections.open did not return url"
2905 );
2906 anyhow::Error::msg("Slack apps.connections.open did not return url")
2907 })
2908 }
2909
2910 async fn listen_socket_mode(
2911 &self,
2912 tx: tokio::sync::mpsc::Sender<ChannelMessage>,
2913 bot_user_id: &str,
2914 scoped_channels: Option<Vec<String>>,
2915 ) -> anyhow::Result<()> {
2916 let mut last_ts_by_channel: HashMap<String, String> = HashMap::new();
2917 let mut open_url_attempt: u32 = 0;
2918 let mut socket_reconnect_attempt: u32 = 0;
2919
2920 loop {
2921 let ws_url = match self.open_socket_mode_url().await {
2922 Ok(url) => {
2923 open_url_attempt = 0;
2924 url
2925 }
2926 Err(e) => {
2927 let wait = Self::compute_socket_mode_retry_delay(open_url_attempt);
2928 ::zeroclaw_log::record!(
2929 WARN,
2930 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2931 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2932 &format!(
2933 "Socket Mode: failed to open websocket URL: {e}; retrying in {:.3}s (attempt #{})",
2934 wait.as_secs_f64(),
2935 open_url_attempt.saturating_add(1)
2936 )
2937 );
2938 open_url_attempt = open_url_attempt.saturating_add(1);
2939 tokio::time::sleep(wait).await;
2940 continue;
2941 }
2942 };
2943
2944 let (ws_stream, _) = match zeroclaw_config::schema::ws_connect_with_proxy(
2945 &ws_url,
2946 "channel.slack",
2947 self.proxy_url.as_deref(),
2948 )
2949 .await
2950 {
2951 Ok(connection) => {
2952 socket_reconnect_attempt = 0;
2953 connection
2954 }
2955 Err(e) => {
2956 let wait = Self::compute_socket_mode_retry_delay(socket_reconnect_attempt);
2957 ::zeroclaw_log::record!(
2958 WARN,
2959 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2960 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2961 &format!(
2962 "Socket Mode: websocket connect failed: {e}; retrying in {:.3}s (attempt #{})",
2963 wait.as_secs_f64(),
2964 socket_reconnect_attempt.saturating_add(1)
2965 )
2966 );
2967 socket_reconnect_attempt = socket_reconnect_attempt.saturating_add(1);
2968 tokio::time::sleep(wait).await;
2969 continue;
2970 }
2971 };
2972 ::zeroclaw_log::record!(
2973 INFO,
2974 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
2975 "Socket Mode: websocket connected"
2976 );
2977
2978 let (mut write, mut read) = ws_stream.split();
2979
2980 while let Some(frame) = read.next().await {
2981 let text = match frame {
2982 Ok(WsMessage::Text(text)) => text,
2983 Ok(WsMessage::Ping(payload)) => {
2984 if let Err(e) = write.send(WsMessage::Pong(payload)).await {
2985 ::zeroclaw_log::record!(
2986 WARN,
2987 ::zeroclaw_log::Event::new(
2988 module_path!(),
2989 ::zeroclaw_log::Action::Note
2990 )
2991 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2992 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2993 "Socket Mode: pong send failed"
2994 );
2995 break;
2996 }
2997 continue;
2998 }
2999 Ok(WsMessage::Close(_)) => {
3000 ::zeroclaw_log::record!(
3001 WARN,
3002 ::zeroclaw_log::Event::new(
3003 module_path!(),
3004 ::zeroclaw_log::Action::Note
3005 )
3006 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3007 "Socket Mode: websocket closed by server"
3008 );
3009 break;
3010 }
3011 Ok(_) => continue,
3012 Err(e) => {
3013 ::zeroclaw_log::record!(
3014 WARN,
3015 ::zeroclaw_log::Event::new(
3016 module_path!(),
3017 ::zeroclaw_log::Action::Note
3018 )
3019 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
3020 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
3021 "Socket Mode: websocket read failed"
3022 );
3023 break;
3024 }
3025 };
3026
3027 let envelope: serde_json::Value = match serde_json::from_str(text.as_ref()) {
3028 Ok(value) => value,
3029 Err(e) => {
3030 ::zeroclaw_log::record!(
3031 WARN,
3032 ::zeroclaw_log::Event::new(
3033 module_path!(),
3034 ::zeroclaw_log::Action::Note
3035 )
3036 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
3037 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
3038 "Socket Mode: invalid JSON payload"
3039 );
3040 continue;
3041 }
3042 };
3043
3044 if let Some(envelope_id) = envelope.get("envelope_id").and_then(|v| v.as_str()) {
3045 let ack = serde_json::json!({ "envelope_id": envelope_id });
3046 if let Err(e) = write.send(WsMessage::Text(ack.to_string().into())).await {
3047 ::zeroclaw_log::record!(
3048 WARN,
3049 ::zeroclaw_log::Event::new(
3050 module_path!(),
3051 ::zeroclaw_log::Action::Note
3052 )
3053 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
3054 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
3055 "Socket Mode: ack send failed"
3056 );
3057 break;
3058 }
3059 }
3060
3061 let envelope_type = envelope
3062 .get("type")
3063 .and_then(|v| v.as_str())
3064 .unwrap_or_default();
3065 if envelope_type == "disconnect" {
3066 ::zeroclaw_log::record!(
3067 WARN,
3068 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3069 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3070 "Socket Mode: received disconnect event"
3071 );
3072 break;
3073 }
3074
3075 if envelope_type == "interactive" {
3077 if let Some((token, response)) =
3078 Self::try_parse_approval_block_action(&envelope)
3079 {
3080 let mut map = self.pending_approvals.lock().await;
3081 if let Some(sender) = map.remove(&token) {
3082 let _ = sender.send(response);
3083 }
3084 continue;
3085 }
3086 if let Some(msg) =
3087 Self::parse_block_action_as_command(&envelope, bot_user_id, &self.alias)
3088 && tx.send(msg).await.is_err()
3089 {
3090 return Ok(());
3091 }
3092 continue;
3093 }
3094
3095 if envelope_type != "events_api" {
3096 continue;
3097 }
3098
3099 let Some(event) = envelope
3100 .get("payload")
3101 .and_then(|payload| payload.get("event"))
3102 else {
3103 continue;
3104 };
3105 let event_type = event
3106 .get("type")
3107 .and_then(|v| v.as_str())
3108 .unwrap_or_default();
3109
3110 if event_type == "assistant_thread_started"
3112 || event_type == "assistant_thread_context_changed"
3113 {
3114 if let Some(thread) = event.get("assistant_thread") {
3115 let ch = thread
3116 .get("channel_id")
3117 .and_then(|v| v.as_str())
3118 .unwrap_or_default();
3119 let tts = thread
3120 .get("thread_ts")
3121 .and_then(|v| v.as_str())
3122 .unwrap_or_default();
3123 if !ch.is_empty()
3124 && !tts.is_empty()
3125 && let Ok(mut map) = self.active_assistant_thread.lock()
3126 {
3127 map.insert(ch.to_string(), tts.to_string());
3128 }
3129 }
3130 continue;
3131 }
3132
3133 if event_type == "reaction_added" {
3135 if let Some(ref cancel_emoji) = self.cancel_reaction {
3136 let reaction = event
3137 .get("reaction")
3138 .and_then(|v| v.as_str())
3139 .unwrap_or_default();
3140 if reaction == cancel_emoji.as_str() {
3141 let user = event
3142 .get("user")
3143 .and_then(|v| v.as_str())
3144 .unwrap_or_default();
3145 if !user.is_empty() && self.is_user_allowed(user) {
3146 let item = event.get("item");
3147 let item_channel = item
3148 .and_then(|i| i.get("channel"))
3149 .and_then(|v| v.as_str())
3150 .unwrap_or_default();
3151 let item_ts = item
3152 .and_then(|i| i.get("ts"))
3153 .and_then(|v| v.as_str())
3154 .unwrap_or_default();
3155 if !item_channel.is_empty() && !item_ts.is_empty() {
3156 let thread_ts = Some(item_ts.to_string());
3160 let scope_id = Some(item_ts.to_string());
3161 let sender = self.resolve_sender_identity(user).await;
3162 let cancel_msg = ChannelMessage {
3163 id: format!("slack_{item_channel}_{item_ts}_cancel"),
3164 sender,
3165 reply_target: item_channel.to_string(),
3166 content: "/stop".to_string(),
3167 channel: "slack".to_string(),
3168 channel_alias: Some(self.alias.clone()),
3169 timestamp: std::time::SystemTime::now()
3170 .duration_since(std::time::UNIX_EPOCH)
3171 .unwrap_or_default()
3172 .as_secs(),
3173 thread_ts,
3174 interruption_scope_id: scope_id,
3175 attachments: vec![],
3176 subject: None,
3177 };
3178 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"cancel_emoji": cancel_emoji, "user": user, "item_channel": item_channel, "item_ts": item_ts})), ":: reaction from on / — sending /stop");
3179 if tx.send(cancel_msg).await.is_err() {
3180 return Ok(());
3181 }
3182 }
3183 }
3184 }
3185 }
3186 continue;
3187 }
3188
3189 if event_type != "message" {
3190 continue;
3191 }
3192 let subtype = event.get("subtype").and_then(|v| v.as_str());
3193 if !Self::is_supported_message_subtype(subtype) {
3194 continue;
3195 }
3196
3197 let channel_id = event
3198 .get("channel")
3199 .and_then(|v| v.as_str())
3200 .map(str::to_string)
3201 .unwrap_or_default();
3202 if channel_id.is_empty() {
3203 continue;
3204 }
3205 if let Some(ref configured_channels) = scoped_channels
3206 && !configured_channels.iter().any(|id| id == &channel_id)
3207 {
3208 continue;
3209 }
3210
3211 let user = event
3212 .get("user")
3213 .and_then(|v| v.as_str())
3214 .unwrap_or_default();
3215 if user.is_empty() || user == bot_user_id {
3216 continue;
3217 }
3218 if !self.is_user_allowed(user) {
3219 ::zeroclaw_log::record!(
3220 WARN,
3221 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3222 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
3223 .with_attrs(::serde_json::json!({"user": user})),
3224 "ignoring message from unauthorized user"
3225 );
3226 continue;
3227 }
3228
3229 let ts = event.get("ts").and_then(|v| v.as_str()).unwrap_or_default();
3230 if ts.is_empty() {
3231 continue;
3232 }
3233 let last_ts = last_ts_by_channel
3234 .get(&channel_id)
3235 .map(String::as_str)
3236 .unwrap_or_default();
3237 if ts <= last_ts {
3238 continue;
3239 }
3240
3241 let is_group_message = Self::is_group_channel_id(&channel_id);
3242 let is_thread_reply = event.get("thread_ts").and_then(|v| v.as_str()).is_some();
3243 let allow_sender_without_mention =
3244 is_group_message && self.is_group_sender_trigger_enabled(user);
3245 let require_mention = self.mention_only
3246 && is_group_message
3247 && !allow_sender_without_mention
3248 && (!is_thread_reply || self.strict_mention_in_thread);
3249
3250 let Some(normalized_text) = self
3251 .build_incoming_content(event, require_mention, bot_user_id)
3252 .await
3253 else {
3254 continue;
3255 };
3256
3257 if let Some((token, response)) = crate::util::parse_approval_reply(&normalized_text)
3258 {
3259 let mut map = self.pending_approvals.lock().await;
3260 if let Some(ap_sender) = map.remove(&token) {
3261 let _ = ap_sender.send(response);
3262 continue;
3263 }
3264 }
3265
3266 last_ts_by_channel.insert(channel_id.clone(), ts.to_string());
3267 let sender = self.resolve_sender_identity(user).await;
3268
3269 let channel_msg = ChannelMessage {
3270 id: format!("slack_{channel_id}_{ts}"),
3271 sender,
3272 reply_target: channel_id.clone(),
3273 content: normalized_text,
3274 channel: "slack".to_string(),
3275 channel_alias: Some(self.alias.clone()),
3276 timestamp: std::time::SystemTime::now()
3277 .duration_since(std::time::UNIX_EPOCH)
3278 .unwrap_or_default()
3279 .as_secs(),
3280 thread_ts: if self.thread_replies {
3281 Self::inbound_thread_ts(event, ts)
3282 } else {
3283 Self::inbound_thread_ts_genuine_only(event)
3284 },
3285 interruption_scope_id: Self::inbound_interruption_scope_id(event, ts),
3286 attachments: vec![],
3287 subject: None,
3288 };
3289
3290 if let Some(ref tts) = channel_msg.thread_ts
3292 && let Ok(mut map) = self.active_assistant_thread.lock()
3293 {
3294 map.insert(channel_id.clone(), tts.clone());
3295 }
3296
3297 if tx.send(channel_msg).await.is_err() {
3298 return Ok(());
3299 }
3300 }
3301
3302 let wait = Self::compute_socket_mode_retry_delay(socket_reconnect_attempt);
3303 ::zeroclaw_log::record!(
3304 WARN,
3305 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3306 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3307 &format!(
3308 "Socket Mode: reconnecting in {:.3}s (attempt #{})...",
3309 wait.as_secs_f64(),
3310 socket_reconnect_attempt.saturating_add(1)
3311 )
3312 );
3313 socket_reconnect_attempt = socket_reconnect_attempt.saturating_add(1);
3314 tokio::time::sleep(wait).await;
3315 }
3316 }
3317
3318 fn parse_retry_after_secs(headers: &HeaderMap) -> Option<u64> {
3319 let value = headers
3320 .get(reqwest::header::RETRY_AFTER)?
3321 .to_str()
3322 .ok()?
3323 .trim();
3324 Self::parse_retry_after_value(value)
3325 }
3326
3327 fn parse_retry_after_value(value: &str) -> Option<u64> {
3328 if value.is_empty() {
3329 return None;
3330 }
3331
3332 if let Ok(seconds) = value.parse::<u64>() {
3333 return Some(seconds);
3334 }
3335
3336 let truncated = value
3337 .split_once('.')
3338 .map(|(whole, _)| whole)
3339 .unwrap_or(value);
3340 truncated.parse::<u64>().ok()
3341 }
3342
3343 fn jitter_ms(max_jitter_ms: u64) -> u64 {
3344 if max_jitter_ms == 0 {
3345 return 0;
3346 }
3347 rand::random::<u64>() % (max_jitter_ms + 1)
3348 }
3349
3350 fn compute_exponential_backoff_delay(
3351 base_retry_after_secs: u64,
3352 attempt: u32,
3353 max_backoff_secs: u64,
3354 jitter_ms: u64,
3355 ) -> Duration {
3356 let multiplier = 1_u64.checked_shl(attempt).unwrap_or(u64::MAX);
3357 let backoff_secs = base_retry_after_secs
3358 .saturating_mul(multiplier)
3359 .min(max_backoff_secs);
3360 Duration::from_secs(backoff_secs) + Duration::from_millis(jitter_ms)
3361 }
3362
3363 fn compute_retry_delay(base_retry_after_secs: u64, attempt: u32, jitter_ms: u64) -> Duration {
3364 Self::compute_exponential_backoff_delay(
3365 base_retry_after_secs,
3366 attempt,
3367 SLACK_HISTORY_MAX_BACKOFF_SECS,
3368 jitter_ms,
3369 )
3370 }
3371
3372 fn compute_socket_mode_retry_delay(attempt: u32) -> Duration {
3373 let jitter_ms = Self::jitter_ms(SLACK_SOCKET_MODE_MAX_JITTER_MS);
3374 Self::compute_exponential_backoff_delay(
3375 SLACK_SOCKET_MODE_INITIAL_BACKOFF_SECS,
3376 attempt,
3377 SLACK_SOCKET_MODE_MAX_BACKOFF_SECS,
3378 jitter_ms,
3379 )
3380 }
3381
3382 fn next_retry_timestamp(wait: Duration) -> String {
3383 match chrono::Duration::from_std(wait) {
3384 Ok(delta) => (Utc::now() + delta).to_rfc3339(),
3385 Err(_) => Utc::now().to_rfc3339(),
3386 }
3387 }
3388
3389 fn evaluate_health(bot_ok: bool, socket_mode_enabled: bool, socket_mode_ok: bool) -> bool {
3390 if !bot_ok {
3391 return false;
3392 }
3393 if socket_mode_enabled {
3394 return socket_mode_ok;
3395 }
3396 true
3397 }
3398
3399 fn slack_api_call_succeeded(status: reqwest::StatusCode, body: &str) -> bool {
3400 if !status.is_success() {
3401 return false;
3402 }
3403
3404 let parsed: serde_json::Value = serde_json::from_str(body).unwrap_or_default();
3405 parsed
3406 .get("ok")
3407 .and_then(|value| value.as_bool())
3408 .unwrap_or(false)
3409 }
3410
3411 async fn fetch_history_with_retry(
3412 &self,
3413 channel_id: &str,
3414 params: &[(&str, String)],
3415 ) -> Option<serde_json::Value> {
3416 let mut total_wait = Duration::from_secs(0);
3417
3418 for attempt in 0..=SLACK_HISTORY_MAX_RETRIES {
3419 let resp = match self
3420 .http_client()
3421 .get("https://slack.com/api/conversations.history")
3422 .bearer_auth(&self.bot_token)
3423 .query(params)
3424 .send()
3425 .await
3426 {
3427 Ok(r) => r,
3428 Err(e) => {
3429 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"error": format!("{}", e), "channel_id": channel_id})), "poll error for channel");
3430 return None;
3431 }
3432 };
3433
3434 let status = resp.status();
3435 let headers = resp.headers().clone();
3436 let body = resp
3437 .text()
3438 .await
3439 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
3440
3441 let is_ratelimited_http = status == reqwest::StatusCode::TOO_MANY_REQUESTS;
3442 let payload: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
3443 let is_ratelimited_payload = payload.get("ok") == Some(&serde_json::Value::Bool(false))
3444 && payload
3445 .get("error")
3446 .and_then(|e| e.as_str())
3447 .is_some_and(|err| err == "ratelimited");
3448
3449 if is_ratelimited_http || is_ratelimited_payload {
3450 if attempt >= SLACK_HISTORY_MAX_RETRIES {
3451 ::zeroclaw_log::record!(
3452 ERROR,
3453 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3454 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
3455 &format!(
3456 "Slack rate limit retries exhausted for conversations.history on channel {}. Total wait: {}s across {} attempts. Proceeding without channel history.",
3457 channel_id,
3458 total_wait.as_secs(),
3459 SLACK_HISTORY_MAX_RETRIES
3460 )
3461 );
3462 return None;
3463 }
3464
3465 let retry_after_secs = Self::parse_retry_after_secs(&headers)
3466 .unwrap_or(SLACK_HISTORY_DEFAULT_RETRY_AFTER_SECS);
3467 let jitter_ms = Self::jitter_ms(SLACK_HISTORY_MAX_JITTER_MS);
3468 let wait = Self::compute_retry_delay(retry_after_secs, attempt, jitter_ms);
3469 total_wait += wait;
3470 let next_retry_at = Self::next_retry_timestamp(wait);
3471 ::zeroclaw_log::record!(
3472 WARN,
3473 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3474 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3475 &format!(
3476 "Slack conversations.history rate limited for channel {}. Retry-After: {}s. Attempt {}/{}. Next retry at {}.",
3477 channel_id,
3478 retry_after_secs,
3479 attempt + 1,
3480 SLACK_HISTORY_MAX_RETRIES,
3481 next_retry_at
3482 )
3483 );
3484 tokio::time::sleep(wait).await;
3485 continue;
3486 }
3487
3488 if !status.is_success() {
3489 let sanitized = zeroclaw_providers::sanitize_api_error(&body);
3490 ::zeroclaw_log::record!(
3491 WARN,
3492 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3493 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3494 &format!(
3495 "history request failed for channel {} ({}): {}",
3496 channel_id, status, sanitized
3497 )
3498 );
3499 return None;
3500 }
3501
3502 if payload.get("ok") == Some(&serde_json::Value::Bool(false)) {
3503 let err = payload
3504 .get("error")
3505 .and_then(|e| e.as_str())
3506 .unwrap_or("unknown");
3507 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"error": format!("{}", err), "channel_id": channel_id})), "history error for channel");
3508 return None;
3509 }
3510
3511 return Some(payload);
3512 }
3513
3514 None
3515 }
3516
3517 async fn fetch_thread_replies_with_retry(
3518 &self,
3519 channel_id: &str,
3520 thread_ts: &str,
3521 oldest: &str,
3522 ) -> Option<serde_json::Value> {
3523 let mut total_wait = Duration::from_secs(0);
3524
3525 for attempt in 0..=SLACK_HISTORY_MAX_RETRIES {
3526 let resp = match self
3527 .http_client()
3528 .get("https://slack.com/api/conversations.replies")
3529 .bearer_auth(&self.bot_token)
3530 .query(&[
3531 ("channel", channel_id),
3532 ("ts", thread_ts),
3533 ("oldest", oldest),
3534 ("limit", "50"),
3535 ])
3536 .send()
3537 .await
3538 {
3539 Ok(r) => r,
3540 Err(e) => {
3541 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"thread_ts": thread_ts, "channel_id": channel_id, "e": e.to_string()})), "Slack conversations.replies error for thread in");
3542 return None;
3543 }
3544 };
3545
3546 let status = resp.status();
3547 let headers = resp.headers().clone();
3548 let body = resp
3549 .text()
3550 .await
3551 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
3552
3553 let is_ratelimited_http = status == reqwest::StatusCode::TOO_MANY_REQUESTS;
3554 let payload: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
3555 let is_ratelimited_payload = payload.get("ok") == Some(&serde_json::Value::Bool(false))
3556 && payload
3557 .get("error")
3558 .and_then(|e| e.as_str())
3559 .is_some_and(|err| err == "ratelimited");
3560
3561 if is_ratelimited_http || is_ratelimited_payload {
3562 if attempt >= SLACK_HISTORY_MAX_RETRIES {
3563 ::zeroclaw_log::record!(
3564 ERROR,
3565 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3566 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
3567 &format!(
3568 "Slack rate limit retries exhausted for conversations.replies on thread {} in channel {}. Total wait: {}s across {} attempts.",
3569 thread_ts,
3570 channel_id,
3571 total_wait.as_secs(),
3572 SLACK_HISTORY_MAX_RETRIES
3573 )
3574 );
3575 return None;
3576 }
3577
3578 let retry_after_secs = Self::parse_retry_after_secs(&headers)
3579 .unwrap_or(SLACK_HISTORY_DEFAULT_RETRY_AFTER_SECS);
3580 let jitter_ms = Self::jitter_ms(SLACK_HISTORY_MAX_JITTER_MS);
3581 let wait = Self::compute_retry_delay(retry_after_secs, attempt, jitter_ms);
3582 total_wait += wait;
3583 let next_retry_at = Self::next_retry_timestamp(wait);
3584 ::zeroclaw_log::record!(
3585 WARN,
3586 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3587 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3588 &format!(
3589 "Slack conversations.replies rate limited for thread {} in channel {}. Retry-After: {}s. Attempt {}/{}. Next retry at {}.",
3590 thread_ts,
3591 channel_id,
3592 retry_after_secs,
3593 attempt + 1,
3594 SLACK_HISTORY_MAX_RETRIES,
3595 next_retry_at
3596 )
3597 );
3598 tokio::time::sleep(wait).await;
3599 continue;
3600 }
3601
3602 if !status.is_success() {
3603 let sanitized = zeroclaw_providers::sanitize_api_error(&body);
3604 ::zeroclaw_log::record!(
3605 WARN,
3606 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3607 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3608 &format!(
3609 "Slack conversations.replies failed for thread {} in channel {} ({}): {}",
3610 thread_ts, channel_id, status, sanitized
3611 )
3612 );
3613 return None;
3614 }
3615
3616 if payload.get("ok") == Some(&serde_json::Value::Bool(false)) {
3617 let err = payload
3618 .get("error")
3619 .and_then(|e| e.as_str())
3620 .unwrap_or("unknown");
3621 ::zeroclaw_log::record!(
3622 WARN,
3623 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3624 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3625 &format!(
3626 "Slack conversations.replies error for thread {} in channel {}: {}",
3627 thread_ts, channel_id, err
3628 )
3629 );
3630 return None;
3631 }
3632
3633 return Some(payload);
3634 }
3635
3636 None
3637 }
3638
3639 fn extract_active_threads(messages: &[serde_json::Value]) -> Vec<(String, String)> {
3642 messages
3643 .iter()
3644 .filter_map(|msg| {
3645 let thread_ts = msg.get("thread_ts").and_then(|v| v.as_str())?;
3646 let ts = msg.get("ts").and_then(|v| v.as_str()).unwrap_or_default();
3647 if ts != thread_ts {
3649 return None;
3650 }
3651 let reply_count = msg.get("reply_count").and_then(|v| v.as_u64()).unwrap_or(0);
3652 if reply_count == 0 {
3653 return None;
3654 }
3655 let latest_reply = msg
3656 .get("latest_reply")
3657 .and_then(|v| v.as_str())
3658 .unwrap_or(thread_ts);
3659 Some((thread_ts.to_string(), latest_reply.to_string()))
3660 })
3661 .collect()
3662 }
3663
3664 fn evict_stale_threads(
3667 active_threads: &mut HashMap<String, (String, String, Instant)>,
3668 now: Instant,
3669 ) {
3670 let max_age = Duration::from_secs(SLACK_POLL_THREAD_EXPIRE_SECS);
3671 active_threads
3672 .retain(|_, (_, _, last_activity)| now.duration_since(*last_activity) < max_age);
3673 if active_threads.len() > SLACK_POLL_ACTIVE_THREAD_MAX {
3674 let overflow = active_threads.len() - SLACK_POLL_ACTIVE_THREAD_MAX;
3675 let mut entries: Vec<_> = active_threads
3676 .iter()
3677 .map(|(k, (_, _, t))| (k.clone(), *t))
3678 .collect();
3679 entries.sort_by_key(|(_, t)| *t);
3680 for (key, _) in entries.into_iter().take(overflow) {
3681 active_threads.remove(&key);
3682 }
3683 }
3684 }
3685}
3686
3687const SLACK_TRUNCATION_INDICATOR: &str = "\n\n...[message truncated]";
3688
3689fn split_text_into_chunks(text: &str, max_chars: usize, max_chunks: usize) -> Vec<String> {
3693 if text.len() <= max_chars {
3694 return vec![text.to_string()];
3695 }
3696
3697 let mut chunks: Vec<String> = Vec::new();
3698 let mut remaining = text;
3699
3700 while !remaining.is_empty() && chunks.len() < max_chunks {
3701 let is_last_slot = chunks.len() + 1 == max_chunks;
3702
3703 if remaining.len() <= max_chars && !is_last_slot {
3704 chunks.push(remaining.to_string());
3705 break;
3706 }
3707
3708 if is_last_slot {
3709 if remaining.len() <= max_chars {
3711 chunks.push(remaining.to_string());
3712 } else {
3713 let avail = max_chars - SLACK_TRUNCATION_INDICATOR.len();
3715 let break_at = remaining[..avail]
3716 .rfind('\n')
3717 .map(|i| i + 1)
3718 .or_else(|| remaining[..avail].rfind(' ').map(|i| i + 1))
3719 .unwrap_or(avail);
3720 let mut chunk = remaining[..break_at].to_string();
3721 chunk.push_str(SLACK_TRUNCATION_INDICATOR);
3722 chunks.push(chunk);
3723 }
3724 break;
3725 }
3726
3727 let limit = max_chars.min(remaining.len());
3729 let break_at = remaining[..limit]
3730 .rfind('\n')
3731 .map(|i| i + 1)
3732 .or_else(|| remaining[..limit].rfind(' ').map(|i| i + 1))
3733 .unwrap_or(limit);
3734
3735 chunks.push(remaining[..break_at].to_string());
3736 remaining = &remaining[break_at..];
3737 }
3738
3739 chunks
3740}
3741
3742impl ::zeroclaw_api::attribution::Attributable for SlackChannel {
3743 fn role(&self) -> ::zeroclaw_api::attribution::Role {
3744 ::zeroclaw_api::attribution::Role::Channel(::zeroclaw_api::attribution::ChannelKind::Slack)
3745 }
3746 fn alias(&self) -> &str {
3747 &self.alias
3748 }
3749}
3750
3751#[async_trait]
3752impl Channel for SlackChannel {
3753 fn name(&self) -> &str {
3754 "slack"
3755 }
3756
3757 fn self_handle(&self) -> Option<String> {
3765 self.cached_bot_user_id
3766 .lock()
3767 .ok()
3768 .and_then(|guard| guard.clone())
3769 }
3770
3771 fn self_addressed_mention(&self) -> Option<String> {
3776 self.self_handle().map(|id| format!("<@{id}>"))
3777 }
3778
3779 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
3780 let body = if let Some(blocks_json) =
3782 message.content.strip_prefix(crate::util::BLOCK_KIT_PREFIX)
3783 {
3784 let blocks: serde_json::Value = serde_json::from_str(blocks_json)
3785 .context("invalid Block Kit JSON in runtime command response")?;
3786 let mut body = serde_json::json!({
3787 "channel": message.recipient,
3788 "text": "Model configuration",
3789 "blocks": blocks
3790 });
3791 if let Some(ts) = self.outbound_thread_ts(message) {
3792 body["thread_ts"] = serde_json::json!(ts);
3793 }
3794 body
3795 } else {
3796 let mut body = serde_json::json!({
3797 "channel": message.recipient,
3798 "text": message.content
3799 });
3800
3801 let block_limit = if self.use_markdown_blocks {
3806 SLACK_MARKDOWN_BLOCK_MAX_CHARS
3807 } else {
3808 SLACK_BLOCK_TEXT_MAX_CHARS
3809 };
3810 if message.content.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
3811 let chunks = split_text_into_chunks(
3812 &message.content,
3813 block_limit,
3814 SLACK_MAX_BLOCKS_PER_MESSAGE,
3815 );
3816 let blocks: Vec<serde_json::Value> = chunks
3817 .into_iter()
3818 .map(|chunk| {
3819 if self.use_markdown_blocks {
3820 serde_json::json!({
3821 "type": "markdown",
3822 "text": chunk
3823 })
3824 } else {
3825 serde_json::json!({
3826 "type": "section",
3827 "text": {
3828 "type": "mrkdwn",
3829 "text": chunk
3830 }
3831 })
3832 }
3833 })
3834 .collect();
3835 body["blocks"] = serde_json::Value::Array(blocks);
3836 }
3837
3838 if let Some(ts) = self.outbound_thread_ts(message) {
3839 body["thread_ts"] = serde_json::json!(ts);
3840 }
3841 body
3842 };
3843
3844 let resp = self
3845 .http_client()
3846 .post("https://slack.com/api/chat.postMessage")
3847 .bearer_auth(&self.bot_token)
3848 .json(&body)
3849 .send()
3850 .await?;
3851
3852 let status = resp.status();
3853 let body = resp
3854 .text()
3855 .await
3856 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
3857
3858 if !status.is_success() {
3859 let sanitized = zeroclaw_providers::sanitize_api_error(&body);
3860 anyhow::bail!("chat.postMessage failed ({status}): {sanitized}");
3861 }
3862
3863 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
3865 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
3866 let err = parsed
3867 .get("error")
3868 .and_then(|e| e.as_str())
3869 .unwrap_or("unknown");
3870 anyhow::bail!("chat.postMessage failed: {err}");
3871 }
3872
3873 Ok(())
3874 }
3875
3876 fn supports_draft_updates(&self) -> bool {
3877 self.stream_drafts
3878 }
3879
3880 async fn send_draft(&self, message: &SendMessage) -> anyhow::Result<Option<String>> {
3881 if !self.stream_drafts {
3882 return Ok(None);
3883 }
3884
3885 let thread_ts = self.outbound_thread_ts(message).unwrap_or_default();
3888 let lazy_id = format!("{LAZY_DRAFT_PREFIX}{}:{}", message.recipient, thread_ts);
3889 Ok(Some(lazy_id))
3890 }
3891
3892 async fn update_draft(
3893 &self,
3894 recipient: &str,
3895 message_id: &str,
3896 text: &str,
3897 ) -> anyhow::Result<()> {
3898 if message_id.starts_with(LAZY_DRAFT_PREFIX)
3900 && self.resolve_draft_ts(message_id).await.is_none()
3901 {
3902 let _ = self.materialize_lazy_draft(message_id, text).await;
3905 self.last_draft_edit
3906 .lock()
3907 .expect("last_draft_edit lock")
3908 .insert(recipient.to_string(), Instant::now());
3909 return Ok(());
3910 }
3911
3912 let real_ts = match self.resolve_draft_ts(message_id).await {
3914 Some(ts) => ts,
3915 None => return Ok(()),
3916 };
3917
3918 {
3920 let last_edits = self.last_draft_edit.lock().expect("last_draft_edit lock");
3921 if let Some(last_time) = last_edits.get(recipient) {
3922 let elapsed_ms = u64::try_from(last_time.elapsed().as_millis()).unwrap_or(u64::MAX);
3923 if elapsed_ms < self.draft_update_interval_ms {
3924 return Ok(());
3925 }
3926 }
3927 }
3928
3929 self.last_draft_edit
3932 .lock()
3933 .expect("last_draft_edit lock")
3934 .insert(recipient.to_string(), Instant::now());
3935
3936 let display_text = if text.len() > SLACK_MESSAGE_MAX_CHARS {
3939 text[..text
3940 .char_indices()
3941 .take_while(|(idx, _)| *idx < SLACK_MESSAGE_MAX_CHARS)
3942 .last()
3943 .map_or(0, |(idx, ch)| idx + ch.len_utf8())]
3944 .to_string()
3945 } else {
3946 text.to_string()
3947 };
3948
3949 let client = self.http_client();
3950 let token = self.bot_token.clone();
3951 let channel = recipient.to_string();
3952 tokio::spawn(async move {
3953 let mut body = serde_json::json!({
3954 "channel": channel,
3955 "ts": real_ts,
3956 "text": &display_text,
3957 });
3958 if display_text.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
3959 body["blocks"] = serde_json::json!([{
3960 "type": "markdown",
3961 "text": &display_text
3962 }]);
3963 }
3964 match client
3965 .post("https://slack.com/api/chat.update")
3966 .bearer_auth(&token)
3967 .json(&body)
3968 .send()
3969 .await
3970 {
3971 Ok(resp) => {
3972 if let Ok(resp_body) = resp.json::<serde_json::Value>().await
3973 && resp_body.get("ok") != Some(&serde_json::Value::Bool(true))
3974 {
3975 let err = resp_body
3976 .get("error")
3977 .and_then(|e| e.as_str())
3978 .unwrap_or("unknown");
3979 ::zeroclaw_log::record!(
3980 DEBUG,
3981 ::zeroclaw_log::Event::new(
3982 module_path!(),
3983 ::zeroclaw_log::Action::Note
3984 )
3985 .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
3986 "chat.update (draft) failed"
3987 );
3988 }
3989 }
3990 Err(e) => {
3991 ::zeroclaw_log::record!(
3992 DEBUG,
3993 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3994 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
3995 "chat.update (draft) HTTP error"
3996 );
3997 }
3998 }
3999 });
4000
4001 Ok(())
4002 }
4003
4004 async fn update_draft_progress(
4005 &self,
4006 recipient: &str,
4007 _message_id: &str,
4008 text: &str,
4009 ) -> anyhow::Result<()> {
4010 let status_line = text.trim().lines().last().unwrap_or("").trim();
4011 if status_line.is_empty() || status_line.starts_with("\u{1f914}") {
4014 return Ok(());
4015 }
4016 self.set_assistant_status(recipient, status_line).await;
4017 Ok(())
4018 }
4019
4020 async fn finalize_draft(
4021 &self,
4022 recipient: &str,
4023 message_id: &str,
4024 text: &str,
4025 ) -> anyhow::Result<()> {
4026 self.last_draft_edit
4028 .lock()
4029 .expect("last_draft_edit lock")
4030 .remove(recipient);
4031
4032 let draft_thread_ts = message_id
4035 .strip_prefix(LAZY_DRAFT_PREFIX)
4036 .and_then(|rest| rest.find(':').map(|pos| &rest[pos + 1..]))
4037 .filter(|ts| !ts.is_empty())
4038 .map(String::from);
4039
4040 let real_ts = self.resolve_draft_ts(message_id).await;
4041 self.lazy_draft_ts.lock().await.remove(message_id);
4043
4044 let Some(real_ts) = real_ts else {
4045 let msg = SendMessage::new(text, recipient).in_thread(draft_thread_ts);
4047 return self.send(&msg).await;
4048 };
4049
4050 if text.len() > SLACK_MESSAGE_MAX_CHARS {
4052 let _ = self.delete_message(recipient, &real_ts).await;
4053 let msg = SendMessage::new(text, recipient).in_thread(draft_thread_ts);
4054 return self.send(&msg).await;
4055 }
4056
4057 let mut body = serde_json::json!({
4059 "channel": recipient,
4060 "ts": real_ts,
4061 "text": text,
4062 });
4063
4064 if text.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
4066 body["blocks"] = serde_json::json!([{
4067 "type": "markdown",
4068 "text": text
4069 }]);
4070 }
4071
4072 let resp = self
4073 .http_client()
4074 .post("https://slack.com/api/chat.update")
4075 .bearer_auth(&self.bot_token)
4076 .json(&body)
4077 .send()
4078 .await?;
4079
4080 let resp_body: serde_json::Value = resp.json().await?;
4081 if resp_body.get("ok") == Some(&serde_json::Value::Bool(true)) {
4082 return Ok(());
4083 }
4084
4085 let err = resp_body
4087 .get("error")
4088 .and_then(|e| e.as_str())
4089 .unwrap_or("unknown");
4090 ::zeroclaw_log::record!(
4091 DEBUG,
4092 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
4093 .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
4094 "chat.update (finalize) failed; falling back to delete+send"
4095 );
4096
4097 let _ = self.delete_message(recipient, &real_ts).await;
4098 let msg = SendMessage::new(text, recipient).in_thread(draft_thread_ts);
4099 self.send(&msg).await
4100 }
4101
4102 async fn cancel_draft(&self, recipient: &str, message_id: &str) -> anyhow::Result<()> {
4103 self.last_draft_edit
4104 .lock()
4105 .expect("last_draft_edit lock")
4106 .remove(recipient);
4107 let real_ts = self.resolve_draft_ts(message_id).await;
4108 self.lazy_draft_ts.lock().await.remove(message_id);
4109 if let Some(ts) = real_ts {
4110 self.delete_message(recipient, &ts).await
4111 } else {
4112 Ok(())
4113 }
4114 }
4115
4116 async fn add_reaction(
4117 &self,
4118 channel_id: &str,
4119 message_id: &str,
4120 emoji: &str,
4121 ) -> anyhow::Result<()> {
4122 let ts = extract_slack_ts(message_id);
4123 let name = unicode_emoji_to_slack_name(emoji);
4124
4125 let body = serde_json::json!({
4126 "channel": channel_id,
4127 "timestamp": ts,
4128 "name": name
4129 });
4130
4131 let resp = self
4132 .http_client()
4133 .post("https://slack.com/api/reactions.add")
4134 .bearer_auth(&self.bot_token)
4135 .json(&body)
4136 .send()
4137 .await?;
4138
4139 let status = resp.status();
4140 let text = resp.text().await.unwrap_or_default();
4141
4142 if !status.is_success() {
4143 let sanitized = zeroclaw_providers::sanitize_api_error(&text);
4144 anyhow::bail!("Slack reactions.add failed ({status}): {sanitized}");
4145 }
4146
4147 let parsed: serde_json::Value = serde_json::from_str(&text).unwrap_or_default();
4148 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
4149 let err = parsed
4150 .get("error")
4151 .and_then(|e| e.as_str())
4152 .unwrap_or("unknown");
4153 if err != "already_reacted" {
4154 anyhow::bail!("Slack reactions.add failed: {err}");
4155 }
4156 }
4157
4158 Ok(())
4159 }
4160
4161 async fn remove_reaction(
4162 &self,
4163 channel_id: &str,
4164 message_id: &str,
4165 emoji: &str,
4166 ) -> anyhow::Result<()> {
4167 let ts = extract_slack_ts(message_id);
4168 let name = unicode_emoji_to_slack_name(emoji);
4169
4170 let body = serde_json::json!({
4171 "channel": channel_id,
4172 "timestamp": ts,
4173 "name": name
4174 });
4175
4176 let resp = self
4177 .http_client()
4178 .post("https://slack.com/api/reactions.remove")
4179 .bearer_auth(&self.bot_token)
4180 .json(&body)
4181 .send()
4182 .await?;
4183
4184 let status = resp.status();
4185 let text = resp.text().await.unwrap_or_default();
4186
4187 if !status.is_success() {
4188 let sanitized = zeroclaw_providers::sanitize_api_error(&text);
4189 anyhow::bail!("Slack reactions.remove failed ({status}): {sanitized}");
4190 }
4191
4192 let parsed: serde_json::Value = serde_json::from_str(&text).unwrap_or_default();
4193 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
4194 let err = parsed
4195 .get("error")
4196 .and_then(|e| e.as_str())
4197 .unwrap_or("unknown");
4198 if err != "no_reaction" {
4199 anyhow::bail!("Slack reactions.remove failed: {err}");
4200 }
4201 }
4202
4203 Ok(())
4204 }
4205
4206 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
4207 self.cache_bot_user_id().await;
4211 let bot_user_id = self.get_bot_user_id().await.unwrap_or_default();
4212 let scoped_channels = self.scoped_channel_ids();
4213 if self.configured_app_token().is_some() {
4214 ::zeroclaw_log::record!(
4215 INFO,
4216 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
4217 "channel listening in Socket Mode"
4218 );
4219 return self
4220 .listen_socket_mode(tx, &bot_user_id, scoped_channels)
4221 .await;
4222 }
4223
4224 let mut discovered_channels: Vec<String> = Vec::new();
4225 let mut last_discovery = Instant::now();
4226 let mut last_ts_by_channel: HashMap<String, String> = HashMap::new();
4227 let mut active_threads: HashMap<String, (String, String, Instant)> = HashMap::new();
4229
4230 if let Some(ref channel_ids) = scoped_channels {
4231 ::zeroclaw_log::record!(
4232 INFO,
4233 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
4234 &format!(
4235 "channel listening on {} configured channel(s): {}",
4236 channel_ids.len(),
4237 channel_ids.join(", ")
4238 )
4239 );
4240 } else {
4241 ::zeroclaw_log::record!(
4242 INFO,
4243 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
4244 "Slack channel_id/channel_ids not set (or wildcard only); listening across all accessible channels."
4245 );
4246 }
4247
4248 loop {
4249 tokio::time::sleep(Duration::from_secs(3)).await;
4250
4251 let target_channels = if let Some(ref channel_ids) = scoped_channels {
4252 channel_ids.clone()
4253 } else {
4254 if discovered_channels.is_empty()
4255 || last_discovery.elapsed() >= Duration::from_secs(60)
4256 {
4257 match self.list_accessible_channels().await {
4258 Ok(channels) => {
4259 if channels != discovered_channels {
4260 ::zeroclaw_log::record!(
4261 INFO,
4262 ::zeroclaw_log::Event::new(
4263 module_path!(),
4264 ::zeroclaw_log::Action::Note
4265 ),
4266 &format!(
4267 "Slack auto-discovery refreshed: listening on {} channel(s).",
4268 channels.len()
4269 )
4270 );
4271 }
4272 discovered_channels = channels;
4273 }
4274 Err(e) => {
4275 ::zeroclaw_log::record!(
4276 WARN,
4277 ::zeroclaw_log::Event::new(
4278 module_path!(),
4279 ::zeroclaw_log::Action::Note
4280 )
4281 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
4282 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
4283 "channel discovery failed"
4284 );
4285 }
4286 }
4287 last_discovery = Instant::now();
4288 }
4289
4290 discovered_channels.clone()
4291 };
4292
4293 if target_channels.is_empty() {
4294 ::zeroclaw_log::record!(
4295 DEBUG,
4296 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
4297 "no accessible channels discovered yet"
4298 );
4299 continue;
4300 }
4301
4302 for channel_id in target_channels {
4303 let had_cursor = last_ts_by_channel.contains_key(&channel_id);
4304 let bootstrap_ts = Self::slack_now_ts();
4305 let cursor_ts =
4306 Self::ensure_poll_cursor(&mut last_ts_by_channel, &channel_id, &bootstrap_ts);
4307 if !had_cursor {
4308 ::zeroclaw_log::record!(
4309 DEBUG,
4310 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
4311 &format!(
4312 "initialized cursor for channel {} at {} to prevent historical replay",
4313 channel_id, cursor_ts
4314 )
4315 );
4316 }
4317 let params = vec![
4318 ("channel", channel_id.clone()),
4319 ("limit", "10".to_string()),
4320 ("oldest", cursor_ts),
4321 ];
4322
4323 let Some(data) = self.fetch_history_with_retry(&channel_id, ¶ms).await else {
4324 continue;
4325 };
4326
4327 if let Some(messages) = data.get("messages").and_then(|m| m.as_array()) {
4328 for (thread_ts, latest_reply) in Self::extract_active_threads(messages) {
4330 let entry = active_threads.entry(thread_ts.clone()).or_insert_with(|| {
4331 (channel_id.clone(), thread_ts.clone(), Instant::now())
4332 });
4333 if latest_reply > entry.1 {
4334 entry.1 = latest_reply;
4335 }
4336 entry.2 = Instant::now();
4337 }
4338
4339 for msg in messages.iter().rev() {
4341 let subtype = msg.get("subtype").and_then(|value| value.as_str());
4342 if !Self::is_supported_message_subtype(subtype) {
4343 continue;
4344 }
4345 let ts = msg.get("ts").and_then(|t| t.as_str()).unwrap_or("");
4346 let user = msg
4347 .get("user")
4348 .and_then(|u| u.as_str())
4349 .unwrap_or("unknown");
4350 let last_ts = last_ts_by_channel
4351 .get(&channel_id)
4352 .map(String::as_str)
4353 .unwrap_or("");
4354
4355 if user == bot_user_id {
4357 continue;
4358 }
4359
4360 if !self.is_user_allowed(user) {
4362 ::zeroclaw_log::record!(
4363 WARN,
4364 ::zeroclaw_log::Event::new(
4365 module_path!(),
4366 ::zeroclaw_log::Action::Note
4367 )
4368 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
4369 .with_attrs(::serde_json::json!({"user": user})),
4370 "ignoring message from unauthorized user"
4371 );
4372 continue;
4373 }
4374
4375 if ts <= last_ts {
4376 continue;
4377 }
4378
4379 let is_group_message = Self::is_group_channel_id(&channel_id);
4380 let is_thread_reply =
4381 msg.get("thread_ts").and_then(|v| v.as_str()).is_some();
4382 let allow_sender_without_mention =
4383 is_group_message && self.is_group_sender_trigger_enabled(user);
4384 let require_mention = self.mention_only
4385 && is_group_message
4386 && !allow_sender_without_mention
4387 && (!is_thread_reply || self.strict_mention_in_thread);
4388 let Some(normalized_text) = self
4389 .build_incoming_content(msg, require_mention, &bot_user_id)
4390 .await
4391 else {
4392 continue;
4393 };
4394
4395 last_ts_by_channel.insert(channel_id.clone(), ts.to_string());
4396 let sender = self.resolve_sender_identity(user).await;
4397
4398 if let Some((token, response)) =
4399 crate::util::parse_approval_reply(&normalized_text)
4400 {
4401 let mut map = self.pending_approvals.lock().await;
4402 if let Some(ap_sender) = map.remove(&token) {
4403 let _ = ap_sender.send(response);
4404 continue;
4405 }
4406 }
4407
4408 let channel_msg = ChannelMessage {
4409 id: format!("slack_{channel_id}_{ts}"),
4410 sender,
4411 reply_target: channel_id.clone(),
4412 content: normalized_text,
4413 channel: "slack".to_string(),
4414 channel_alias: Some(self.alias.clone()),
4415 timestamp: std::time::SystemTime::now()
4416 .duration_since(std::time::UNIX_EPOCH)
4417 .unwrap_or_default()
4418 .as_secs(),
4419 thread_ts: if self.thread_replies {
4420 Self::inbound_thread_ts(msg, ts)
4421 } else {
4422 Self::inbound_thread_ts_genuine_only(msg)
4423 },
4424 interruption_scope_id: Self::inbound_interruption_scope_id(msg, ts),
4425 attachments: vec![],
4426 subject: None,
4427 };
4428
4429 if tx.send(channel_msg).await.is_err() {
4430 return Ok(());
4431 }
4432 }
4433 }
4434 }
4435
4436 Self::evict_stale_threads(&mut active_threads, Instant::now());
4438 let thread_snapshot: Vec<(String, String, String)> = active_threads
4439 .iter()
4440 .map(|(thread_ts, (ch, last_reply, _))| {
4441 (thread_ts.clone(), ch.clone(), last_reply.clone())
4442 })
4443 .collect();
4444
4445 for (thread_ts, thread_channel_id, last_reply_ts) in thread_snapshot {
4446 let Some(data) = self
4447 .fetch_thread_replies_with_retry(&thread_channel_id, &thread_ts, &last_reply_ts)
4448 .await
4449 else {
4450 continue;
4451 };
4452
4453 let Some(replies) = data.get("messages").and_then(|m| m.as_array()) else {
4454 continue;
4455 };
4456
4457 for reply in replies {
4458 let reply_ts = reply.get("ts").and_then(|v| v.as_str()).unwrap_or_default();
4459 if reply_ts.is_empty() || reply_ts <= last_reply_ts.as_str() {
4460 continue;
4461 }
4462 let subtype = reply.get("subtype").and_then(|v| v.as_str());
4463 if !Self::is_supported_message_subtype(subtype) {
4464 continue;
4465 }
4466
4467 let user = reply
4468 .get("user")
4469 .and_then(|u| u.as_str())
4470 .unwrap_or_default();
4471 if user.is_empty() || user == bot_user_id {
4472 continue;
4473 }
4474 if !self.is_user_allowed(user) {
4475 continue;
4476 }
4477
4478 let require_mention = false;
4481 let Some(normalized_text) = self
4482 .build_incoming_content(reply, require_mention, &bot_user_id)
4483 .await
4484 else {
4485 continue;
4486 };
4487
4488 if let Some(entry) = active_threads.get_mut(&thread_ts) {
4490 if reply_ts > entry.1.as_str() {
4491 entry.1 = reply_ts.to_string();
4492 }
4493 entry.2 = Instant::now();
4494 }
4495
4496 let sender = self.resolve_sender_identity(user).await;
4497
4498 if let Some((token, response)) =
4499 crate::util::parse_approval_reply(&normalized_text)
4500 {
4501 let mut map = self.pending_approvals.lock().await;
4502 if let Some(ap_sender) = map.remove(&token) {
4503 let _ = ap_sender.send(response);
4504 continue;
4505 }
4506 }
4507
4508 let channel_msg = ChannelMessage {
4509 id: format!("slack_{thread_channel_id}_{reply_ts}"),
4510 sender,
4511 reply_target: thread_channel_id.clone(),
4512 content: normalized_text,
4513 channel: "slack".to_string(),
4514 channel_alias: Some(self.alias.clone()),
4515 timestamp: std::time::SystemTime::now()
4516 .duration_since(std::time::UNIX_EPOCH)
4517 .unwrap_or_default()
4518 .as_secs(),
4519 thread_ts: Some(thread_ts.clone()),
4520 interruption_scope_id: Some(thread_ts.clone()),
4521 attachments: vec![],
4522 subject: None,
4523 };
4524
4525 if tx.send(channel_msg).await.is_err() {
4526 return Ok(());
4527 }
4528 }
4529 }
4530 }
4531 }
4532
4533 async fn health_check(&self) -> bool {
4534 let bot_ok = match self
4535 .http_client()
4536 .get("https://slack.com/api/auth.test")
4537 .bearer_auth(&self.bot_token)
4538 .send()
4539 .await
4540 {
4541 Ok(response) => {
4542 let status = response.status();
4543 let body = response.text().await.unwrap_or_default();
4544 Self::slack_api_call_succeeded(status, &body)
4545 }
4546 Err(_) => false,
4547 };
4548 let socket_mode_enabled = self.configured_app_token().is_some();
4549 let socket_mode_ok = if socket_mode_enabled {
4550 self.open_socket_mode_url().await.is_ok()
4551 } else {
4552 true
4553 };
4554 Self::evaluate_health(bot_ok, socket_mode_enabled, socket_mode_ok)
4555 }
4556
4557 async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> {
4558 let thread_ts = {
4559 let map = self.active_assistant_thread.lock().map_err(|e| {
4560 ::zeroclaw_log::record!(
4561 ERROR,
4562 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
4563 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
4564 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
4565 "lock poisoned"
4566 );
4567 anyhow::Error::msg(format!("lock poisoned: {e}"))
4568 })?;
4569 match map.get(recipient) {
4570 Some(ts) => ts.clone(),
4571 None => return Ok(()),
4572 }
4573 };
4574
4575 let body = serde_json::json!({
4576 "channel_id": recipient,
4577 "thread_ts": thread_ts,
4578 "status": "is thinking...",
4579 });
4580
4581 if let Ok(resp) = self
4583 .http_client()
4584 .post("https://slack.com/api/assistant.threads.setStatus")
4585 .bearer_auth(&self.bot_token)
4586 .json(&body)
4587 .send()
4588 .await
4589 && !resp.status().is_success()
4590 {
4591 ::zeroclaw_log::record!(
4592 DEBUG,
4593 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
4594 &format!(
4595 "assistant.threads.setStatus returned {}; ignoring",
4596 resp.status()
4597 )
4598 );
4599 }
4600
4601 Ok(())
4602 }
4603
4604 async fn stop_typing(&self, recipient: &str) -> anyhow::Result<()> {
4605 if self.stream_drafts {
4609 self.set_assistant_status(recipient, "").await;
4610 }
4611 Ok(())
4612 }
4613
4614 async fn request_approval(
4615 &self,
4616 recipient: &str,
4617 request: &ChannelApprovalRequest,
4618 ) -> anyhow::Result<Option<ChannelApprovalResponse>> {
4619 let token = crate::util::new_approval_token();
4620
4621 let (tx, rx) = oneshot::channel();
4622 self.pending_approvals
4623 .lock()
4624 .await
4625 .insert(token.clone(), tx);
4626
4627 let send_result = if self.app_token.is_some() {
4630 let body = serde_json::json!({
4631 "channel": recipient,
4632 "text": format!("APPROVAL REQUIRED [{token}]\nTool: {}\nArgs: {}", request.tool_name, request.arguments_summary),
4633 "blocks": [{
4634 "type": "section",
4635 "text": {
4636 "type": "mrkdwn",
4637 "text": format!("*APPROVAL REQUIRED* [`{token}`]\n*Tool:* `{}`\n*Args:* {}", request.tool_name, request.arguments_summary),
4638 }
4639 }, {
4640 "type": "actions",
4641 "elements": [
4642 { "type": "button", "text": { "type": "plain_text", "text": "Approve" }, "action_id": format!("approval_{token}_approve"), "style": "primary" },
4643 { "type": "button", "text": { "type": "plain_text", "text": "Deny" }, "action_id": format!("approval_{token}_deny"), "style": "danger" },
4644 { "type": "button", "text": { "type": "plain_text", "text": "Always" }, "action_id": format!("approval_{token}_always") },
4645 ]
4646 }]
4647 });
4648 self.http_client()
4649 .post("https://slack.com/api/chat.postMessage")
4650 .bearer_auth(&self.bot_token)
4651 .json(&body)
4652 .send()
4653 .await
4654 .map(|_| ())
4655 .map_err(anyhow::Error::from)
4656 } else {
4657 self.send(&SendMessage::new(
4658 format!(
4659 "APPROVAL REQUIRED [{token}]\nTool: {}\nArgs: {}\n\nReply: \"{token} yes\", \"{token} no\", or \"{token} always\"",
4660 request.tool_name, request.arguments_summary,
4661 ),
4662 recipient,
4663 ))
4664 .await
4665 };
4666
4667 if let Err(err) = send_result {
4668 self.pending_approvals.lock().await.remove(&token);
4669 return Err(err);
4670 }
4671
4672 let response =
4673 match tokio::time::timeout(Duration::from_secs(self.approval_timeout_secs), rx).await {
4674 Ok(Ok(resp)) => resp,
4675 _ => {
4676 self.pending_approvals.lock().await.remove(&token);
4677 ChannelApprovalResponse::Deny
4678 }
4679 };
4680 Ok(Some(response))
4681 }
4682}
4683
4684#[cfg(test)]
4685mod tests {
4686 use super::*;
4687
4688 #[test]
4689 fn slack_channel_name() {
4690 let ch = SlackChannel::new(
4691 "xoxb-fake".into(),
4692 None,
4693 vec![],
4694 "slack_test_alias",
4695 Arc::new(Vec::new),
4696 );
4697 assert_eq!(ch.name(), "slack");
4698 }
4699
4700 #[test]
4701 fn slack_channel_with_channel_ids() {
4702 let ch = SlackChannel::new(
4703 "xoxb-fake".into(),
4704 None,
4705 vec!["C12345".into()],
4706 "slack_test_alias",
4707 Arc::new(Vec::new),
4708 );
4709 assert_eq!(ch.channel_ids, vec!["C12345".to_string()]);
4710 }
4711
4712 #[test]
4713 fn slack_group_reply_policy_defaults_to_all_messages() {
4714 let ch = SlackChannel::new(
4715 "xoxb-fake".into(),
4716 None,
4717 vec![],
4718 "slack_test_alias",
4719 Arc::new(|| vec!["*".into()]),
4720 );
4721 assert!(ch.thread_replies);
4722 assert!(!ch.mention_only);
4723 assert!(ch.group_reply_allowed_sender_ids.is_empty());
4724 }
4725
4726 #[test]
4727 fn with_thread_replies_sets_flag() {
4728 let ch = SlackChannel::new(
4729 "xoxb-fake".into(),
4730 None,
4731 vec![],
4732 "slack_test_alias",
4733 Arc::new(Vec::new),
4734 )
4735 .with_thread_replies(false);
4736 assert!(!ch.thread_replies);
4737 }
4738
4739 #[test]
4740 fn with_strict_mention_in_thread_sets_flag() {
4741 let ch = SlackChannel::new(
4742 "xoxb-fake".into(),
4743 None,
4744 vec![],
4745 "slack_test_alias",
4746 Arc::new(Vec::new),
4747 );
4748 assert!(!ch.strict_mention_in_thread);
4749 let ch = ch.with_strict_mention_in_thread(true);
4750 assert!(ch.strict_mention_in_thread);
4751 }
4752
4753 #[test]
4754 fn outbound_thread_ts_respects_thread_replies_setting() {
4755 let msg = SendMessage::new("hello", "C123").in_thread(Some("1741234567.100001".into()));
4756
4757 let threaded = SlackChannel::new(
4758 "xoxb-fake".into(),
4759 None,
4760 vec![],
4761 "slack_test_alias",
4762 Arc::new(Vec::new),
4763 );
4764 assert_eq!(threaded.outbound_thread_ts(&msg), Some("1741234567.100001"));
4765
4766 let channel_root = SlackChannel::new(
4767 "xoxb-fake".into(),
4768 None,
4769 vec![],
4770 "slack_test_alias",
4771 Arc::new(Vec::new),
4772 )
4773 .with_thread_replies(false);
4774 assert_eq!(channel_root.outbound_thread_ts(&msg), None);
4775 }
4776
4777 #[test]
4778 fn with_workspace_dir_sets_field() {
4779 let ch = SlackChannel::new(
4780 "xoxb-fake".into(),
4781 None,
4782 vec![],
4783 "slack_test_alias",
4784 Arc::new(Vec::new),
4785 )
4786 .with_workspace_dir(PathBuf::from("/tmp/slack-workspace"));
4787 assert_eq!(
4788 ch.workspace_dir.as_deref(),
4789 Some(std::path::Path::new("/tmp/slack-workspace"))
4790 );
4791 }
4792
4793 #[test]
4794 fn slack_group_reply_policy_applies_sender_overrides() {
4795 let ch = SlackChannel::new(
4796 "xoxb-fake".into(),
4797 None,
4798 vec![],
4799 "slack_test_alias",
4800 Arc::new(|| vec!["*".into()]),
4801 )
4802 .with_group_reply_policy(true, vec![" U111 ".into(), "U111".into(), "U222".into()]);
4803
4804 assert!(ch.mention_only);
4805 assert_eq!(
4806 ch.group_reply_allowed_sender_ids,
4807 vec!["U111".to_string(), "U222".to_string()]
4808 );
4809 assert!(ch.is_group_sender_trigger_enabled("U111"));
4810 assert!(!ch.is_group_sender_trigger_enabled("U999"));
4811 }
4812
4813 #[test]
4814 fn normalized_channel_id_respects_wildcard_and_blank() {
4815 assert_eq!(SlackChannel::normalized_channel_id(None), None);
4816 assert_eq!(SlackChannel::normalized_channel_id(Some("")), None);
4817 assert_eq!(SlackChannel::normalized_channel_id(Some(" ")), None);
4818 assert_eq!(SlackChannel::normalized_channel_id(Some("*")), None);
4819 assert_eq!(SlackChannel::normalized_channel_id(Some(" * ")), None);
4820 assert_eq!(
4821 SlackChannel::normalized_channel_id(Some(" C12345 ")),
4822 Some("C12345".to_string())
4823 );
4824 }
4825
4826 #[test]
4827 fn configured_app_token_ignores_blank_values() {
4828 let ch = SlackChannel::new(
4829 "xoxb-fake".into(),
4830 Some(" ".into()),
4831 vec![],
4832 "slack_test_alias",
4833 Arc::new(Vec::new),
4834 );
4835 assert_eq!(ch.configured_app_token(), None);
4836 }
4837
4838 #[test]
4839 fn configured_app_token_trims_value() {
4840 let ch = SlackChannel::new(
4841 "xoxb-fake".into(),
4842 Some(" xapp-123 ".into()),
4843 vec![],
4844 "slack_test_alias",
4845 Arc::new(Vec::new),
4846 );
4847 assert_eq!(ch.configured_app_token().as_deref(), Some("xapp-123"));
4848 }
4849
4850 #[test]
4851 fn scoped_channel_ids_uses_explicit_list() {
4852 let ch = SlackChannel::new(
4853 "xoxb-fake".into(),
4854 None,
4855 vec!["C_LIST1".into(), "D_DM1".into()],
4856 "slack_test_alias",
4857 Arc::new(Vec::new),
4858 );
4859 assert_eq!(
4860 ch.scoped_channel_ids(),
4861 Some(vec!["C_LIST1".to_string(), "D_DM1".to_string()])
4862 );
4863 }
4864
4865 #[test]
4866 fn scoped_channel_ids_with_single_entry() {
4867 let ch = SlackChannel::new(
4868 "xoxb-fake".into(),
4869 None,
4870 vec!["C_SINGLE".into()],
4871 "slack_test_alias",
4872 Arc::new(Vec::new),
4873 );
4874 assert_eq!(ch.scoped_channel_ids(), Some(vec!["C_SINGLE".to_string()]));
4875 }
4876
4877 #[test]
4878 fn scoped_channel_ids_returns_none_for_wildcard_mode() {
4879 let ch = SlackChannel::new(
4880 "xoxb-fake".into(),
4881 None,
4882 vec![],
4883 "slack_test_alias",
4884 Arc::new(Vec::new),
4885 );
4886 assert_eq!(ch.scoped_channel_ids(), None);
4887 }
4888
4889 #[test]
4890 fn is_group_channel_id_detects_channel_prefixes() {
4891 assert!(SlackChannel::is_group_channel_id("C123"));
4892 assert!(SlackChannel::is_group_channel_id("G123"));
4893 assert!(!SlackChannel::is_group_channel_id("D123"));
4894 assert!(!SlackChannel::is_group_channel_id(""));
4895 }
4896
4897 #[test]
4898 fn extract_channel_ids_filters_archived_and_non_member_entries() {
4899 let payload = serde_json::json!({
4900 "channels": [
4901 {"id": "C1", "is_archived": false, "is_member": true},
4902 {"id": "C2", "is_archived": true, "is_member": true},
4903 {"id": "C3", "is_archived": false, "is_member": false},
4904 {"id": "C1", "is_archived": false, "is_member": true},
4905 {"id": "C4"}
4906 ]
4907 });
4908 let ids = SlackChannel::extract_channel_ids(&payload);
4909 assert_eq!(ids, vec!["C1".to_string(), "C4".to_string()]);
4910 }
4911
4912 #[test]
4913 fn empty_allowlist_denies_everyone() {
4914 let ch = SlackChannel::new(
4915 "xoxb-fake".into(),
4916 None,
4917 vec![],
4918 "slack_test_alias",
4919 Arc::new(Vec::new),
4920 );
4921 assert!(!ch.is_user_allowed("U12345"));
4922 assert!(!ch.is_user_allowed("anyone"));
4923 }
4924
4925 #[test]
4926 fn wildcard_allows_everyone() {
4927 let ch = SlackChannel::new(
4928 "xoxb-fake".into(),
4929 None,
4930 vec![],
4931 "slack_test_alias",
4932 Arc::new(|| vec!["*".into()]),
4933 );
4934 assert!(ch.is_user_allowed("U12345"));
4935 }
4936
4937 #[test]
4938 fn extract_user_display_name_prefers_profile_display_name() {
4939 let payload = serde_json::json!({
4940 "ok": true,
4941 "user": {
4942 "name": "fallback_name",
4943 "profile": {
4944 "display_name": "Display Name",
4945 "real_name": "Real Name"
4946 }
4947 }
4948 });
4949
4950 assert_eq!(
4951 SlackChannel::extract_user_display_name(&payload).as_deref(),
4952 Some("Display Name")
4953 );
4954 }
4955
4956 #[test]
4957 fn extract_user_display_name_falls_back_to_username() {
4958 let payload = serde_json::json!({
4959 "ok": true,
4960 "user": {
4961 "name": "fallback_name",
4962 "profile": {
4963 "display_name": " ",
4964 "real_name": ""
4965 }
4966 }
4967 });
4968
4969 assert_eq!(
4970 SlackChannel::extract_user_display_name(&payload).as_deref(),
4971 Some("fallback_name")
4972 );
4973 }
4974
4975 #[test]
4976 fn cached_sender_display_name_returns_none_when_expired() {
4977 let ch = SlackChannel::new(
4978 "xoxb-fake".into(),
4979 None,
4980 vec![],
4981 "slack_test_alias",
4982 Arc::new(|| vec!["*".into()]),
4983 );
4984 {
4985 let mut cache = ch.user_display_name_cache.lock().unwrap();
4986 cache.insert(
4987 "U123".to_string(),
4988 CachedSlackDisplayName {
4989 display_name: "Expired Name".to_string(),
4990 expires_at: Instant::now()
4991 .checked_sub(Duration::from_secs(1))
4992 .expect("instant should allow subtracting one second in tests"),
4993 },
4994 );
4995 }
4996
4997 assert_eq!(ch.cached_sender_display_name("U123"), None);
4998 }
4999
5000 #[test]
5001 fn cached_sender_display_name_returns_cached_value_when_valid() {
5002 let ch = SlackChannel::new(
5003 "xoxb-fake".into(),
5004 None,
5005 vec![],
5006 "slack_test_alias",
5007 Arc::new(|| vec!["*".into()]),
5008 );
5009 ch.cache_sender_display_name("U123", "Cached Name");
5010
5011 assert_eq!(
5012 ch.cached_sender_display_name("U123").as_deref(),
5013 Some("Cached Name")
5014 );
5015 }
5016
5017 #[test]
5018 fn normalize_incoming_content_requires_mention_when_enabled() {
5019 assert!(SlackChannel::normalize_incoming_content("hello", true, "U_BOT").is_none());
5020 assert_eq!(
5021 SlackChannel::normalize_incoming_content("<@U_BOT> run", true, "U_BOT").as_deref(),
5022 Some("<@U_BOT> run")
5023 );
5024 }
5025
5026 #[test]
5027 fn normalize_incoming_content_without_mention_mode_keeps_message() {
5028 assert_eq!(
5029 SlackChannel::normalize_incoming_content(" hello world ", false, "U_BOT").as_deref(),
5030 Some("hello world")
5031 );
5032 }
5033
5034 #[test]
5035 fn compose_incoming_content_allows_attachment_only_messages() {
5036 let composed = SlackChannel::compose_incoming_content(
5037 String::new(),
5038 vec!["[IMAGE:data:image/png;base64,aaaa]".to_string()],
5039 );
5040 assert_eq!(
5041 composed.as_deref(),
5042 Some("[IMAGE:data:image/png;base64,aaaa]")
5043 );
5044 }
5045
5046 #[test]
5047 fn parse_slack_permalink_accepts_standard_archives_link() {
5048 let parsed = SlackChannel::parse_slack_permalink(
5049 "https://acme.slack.com/archives/C12345678/p1712345678901234",
5050 )
5051 .expect("permalink");
5052
5053 assert_eq!(parsed.channel_id, "C12345678");
5054 assert_eq!(parsed.message_ts, "1712345678.901234");
5055 assert_eq!(parsed.thread_ts_hint, None);
5056 }
5057
5058 #[test]
5059 fn parse_slack_permalink_reads_thread_hint_when_present() {
5060 let parsed = SlackChannel::parse_slack_permalink(
5061 "https://acme.slack.com/archives/C12345678/p1712345678901234?thread_ts=1712345600.000100&cid=C12345678",
5062 )
5063 .expect("permalink");
5064
5065 assert_eq!(parsed.thread_ts_hint.as_deref(), Some("1712345600.000100"));
5066 }
5067
5068 #[test]
5069 fn parse_slack_permalink_rejects_non_message_links() {
5070 assert!(SlackChannel::parse_slack_permalink("https://example.com/path").is_none());
5071 assert!(
5072 SlackChannel::parse_slack_permalink("https://acme.slack.com/client/T1/C1").is_none()
5073 );
5074 assert!(
5075 SlackChannel::parse_slack_permalink("https://acme.slack.com/archives/C1/not-a-message")
5076 .is_none()
5077 );
5078 }
5079
5080 #[test]
5081 fn extract_slack_permalinks_handles_slack_angle_bracket_format() {
5082 let permalinks = SlackChannel::extract_slack_permalinks(
5083 "Please inspect <https://acme.slack.com/archives/C123/p1712345678901234|message> now",
5084 );
5085
5086 assert_eq!(permalinks.len(), 1);
5087 assert_eq!(permalinks[0].channel_id, "C123");
5088 assert_eq!(permalinks[0].message_ts, "1712345678.901234");
5089 }
5090
5091 #[test]
5092 fn extract_slack_permalinks_deduplicates_message_targets() {
5093 let permalinks = SlackChannel::extract_slack_permalinks(
5094 "https://acme.slack.com/archives/C123/p1712345678901234 again <https://acme.slack.com/archives/C123/p1712345678901234|same>",
5095 );
5096
5097 assert_eq!(permalinks.len(), 1);
5098 }
5099
5100 #[test]
5101 fn message_subtype_support_allows_file_share() {
5102 assert!(SlackChannel::is_supported_message_subtype(None));
5103 assert!(SlackChannel::is_supported_message_subtype(Some(
5104 "file_share"
5105 )));
5106 assert!(SlackChannel::is_supported_message_subtype(Some(
5107 "thread_broadcast"
5108 )));
5109 assert!(!SlackChannel::is_supported_message_subtype(Some(
5110 "message_changed"
5111 )));
5112 assert!(!SlackChannel::is_supported_message_subtype(Some(
5113 "channel_join"
5114 )));
5115 }
5116
5117 #[test]
5118 fn file_text_preview_prefers_preview_field() {
5119 let file = serde_json::json!({
5120 "preview": "line 1\nline 2",
5121 "preview_highlight": "ignored"
5122 });
5123 assert_eq!(
5124 SlackChannel::file_text_preview(&file).as_deref(),
5125 Some("line 1\nline 2")
5126 );
5127 }
5128
5129 #[test]
5130 fn is_image_file_detects_mimetype_or_extension() {
5131 let from_mime = serde_json::json!({"mimetype":"image/png"});
5132 let from_ext = serde_json::json!({"name":"photo.jpeg"});
5133 let non_image = serde_json::json!({"name":"notes.txt","mimetype":"text/plain"});
5134 assert!(SlackChannel::is_image_file(&from_mime));
5135 assert!(SlackChannel::is_image_file(&from_ext));
5136 assert!(!SlackChannel::is_image_file(&non_image));
5137 }
5138
5139 #[test]
5140 fn detect_image_mime_rejects_non_image_bytes_despite_image_metadata() {
5141 let file = serde_json::json!({"mimetype":"image/png","name":"wow.png"});
5142 let html_bytes = b"<!DOCTYPE html><html><body>login required</body></html>";
5143 assert_eq!(
5144 SlackChannel::detect_image_mime(
5145 Some("image/png"),
5146 &file,
5147 html_bytes,
5148 "https://files.slack.com/files-pri/T1/F2/wow.png"
5149 ),
5150 None
5151 );
5152 }
5153
5154 #[test]
5155 fn detect_image_mime_prefers_magic_bytes_over_misleading_metadata() {
5156 let file = serde_json::json!({"mimetype":"image/bmp","name":"wow.png"});
5157 let png_header = [0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n'];
5158 assert_eq!(
5159 SlackChannel::detect_image_mime(
5160 Some("image/bmp"),
5161 &file,
5162 &png_header,
5163 "https://files.slack.com/files-pri/T1/F2/wow.png"
5164 )
5165 .as_deref(),
5166 Some("image/png")
5167 );
5168 }
5169
5170 #[test]
5171 fn is_probably_text_file_accepts_snippet_mode() {
5172 let snippet = serde_json::json!({"mode":"snippet"});
5173 let plain = serde_json::json!({"mimetype":"text/plain"});
5174 let binary = serde_json::json!({"mimetype":"application/octet-stream","name":"a.bin"});
5175 assert!(SlackChannel::is_probably_text_file(&snippet));
5176 assert!(SlackChannel::is_probably_text_file(&plain));
5177 assert!(!SlackChannel::is_probably_text_file(&binary));
5178 }
5179
5180 #[test]
5181 fn sanitize_attachment_filename_strips_path_traversal() {
5182 assert_eq!(
5183 SlackChannel::sanitize_attachment_filename("../../secret.txt").as_deref(),
5184 Some("secret.txt")
5185 );
5186 assert_eq!(
5187 SlackChannel::sanitize_attachment_filename(r"..\\..\\secret.txt").as_deref(),
5188 Some("..__..__secret.txt")
5189 );
5190 assert!(SlackChannel::sanitize_attachment_filename("..").is_none());
5191 }
5192
5193 #[test]
5194 fn ensure_file_extension_appends_when_missing() {
5195 assert_eq!(
5196 SlackChannel::ensure_file_extension("capture", "png"),
5197 "capture.png"
5198 );
5199 assert_eq!(
5200 SlackChannel::ensure_file_extension("capture.jpeg", "png"),
5201 "capture.jpeg"
5202 );
5203 }
5204
5205 #[test]
5206 fn is_allowed_slack_media_hostname_matches_suffixes() {
5207 assert!(SlackChannel::is_allowed_slack_media_hostname(
5208 "files.slack.com"
5209 ));
5210 assert!(SlackChannel::is_allowed_slack_media_hostname(
5211 "downloads.slack-edge.com"
5212 ));
5213 assert!(SlackChannel::is_allowed_slack_media_hostname(
5214 "foo.slack-files.com"
5215 ));
5216 assert!(!SlackChannel::is_allowed_slack_media_hostname(
5217 "example.com"
5218 ));
5219 }
5220
5221 #[test]
5222 fn validate_slack_private_file_url_rejects_invalid_schemes_and_hosts() {
5223 assert!(
5224 SlackChannel::validate_slack_private_file_url("https://files.slack.com/f").is_some()
5225 );
5226 assert!(
5227 SlackChannel::validate_slack_private_file_url("http://files.slack.com/f").is_none()
5228 );
5229 assert!(SlackChannel::validate_slack_private_file_url("https://example.com/f").is_none());
5230 assert!(SlackChannel::validate_slack_private_file_url("not a url").is_none());
5231 }
5232
5233 #[test]
5234 fn resolve_https_redirect_target_enforces_https() {
5235 let base = reqwest::Url::parse("https://files.slack.com/path/file").unwrap();
5236 let ok = SlackChannel::resolve_https_redirect_target(&base, "/next");
5237 assert_eq!(
5238 ok.as_ref().map(|url| url.as_str()),
5239 Some("https://files.slack.com/next")
5240 );
5241
5242 let rejected =
5243 SlackChannel::resolve_https_redirect_target(&base, "http://files.slack.com/next");
5244 assert!(rejected.is_none());
5245
5246 let rejected_host =
5247 SlackChannel::resolve_https_redirect_target(&base, "https://example.com/next");
5248 assert!(rejected_host.is_none());
5249 }
5250
5251 #[test]
5252 fn redact_slack_url_hides_query_fragments() {
5253 let url = reqwest::Url::parse(
5254 "https://files.slack.com/files-pri/T1/F2/wow.png?token=secret#fragment",
5255 )
5256 .unwrap();
5257 let redacted = SlackChannel::redact_slack_url(&url);
5258 assert_eq!(redacted, "files.slack.com/.../wow.png");
5259 assert!(!redacted.contains('?'));
5260 assert!(!redacted.contains("token="));
5261 assert!(!redacted.contains('#'));
5262 }
5263
5264 #[test]
5265 fn redact_redirect_location_keeps_only_relative_tail() {
5266 let redacted =
5267 SlackChannel::redact_redirect_location("/files-pri/T1/F2/wow.png?token=secret");
5268 assert_eq!(redacted, "relative/.../wow.png");
5269 assert!(!redacted.contains("token="));
5270 }
5271
5272 #[tokio::test]
5273 async fn resolve_workspace_attachment_output_path_stays_in_workspace() {
5274 let workspace = tempfile::tempdir().unwrap();
5275 let output =
5276 SlackChannel::resolve_workspace_attachment_output_path(workspace.path(), "capture.png")
5277 .await
5278 .unwrap();
5279
5280 let root = tokio::fs::canonicalize(workspace.path()).await.unwrap();
5281 assert!(output.starts_with(&root));
5282 assert!(output.to_string_lossy().contains("slack_files"));
5283 }
5284
5285 #[tokio::test]
5286 async fn persist_image_attachment_writes_bytes_without_part_leftovers() {
5287 let workspace = tempfile::tempdir().unwrap();
5288 let channel = SlackChannel::new(
5289 "xoxb-fake".into(),
5290 None,
5291 vec![],
5292 "slack_test_alias",
5293 Arc::new(Vec::new),
5294 )
5295 .with_workspace_dir(workspace.path().to_path_buf());
5296 let file = serde_json::json!({"id":"F1","name":"wow.png"});
5297 let png_bytes = vec![
5298 0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n', 0x00, 0x01, 0x02, 0x03,
5299 ];
5300
5301 let output = channel
5302 .persist_image_attachment(&file, "wow.png", "image/png", &png_bytes)
5303 .await
5304 .expect("attachment path");
5305 let stored = tokio::fs::read(&output).await.expect("stored bytes");
5306 assert_eq!(stored, png_bytes);
5307
5308 let save_dir = output.parent().unwrap();
5309 let mut entries = tokio::fs::read_dir(save_dir).await.unwrap();
5310 while let Some(entry) = entries.next_entry().await.unwrap() {
5311 let name = entry.file_name().to_string_lossy().to_string();
5312 assert!(
5313 !name.ends_with(".part"),
5314 "unexpected temp artifact left behind: {name}"
5315 );
5316 }
5317 }
5318
5319 #[test]
5320 fn evaluate_health_enforces_socket_mode_probe_when_enabled() {
5321 assert!(!SlackChannel::evaluate_health(false, false, true));
5322 assert!(!SlackChannel::evaluate_health(false, true, true));
5323 assert!(SlackChannel::evaluate_health(true, false, false));
5324 assert!(SlackChannel::evaluate_health(true, false, true));
5325 assert!(!SlackChannel::evaluate_health(true, true, false));
5326 assert!(SlackChannel::evaluate_health(true, true, true));
5327 }
5328
5329 #[test]
5330 fn slack_api_call_succeeded_requires_ok_true_in_body() {
5331 assert!(!SlackChannel::slack_api_call_succeeded(
5332 reqwest::StatusCode::OK,
5333 r#"{"ok":false,"error":"invalid_auth"}"#
5334 ));
5335 }
5336
5337 #[test]
5338 fn slack_api_call_succeeded_accepts_ok_true() {
5339 assert!(SlackChannel::slack_api_call_succeeded(
5340 reqwest::StatusCode::OK,
5341 r#"{"ok":true}"#
5342 ));
5343 }
5344
5345 #[test]
5346 fn specific_allowlist_filters() {
5347 let ch = SlackChannel::new(
5348 "xoxb-fake".into(),
5349 None,
5350 vec![],
5351 "slack_test_alias",
5352 Arc::new(|| vec!["U111".into(), "U222".into()]),
5353 );
5354 assert!(ch.is_user_allowed("U111"));
5355 assert!(ch.is_user_allowed("U222"));
5356 assert!(!ch.is_user_allowed("U333"));
5357 }
5358
5359 #[test]
5360 fn allowlist_exact_match_not_substring() {
5361 let ch = SlackChannel::new(
5362 "xoxb-fake".into(),
5363 None,
5364 vec![],
5365 "slack_test_alias",
5366 Arc::new(|| vec!["U111".into()]),
5367 );
5368 assert!(!ch.is_user_allowed("U1111"));
5369 assert!(!ch.is_user_allowed("U11"));
5370 }
5371
5372 #[test]
5373 fn allowlist_empty_user_id() {
5374 let ch = SlackChannel::new(
5375 "xoxb-fake".into(),
5376 None,
5377 vec![],
5378 "slack_test_alias",
5379 Arc::new(|| vec!["U111".into()]),
5380 );
5381 assert!(!ch.is_user_allowed(""));
5382 }
5383
5384 #[test]
5385 fn allowlist_case_sensitive() {
5386 let ch = SlackChannel::new(
5387 "xoxb-fake".into(),
5388 None,
5389 vec![],
5390 "slack_test_alias",
5391 Arc::new(|| vec!["U111".into()]),
5392 );
5393 assert!(ch.is_user_allowed("U111"));
5394 assert!(!ch.is_user_allowed("u111"));
5395 }
5396
5397 #[test]
5398 fn allowlist_wildcard_and_specific() {
5399 let ch = SlackChannel::new(
5400 "xoxb-fake".into(),
5401 None,
5402 vec![],
5403 "slack_test_alias",
5404 Arc::new(|| vec!["U111".into(), "*".into()]),
5405 );
5406 assert!(ch.is_user_allowed("U111"));
5407 assert!(ch.is_user_allowed("anyone"));
5408 }
5409
5410 #[test]
5413 fn slack_message_id_format_includes_channel_and_ts() {
5414 let ts = "1234567890.123456";
5416 let channel_id = "C12345";
5417 let expected_id = format!("slack_{channel_id}_{ts}");
5418 assert_eq!(expected_id, "slack_C12345_1234567890.123456");
5419 }
5420
5421 #[test]
5422 fn slack_message_id_is_deterministic() {
5423 let ts = "1234567890.123456";
5425 let channel_id = "C12345";
5426 let id1 = format!("slack_{channel_id}_{ts}");
5427 let id2 = format!("slack_{channel_id}_{ts}");
5428 assert_eq!(id1, id2);
5429 }
5430
5431 #[test]
5432 fn slack_message_id_different_ts_different_id() {
5433 let channel_id = "C12345";
5435 let id1 = format!("slack_{channel_id}_1234567890.123456");
5436 let id2 = format!("slack_{channel_id}_1234567890.123457");
5437 assert_ne!(id1, id2);
5438 }
5439
5440 #[test]
5441 fn slack_message_id_different_channel_different_id() {
5442 let ts = "1234567890.123456";
5444 let id1 = format!("slack_C12345_{ts}");
5445 let id2 = format!("slack_C67890_{ts}");
5446 assert_ne!(id1, id2);
5447 }
5448
5449 #[test]
5450 fn slack_message_id_no_uuid_randomness() {
5451 let ts = "1234567890.123456";
5453 let channel_id = "C12345";
5454 let id = format!("slack_{channel_id}_{ts}");
5455 assert!(!id.contains('-')); assert!(id.starts_with("slack_"));
5457 }
5458
5459 #[test]
5460 fn inbound_thread_ts_prefers_explicit_thread_ts() {
5461 let msg = serde_json::json!({
5462 "ts": "123.002",
5463 "thread_ts": "123.001"
5464 });
5465
5466 let thread_ts = SlackChannel::inbound_thread_ts(&msg, "123.002");
5467 assert_eq!(thread_ts.as_deref(), Some("123.001"));
5468 }
5469
5470 #[test]
5471 fn inbound_thread_ts_falls_back_to_ts() {
5472 let msg = serde_json::json!({
5473 "ts": "123.001"
5474 });
5475
5476 let thread_ts = SlackChannel::inbound_thread_ts(&msg, "123.001");
5477 assert_eq!(thread_ts.as_deref(), Some("123.001"));
5478 }
5479
5480 #[test]
5481 fn inbound_thread_ts_none_when_ts_missing() {
5482 let msg = serde_json::json!({});
5483
5484 let thread_ts = SlackChannel::inbound_thread_ts(&msg, "");
5485 assert_eq!(thread_ts, None);
5486 }
5487
5488 #[test]
5489 fn ensure_poll_cursor_bootstraps_new_channel() {
5490 let mut cursors = HashMap::new();
5491 let now_ts = "1700000000.123456";
5492
5493 let cursor = SlackChannel::ensure_poll_cursor(&mut cursors, "C123", now_ts);
5494 assert_eq!(cursor, now_ts);
5495 assert_eq!(cursors.get("C123").map(String::as_str), Some(now_ts));
5496 }
5497
5498 #[test]
5499 fn ensure_poll_cursor_keeps_existing_cursor() {
5500 let mut cursors = HashMap::from([("C123".to_string(), "1700000000.000001".to_string())]);
5501 let cursor = SlackChannel::ensure_poll_cursor(&mut cursors, "C123", "9999999999.999999");
5502
5503 assert_eq!(cursor, "1700000000.000001");
5504 assert_eq!(
5505 cursors.get("C123").map(String::as_str),
5506 Some("1700000000.000001")
5507 );
5508 }
5509
5510 #[test]
5511 fn parse_retry_after_value_accepts_integer_seconds() {
5512 assert_eq!(SlackChannel::parse_retry_after_value("30"), Some(30));
5513 }
5514
5515 #[test]
5516 fn parse_retry_after_value_accepts_decimal_seconds() {
5517 assert_eq!(SlackChannel::parse_retry_after_value("2.9"), Some(2));
5518 }
5519
5520 #[test]
5521 fn parse_retry_after_value_rejects_non_numeric_values() {
5522 assert_eq!(SlackChannel::parse_retry_after_value("later"), None);
5523 assert_eq!(SlackChannel::parse_retry_after_value(""), None);
5524 }
5525
5526 #[test]
5527 fn parse_retry_after_secs_reads_header_value() {
5528 let mut headers = HeaderMap::new();
5529 headers.insert(reqwest::header::RETRY_AFTER, "45".parse().unwrap());
5530 assert_eq!(SlackChannel::parse_retry_after_secs(&headers), Some(45));
5531 }
5532
5533 #[test]
5534 fn compute_retry_delay_applies_backoff_and_jitter_with_cap() {
5535 let delay = SlackChannel::compute_retry_delay(30, 3, 250);
5536 assert_eq!(delay, Duration::from_secs(120) + Duration::from_millis(250));
5537 }
5538
5539 #[test]
5542 fn extract_active_threads_finds_thread_parents_with_replies() {
5543 let messages = vec![
5544 serde_json::json!({
5545 "ts": "100.000",
5546 "thread_ts": "100.000",
5547 "reply_count": 3,
5548 "latest_reply": "103.000"
5549 }),
5550 serde_json::json!({
5551 "ts": "200.000",
5552 "text": "no thread"
5553 }),
5554 serde_json::json!({
5555 "ts": "300.000",
5556 "thread_ts": "300.000",
5557 "reply_count": 0
5558 }),
5559 ];
5560
5561 let threads = SlackChannel::extract_active_threads(&messages);
5562 assert_eq!(threads.len(), 1);
5563 assert_eq!(threads[0].0, "100.000");
5564 assert_eq!(threads[0].1, "103.000");
5565 }
5566
5567 #[test]
5568 fn extract_active_threads_ignores_reply_messages() {
5569 let messages = vec![serde_json::json!({
5571 "ts": "101.000",
5572 "thread_ts": "100.000",
5573 "text": "reply in thread"
5574 })];
5575
5576 let threads = SlackChannel::extract_active_threads(&messages);
5577 assert!(threads.is_empty());
5578 }
5579
5580 #[test]
5581 fn extract_active_threads_uses_thread_ts_as_fallback_latest_reply() {
5582 let messages = vec![serde_json::json!({
5583 "ts": "100.000",
5584 "thread_ts": "100.000",
5585 "reply_count": 1
5586 })];
5587
5588 let threads = SlackChannel::extract_active_threads(&messages);
5589 assert_eq!(threads.len(), 1);
5590 assert_eq!(threads[0].1, "100.000");
5591 }
5592
5593 #[test]
5594 fn evict_stale_threads_removes_expired_entries() {
5595 let mut threads: HashMap<String, (String, String, Instant)> = HashMap::new();
5596 let old = Instant::now()
5597 .checked_sub(Duration::from_secs(SLACK_POLL_THREAD_EXPIRE_SECS + 1))
5598 .unwrap();
5599 threads.insert(
5600 "old.thread".to_string(),
5601 ("C1".to_string(), "old.reply".to_string(), old),
5602 );
5603 threads.insert(
5604 "new.thread".to_string(),
5605 ("C1".to_string(), "new.reply".to_string(), Instant::now()),
5606 );
5607
5608 SlackChannel::evict_stale_threads(&mut threads, Instant::now());
5609 assert_eq!(threads.len(), 1);
5610 assert!(threads.contains_key("new.thread"));
5611 }
5612
5613 #[test]
5614 fn evict_stale_threads_trims_excess_by_oldest_key() {
5615 let mut threads: HashMap<String, (String, String, Instant)> = HashMap::new();
5616 let now = Instant::now();
5617 for i in 0..(SLACK_POLL_ACTIVE_THREAD_MAX + 5) {
5618 threads.insert(
5619 format!("{i:06}.000"),
5620 ("C1".to_string(), format!("{i:06}.001"), now),
5621 );
5622 }
5623
5624 SlackChannel::evict_stale_threads(&mut threads, now);
5625 assert_eq!(threads.len(), SLACK_POLL_ACTIVE_THREAD_MAX);
5626 }
5627
5628 #[test]
5629 fn is_supported_message_subtype_rejects_message_replied() {
5630 assert!(!SlackChannel::is_supported_message_subtype(Some(
5632 "message_replied"
5633 )));
5634 }
5635
5636 #[test]
5637 fn extract_slack_ts_from_standard_message_id() {
5638 assert_eq!(
5639 extract_slack_ts("slack_C1234567890_1234567890.123456"),
5640 "1234567890.123456"
5641 );
5642 }
5643
5644 #[test]
5645 fn extract_slack_ts_from_raw_ts_passthrough() {
5646 assert_eq!(extract_slack_ts("1234567890.123456"), "1234567890.123456");
5647 }
5648
5649 #[test]
5650 fn extract_slack_ts_from_unprefixed_id() {
5651 assert_eq!(extract_slack_ts("unknown_format"), "unknown_format");
5652 }
5653
5654 #[test]
5655 fn unicode_emoji_maps_to_slack_eyes() {
5656 assert_eq!(unicode_emoji_to_slack_name("\u{1F440}"), "eyes");
5657 }
5658
5659 #[test]
5660 fn unicode_emoji_maps_to_slack_check_mark() {
5661 assert_eq!(unicode_emoji_to_slack_name("\u{2705}"), "white_check_mark");
5662 }
5663
5664 #[test]
5665 fn unicode_emoji_maps_to_slack_warning() {
5666 assert_eq!(unicode_emoji_to_slack_name("\u{26A0}\u{FE0F}"), "warning");
5667 assert_eq!(unicode_emoji_to_slack_name("\u{26A0}"), "warning");
5668 }
5669
5670 #[test]
5671 fn unicode_emoji_colon_wrapped_passthrough() {
5672 assert_eq!(
5673 unicode_emoji_to_slack_name(":custom_emoji:"),
5674 "custom_emoji"
5675 );
5676 }
5677
5678 #[test]
5679 fn inbound_thread_ts_on_thread_reply_uses_thread_ts() {
5680 let reply = serde_json::json!({
5681 "ts": "200.000",
5682 "thread_ts": "100.000",
5683 "text": "a thread reply"
5684 });
5685 let thread_ts = SlackChannel::inbound_thread_ts(&reply, "200.000");
5686 assert_eq!(thread_ts.as_deref(), Some("100.000"));
5687 }
5688
5689 #[test]
5690 fn inbound_thread_ts_genuine_only_returns_none_for_top_level() {
5691 let msg = serde_json::json!({
5693 "ts": "100.000",
5694 "text": "hello"
5695 });
5696 assert_eq!(SlackChannel::inbound_thread_ts_genuine_only(&msg), None);
5697 }
5698
5699 #[test]
5700 fn inbound_thread_ts_genuine_only_returns_thread_ts_for_replies() {
5701 let reply = serde_json::json!({
5703 "ts": "200.000",
5704 "thread_ts": "100.000",
5705 "text": "a reply"
5706 });
5707 assert_eq!(
5708 SlackChannel::inbound_thread_ts_genuine_only(&reply).as_deref(),
5709 Some("100.000")
5710 );
5711 }
5712
5713 #[test]
5714 fn session_key_stable_without_thread_replies() {
5715 use zeroclaw_api::channel::ChannelMessage;
5718
5719 let make_msg = |ts: &str| ChannelMessage {
5720 id: format!("slack_C123_{ts}"),
5721 sender: "U_alice".into(),
5722 reply_target: "C123".into(),
5723 content: "text".into(),
5724 channel: "slack".into(),
5725 channel_alias: None,
5726 timestamp: 0,
5727 thread_ts: None, interruption_scope_id: None,
5729 attachments: vec![],
5730 subject: None,
5731 };
5732
5733 let msg1 = make_msg("100.000");
5734 let msg2 = make_msg("200.000");
5735
5736 let key1 = crate::util::conversation_history_key(&msg1);
5737 let key2 = crate::util::conversation_history_key(&msg2);
5738 assert_eq!(key1, key2, "session key should be stable across messages");
5739 }
5740
5741 #[test]
5742 fn session_key_varies_with_thread_replies() {
5743 use zeroclaw_api::channel::ChannelMessage;
5746
5747 let make_msg = |ts: &str| ChannelMessage {
5748 id: format!("slack_C123_{ts}"),
5749 sender: "U_alice".into(),
5750 reply_target: "C123".into(),
5751 content: "text".into(),
5752 channel: "slack".into(),
5753 channel_alias: None,
5754 timestamp: 0,
5755 thread_ts: Some(ts.to_string()), interruption_scope_id: None,
5757 attachments: vec![],
5758 subject: None,
5759 };
5760
5761 let msg1 = make_msg("100.000");
5762 let msg2 = make_msg("200.000");
5763
5764 let key1 = crate::util::conversation_history_key(&msg1);
5765 let key2 = crate::util::conversation_history_key(&msg2);
5766 assert_ne!(key1, key2, "session key should differ per thread");
5767 }
5768
5769 #[test]
5770 fn slack_send_uses_markdown_blocks() {
5771 let msg = SendMessage::new("**bold** and _italic_", "C123");
5772 let ch = SlackChannel::new(
5773 "xoxb-fake".into(),
5774 None,
5775 vec![],
5776 "slack_test_alias",
5777 Arc::new(Vec::new),
5778 );
5779
5780 let mut body = serde_json::json!({
5782 "channel": msg.recipient,
5783 "text": msg.content
5784 });
5785 if msg.content.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
5786 body["blocks"] = serde_json::json!([{
5787 "type": "markdown",
5788 "text": msg.content
5789 }]);
5790 }
5791
5792 let blocks = body["blocks"]
5794 .as_array()
5795 .expect("blocks should be an array");
5796 assert_eq!(blocks.len(), 1);
5797 assert_eq!(blocks[0]["type"], "markdown");
5798 assert_eq!(blocks[0]["text"], msg.content);
5799 assert_eq!(body["text"], msg.content);
5801 let _ = ch.name();
5803 }
5804
5805 #[test]
5806 fn slack_send_skips_markdown_blocks_for_long_content() {
5807 let long_content = "x".repeat(SLACK_MARKDOWN_BLOCK_MAX_CHARS + 1);
5808 let msg = SendMessage::new(long_content.clone(), "C123");
5809
5810 let mut body = serde_json::json!({
5811 "channel": msg.recipient,
5812 "text": msg.content
5813 });
5814 if msg.content.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
5815 body["blocks"] = serde_json::json!([{
5816 "type": "markdown",
5817 "text": msg.content
5818 }]);
5819 }
5820
5821 assert!(
5822 body.get("blocks").is_none(),
5823 "blocks should not be set for oversized content"
5824 );
5825 }
5826
5827 #[tokio::test]
5828 async fn start_typing_requires_thread_context() {
5829 let ch = SlackChannel::new(
5830 "xoxb-fake".into(),
5831 None,
5832 vec![],
5833 "slack_test_alias",
5834 Arc::new(Vec::new),
5835 );
5836 let result = ch.start_typing("C999").await;
5838 assert!(
5839 result.is_ok(),
5840 "start_typing should succeed as no-op without thread context"
5841 );
5842 }
5843
5844 #[test]
5845 fn assistant_thread_tracking() {
5846 let ch = SlackChannel::new(
5847 "xoxb-fake".into(),
5848 None,
5849 vec![],
5850 "slack_test_alias",
5851 Arc::new(Vec::new),
5852 );
5853
5854 {
5856 let map = ch.active_assistant_thread.lock().unwrap();
5857 assert!(map.is_empty());
5858 }
5859
5860 {
5862 let mut map = ch.active_assistant_thread.lock().unwrap();
5863 map.insert("C123".to_string(), "1741234567.000100".to_string());
5864 }
5865
5866 {
5868 let map = ch.active_assistant_thread.lock().unwrap();
5869 assert_eq!(map.get("C123"), Some(&"1741234567.000100".to_string()),);
5870 assert_eq!(map.get("C999"), None);
5871 }
5872 }
5873
5874 #[test]
5875 fn pending_approvals_map_is_initially_empty() {
5876 let ch = SlackChannel::new(
5877 "xoxb-token".into(),
5878 None,
5879 vec![],
5880 "slack_test_alias",
5881 Arc::new(Vec::new),
5882 );
5883 let map = ch.pending_approvals.try_lock().unwrap();
5884 assert!(map.is_empty());
5885 }
5886
5887 #[test]
5888 fn approval_timeout_defaults_to_300_and_is_overridable() {
5889 let ch = SlackChannel::new(
5890 "xoxb-token".into(),
5891 None,
5892 vec![],
5893 "slack_test_alias",
5894 Arc::new(Vec::new),
5895 );
5896 assert_eq!(ch.approval_timeout_secs, 300);
5897 let ch = ch.with_approval_timeout_secs(90);
5898 assert_eq!(ch.approval_timeout_secs, 90);
5899 }
5900
5901 #[tokio::test]
5902 async fn pending_approval_oneshot_delivers_response() {
5903 let ch = SlackChannel::new(
5904 "xoxb-token".into(),
5905 None,
5906 vec![],
5907 "slack_test_alias",
5908 Arc::new(Vec::new),
5909 );
5910 let (tx, rx) = oneshot::channel();
5911 ch.pending_approvals
5912 .lock()
5913 .await
5914 .insert("abc123".to_string(), tx);
5915 let sender = ch.pending_approvals.lock().await.remove("abc123").unwrap();
5916 sender.send(ChannelApprovalResponse::AlwaysApprove).unwrap();
5917 assert_eq!(rx.await.unwrap(), ChannelApprovalResponse::AlwaysApprove);
5918 }
5919
5920 #[test]
5921 fn approval_block_action_parsed_correctly() {
5922 let envelope = serde_json::json!({
5923 "payload": {
5924 "type": "block_actions",
5925 "actions": [{ "action_id": "approval_abc123_approve" }]
5926 }
5927 });
5928 let (token, response) = SlackChannel::try_parse_approval_block_action(&envelope).unwrap();
5929 assert_eq!(token, "abc123");
5930 assert_eq!(response, ChannelApprovalResponse::Approve);
5931 }
5932
5933 #[test]
5934 fn approval_block_action_deny_parsed() {
5935 let envelope = serde_json::json!({
5936 "payload": {
5937 "type": "block_actions",
5938 "actions": [{ "action_id": "approval_xz9q1w_deny" }]
5939 }
5940 });
5941 let (token, response) = SlackChannel::try_parse_approval_block_action(&envelope).unwrap();
5942 assert_eq!(token, "xz9q1w");
5943 assert_eq!(response, ChannelApprovalResponse::Deny);
5944 }
5945
5946 #[test]
5947 fn approval_block_action_non_approval_returns_none() {
5948 let envelope = serde_json::json!({
5949 "payload": {
5950 "type": "block_actions",
5951 "actions": [{ "action_id": "zeroclaw_config_provider", "selected_option": { "value": "anthropic" } }]
5952 }
5953 });
5954 assert!(SlackChannel::try_parse_approval_block_action(&envelope).is_none());
5955 }
5956}