1use async_trait::async_trait;
2use std::sync::Arc;
3use uuid::Uuid;
4use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
5
6const MAX_WATI_AUDIO_BYTES: u64 = 25 * 1024 * 1024;
7
8pub struct WatiChannel {
15 api_token: String,
16 api_url: String,
17 tenant_id: Option<String>,
18 alias: String,
21 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
24 client: reqwest::Client,
25 transcription_manager: Option<std::sync::Arc<super::transcription::TranscriptionManager>>,
26}
27
28impl WatiChannel {
29 pub fn new(
30 api_token: String,
31 api_url: String,
32 tenant_id: Option<String>,
33 alias: impl Into<String>,
34 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
35 ) -> Self {
36 Self::new_with_proxy(api_token, api_url, tenant_id, alias, peer_resolver, None)
37 }
38
39 pub fn new_with_proxy(
40 api_token: String,
41 api_url: String,
42 tenant_id: Option<String>,
43 alias: impl Into<String>,
44 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
45 proxy_url: Option<String>,
46 ) -> Self {
47 Self {
48 api_token,
49 api_url,
50 tenant_id,
51 alias: alias.into(),
52 peer_resolver,
53 client: zeroclaw_config::schema::build_channel_proxy_client(
54 "channel.wati",
55 proxy_url.as_deref(),
56 ),
57 transcription_manager: None,
58 }
59 }
60
61 pub fn alias(&self) -> &str {
64 &self.alias
65 }
66
67 pub fn with_transcription(
68 mut self,
69 config: zeroclaw_config::schema::TranscriptionConfig,
70 ) -> Self {
71 if !config.enabled {
72 return self;
73 }
74 match super::transcription::TranscriptionManager::new(&config) {
75 Ok(m) => {
76 let names = m.available_providers();
85 let m = if names.len() == 1 {
86 let only = names[0].to_string();
87 m.with_agent_transcription_provider(only)
88 } else {
89 m
90 };
91 self.transcription_manager = Some(std::sync::Arc::new(m));
92 }
93 Err(e) => {
94 ::zeroclaw_log::record!(
95 WARN,
96 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
97 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
98 .with_attrs(::serde_json::json!({"e": e.to_string()})),
99 "transcription manager init failed, voice transcription disabled"
100 );
101 }
102 }
103 self
104 }
105
106 fn is_number_allowed(&self, phone: &str) -> bool {
108 let peers = (self.peer_resolver)();
109 crate::allowlist::is_user_allowed(&peers, phone, crate::allowlist::Match::Sensitive)
110 }
111
112 fn extract_sender(&self, payload: &serde_json::Value) -> Option<String> {
115 let wa_id = payload
117 .get("waId")
118 .or_else(|| payload.get("wa_id"))
119 .or_else(|| payload.get("from"))
120 .and_then(|v| v.as_str())
121 .unwrap_or("")
122 .trim();
123
124 if wa_id.is_empty() {
125 return None;
126 }
127
128 let normalized_phone = if wa_id.starts_with('+') {
130 wa_id.to_string()
131 } else {
132 format!("+{wa_id}")
133 };
134
135 if !self.is_number_allowed(&normalized_phone) {
137 ::zeroclaw_log::record!(
138 WARN,
139 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
140 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
141 .with_attrs(::serde_json::json!({"normalized_phone": normalized_phone})),
142 "ignoring message from unauthorized sender: . Add to channels.wati.allowed_numbers in config.toml, or run `zeroclaw onboard channels` to configure interactively."
143 );
144 return None;
145 }
146
147 Some(normalized_phone)
148 }
149
150 fn build_target(&self, phone: &str) -> String {
152 let bare = phone.strip_prefix('+').unwrap_or(phone);
154 if let Some(ref tid) = self.tenant_id {
155 if bare.starts_with(&format!("{tid}:")) {
156 bare.to_string()
157 } else {
158 format!("{tid}:{bare}")
159 }
160 } else {
161 bare.to_string()
162 }
163 }
164
165 fn extract_timestamp(payload: &serde_json::Value) -> u64 {
170 payload
171 .get("timestamp")
172 .or_else(|| payload.get("created"))
173 .map(|t| {
174 if let Some(secs) = t.as_u64() {
175 if secs > 10_000_000_000 {
176 secs / 1000
177 } else {
178 secs
179 }
180 } else if let Some(s) = t.as_str() {
181 chrono::DateTime::parse_from_rfc3339(s)
182 .ok()
183 .map(|dt| dt.timestamp().cast_unsigned())
184 .unwrap_or_else(|| {
185 std::time::SystemTime::now()
186 .duration_since(std::time::UNIX_EPOCH)
187 .unwrap_or_default()
188 .as_secs()
189 })
190 } else {
191 std::time::SystemTime::now()
192 .duration_since(std::time::UNIX_EPOCH)
193 .unwrap_or_default()
194 .as_secs()
195 }
196 })
197 .unwrap_or_else(|| {
198 std::time::SystemTime::now()
199 .duration_since(std::time::UNIX_EPOCH)
200 .unwrap_or_default()
201 .as_secs()
202 })
203 }
204
205 pub fn parse_webhook_payload(&self, payload: &serde_json::Value) -> Vec<ChannelMessage> {
210 let mut messages = Vec::new();
211
212 let text = payload
214 .get("text")
215 .and_then(|v| v.as_str())
216 .or_else(|| {
217 payload
218 .get("message")
219 .and_then(|m| m.get("text").or_else(|| m.get("body")))
220 .and_then(|v| v.as_str())
221 })
222 .unwrap_or("")
223 .trim();
224
225 if text.is_empty() {
226 return messages;
227 }
228
229 let from_me = payload
231 .get("fromMe")
232 .or_else(|| payload.get("from_me"))
233 .or_else(|| payload.get("owner"))
234 .and_then(|v| v.as_bool())
235 .unwrap_or(false);
236
237 if from_me {
238 ::zeroclaw_log::record!(
239 DEBUG,
240 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
241 "skipping fromMe message"
242 );
243 return messages;
244 }
245
246 let Some(normalized_phone) = self.extract_sender(payload) else {
248 return messages;
249 };
250
251 let timestamp = Self::extract_timestamp(payload);
252 messages.push(ChannelMessage {
253 id: Uuid::new_v4().to_string(),
254 reply_target: normalized_phone.clone(),
255 sender: normalized_phone,
256 content: text.to_string(),
257 channel: "wati".to_string(),
258 channel_alias: Some(self.alias.clone()),
259 timestamp,
260 thread_ts: None,
261 interruption_scope_id: None,
262 attachments: vec![],
263 subject: None,
264 });
265
266 messages
267 }
268
269 fn extract_host(url_str: &str) -> Option<String> {
271 reqwest::Url::parse(url_str)
272 .ok()?
273 .host_str()
274 .map(|h| h.to_ascii_lowercase())
275 }
276
277 pub async fn try_transcribe_audio(&self, payload: &serde_json::Value) -> Option<String> {
282 let manager = self.transcription_manager.as_deref()?;
283
284 let media_url = payload
285 .get("mediaUrl")
286 .or_else(|| payload.get("media_url"))
287 .and_then(|v| v.as_str())?;
288
289 let api_host = Self::extract_host(&self.api_url);
291 let media_host = Self::extract_host(media_url);
292 match (api_host, media_host) {
293 (Some(ref expected), Some(ref actual)) if actual == expected => {}
294 _ => {
295 ::zeroclaw_log::record!(
296 WARN,
297 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
298 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
299 .with_attrs(::serde_json::json!({"media_url": media_url})),
300 "blocked media URL with unexpected host"
301 );
302 return None;
303 }
304 }
305
306 let from_me = payload
308 .get("fromMe")
309 .or_else(|| payload.get("from_me"))
310 .or_else(|| payload.get("owner"))
311 .and_then(|v| v.as_bool())
312 .unwrap_or(false);
313 if from_me {
314 ::zeroclaw_log::record!(
315 DEBUG,
316 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
317 "skipping fromMe audio before download"
318 );
319 return None;
320 }
321
322 let msg_type = payload
323 .get("type")
324 .and_then(|v| v.as_str())
325 .unwrap_or("audio");
326
327 let file_name = match msg_type {
328 "voice" => "voice.ogg",
329 _ => "audio.ogg",
330 };
331
332 let mut resp = match self
333 .client
334 .get(media_url)
335 .bearer_auth(&self.api_token)
336 .send()
337 .await
338 {
339 Ok(r) => r,
340 Err(e) => {
341 ::zeroclaw_log::record!(
342 WARN,
343 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
344 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
345 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
346 "media download request failed"
347 );
348 return None;
349 }
350 };
351
352 if !resp.status().is_success() {
353 ::zeroclaw_log::record!(
354 WARN,
355 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
356 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
357 &format!("media download failed: {}", resp.status())
358 );
359 return None;
360 }
361
362 let mut audio_bytes = Vec::new();
363 while let Some(chunk) = resp.chunk().await.ok().flatten() {
364 audio_bytes.extend_from_slice(&chunk);
365 if audio_bytes.len() as u64 > MAX_WATI_AUDIO_BYTES {
366 ::zeroclaw_log::record!(
367 WARN,
368 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
369 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
370 &format!("audio download exceeds {} byte limit", MAX_WATI_AUDIO_BYTES)
371 );
372 return None;
373 }
374 }
375
376 match manager.transcribe(&audio_bytes, file_name).await {
377 Ok(transcript) => Some(transcript),
378 Err(e) => {
379 ::zeroclaw_log::record!(
380 WARN,
381 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
382 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
383 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
384 "transcription failed"
385 );
386 None
387 }
388 }
389 }
390
391 pub fn parse_audio_as_message(
396 &self,
397 payload: &serde_json::Value,
398 transcript: String,
399 ) -> Vec<ChannelMessage> {
400 let mut messages = Vec::new();
401
402 let from_me = payload
404 .get("fromMe")
405 .or_else(|| payload.get("from_me"))
406 .or_else(|| payload.get("owner"))
407 .and_then(|v| v.as_bool())
408 .unwrap_or(false);
409
410 if from_me {
411 ::zeroclaw_log::record!(
412 DEBUG,
413 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
414 "skipping fromMe audio message"
415 );
416 return messages;
417 }
418
419 if transcript.trim().is_empty() {
420 ::zeroclaw_log::record!(
421 DEBUG,
422 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
423 "skipping empty audio transcript"
424 );
425 return messages;
426 }
427
428 let Some(normalized_phone) = self.extract_sender(payload) else {
430 return messages;
431 };
432
433 let timestamp = Self::extract_timestamp(payload);
434 messages.push(ChannelMessage {
435 id: Uuid::new_v4().to_string(),
436 reply_target: normalized_phone.clone(),
437 sender: normalized_phone,
438 content: transcript,
439 channel: "wati".to_string(),
440 channel_alias: Some(self.alias.clone()),
441 timestamp,
442 thread_ts: None,
443 interruption_scope_id: None,
444 attachments: vec![],
445 subject: None,
446 });
447
448 messages
449 }
450}
451
452impl ::zeroclaw_api::attribution::Attributable for WatiChannel {
453 fn role(&self) -> ::zeroclaw_api::attribution::Role {
454 ::zeroclaw_api::attribution::Role::Channel(::zeroclaw_api::attribution::ChannelKind::Wati)
455 }
456 fn alias(&self) -> &str {
457 &self.alias
458 }
459}
460
461#[async_trait]
462impl Channel for WatiChannel {
463 fn name(&self) -> &str {
464 "wati"
465 }
466
467 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
468 let target = self.build_target(&message.recipient);
469
470 let body = serde_json::json!({
471 "target": target,
472 "text": message.content
473 });
474
475 let url = format!("{}/api/ext/v3/conversations/messages/text", self.api_url);
476
477 let resp = self
478 .client
479 .post(&url)
480 .bearer_auth(&self.api_token)
481 .header("Content-Type", "application/json")
482 .json(&body)
483 .send()
484 .await?;
485
486 if !resp.status().is_success() {
487 let status = resp.status();
488 let error_body = resp.text().await.unwrap_or_default();
489 ::zeroclaw_log::record!(ERROR, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail).with_outcome(::zeroclaw_log::EventOutcome::Failure).with_attrs(::serde_json::json!({"status": status.to_string(), "error_body": error_body})), "send failed:");
490 anyhow::bail!("WATI API error: {status}");
491 }
492
493 Ok(())
494 }
495
496 async fn listen(&self, _tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
497 ::zeroclaw_log::record!(
500 INFO,
501 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
502 "WATI channel active (webhook mode). \
503 Configure WATI webhook to POST to your gateway's /wati endpoint."
504 );
505
506 loop {
508 tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
509 }
510 }
511
512 async fn health_check(&self) -> bool {
513 let url = format!("{}/api/ext/v3/contacts/count", self.api_url);
514
515 self.client
516 .get(&url)
517 .bearer_auth(&self.api_token)
518 .send()
519 .await
520 .map(|r| r.status().is_success())
521 .unwrap_or(false)
522 }
523
524 async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
525 Ok(())
527 }
528
529 async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
530 Ok(())
532 }
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538
539 #[test]
540 fn wati_channel_name() {
541 let ch = WatiChannel::new(
542 "test-token".into(),
543 "https://live-mt-server.wati.io".into(),
544 None,
545 "wati_test_alias",
546 Arc::new(|| vec!["+1234567890".into()]),
547 );
548 assert_eq!(ch.name(), "wati");
549 }
550
551 #[test]
552 fn wati_number_allowed_exact() {
553 let ch = WatiChannel::new(
554 "test-token".into(),
555 "https://live-mt-server.wati.io".into(),
556 None,
557 "wati_test_alias",
558 Arc::new(|| vec!["+1234567890".into()]),
559 );
560 assert!(ch.is_number_allowed("+1234567890"));
561 assert!(!ch.is_number_allowed("+9876543210"));
562 }
563
564 #[test]
565 fn wati_number_allowed_wildcard() {
566 let ch = WatiChannel::new(
567 "test-token".into(),
568 "https://live-mt-server.wati.io".into(),
569 None,
570 "wati_test_alias",
571 Arc::new(|| vec!["*".into()]),
572 );
573 assert!(ch.is_number_allowed("+1234567890"));
574 assert!(ch.is_number_allowed("+9999999999"));
575 }
576
577 #[test]
578 fn wati_number_allowed_empty() {
579 let ch = WatiChannel::new(
580 "tok".into(),
581 "https://live-mt-server.wati.io".into(),
582 None,
583 "wati_test_alias",
584 Arc::new(Vec::new),
585 );
586 assert!(!ch.is_number_allowed("+1234567890"));
587 }
588
589 #[test]
590 fn wati_build_target_with_tenant() {
591 let ch = WatiChannel::new(
592 "tok".into(),
593 "https://live-mt-server.wati.io".into(),
594 Some("tenant1".into()),
595 "wati_test_alias",
596 Arc::new(Vec::new),
597 );
598 assert_eq!(ch.build_target("+1234567890"), "tenant1:1234567890");
599 }
600
601 #[test]
602 fn wati_build_target_without_tenant() {
603 let ch = WatiChannel::new(
604 "test-token".into(),
605 "https://live-mt-server.wati.io".into(),
606 None,
607 "wati_test_alias",
608 Arc::new(|| vec!["+1234567890".into()]),
609 );
610 assert_eq!(ch.build_target("+1234567890"), "1234567890");
611 }
612
613 #[test]
614 fn wati_build_target_already_prefixed() {
615 let ch = WatiChannel::new(
616 "tok".into(),
617 "https://live-mt-server.wati.io".into(),
618 Some("tenant1".into()),
619 "wati_test_alias",
620 Arc::new(Vec::new),
621 );
622 assert_eq!(ch.build_target("tenant1:1234567890"), "tenant1:1234567890");
624 }
625
626 #[test]
627 fn wati_parse_valid_message() {
628 let ch = WatiChannel::new(
629 "test-token".into(),
630 "https://live-mt-server.wati.io".into(),
631 None,
632 "wati_test_alias",
633 Arc::new(|| vec!["+1234567890".into()]),
634 );
635 let payload = serde_json::json!({
636 "text": "Hello from WATI!",
637 "waId": "1234567890",
638 "fromMe": false,
639 "timestamp": 1_705_320_000_u64
640 });
641
642 let msgs = ch.parse_webhook_payload(&payload);
643 assert_eq!(msgs.len(), 1);
644 assert_eq!(msgs[0].sender, "+1234567890");
645 assert_eq!(msgs[0].content, "Hello from WATI!");
646 assert_eq!(msgs[0].channel, "wati");
647 assert_eq!(msgs[0].reply_target, "+1234567890");
648 assert_eq!(msgs[0].timestamp, 1_705_320_000);
649 }
650
651 #[test]
652 fn wati_parse_skip_from_me() {
653 let ch = WatiChannel::new(
654 "test-token".into(),
655 "https://live-mt-server.wati.io".into(),
656 None,
657 "wati_test_alias",
658 Arc::new(|| vec!["*".into()]),
659 );
660 let payload = serde_json::json!({
661 "text": "My own message",
662 "waId": "1234567890",
663 "fromMe": true
664 });
665
666 let msgs = ch.parse_webhook_payload(&payload);
667 assert!(msgs.is_empty(), "fromMe messages should be skipped");
668 }
669
670 #[test]
671 fn wati_parse_skip_no_text() {
672 let ch = WatiChannel::new(
673 "test-token".into(),
674 "https://live-mt-server.wati.io".into(),
675 None,
676 "wati_test_alias",
677 Arc::new(|| vec!["*".into()]),
678 );
679 let payload = serde_json::json!({
680 "waId": "1234567890",
681 "fromMe": false
682 });
683
684 let msgs = ch.parse_webhook_payload(&payload);
685 assert!(msgs.is_empty(), "Messages without text should be skipped");
686 }
687
688 #[test]
689 fn wati_parse_alternative_field_names() {
690 let ch = WatiChannel::new(
691 "test-token".into(),
692 "https://live-mt-server.wati.io".into(),
693 None,
694 "wati_test_alias",
695 Arc::new(|| vec!["*".into()]),
696 );
697
698 let payload = serde_json::json!({
700 "message": { "body": "Alt field test" },
701 "wa_id": "1234567890",
702 "from_me": false,
703 "timestamp": 1_705_320_000_u64
704 });
705
706 let msgs = ch.parse_webhook_payload(&payload);
707 assert_eq!(msgs.len(), 1);
708 assert_eq!(msgs[0].content, "Alt field test");
709 assert_eq!(msgs[0].sender, "+1234567890");
710 }
711
712 #[test]
713 fn wati_parse_timestamp_seconds() {
714 let ch = WatiChannel::new(
715 "test-token".into(),
716 "https://live-mt-server.wati.io".into(),
717 None,
718 "wati_test_alias",
719 Arc::new(|| vec!["*".into()]),
720 );
721 let payload = serde_json::json!({
722 "text": "Test",
723 "waId": "1234567890",
724 "timestamp": 1_705_320_000_u64
725 });
726
727 let msgs = ch.parse_webhook_payload(&payload);
728 assert_eq!(msgs[0].timestamp, 1_705_320_000);
729 }
730
731 #[test]
732 fn wati_parse_timestamp_milliseconds() {
733 let ch = WatiChannel::new(
734 "test-token".into(),
735 "https://live-mt-server.wati.io".into(),
736 None,
737 "wati_test_alias",
738 Arc::new(|| vec!["*".into()]),
739 );
740 let payload = serde_json::json!({
741 "text": "Test",
742 "waId": "1234567890",
743 "timestamp": 1_705_320_000_000_u64
744 });
745
746 let msgs = ch.parse_webhook_payload(&payload);
747 assert_eq!(msgs[0].timestamp, 1_705_320_000);
748 }
749
750 #[test]
751 fn wati_parse_timestamp_iso() {
752 let ch = WatiChannel::new(
753 "test-token".into(),
754 "https://live-mt-server.wati.io".into(),
755 None,
756 "wati_test_alias",
757 Arc::new(|| vec!["*".into()]),
758 );
759 let payload = serde_json::json!({
760 "text": "Test",
761 "waId": "1234567890",
762 "timestamp": "2025-01-15T12:00:00Z"
763 });
764
765 let msgs = ch.parse_webhook_payload(&payload);
766 assert_eq!(msgs[0].timestamp, 1_736_942_400);
767 }
768
769 #[test]
770 fn wati_parse_normalizes_phone() {
771 let ch = WatiChannel::new(
772 "tok".into(),
773 "https://live-mt-server.wati.io".into(),
774 None,
775 "wati_test_alias",
776 Arc::new(|| vec!["+1234567890".into()]),
777 );
778
779 let payload = serde_json::json!({
781 "text": "Hi",
782 "waId": "1234567890",
783 "fromMe": false
784 });
785
786 let msgs = ch.parse_webhook_payload(&payload);
787 assert_eq!(msgs.len(), 1);
788 assert_eq!(msgs[0].sender, "+1234567890");
789 }
790
791 #[test]
792 fn wati_parse_empty_payload() {
793 let ch = WatiChannel::new(
794 "test-token".into(),
795 "https://live-mt-server.wati.io".into(),
796 None,
797 "wati_test_alias",
798 Arc::new(|| vec!["+1234567890".into()]),
799 );
800 let payload = serde_json::json!({});
801 let msgs = ch.parse_webhook_payload(&payload);
802 assert!(msgs.is_empty());
803 }
804
805 #[test]
806 fn wati_parse_from_field_fallback() {
807 let ch = WatiChannel::new(
808 "test-token".into(),
809 "https://live-mt-server.wati.io".into(),
810 None,
811 "wati_test_alias",
812 Arc::new(|| vec!["*".into()]),
813 );
814 let payload = serde_json::json!({
816 "text": "Fallback test",
817 "from": "1234567890",
818 "fromMe": false
819 });
820
821 let msgs = ch.parse_webhook_payload(&payload);
822 assert_eq!(msgs.len(), 1);
823 assert_eq!(msgs[0].sender, "+1234567890");
824 }
825
826 #[test]
827 fn wati_parse_message_text_fallback() {
828 let ch = WatiChannel::new(
829 "test-token".into(),
830 "https://live-mt-server.wati.io".into(),
831 None,
832 "wati_test_alias",
833 Arc::new(|| vec!["*".into()]),
834 );
835 let payload = serde_json::json!({
837 "message": { "text": "Nested text" },
838 "waId": "1234567890",
839 "fromMe": false
840 });
841
842 let msgs = ch.parse_webhook_payload(&payload);
843 assert_eq!(msgs.len(), 1);
844 assert_eq!(msgs[0].content, "Nested text");
845 }
846
847 #[test]
848 fn wati_parse_owner_field_as_from_me() {
849 let ch = WatiChannel::new(
850 "test-token".into(),
851 "https://live-mt-server.wati.io".into(),
852 None,
853 "wati_test_alias",
854 Arc::new(|| vec!["*".into()]),
855 );
856 let payload = serde_json::json!({
858 "text": "Test",
859 "waId": "1234567890",
860 "owner": true
861 });
862
863 let msgs = ch.parse_webhook_payload(&payload);
864 assert!(msgs.is_empty(), "owner=true messages should be skipped");
865 }
866
867 #[test]
868 fn wati_manager_none_when_not_configured() {
869 let ch = WatiChannel::new(
870 "test-token".into(),
871 "https://live-mt-server.wati.io".into(),
872 None,
873 "wati_test_alias",
874 Arc::new(|| vec!["+1234567890".into()]),
875 );
876 assert!(ch.transcription_manager.is_none());
877 }
878
879 #[test]
880 fn wati_manager_some_when_valid_config() {
881 let config = zeroclaw_config::schema::TranscriptionConfig {
882 enabled: true,
883 api_key: Some("test-key".to_string()),
884 api_url: "https://api.groq.com/openai/v1/audio/transcriptions".to_string(),
885 model: "distil-whisper-large-v3-en".to_string(),
886 language: None,
887 initial_prompt: None,
888 max_duration_secs: 120,
889 openai: None,
890 deepgram: None,
891 assemblyai: None,
892 google: None,
893 local_whisper: None,
894 transcribe_non_ptt_audio: false,
895 };
896
897 let ch = WatiChannel::new(
898 "test-token".into(),
899 "https://live-mt-server.wati.io".into(),
900 None,
901 "wati_test_alias",
902 Arc::new(|| vec!["+1234567890".into()]),
903 )
904 .with_transcription(config);
905
906 assert!(ch.transcription_manager.is_some());
907 }
908
909 #[test]
910 fn wati_manager_none_and_warn_on_init_failure() {
911 let config = zeroclaw_config::schema::TranscriptionConfig {
912 enabled: true,
913 api_key: Some(String::new()),
914 api_url: "https://api.groq.com/openai/v1/audio/transcriptions".to_string(),
915 model: "distil-whisper-large-v3-en".to_string(),
916 language: None,
917 initial_prompt: None,
918 max_duration_secs: 120,
919 openai: None,
920 deepgram: None,
921 assemblyai: None,
922 google: None,
923 local_whisper: None,
924 transcribe_non_ptt_audio: false,
925 };
926
927 let ch = WatiChannel::new(
928 "test-token".into(),
929 "https://live-mt-server.wati.io".into(),
930 None,
931 "wati_test_alias",
932 Arc::new(|| vec!["+1234567890".into()]),
933 )
934 .with_transcription(config);
935
936 assert!(ch.transcription_manager.is_none());
937 }
938
939 #[tokio::test]
940 async fn wati_try_transcribe_returns_none_when_manager_none() {
941 let ch = WatiChannel::new(
942 "test-token".into(),
943 "https://live-mt-server.wati.io".into(),
944 None,
945 "wati_test_alias",
946 Arc::new(|| vec!["+1234567890".into()]),
947 );
948 let payload = serde_json::json!({
949 "type": "audio",
950 "mediaUrl": "https://example.com/audio.ogg",
951 "waId": "1234567890"
952 });
953
954 let result = ch.try_transcribe_audio(&payload).await;
955 assert!(result.is_none());
956 }
957
958 #[tokio::test]
959 async fn wati_try_transcribe_returns_none_when_no_media_url() {
960 let config = zeroclaw_config::schema::TranscriptionConfig {
961 enabled: false,
962 api_key: Some("test-key".to_string()),
963 api_url: "https://api.groq.com/openai/v1/audio/transcriptions".to_string(),
964 model: "distil-whisper-large-v3-en".to_string(),
965 language: None,
966 initial_prompt: None,
967 max_duration_secs: 120,
968 openai: None,
969 deepgram: None,
970 assemblyai: None,
971 google: None,
972 local_whisper: None,
973 transcribe_non_ptt_audio: false,
974 };
975
976 let ch = WatiChannel::new(
977 "test-token".into(),
978 "https://live-mt-server.wati.io".into(),
979 None,
980 "wati_test_alias",
981 Arc::new(|| vec!["+1234567890".into()]),
982 )
983 .with_transcription(config);
984
985 let payload = serde_json::json!({
986 "type": "audio",
987 "waId": "1234567890"
988 });
989
990 let result = ch.try_transcribe_audio(&payload).await;
991 assert!(result.is_none());
992 }
993
994 #[test]
995 fn wati_filename_voice_type() {
996 let _ch = WatiChannel::new(
997 "test-token".into(),
998 "https://live-mt-server.wati.io".into(),
999 None,
1000 "wati_test_alias",
1001 Arc::new(|| vec!["+1234567890".into()]),
1002 );
1003 let payload = serde_json::json!({
1004 "type": "voice",
1005 "mediaUrl": "https://example.com/media/123",
1006 "waId": "1234567890"
1007 });
1008
1009 let msg_type = payload
1010 .get("type")
1011 .and_then(|v| v.as_str())
1012 .unwrap_or("audio");
1013 let file_name = match msg_type {
1014 "voice" => "voice.ogg",
1015 _ => "audio.ogg",
1016 };
1017
1018 assert_eq!(file_name, "voice.ogg");
1019 }
1020
1021 #[test]
1022 fn wati_filename_audio_type() {
1023 let _ch = WatiChannel::new(
1024 "test-token".into(),
1025 "https://live-mt-server.wati.io".into(),
1026 None,
1027 "wati_test_alias",
1028 Arc::new(|| vec!["+1234567890".into()]),
1029 );
1030 let payload = serde_json::json!({
1031 "type": "audio",
1032 "mediaUrl": "https://example.com/media/123",
1033 "waId": "1234567890"
1034 });
1035
1036 let msg_type = payload
1037 .get("type")
1038 .and_then(|v| v.as_str())
1039 .unwrap_or("audio");
1040 let file_name = match msg_type {
1041 "voice" => "voice.ogg",
1042 _ => "audio.ogg",
1043 };
1044
1045 assert_eq!(file_name, "audio.ogg");
1046 }
1047
1048 #[test]
1049 fn wati_extract_sender_absent_returns_none() {
1050 let ch = WatiChannel::new(
1051 "test-token".into(),
1052 "https://live-mt-server.wati.io".into(),
1053 None,
1054 "wati_test_alias",
1055 Arc::new(|| vec!["+1234567890".into()]),
1056 );
1057 let payload = serde_json::json!({
1058 "type": "audio"
1059 });
1060
1061 let result = ch.extract_sender(&payload);
1062 assert!(result.is_none());
1063 }
1064
1065 #[test]
1066 fn wati_extract_sender_not_in_allowlist_returns_none() {
1067 let ch = WatiChannel::new(
1068 "test-token".into(),
1069 "https://live-mt-server.wati.io".into(),
1070 None,
1071 "wati_test_alias",
1072 Arc::new(|| vec!["+1234567890".into()]),
1073 );
1074 let payload = serde_json::json!({
1075 "waId": "9999999999"
1076 });
1077
1078 let result = ch.extract_sender(&payload);
1079 assert!(result.is_none());
1080 }
1081
1082 #[test]
1083 fn wati_parse_audio_as_message_uses_transcript_as_content() {
1084 let ch = WatiChannel::new(
1085 "test-token".into(),
1086 "https://live-mt-server.wati.io".into(),
1087 None,
1088 "wati_test_alias",
1089 Arc::new(|| vec!["*".into()]),
1090 );
1091 let payload = serde_json::json!({
1092 "type": "audio",
1093 "waId": "1234567890",
1094 "fromMe": false,
1095 "timestamp": 1_705_320_000_u64
1096 });
1097
1098 let transcript = "This is a test transcript.".to_string();
1099 let msgs = ch.parse_audio_as_message(&payload, transcript.clone());
1100
1101 assert_eq!(msgs.len(), 1);
1102 assert_eq!(msgs[0].content, transcript);
1103 assert_eq!(msgs[0].sender, "+1234567890");
1104 assert_eq!(msgs[0].channel, "wati");
1105 assert_eq!(msgs[0].timestamp, 1_705_320_000);
1106 }
1107
1108 #[tokio::test]
1109 async fn wati_transcribes_audio_via_local_whisper() {
1110 use wiremock::matchers::{method, path};
1111 use wiremock::{Mock, MockServer, ResponseTemplate};
1112
1113 let media_server = MockServer::start().await;
1114 let whisper_server = MockServer::start().await;
1115
1116 let audio_bytes = b"fake-audio-data";
1117 Mock::given(method("GET"))
1118 .and(path("/media/123"))
1119 .respond_with(ResponseTemplate::new(200).set_body_bytes(audio_bytes))
1120 .mount(&media_server)
1121 .await;
1122
1123 let transcript = "Transcribed text from local whisper.";
1124 Mock::given(method("POST"))
1125 .and(path("/v1/transcribe"))
1126 .respond_with(
1127 ResponseTemplate::new(200).set_body_json(serde_json::json!({"text": transcript})),
1128 )
1129 .mount(&whisper_server)
1130 .await;
1131
1132 let config = zeroclaw_config::schema::TranscriptionConfig {
1133 enabled: true,
1134 api_key: None,
1135 api_url: "https://api.groq.com/openai/v1/audio/transcriptions".to_string(),
1136 model: "whisper-1".to_string(),
1137 language: None,
1138 initial_prompt: None,
1139 max_duration_secs: 120,
1140 openai: None,
1141 deepgram: None,
1142 assemblyai: None,
1143 google: None,
1144 local_whisper: Some(zeroclaw_config::schema::LocalWhisperConfig {
1145 url: format!("{}/v1/transcribe", whisper_server.uri()),
1146 bearer_token: Some("test-token".to_string()),
1147 max_audio_bytes: 25 * 1024 * 1024,
1148 timeout_secs: 300,
1149 }),
1150 transcribe_non_ptt_audio: false,
1151 };
1152
1153 let ch = WatiChannel::new(
1154 "test-token".into(),
1155 media_server.uri(),
1156 None,
1157 "wati_test_alias",
1158 Arc::new(|| vec!["+1234567890".into()]),
1159 )
1160 .with_transcription(config);
1161
1162 let payload = serde_json::json!({
1163 "type": "audio",
1164 "mediaUrl": format!("{}/media/123", media_server.uri()),
1165 "waId": "1234567890"
1166 });
1167
1168 let result = ch.try_transcribe_audio(&payload).await;
1169 assert_eq!(result, Some(transcript.to_string()));
1170 }
1171
1172 #[tokio::test]
1173 async fn wati_try_transcribe_returns_none_on_media_download_failure() {
1174 use wiremock::matchers::{method, path};
1175 use wiremock::{Mock, MockServer, ResponseTemplate};
1176
1177 let media_server = MockServer::start().await;
1178
1179 Mock::given(method("GET"))
1180 .and(path("/media/123"))
1181 .respond_with(ResponseTemplate::new(404))
1182 .mount(&media_server)
1183 .await;
1184
1185 let config = zeroclaw_config::schema::TranscriptionConfig {
1186 enabled: true,
1187 api_key: None,
1188 api_url: "https://api.groq.com/openai/v1/audio/transcriptions".to_string(),
1189 model: "whisper-1".to_string(),
1190 language: None,
1191 initial_prompt: None,
1192 max_duration_secs: 120,
1193 openai: None,
1194 deepgram: None,
1195 assemblyai: None,
1196 google: None,
1197 local_whisper: Some(zeroclaw_config::schema::LocalWhisperConfig {
1198 url: "http://localhost:8000/v1/transcribe".to_string(),
1199 bearer_token: Some("test-token".to_string()),
1200 max_audio_bytes: 25 * 1024 * 1024,
1201 timeout_secs: 300,
1202 }),
1203 transcribe_non_ptt_audio: false,
1204 };
1205
1206 let ch = WatiChannel::new(
1207 "test-token".into(),
1208 media_server.uri(),
1209 None,
1210 "wati_test_alias",
1211 Arc::new(|| vec!["+1234567890".into()]),
1212 )
1213 .with_transcription(config);
1214
1215 let payload = serde_json::json!({
1216 "type": "audio",
1217 "mediaUrl": format!("{}/media/123", media_server.uri()),
1218 "waId": "1234567890"
1219 });
1220
1221 let result = ch.try_transcribe_audio(&payload).await;
1222 assert!(result.is_none());
1223 }
1224
1225 #[test]
1226 fn extract_host_uses_url_parser() {
1227 assert_eq!(
1228 WatiChannel::extract_host("https://live-mt-server.wati.io/media/123"),
1229 Some("live-mt-server.wati.io".to_string())
1230 );
1231 assert_eq!(
1234 WatiChannel::extract_host("https://live-mt-server.wati.io@evil.com/media/123"),
1235 Some("evil.com".to_string())
1236 );
1237 }
1238
1239 #[tokio::test]
1240 async fn wati_try_transcribe_blocks_host_mismatch() {
1241 let config = zeroclaw_config::schema::TranscriptionConfig {
1242 enabled: true,
1243 local_whisper: Some(zeroclaw_config::schema::LocalWhisperConfig {
1244 url: "http://localhost:8001/v1/transcribe".into(),
1245 bearer_token: Some("test-token".into()),
1246 max_audio_bytes: 25 * 1024 * 1024,
1247 timeout_secs: 120,
1248 }),
1249 ..Default::default()
1250 };
1251
1252 let ch = WatiChannel::new(
1253 "test-token".into(),
1254 "https://live-mt-server.wati.io".into(),
1255 None,
1256 "wati_test_alias",
1257 Arc::new(|| vec!["+1234567890".into()]),
1258 )
1259 .with_transcription(config);
1260
1261 let payload = serde_json::json!({
1262 "type": "audio",
1263 "mediaUrl": "https://evil.com/media/123",
1264 "waId": "1234567890"
1265 });
1266
1267 let result = ch.try_transcribe_audio(&payload).await;
1268 assert!(result.is_none());
1269 }
1270}