Skip to main content

zeroclaw_channels/
wati.rs

1use async_trait::async_trait;
2use std::sync::Arc;
3use uuid::Uuid;
4use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
5
6const MAX_WATI_AUDIO_BYTES: u64 = 25 * 1024 * 1024;
7
8/// WATI WhatsApp Business API channel.
9///
10/// This channel operates in webhook mode (push-based) rather than polling.
11/// Messages are received via the gateway's `/wati` webhook endpoint.
12/// The `listen` method here is a keepalive placeholder; actual message handling
13/// happens in the gateway when WATI sends webhook events.
14pub struct WatiChannel {
15    api_token: String,
16    api_url: String,
17    tenant_id: Option<String>,
18    /// The alias key under `[channels.wati.<alias>]` this handle is
19    /// bound to. Used to scope peer-group writes and resolver lookups.
20    alias: String,
21    /// Resolves inbound external peers from canonical state at message-time.
22    /// No cache (see AGENTS.md "ABSOLUTE RULE — SINGLE SOURCE OF TRUTH").
23    peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
24    client: reqwest::Client,
25    transcription_manager: Option<std::sync::Arc<super::transcription::TranscriptionManager>>,
26}
27
28impl WatiChannel {
29    pub fn new(
30        api_token: String,
31        api_url: String,
32        tenant_id: Option<String>,
33        alias: impl Into<String>,
34        peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
35    ) -> Self {
36        Self::new_with_proxy(api_token, api_url, tenant_id, alias, peer_resolver, None)
37    }
38
39    pub fn new_with_proxy(
40        api_token: String,
41        api_url: String,
42        tenant_id: Option<String>,
43        alias: impl Into<String>,
44        peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
45        proxy_url: Option<String>,
46    ) -> Self {
47        Self {
48            api_token,
49            api_url,
50            tenant_id,
51            alias: alias.into(),
52            peer_resolver,
53            client: zeroclaw_config::schema::build_channel_proxy_client(
54                "channel.wati",
55                proxy_url.as_deref(),
56            ),
57            transcription_manager: None,
58        }
59    }
60
61    /// Return the alias under `[channels.wati.<alias>]` that this
62    /// channel handle is bound to.
63    pub fn alias(&self) -> &str {
64        &self.alias
65    }
66
67    pub fn with_transcription(
68        mut self,
69        config: zeroclaw_config::schema::TranscriptionConfig,
70    ) -> Self {
71        if !config.enabled {
72            return self;
73        }
74        match super::transcription::TranscriptionManager::new(&config) {
75            Ok(m) => {
76                // Per-agent `transcription_provider` routes through the
77                // orchestrator's resolved-runtime path. For the
78                // `try_transcribe_audio` direct path (gateway WS handler /
79                // channel-side ingest), bind to the sole registered provider
80                // when only one is configured so the single-provider case
81                // dispatches without an agent context. Multi-provider setups
82                // still require explicit `agent.<alias>.transcription_provider`
83                // routing through the orchestrator.
84                let names = m.available_providers();
85                let m = if names.len() == 1 {
86                    let only = names[0].to_string();
87                    m.with_agent_transcription_provider(only)
88                } else {
89                    m
90                };
91                self.transcription_manager = Some(std::sync::Arc::new(m));
92            }
93            Err(e) => {
94                ::zeroclaw_log::record!(
95                    WARN,
96                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
97                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
98                        .with_attrs(::serde_json::json!({"e": e.to_string()})),
99                    "transcription manager init failed, voice transcription disabled"
100                );
101            }
102        }
103        self
104    }
105
106    /// Check if a phone number is allowed (E.164 format: +1234567890).
107    fn is_number_allowed(&self, phone: &str) -> bool {
108        let peers = (self.peer_resolver)();
109        crate::allowlist::is_user_allowed(&peers, phone, crate::allowlist::Match::Sensitive)
110    }
111
112    /// Extract and normalize the sender phone number from a WATI webhook payload.
113    /// Returns `None` if the sender is absent, empty, or not in the allowlist.
114    fn extract_sender(&self, payload: &serde_json::Value) -> Option<String> {
115        // Extract waId (sender phone number)
116        let wa_id = payload
117            .get("waId")
118            .or_else(|| payload.get("wa_id"))
119            .or_else(|| payload.get("from"))
120            .and_then(|v| v.as_str())
121            .unwrap_or("")
122            .trim();
123
124        if wa_id.is_empty() {
125            return None;
126        }
127
128        // Normalize phone to E.164 format
129        let normalized_phone = if wa_id.starts_with('+') {
130            wa_id.to_string()
131        } else {
132            format!("+{wa_id}")
133        };
134
135        // Check allowlist
136        if !self.is_number_allowed(&normalized_phone) {
137            ::zeroclaw_log::record!(
138                WARN,
139                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
140                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
141                    .with_attrs(::serde_json::json!({"normalized_phone": normalized_phone})),
142                "ignoring message from unauthorized sender: . Add to channels.wati.allowed_numbers in config.toml, or run `zeroclaw onboard channels` to configure interactively."
143            );
144            return None;
145        }
146
147        Some(normalized_phone)
148    }
149
150    /// Build the target field for the WATI API, prefixing with tenant_id if set.
151    fn build_target(&self, phone: &str) -> String {
152        // Strip leading '+' — WATI expects bare digits
153        let bare = phone.strip_prefix('+').unwrap_or(phone);
154        if let Some(ref tid) = self.tenant_id {
155            if bare.starts_with(&format!("{tid}:")) {
156                bare.to_string()
157            } else {
158                format!("{tid}:{bare}")
159            }
160        } else {
161            bare.to_string()
162        }
163    }
164
165    /// Extract and normalize a timestamp from a WATI webhook payload.
166    ///
167    /// Handles unix seconds, unix milliseconds (divided by 1000), and ISO 8601
168    /// strings. Falls back to the current system time if parsing fails.
169    fn extract_timestamp(payload: &serde_json::Value) -> u64 {
170        payload
171            .get("timestamp")
172            .or_else(|| payload.get("created"))
173            .map(|t| {
174                if let Some(secs) = t.as_u64() {
175                    if secs > 10_000_000_000 {
176                        secs / 1000
177                    } else {
178                        secs
179                    }
180                } else if let Some(s) = t.as_str() {
181                    chrono::DateTime::parse_from_rfc3339(s)
182                        .ok()
183                        .map(|dt| dt.timestamp().cast_unsigned())
184                        .unwrap_or_else(|| {
185                            std::time::SystemTime::now()
186                                .duration_since(std::time::UNIX_EPOCH)
187                                .unwrap_or_default()
188                                .as_secs()
189                        })
190                } else {
191                    std::time::SystemTime::now()
192                        .duration_since(std::time::UNIX_EPOCH)
193                        .unwrap_or_default()
194                        .as_secs()
195                }
196            })
197            .unwrap_or_else(|| {
198                std::time::SystemTime::now()
199                    .duration_since(std::time::UNIX_EPOCH)
200                    .unwrap_or_default()
201                    .as_secs()
202            })
203    }
204
205    /// Parse an incoming webhook payload from WATI and extract messages.
206    ///
207    /// WATI's webhook payloads have variable field names depending on the API
208    /// version and configuration, so we try multiple paths for each field.
209    pub fn parse_webhook_payload(&self, payload: &serde_json::Value) -> Vec<ChannelMessage> {
210        let mut messages = Vec::new();
211
212        // Extract text — try multiple field paths
213        let text = payload
214            .get("text")
215            .and_then(|v| v.as_str())
216            .or_else(|| {
217                payload
218                    .get("message")
219                    .and_then(|m| m.get("text").or_else(|| m.get("body")))
220                    .and_then(|v| v.as_str())
221            })
222            .unwrap_or("")
223            .trim();
224
225        if text.is_empty() {
226            return messages;
227        }
228
229        // Check fromMe — skip outgoing messages
230        let from_me = payload
231            .get("fromMe")
232            .or_else(|| payload.get("from_me"))
233            .or_else(|| payload.get("owner"))
234            .and_then(|v| v.as_bool())
235            .unwrap_or(false);
236
237        if from_me {
238            ::zeroclaw_log::record!(
239                DEBUG,
240                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
241                "skipping fromMe message"
242            );
243            return messages;
244        }
245
246        // Extract and validate sender
247        let Some(normalized_phone) = self.extract_sender(payload) else {
248            return messages;
249        };
250
251        let timestamp = Self::extract_timestamp(payload);
252        messages.push(ChannelMessage {
253            id: Uuid::new_v4().to_string(),
254            reply_target: normalized_phone.clone(),
255            sender: normalized_phone,
256            content: text.to_string(),
257            channel: "wati".to_string(),
258            channel_alias: Some(self.alias.clone()),
259            timestamp,
260            thread_ts: None,
261            interruption_scope_id: None,
262            attachments: vec![],
263            subject: None,
264        });
265
266        messages
267    }
268
269    /// Extract host from URL string.
270    fn extract_host(url_str: &str) -> Option<String> {
271        reqwest::Url::parse(url_str)
272            .ok()?
273            .host_str()
274            .map(|h| h.to_ascii_lowercase())
275    }
276
277    /// Attempt to download and transcribe an audio message from a WATI webhook payload.
278    ///
279    /// Returns `Some(transcript)` if transcription succeeds, `None` otherwise.
280    /// Called by the gateway after detecting `type == "audio"` or `type == "voice"`.
281    pub async fn try_transcribe_audio(&self, payload: &serde_json::Value) -> Option<String> {
282        let manager = self.transcription_manager.as_deref()?;
283
284        let media_url = payload
285            .get("mediaUrl")
286            .or_else(|| payload.get("media_url"))
287            .and_then(|v| v.as_str())?;
288
289        // Validate media_url host matches api_url to prevent SSRF
290        let api_host = Self::extract_host(&self.api_url);
291        let media_host = Self::extract_host(media_url);
292        match (api_host, media_host) {
293            (Some(ref expected), Some(ref actual)) if actual == expected => {}
294            _ => {
295                ::zeroclaw_log::record!(
296                    WARN,
297                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
298                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
299                        .with_attrs(::serde_json::json!({"media_url": media_url})),
300                    "blocked media URL with unexpected host"
301                );
302                return None;
303            }
304        }
305
306        // Check fromMe early to avoid downloading media for outgoing messages
307        let from_me = payload
308            .get("fromMe")
309            .or_else(|| payload.get("from_me"))
310            .or_else(|| payload.get("owner"))
311            .and_then(|v| v.as_bool())
312            .unwrap_or(false);
313        if from_me {
314            ::zeroclaw_log::record!(
315                DEBUG,
316                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
317                "skipping fromMe audio before download"
318            );
319            return None;
320        }
321
322        let msg_type = payload
323            .get("type")
324            .and_then(|v| v.as_str())
325            .unwrap_or("audio");
326
327        let file_name = match msg_type {
328            "voice" => "voice.ogg",
329            _ => "audio.ogg",
330        };
331
332        let mut resp = match self
333            .client
334            .get(media_url)
335            .bearer_auth(&self.api_token)
336            .send()
337            .await
338        {
339            Ok(r) => r,
340            Err(e) => {
341                ::zeroclaw_log::record!(
342                    WARN,
343                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
344                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
345                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
346                    "media download request failed"
347                );
348                return None;
349            }
350        };
351
352        if !resp.status().is_success() {
353            ::zeroclaw_log::record!(
354                WARN,
355                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
356                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
357                &format!("media download failed: {}", resp.status())
358            );
359            return None;
360        }
361
362        let mut audio_bytes = Vec::new();
363        while let Some(chunk) = resp.chunk().await.ok().flatten() {
364            audio_bytes.extend_from_slice(&chunk);
365            if audio_bytes.len() as u64 > MAX_WATI_AUDIO_BYTES {
366                ::zeroclaw_log::record!(
367                    WARN,
368                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
369                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
370                    &format!("audio download exceeds {} byte limit", MAX_WATI_AUDIO_BYTES)
371                );
372                return None;
373            }
374        }
375
376        match manager.transcribe(&audio_bytes, file_name).await {
377            Ok(transcript) => Some(transcript),
378            Err(e) => {
379                ::zeroclaw_log::record!(
380                    WARN,
381                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
382                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
383                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
384                    "transcription failed"
385                );
386                None
387            }
388        }
389    }
390
391    /// Build a ChannelMessage from an audio transcript.
392    ///
393    /// This helper reuses the same sender extraction and timestamp logic as
394    /// `parse_webhook_payload()` but substitutes the transcript as the message content.
395    pub fn parse_audio_as_message(
396        &self,
397        payload: &serde_json::Value,
398        transcript: String,
399    ) -> Vec<ChannelMessage> {
400        let mut messages = Vec::new();
401
402        // Check fromMe — skip outgoing messages
403        let from_me = payload
404            .get("fromMe")
405            .or_else(|| payload.get("from_me"))
406            .or_else(|| payload.get("owner"))
407            .and_then(|v| v.as_bool())
408            .unwrap_or(false);
409
410        if from_me {
411            ::zeroclaw_log::record!(
412                DEBUG,
413                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
414                "skipping fromMe audio message"
415            );
416            return messages;
417        }
418
419        if transcript.trim().is_empty() {
420            ::zeroclaw_log::record!(
421                DEBUG,
422                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
423                "skipping empty audio transcript"
424            );
425            return messages;
426        }
427
428        // Extract and validate sender
429        let Some(normalized_phone) = self.extract_sender(payload) else {
430            return messages;
431        };
432
433        let timestamp = Self::extract_timestamp(payload);
434        messages.push(ChannelMessage {
435            id: Uuid::new_v4().to_string(),
436            reply_target: normalized_phone.clone(),
437            sender: normalized_phone,
438            content: transcript,
439            channel: "wati".to_string(),
440            channel_alias: Some(self.alias.clone()),
441            timestamp,
442            thread_ts: None,
443            interruption_scope_id: None,
444            attachments: vec![],
445            subject: None,
446        });
447
448        messages
449    }
450}
451
452impl ::zeroclaw_api::attribution::Attributable for WatiChannel {
453    fn role(&self) -> ::zeroclaw_api::attribution::Role {
454        ::zeroclaw_api::attribution::Role::Channel(::zeroclaw_api::attribution::ChannelKind::Wati)
455    }
456    fn alias(&self) -> &str {
457        &self.alias
458    }
459}
460
461#[async_trait]
462impl Channel for WatiChannel {
463    fn name(&self) -> &str {
464        "wati"
465    }
466
467    async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
468        let target = self.build_target(&message.recipient);
469
470        let body = serde_json::json!({
471            "target": target,
472            "text": message.content
473        });
474
475        let url = format!("{}/api/ext/v3/conversations/messages/text", self.api_url);
476
477        let resp = self
478            .client
479            .post(&url)
480            .bearer_auth(&self.api_token)
481            .header("Content-Type", "application/json")
482            .json(&body)
483            .send()
484            .await?;
485
486        if !resp.status().is_success() {
487            let status = resp.status();
488            let error_body = resp.text().await.unwrap_or_default();
489            ::zeroclaw_log::record!(ERROR, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail).with_outcome(::zeroclaw_log::EventOutcome::Failure).with_attrs(::serde_json::json!({"status": status.to_string(), "error_body": error_body})), "send failed:");
490            anyhow::bail!("WATI API error: {status}");
491        }
492
493        Ok(())
494    }
495
496    async fn listen(&self, _tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
497        // WATI uses webhooks (push-based), not polling.
498        // Messages are received via the gateway's /wati endpoint.
499        ::zeroclaw_log::record!(
500            INFO,
501            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
502            "WATI channel active (webhook mode). \
503            Configure WATI webhook to POST to your gateway's /wati endpoint."
504        );
505
506        // Keep the task alive — it will be cancelled when the channel shuts down
507        loop {
508            tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
509        }
510    }
511
512    async fn health_check(&self) -> bool {
513        let url = format!("{}/api/ext/v3/contacts/count", self.api_url);
514
515        self.client
516            .get(&url)
517            .bearer_auth(&self.api_token)
518            .send()
519            .await
520            .map(|r| r.status().is_success())
521            .unwrap_or(false)
522    }
523
524    async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
525        // WATI API does not support typing indicators
526        Ok(())
527    }
528
529    async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
530        // WATI API does not support typing indicators
531        Ok(())
532    }
533}
534
535#[cfg(test)]
536mod tests {
537    use super::*;
538
539    #[test]
540    fn wati_channel_name() {
541        let ch = WatiChannel::new(
542            "test-token".into(),
543            "https://live-mt-server.wati.io".into(),
544            None,
545            "wati_test_alias",
546            Arc::new(|| vec!["+1234567890".into()]),
547        );
548        assert_eq!(ch.name(), "wati");
549    }
550
551    #[test]
552    fn wati_number_allowed_exact() {
553        let ch = WatiChannel::new(
554            "test-token".into(),
555            "https://live-mt-server.wati.io".into(),
556            None,
557            "wati_test_alias",
558            Arc::new(|| vec!["+1234567890".into()]),
559        );
560        assert!(ch.is_number_allowed("+1234567890"));
561        assert!(!ch.is_number_allowed("+9876543210"));
562    }
563
564    #[test]
565    fn wati_number_allowed_wildcard() {
566        let ch = WatiChannel::new(
567            "test-token".into(),
568            "https://live-mt-server.wati.io".into(),
569            None,
570            "wati_test_alias",
571            Arc::new(|| vec!["*".into()]),
572        );
573        assert!(ch.is_number_allowed("+1234567890"));
574        assert!(ch.is_number_allowed("+9999999999"));
575    }
576
577    #[test]
578    fn wati_number_allowed_empty() {
579        let ch = WatiChannel::new(
580            "tok".into(),
581            "https://live-mt-server.wati.io".into(),
582            None,
583            "wati_test_alias",
584            Arc::new(Vec::new),
585        );
586        assert!(!ch.is_number_allowed("+1234567890"));
587    }
588
589    #[test]
590    fn wati_build_target_with_tenant() {
591        let ch = WatiChannel::new(
592            "tok".into(),
593            "https://live-mt-server.wati.io".into(),
594            Some("tenant1".into()),
595            "wati_test_alias",
596            Arc::new(Vec::new),
597        );
598        assert_eq!(ch.build_target("+1234567890"), "tenant1:1234567890");
599    }
600
601    #[test]
602    fn wati_build_target_without_tenant() {
603        let ch = WatiChannel::new(
604            "test-token".into(),
605            "https://live-mt-server.wati.io".into(),
606            None,
607            "wati_test_alias",
608            Arc::new(|| vec!["+1234567890".into()]),
609        );
610        assert_eq!(ch.build_target("+1234567890"), "1234567890");
611    }
612
613    #[test]
614    fn wati_build_target_already_prefixed() {
615        let ch = WatiChannel::new(
616            "tok".into(),
617            "https://live-mt-server.wati.io".into(),
618            Some("tenant1".into()),
619            "wati_test_alias",
620            Arc::new(Vec::new),
621        );
622        // If the phone already has the tenant prefix, don't double it
623        assert_eq!(ch.build_target("tenant1:1234567890"), "tenant1:1234567890");
624    }
625
626    #[test]
627    fn wati_parse_valid_message() {
628        let ch = WatiChannel::new(
629            "test-token".into(),
630            "https://live-mt-server.wati.io".into(),
631            None,
632            "wati_test_alias",
633            Arc::new(|| vec!["+1234567890".into()]),
634        );
635        let payload = serde_json::json!({
636            "text": "Hello from WATI!",
637            "waId": "1234567890",
638            "fromMe": false,
639            "timestamp": 1_705_320_000_u64
640        });
641
642        let msgs = ch.parse_webhook_payload(&payload);
643        assert_eq!(msgs.len(), 1);
644        assert_eq!(msgs[0].sender, "+1234567890");
645        assert_eq!(msgs[0].content, "Hello from WATI!");
646        assert_eq!(msgs[0].channel, "wati");
647        assert_eq!(msgs[0].reply_target, "+1234567890");
648        assert_eq!(msgs[0].timestamp, 1_705_320_000);
649    }
650
651    #[test]
652    fn wati_parse_skip_from_me() {
653        let ch = WatiChannel::new(
654            "test-token".into(),
655            "https://live-mt-server.wati.io".into(),
656            None,
657            "wati_test_alias",
658            Arc::new(|| vec!["*".into()]),
659        );
660        let payload = serde_json::json!({
661            "text": "My own message",
662            "waId": "1234567890",
663            "fromMe": true
664        });
665
666        let msgs = ch.parse_webhook_payload(&payload);
667        assert!(msgs.is_empty(), "fromMe messages should be skipped");
668    }
669
670    #[test]
671    fn wati_parse_skip_no_text() {
672        let ch = WatiChannel::new(
673            "test-token".into(),
674            "https://live-mt-server.wati.io".into(),
675            None,
676            "wati_test_alias",
677            Arc::new(|| vec!["*".into()]),
678        );
679        let payload = serde_json::json!({
680            "waId": "1234567890",
681            "fromMe": false
682        });
683
684        let msgs = ch.parse_webhook_payload(&payload);
685        assert!(msgs.is_empty(), "Messages without text should be skipped");
686    }
687
688    #[test]
689    fn wati_parse_alternative_field_names() {
690        let ch = WatiChannel::new(
691            "test-token".into(),
692            "https://live-mt-server.wati.io".into(),
693            None,
694            "wati_test_alias",
695            Arc::new(|| vec!["*".into()]),
696        );
697
698        // wa_id instead of waId, message.body instead of text
699        let payload = serde_json::json!({
700            "message": { "body": "Alt field test" },
701            "wa_id": "1234567890",
702            "from_me": false,
703            "timestamp": 1_705_320_000_u64
704        });
705
706        let msgs = ch.parse_webhook_payload(&payload);
707        assert_eq!(msgs.len(), 1);
708        assert_eq!(msgs[0].content, "Alt field test");
709        assert_eq!(msgs[0].sender, "+1234567890");
710    }
711
712    #[test]
713    fn wati_parse_timestamp_seconds() {
714        let ch = WatiChannel::new(
715            "test-token".into(),
716            "https://live-mt-server.wati.io".into(),
717            None,
718            "wati_test_alias",
719            Arc::new(|| vec!["*".into()]),
720        );
721        let payload = serde_json::json!({
722            "text": "Test",
723            "waId": "1234567890",
724            "timestamp": 1_705_320_000_u64
725        });
726
727        let msgs = ch.parse_webhook_payload(&payload);
728        assert_eq!(msgs[0].timestamp, 1_705_320_000);
729    }
730
731    #[test]
732    fn wati_parse_timestamp_milliseconds() {
733        let ch = WatiChannel::new(
734            "test-token".into(),
735            "https://live-mt-server.wati.io".into(),
736            None,
737            "wati_test_alias",
738            Arc::new(|| vec!["*".into()]),
739        );
740        let payload = serde_json::json!({
741            "text": "Test",
742            "waId": "1234567890",
743            "timestamp": 1_705_320_000_000_u64
744        });
745
746        let msgs = ch.parse_webhook_payload(&payload);
747        assert_eq!(msgs[0].timestamp, 1_705_320_000);
748    }
749
750    #[test]
751    fn wati_parse_timestamp_iso() {
752        let ch = WatiChannel::new(
753            "test-token".into(),
754            "https://live-mt-server.wati.io".into(),
755            None,
756            "wati_test_alias",
757            Arc::new(|| vec!["*".into()]),
758        );
759        let payload = serde_json::json!({
760            "text": "Test",
761            "waId": "1234567890",
762            "timestamp": "2025-01-15T12:00:00Z"
763        });
764
765        let msgs = ch.parse_webhook_payload(&payload);
766        assert_eq!(msgs[0].timestamp, 1_736_942_400);
767    }
768
769    #[test]
770    fn wati_parse_normalizes_phone() {
771        let ch = WatiChannel::new(
772            "tok".into(),
773            "https://live-mt-server.wati.io".into(),
774            None,
775            "wati_test_alias",
776            Arc::new(|| vec!["+1234567890".into()]),
777        );
778
779        // Phone without + prefix
780        let payload = serde_json::json!({
781            "text": "Hi",
782            "waId": "1234567890",
783            "fromMe": false
784        });
785
786        let msgs = ch.parse_webhook_payload(&payload);
787        assert_eq!(msgs.len(), 1);
788        assert_eq!(msgs[0].sender, "+1234567890");
789    }
790
791    #[test]
792    fn wati_parse_empty_payload() {
793        let ch = WatiChannel::new(
794            "test-token".into(),
795            "https://live-mt-server.wati.io".into(),
796            None,
797            "wati_test_alias",
798            Arc::new(|| vec!["+1234567890".into()]),
799        );
800        let payload = serde_json::json!({});
801        let msgs = ch.parse_webhook_payload(&payload);
802        assert!(msgs.is_empty());
803    }
804
805    #[test]
806    fn wati_parse_from_field_fallback() {
807        let ch = WatiChannel::new(
808            "test-token".into(),
809            "https://live-mt-server.wati.io".into(),
810            None,
811            "wati_test_alias",
812            Arc::new(|| vec!["*".into()]),
813        );
814        // Uses "from" instead of "waId"
815        let payload = serde_json::json!({
816            "text": "Fallback test",
817            "from": "1234567890",
818            "fromMe": false
819        });
820
821        let msgs = ch.parse_webhook_payload(&payload);
822        assert_eq!(msgs.len(), 1);
823        assert_eq!(msgs[0].sender, "+1234567890");
824    }
825
826    #[test]
827    fn wati_parse_message_text_fallback() {
828        let ch = WatiChannel::new(
829            "test-token".into(),
830            "https://live-mt-server.wati.io".into(),
831            None,
832            "wati_test_alias",
833            Arc::new(|| vec!["*".into()]),
834        );
835        // Uses "message.text" instead of top-level "text"
836        let payload = serde_json::json!({
837            "message": { "text": "Nested text" },
838            "waId": "1234567890",
839            "fromMe": false
840        });
841
842        let msgs = ch.parse_webhook_payload(&payload);
843        assert_eq!(msgs.len(), 1);
844        assert_eq!(msgs[0].content, "Nested text");
845    }
846
847    #[test]
848    fn wati_parse_owner_field_as_from_me() {
849        let ch = WatiChannel::new(
850            "test-token".into(),
851            "https://live-mt-server.wati.io".into(),
852            None,
853            "wati_test_alias",
854            Arc::new(|| vec!["*".into()]),
855        );
856        // Uses "owner" field as fromMe indicator
857        let payload = serde_json::json!({
858            "text": "Test",
859            "waId": "1234567890",
860            "owner": true
861        });
862
863        let msgs = ch.parse_webhook_payload(&payload);
864        assert!(msgs.is_empty(), "owner=true messages should be skipped");
865    }
866
867    #[test]
868    fn wati_manager_none_when_not_configured() {
869        let ch = WatiChannel::new(
870            "test-token".into(),
871            "https://live-mt-server.wati.io".into(),
872            None,
873            "wati_test_alias",
874            Arc::new(|| vec!["+1234567890".into()]),
875        );
876        assert!(ch.transcription_manager.is_none());
877    }
878
879    #[test]
880    fn wati_manager_some_when_valid_config() {
881        let config = zeroclaw_config::schema::TranscriptionConfig {
882            enabled: true,
883            api_key: Some("test-key".to_string()),
884            api_url: "https://api.groq.com/openai/v1/audio/transcriptions".to_string(),
885            model: "distil-whisper-large-v3-en".to_string(),
886            language: None,
887            initial_prompt: None,
888            max_duration_secs: 120,
889            openai: None,
890            deepgram: None,
891            assemblyai: None,
892            google: None,
893            local_whisper: None,
894            transcribe_non_ptt_audio: false,
895        };
896
897        let ch = WatiChannel::new(
898            "test-token".into(),
899            "https://live-mt-server.wati.io".into(),
900            None,
901            "wati_test_alias",
902            Arc::new(|| vec!["+1234567890".into()]),
903        )
904        .with_transcription(config);
905
906        assert!(ch.transcription_manager.is_some());
907    }
908
909    #[test]
910    fn wati_manager_none_and_warn_on_init_failure() {
911        let config = zeroclaw_config::schema::TranscriptionConfig {
912            enabled: true,
913            api_key: Some(String::new()),
914            api_url: "https://api.groq.com/openai/v1/audio/transcriptions".to_string(),
915            model: "distil-whisper-large-v3-en".to_string(),
916            language: None,
917            initial_prompt: None,
918            max_duration_secs: 120,
919            openai: None,
920            deepgram: None,
921            assemblyai: None,
922            google: None,
923            local_whisper: None,
924            transcribe_non_ptt_audio: false,
925        };
926
927        let ch = WatiChannel::new(
928            "test-token".into(),
929            "https://live-mt-server.wati.io".into(),
930            None,
931            "wati_test_alias",
932            Arc::new(|| vec!["+1234567890".into()]),
933        )
934        .with_transcription(config);
935
936        assert!(ch.transcription_manager.is_none());
937    }
938
939    #[tokio::test]
940    async fn wati_try_transcribe_returns_none_when_manager_none() {
941        let ch = WatiChannel::new(
942            "test-token".into(),
943            "https://live-mt-server.wati.io".into(),
944            None,
945            "wati_test_alias",
946            Arc::new(|| vec!["+1234567890".into()]),
947        );
948        let payload = serde_json::json!({
949            "type": "audio",
950            "mediaUrl": "https://example.com/audio.ogg",
951            "waId": "1234567890"
952        });
953
954        let result = ch.try_transcribe_audio(&payload).await;
955        assert!(result.is_none());
956    }
957
958    #[tokio::test]
959    async fn wati_try_transcribe_returns_none_when_no_media_url() {
960        let config = zeroclaw_config::schema::TranscriptionConfig {
961            enabled: false,
962            api_key: Some("test-key".to_string()),
963            api_url: "https://api.groq.com/openai/v1/audio/transcriptions".to_string(),
964            model: "distil-whisper-large-v3-en".to_string(),
965            language: None,
966            initial_prompt: None,
967            max_duration_secs: 120,
968            openai: None,
969            deepgram: None,
970            assemblyai: None,
971            google: None,
972            local_whisper: None,
973            transcribe_non_ptt_audio: false,
974        };
975
976        let ch = WatiChannel::new(
977            "test-token".into(),
978            "https://live-mt-server.wati.io".into(),
979            None,
980            "wati_test_alias",
981            Arc::new(|| vec!["+1234567890".into()]),
982        )
983        .with_transcription(config);
984
985        let payload = serde_json::json!({
986            "type": "audio",
987            "waId": "1234567890"
988        });
989
990        let result = ch.try_transcribe_audio(&payload).await;
991        assert!(result.is_none());
992    }
993
994    #[test]
995    fn wati_filename_voice_type() {
996        let _ch = WatiChannel::new(
997            "test-token".into(),
998            "https://live-mt-server.wati.io".into(),
999            None,
1000            "wati_test_alias",
1001            Arc::new(|| vec!["+1234567890".into()]),
1002        );
1003        let payload = serde_json::json!({
1004            "type": "voice",
1005            "mediaUrl": "https://example.com/media/123",
1006            "waId": "1234567890"
1007        });
1008
1009        let msg_type = payload
1010            .get("type")
1011            .and_then(|v| v.as_str())
1012            .unwrap_or("audio");
1013        let file_name = match msg_type {
1014            "voice" => "voice.ogg",
1015            _ => "audio.ogg",
1016        };
1017
1018        assert_eq!(file_name, "voice.ogg");
1019    }
1020
1021    #[test]
1022    fn wati_filename_audio_type() {
1023        let _ch = WatiChannel::new(
1024            "test-token".into(),
1025            "https://live-mt-server.wati.io".into(),
1026            None,
1027            "wati_test_alias",
1028            Arc::new(|| vec!["+1234567890".into()]),
1029        );
1030        let payload = serde_json::json!({
1031            "type": "audio",
1032            "mediaUrl": "https://example.com/media/123",
1033            "waId": "1234567890"
1034        });
1035
1036        let msg_type = payload
1037            .get("type")
1038            .and_then(|v| v.as_str())
1039            .unwrap_or("audio");
1040        let file_name = match msg_type {
1041            "voice" => "voice.ogg",
1042            _ => "audio.ogg",
1043        };
1044
1045        assert_eq!(file_name, "audio.ogg");
1046    }
1047
1048    #[test]
1049    fn wati_extract_sender_absent_returns_none() {
1050        let ch = WatiChannel::new(
1051            "test-token".into(),
1052            "https://live-mt-server.wati.io".into(),
1053            None,
1054            "wati_test_alias",
1055            Arc::new(|| vec!["+1234567890".into()]),
1056        );
1057        let payload = serde_json::json!({
1058            "type": "audio"
1059        });
1060
1061        let result = ch.extract_sender(&payload);
1062        assert!(result.is_none());
1063    }
1064
1065    #[test]
1066    fn wati_extract_sender_not_in_allowlist_returns_none() {
1067        let ch = WatiChannel::new(
1068            "test-token".into(),
1069            "https://live-mt-server.wati.io".into(),
1070            None,
1071            "wati_test_alias",
1072            Arc::new(|| vec!["+1234567890".into()]),
1073        );
1074        let payload = serde_json::json!({
1075            "waId": "9999999999"
1076        });
1077
1078        let result = ch.extract_sender(&payload);
1079        assert!(result.is_none());
1080    }
1081
1082    #[test]
1083    fn wati_parse_audio_as_message_uses_transcript_as_content() {
1084        let ch = WatiChannel::new(
1085            "test-token".into(),
1086            "https://live-mt-server.wati.io".into(),
1087            None,
1088            "wati_test_alias",
1089            Arc::new(|| vec!["*".into()]),
1090        );
1091        let payload = serde_json::json!({
1092            "type": "audio",
1093            "waId": "1234567890",
1094            "fromMe": false,
1095            "timestamp": 1_705_320_000_u64
1096        });
1097
1098        let transcript = "This is a test transcript.".to_string();
1099        let msgs = ch.parse_audio_as_message(&payload, transcript.clone());
1100
1101        assert_eq!(msgs.len(), 1);
1102        assert_eq!(msgs[0].content, transcript);
1103        assert_eq!(msgs[0].sender, "+1234567890");
1104        assert_eq!(msgs[0].channel, "wati");
1105        assert_eq!(msgs[0].timestamp, 1_705_320_000);
1106    }
1107
1108    #[tokio::test]
1109    async fn wati_transcribes_audio_via_local_whisper() {
1110        use wiremock::matchers::{method, path};
1111        use wiremock::{Mock, MockServer, ResponseTemplate};
1112
1113        let media_server = MockServer::start().await;
1114        let whisper_server = MockServer::start().await;
1115
1116        let audio_bytes = b"fake-audio-data";
1117        Mock::given(method("GET"))
1118            .and(path("/media/123"))
1119            .respond_with(ResponseTemplate::new(200).set_body_bytes(audio_bytes))
1120            .mount(&media_server)
1121            .await;
1122
1123        let transcript = "Transcribed text from local whisper.";
1124        Mock::given(method("POST"))
1125            .and(path("/v1/transcribe"))
1126            .respond_with(
1127                ResponseTemplate::new(200).set_body_json(serde_json::json!({"text": transcript})),
1128            )
1129            .mount(&whisper_server)
1130            .await;
1131
1132        let config = zeroclaw_config::schema::TranscriptionConfig {
1133            enabled: true,
1134            api_key: None,
1135            api_url: "https://api.groq.com/openai/v1/audio/transcriptions".to_string(),
1136            model: "whisper-1".to_string(),
1137            language: None,
1138            initial_prompt: None,
1139            max_duration_secs: 120,
1140            openai: None,
1141            deepgram: None,
1142            assemblyai: None,
1143            google: None,
1144            local_whisper: Some(zeroclaw_config::schema::LocalWhisperConfig {
1145                url: format!("{}/v1/transcribe", whisper_server.uri()),
1146                bearer_token: Some("test-token".to_string()),
1147                max_audio_bytes: 25 * 1024 * 1024,
1148                timeout_secs: 300,
1149            }),
1150            transcribe_non_ptt_audio: false,
1151        };
1152
1153        let ch = WatiChannel::new(
1154            "test-token".into(),
1155            media_server.uri(),
1156            None,
1157            "wati_test_alias",
1158            Arc::new(|| vec!["+1234567890".into()]),
1159        )
1160        .with_transcription(config);
1161
1162        let payload = serde_json::json!({
1163            "type": "audio",
1164            "mediaUrl": format!("{}/media/123", media_server.uri()),
1165            "waId": "1234567890"
1166        });
1167
1168        let result = ch.try_transcribe_audio(&payload).await;
1169        assert_eq!(result, Some(transcript.to_string()));
1170    }
1171
1172    #[tokio::test]
1173    async fn wati_try_transcribe_returns_none_on_media_download_failure() {
1174        use wiremock::matchers::{method, path};
1175        use wiremock::{Mock, MockServer, ResponseTemplate};
1176
1177        let media_server = MockServer::start().await;
1178
1179        Mock::given(method("GET"))
1180            .and(path("/media/123"))
1181            .respond_with(ResponseTemplate::new(404))
1182            .mount(&media_server)
1183            .await;
1184
1185        let config = zeroclaw_config::schema::TranscriptionConfig {
1186            enabled: true,
1187            api_key: None,
1188            api_url: "https://api.groq.com/openai/v1/audio/transcriptions".to_string(),
1189            model: "whisper-1".to_string(),
1190            language: None,
1191            initial_prompt: None,
1192            max_duration_secs: 120,
1193            openai: None,
1194            deepgram: None,
1195            assemblyai: None,
1196            google: None,
1197            local_whisper: Some(zeroclaw_config::schema::LocalWhisperConfig {
1198                url: "http://localhost:8000/v1/transcribe".to_string(),
1199                bearer_token: Some("test-token".to_string()),
1200                max_audio_bytes: 25 * 1024 * 1024,
1201                timeout_secs: 300,
1202            }),
1203            transcribe_non_ptt_audio: false,
1204        };
1205
1206        let ch = WatiChannel::new(
1207            "test-token".into(),
1208            media_server.uri(),
1209            None,
1210            "wati_test_alias",
1211            Arc::new(|| vec!["+1234567890".into()]),
1212        )
1213        .with_transcription(config);
1214
1215        let payload = serde_json::json!({
1216            "type": "audio",
1217            "mediaUrl": format!("{}/media/123", media_server.uri()),
1218            "waId": "1234567890"
1219        });
1220
1221        let result = ch.try_transcribe_audio(&payload).await;
1222        assert!(result.is_none());
1223    }
1224
1225    #[test]
1226    fn extract_host_uses_url_parser() {
1227        assert_eq!(
1228            WatiChannel::extract_host("https://live-mt-server.wati.io/media/123"),
1229            Some("live-mt-server.wati.io".to_string())
1230        );
1231        // URL with userinfo@ — proper parser extracts the real host, not the
1232        // attacker-controlled host that naive string splitting would produce
1233        assert_eq!(
1234            WatiChannel::extract_host("https://live-mt-server.wati.io@evil.com/media/123"),
1235            Some("evil.com".to_string())
1236        );
1237    }
1238
1239    #[tokio::test]
1240    async fn wati_try_transcribe_blocks_host_mismatch() {
1241        let config = zeroclaw_config::schema::TranscriptionConfig {
1242            enabled: true,
1243            local_whisper: Some(zeroclaw_config::schema::LocalWhisperConfig {
1244                url: "http://localhost:8001/v1/transcribe".into(),
1245                bearer_token: Some("test-token".into()),
1246                max_audio_bytes: 25 * 1024 * 1024,
1247                timeout_secs: 120,
1248            }),
1249            ..Default::default()
1250        };
1251
1252        let ch = WatiChannel::new(
1253            "test-token".into(),
1254            "https://live-mt-server.wati.io".into(),
1255            None,
1256            "wati_test_alias",
1257            Arc::new(|| vec!["+1234567890".into()]),
1258        )
1259        .with_transcription(config);
1260
1261        let payload = serde_json::json!({
1262            "type": "audio",
1263            "mediaUrl": "https://evil.com/media/123",
1264            "waId": "1234567890"
1265        });
1266
1267        let result = ch.try_transcribe_audio(&payload).await;
1268        assert!(result.is_none());
1269    }
1270}