1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use chrono::{DateTime, NaiveDateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::time::Duration;
6use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
7
8const DEFAULT_MAX_RETRIES: u32 = 3;
9const DEFAULT_RETRY_BASE_DELAY_MS: u64 = 500;
10const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 30_000;
11
12pub struct WebhookChannel {
16 alias: String,
17 listen_port: u16,
18 listen_path: String,
19 send_url: Option<String>,
20 send_method: String,
21 auth_header: Option<String>,
22 secret: Option<String>,
23 max_retries: u32,
24 retry_base_delay_ms: u64,
25 retry_max_delay_ms: u64,
26}
27
28#[derive(Debug, Deserialize)]
30struct IncomingWebhook {
31 sender: String,
32 content: String,
33 #[serde(default)]
34 thread_id: Option<String>,
35}
36
37#[derive(Debug, Serialize)]
39struct OutgoingWebhook {
40 content: String,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 thread_id: Option<String>,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 recipient: Option<String>,
45}
46
47impl WebhookChannel {
48 #[allow(clippy::too_many_arguments)]
49 pub fn new(
50 alias: String,
51 listen_port: u16,
52 listen_path: Option<String>,
53 send_url: Option<String>,
54 send_method: Option<String>,
55 auth_header: Option<String>,
56 secret: Option<String>,
57 max_retries: Option<u32>,
58 retry_base_delay_ms: Option<u64>,
59 retry_max_delay_ms: Option<u64>,
60 ) -> Self {
61 let path = listen_path.unwrap_or_else(|| "/webhook".to_string());
62 let listen_path = if path.starts_with('/') {
64 path
65 } else {
66 format!("/{path}")
67 };
68
69 Self {
70 alias,
71 listen_port,
72 listen_path,
73 send_url,
74 send_method: send_method
75 .unwrap_or_else(|| "POST".to_string())
76 .to_uppercase(),
77 auth_header,
78 secret,
79 max_retries: max_retries.unwrap_or(DEFAULT_MAX_RETRIES),
80 retry_base_delay_ms: retry_base_delay_ms
82 .unwrap_or(DEFAULT_RETRY_BASE_DELAY_MS)
83 .max(1),
84 retry_max_delay_ms: retry_max_delay_ms
85 .unwrap_or(DEFAULT_RETRY_MAX_DELAY_MS)
86 .max(1),
87 }
88 }
89
90 fn http_client(&self) -> reqwest::Client {
91 zeroclaw_config::schema::build_runtime_proxy_client("channel.webhook")
92 }
93
94 fn compute_backoff(&self, attempt: u32) -> Duration {
98 let multiplier = 1_u64.checked_shl(attempt).unwrap_or(u64::MAX);
99 let base = self.retry_base_delay_ms.saturating_mul(multiplier);
100 let jittered = apply_jitter(base);
101 let capped = jittered.min(self.retry_max_delay_ms);
102 Duration::from_millis(capped)
103 }
104
105 #[cfg(test)]
107 fn verify_signature(&self, body: &[u8], signature: Option<&str>) -> bool {
108 let Some(ref secret) = self.secret else {
109 return true; };
111
112 let Some(sig) = signature else {
113 return false; };
115
116 use hmac::{Hmac, Mac};
118 use sha2::Sha256;
119
120 type HmacSha256 = Hmac<Sha256>;
121
122 let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
123 return false;
124 };
125 mac.update(body);
126
127 let Ok(expected) = hex::decode(sig.trim_start_matches("sha256=")) else {
129 return false;
130 };
131
132 mac.verify_slice(&expected).is_ok()
133 }
134
135 async fn attempt_send(
136 &self,
137 client: &reqwest::Client,
138 send_url: &str,
139 payload: &OutgoingWebhook,
140 ) -> AttemptOutcome {
141 let mut request = match self.send_method.as_str() {
142 "PUT" => client.put(send_url),
143 _ => client.post(send_url),
144 };
145
146 if let Some(ref auth) = self.auth_header {
147 request = request.header("Authorization", auth);
148 }
149
150 let resp = match request.json(payload).send().await {
151 Ok(r) => r,
152 Err(e) => return AttemptOutcome::Retry(format!("network error: {e}")),
153 };
154
155 let status = resp.status();
156 if status.is_success() {
157 return AttemptOutcome::Success;
158 }
159
160 let code = status.as_u16();
161 let retry_after = resp
162 .headers()
163 .get(reqwest::header::RETRY_AFTER)
164 .and_then(|v| v.to_str().ok())
165 .and_then(parse_retry_after_ms);
166
167 if (code == 429 || code == 503)
173 && let Some(ms) = retry_after
174 {
175 return AttemptOutcome::RetryAfter(Duration::from_millis(ms));
176 }
177
178 let body = resp
179 .text()
180 .await
181 .unwrap_or_else(|e| format!("<failed to read response: {e}>"));
182
183 if code == 429 || (500..600).contains(&code) {
185 return AttemptOutcome::Retry(format!("Webhook send failed ({status}): {body}"));
186 }
187
188 AttemptOutcome::Fatal(anyhow::Error::msg(format!(
190 "Webhook send failed ({status}): {body}"
191 )))
192 }
193}
194
195impl ::zeroclaw_api::attribution::Attributable for WebhookChannel {
196 fn role(&self) -> ::zeroclaw_api::attribution::Role {
197 ::zeroclaw_api::attribution::Role::Channel(
198 ::zeroclaw_api::attribution::ChannelKind::Webhook,
199 )
200 }
201 fn alias(&self) -> &str {
202 &self.alias
203 }
204}
205
206fn apply_jitter(delay_ms: u64) -> u64 {
208 if delay_ms == 0 {
209 return 0;
210 }
211 let jitter_factor = 0.75 + (rand::random::<f64>() * 0.5);
212 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
214 let jittered = ((delay_ms as f64) * jitter_factor) as u64;
215 jittered
216}
217
218fn parse_retry_after_ms(value: &str) -> Option<u64> {
221 parse_retry_after_ms_at(value, Utc::now())
222}
223
224fn parse_retry_after_ms_at(value: &str, now: DateTime<Utc>) -> Option<u64> {
225 let trimmed = value.trim();
226 if trimmed.is_empty() {
227 return None;
228 }
229 if let Ok(seconds) = trimmed.parse::<u64>() {
230 return Some(seconds.saturating_mul(1000));
231 }
232 let whole = trimmed
233 .split_once('.')
234 .map(|(whole, _)| whole)
235 .unwrap_or(trimmed);
236 if let Ok(seconds) = whole.parse::<u64>() {
237 return Some(seconds.saturating_mul(1000));
238 }
239
240 parse_retry_after_http_date(trimmed).map(|date| {
241 let delay_ms = date.signed_duration_since(now).num_milliseconds();
242 if delay_ms <= 0 {
243 0
244 } else {
245 u64::try_from(delay_ms).unwrap_or(u64::MAX)
246 }
247 })
248}
249
250fn parse_retry_after_http_date(value: &str) -> Option<DateTime<Utc>> {
251 if let Ok(date) = NaiveDateTime::parse_from_str(value, "%a, %d %b %Y %H:%M:%S GMT") {
252 return Some(DateTime::from_naive_utc_and_offset(date, Utc));
253 }
254 if let Ok(date) = NaiveDateTime::parse_from_str(value, "%A, %d-%b-%y %H:%M:%S GMT") {
255 return Some(DateTime::from_naive_utc_and_offset(date, Utc));
256 }
257 NaiveDateTime::parse_from_str(value, "%a %b %e %H:%M:%S %Y")
258 .ok()
259 .map(|date| DateTime::from_naive_utc_and_offset(date, Utc))
260}
261
262enum AttemptOutcome {
264 Success,
265 RetryAfter(Duration),
266 Retry(String),
267 Fatal(anyhow::Error),
268}
269
270#[async_trait]
271impl Channel for WebhookChannel {
272 fn name(&self) -> &str {
273 "webhook"
274 }
275
276 async fn send(&self, message: &SendMessage) -> Result<()> {
277 let Some(ref send_url) = self.send_url else {
278 ::zeroclaw_log::record!(
279 DEBUG,
280 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
281 "channel: no send_url configured, skipping outbound message"
282 );
283 return Ok(());
284 };
285
286 let client = self.http_client();
287 let payload = OutgoingWebhook {
288 content: message.content.clone(),
289 thread_id: message.thread_ts.clone(),
290 recipient: if message.recipient.is_empty() {
291 None
292 } else {
293 Some(message.recipient.clone())
294 },
295 };
296
297 let total_attempts = self.max_retries.saturating_add(1);
298
299 for attempt in 0..total_attempts {
300 let outcome = self.attempt_send(&client, send_url, &payload).await;
301
302 match outcome {
303 AttemptOutcome::Success => return Ok(()),
304 AttemptOutcome::Fatal(err) => return Err(err),
305 AttemptOutcome::RetryAfter(delay) => {
306 if attempt + 1 >= total_attempts {
307 bail!(
308 "Webhook send failed after {total_attempts} attempt(s); last error: rate limited / server error with Retry-After"
309 );
310 }
311 let capped = delay.min(Duration::from_millis(self.retry_max_delay_ms));
312 ::zeroclaw_log::record!(
313 WARN,
314 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
315 &format!(
316 "Webhook send: server requested retry after {}ms (attempt {}/{}), waiting...",
317 capped.as_millis(),
318 attempt + 1,
319 total_attempts
320 )
321 );
322 tokio::time::sleep(capped).await;
323 }
324 AttemptOutcome::Retry(err_msg) => {
325 if attempt + 1 >= total_attempts {
326 bail!(
327 "Webhook send failed after {total_attempts} attempt(s); last error: {err_msg}"
328 );
329 }
330 let delay = self.compute_backoff(attempt);
331 ::zeroclaw_log::record!(
332 WARN,
333 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
334 &format!(
335 "Webhook send failed (attempt {}/{}): {}; retrying in {}ms",
336 attempt + 1,
337 total_attempts,
338 err_msg,
339 delay.as_millis()
340 )
341 );
342 tokio::time::sleep(delay).await;
343 }
344 }
345 }
346
347 unreachable!("send loop exits via return or bail on the final attempt")
348 }
349
350 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
351 use axum::{
352 Router,
353 body::Bytes,
354 extract::State,
355 http::{HeaderMap, StatusCode},
356 routing::post,
357 };
358 use portable_atomic::{AtomicU64, Ordering};
359 use std::sync::Arc;
360
361 let counter = Arc::new(AtomicU64::new(0));
362
363 struct WebhookState {
364 tx: tokio::sync::mpsc::Sender<ChannelMessage>,
365 secret: Option<String>,
366 counter: Arc<AtomicU64>,
367 }
368
369 let state = Arc::new(WebhookState {
370 tx: tx.clone(),
371 secret: self.secret.clone(),
372 counter: counter.clone(),
373 });
374
375 let listen_path = self.listen_path.clone();
376
377 async fn handle_webhook(
378 State(state): State<Arc<WebhookState>>,
379 headers: HeaderMap,
380 body: Bytes,
381 ) -> StatusCode {
382 if let Some(ref secret) = state.secret {
384 use hmac::{Hmac, Mac};
385 use sha2::Sha256;
386 type HmacSha256 = Hmac<Sha256>;
387
388 let signature = headers
389 .get("x-webhook-signature")
390 .and_then(|v| v.to_str().ok());
391
392 let valid = if let Some(sig) = signature {
393 if let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) {
394 mac.update(&body);
395 let expected =
396 hex::decode(sig.trim_start_matches("sha256=")).unwrap_or_default();
397 mac.verify_slice(&expected).is_ok()
398 } else {
399 false
400 }
401 } else {
402 false
403 };
404
405 if !valid {
406 ::zeroclaw_log::record!(
407 WARN,
408 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
409 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
410 "invalid signature, rejecting request"
411 );
412 return StatusCode::UNAUTHORIZED;
413 }
414 }
415
416 let payload: IncomingWebhook = match serde_json::from_slice(&body) {
417 Ok(p) => p,
418 Err(e) => {
419 ::zeroclaw_log::record!(
420 WARN,
421 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
422 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
423 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
424 "invalid JSON payload"
425 );
426 return StatusCode::BAD_REQUEST;
427 }
428 };
429
430 if payload.content.is_empty() {
431 return StatusCode::BAD_REQUEST;
432 }
433
434 let seq = state.counter.fetch_add(1, Ordering::Relaxed);
435
436 #[allow(clippy::cast_possible_truncation)]
437 let timestamp = std::time::SystemTime::now()
438 .duration_since(std::time::UNIX_EPOCH)
439 .unwrap_or_default()
440 .as_secs();
441
442 let reply_target = payload
443 .thread_id
444 .clone()
445 .unwrap_or_else(|| payload.sender.clone());
446
447 let msg = ChannelMessage {
448 id: format!("webhook_{seq}"),
449 sender: payload.sender,
450 reply_target,
451 content: payload.content,
452 channel: "webhook".to_string(),
453 channel_alias: None,
454 timestamp,
455 thread_ts: payload.thread_id,
456 interruption_scope_id: None,
457 attachments: vec![],
458 subject: None,
459 };
460
461 if state.tx.send(msg).await.is_err() {
462 return StatusCode::SERVICE_UNAVAILABLE;
463 }
464
465 StatusCode::OK
466 }
467
468 let app = Router::new()
469 .route(&listen_path, post(handle_webhook))
470 .with_state(state);
471
472 let addr = std::net::SocketAddr::from(([0, 0, 0, 0], self.listen_port));
473 ::zeroclaw_log::record!(
474 INFO,
475 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
476 &format!(
477 "Webhook channel listening on http://0.0.0.0:{}{} ...",
478 self.listen_port, self.listen_path
479 )
480 );
481
482 let listener = tokio::net::TcpListener::bind(addr).await?;
483 axum::serve(listener, app).await.map_err(|e| {
484 ::zeroclaw_log::record!(
485 ERROR,
486 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
487 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
488 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
489 "Webhook server error"
490 );
491 anyhow::Error::msg(format!("Webhook server error: {e}"))
492 })?;
493
494 Ok(())
495 }
496
497 async fn health_check(&self) -> bool {
498 true
501 }
502}
503
504#[cfg(test)]
505mod tests {
506 use super::*;
507
508 fn make_channel() -> WebhookChannel {
509 WebhookChannel::new(
510 "test-hook".into(),
511 8080,
512 Some("/webhook".into()),
513 Some("https://example.com/callback".into()),
514 None,
515 None,
516 None,
517 None,
518 None,
519 None,
520 )
521 }
522
523 fn make_channel_with_secret() -> WebhookChannel {
524 WebhookChannel::new(
525 "test-hook".into(),
526 8080,
527 None,
528 Some("https://example.com/callback".into()),
529 None,
530 None,
531 Some("mysecret".into()),
532 None,
533 None,
534 None,
535 )
536 }
537
538 fn make_channel_to(url: &str) -> WebhookChannel {
539 WebhookChannel::new(
541 "test-hook".into(),
542 8080,
543 None,
544 Some(url.into()),
545 None,
546 None,
547 None,
548 Some(2),
549 Some(10),
550 Some(100),
551 )
552 }
553
554 #[test]
555 fn default_path() {
556 let ch = WebhookChannel::new(
557 "test-hook".into(),
558 8080,
559 None,
560 None,
561 None,
562 None,
563 None,
564 None,
565 None,
566 None,
567 );
568 assert_eq!(ch.listen_path, "/webhook");
569 }
570
571 #[test]
572 fn path_normalized() {
573 let ch = WebhookChannel::new(
574 "test-hook".into(),
575 8080,
576 Some("hooks/incoming".into()),
577 None,
578 None,
579 None,
580 None,
581 None,
582 None,
583 None,
584 );
585 assert_eq!(ch.listen_path, "/hooks/incoming");
586 }
587
588 #[test]
589 fn send_method_default() {
590 let ch = make_channel();
591 assert_eq!(ch.send_method, "POST");
592 }
593
594 #[test]
595 fn send_method_put() {
596 let ch = WebhookChannel::new(
597 "test-hook".into(),
598 8080,
599 None,
600 Some("https://example.com".into()),
601 Some("put".into()),
602 None,
603 None,
604 None,
605 None,
606 None,
607 );
608 assert_eq!(ch.send_method, "PUT");
609 }
610
611 #[test]
612 fn retry_defaults_applied() {
613 let ch = make_channel();
614 assert_eq!(ch.max_retries, DEFAULT_MAX_RETRIES);
615 assert_eq!(ch.retry_base_delay_ms, DEFAULT_RETRY_BASE_DELAY_MS);
616 assert_eq!(ch.retry_max_delay_ms, DEFAULT_RETRY_MAX_DELAY_MS);
617 }
618
619 #[test]
620 fn retry_overrides_applied() {
621 let ch = WebhookChannel::new(
622 "test-hook".into(),
623 8080,
624 None,
625 Some("https://example.com".into()),
626 None,
627 None,
628 None,
629 Some(0),
630 Some(50),
631 Some(1_000),
632 );
633 assert_eq!(ch.max_retries, 0);
634 assert_eq!(ch.retry_base_delay_ms, 50);
635 assert_eq!(ch.retry_max_delay_ms, 1_000);
636 }
637
638 #[test]
639 fn backoff_capped_by_max_delay() {
640 let ch = WebhookChannel::new(
641 "test-hook".into(),
642 8080,
643 None,
644 Some("https://example.com".into()),
645 None,
646 None,
647 None,
648 Some(5),
649 Some(1_000),
650 Some(2_000),
651 );
652 let d = ch.compute_backoff(10);
656 assert_eq!(d.as_millis(), 2_000);
657 }
658
659 #[test]
660 fn backoff_never_exceeds_max_delay_near_cap() {
661 let ch = WebhookChannel::new(
665 "test-hook".into(),
666 8080,
667 None,
668 Some("https://example.com".into()),
669 None,
670 None,
671 None,
672 Some(5),
673 Some(1_000),
674 Some(2_000),
675 );
676 for _ in 0..256 {
677 let d = ch.compute_backoff(1); assert!(
679 d.as_millis() <= 2_000,
680 "compute_backoff exceeded retry_max_delay_ms: {}ms",
681 d.as_millis()
682 );
683 }
684 }
685
686 #[test]
687 fn parse_retry_after_integer_seconds() {
688 assert_eq!(parse_retry_after_ms("5"), Some(5_000));
689 }
690
691 #[test]
692 fn parse_retry_after_decimal_seconds() {
693 assert_eq!(parse_retry_after_ms("2.9"), Some(2_000));
694 }
695
696 #[test]
697 fn parse_retry_after_http_date() {
698 let now = DateTime::parse_from_rfc2822("Sun, 06 Nov 1994 08:49:37 GMT")
699 .unwrap()
700 .with_timezone(&Utc);
701 assert_eq!(
702 parse_retry_after_ms_at("Sun, 06 Nov 1994 08:49:39 GMT", now),
703 Some(2_000)
704 );
705 }
706
707 #[test]
708 fn parse_retry_after_obsolete_http_dates() {
709 let now = DateTime::parse_from_rfc2822("Sun, 06 Nov 1994 08:49:37 GMT")
710 .unwrap()
711 .with_timezone(&Utc);
712 assert_eq!(
713 parse_retry_after_ms_at("Sunday, 06-Nov-94 08:49:39 GMT", now),
714 Some(2_000)
715 );
716 assert_eq!(
717 parse_retry_after_ms_at("Sun Nov 6 08:49:39 1994", now),
718 Some(2_000)
719 );
720 }
721
722 #[test]
723 fn parse_retry_after_past_http_date_as_zero() {
724 let now = DateTime::parse_from_rfc2822("Sun, 06 Nov 1994 08:49:39 GMT")
725 .unwrap()
726 .with_timezone(&Utc);
727 assert_eq!(
728 parse_retry_after_ms_at("Sun, 06 Nov 1994 08:49:37 GMT", now),
729 Some(0)
730 );
731 }
732
733 #[test]
734 fn parse_retry_after_rejects_non_numeric() {
735 assert_eq!(parse_retry_after_ms("later"), None);
736 }
737
738 #[test]
739 fn parse_retry_after_empty() {
740 assert_eq!(parse_retry_after_ms(" "), None);
741 }
742
743 #[test]
744 fn parse_retry_after_zero() {
745 assert_eq!(parse_retry_after_ms("0"), Some(0));
746 }
747
748 #[test]
749 fn incoming_payload_deserializes_all_fields() {
750 let json = r#"{"sender": "zeroclaw_user", "content": "hello", "thread_id": "t1"}"#;
751 let payload: IncomingWebhook = serde_json::from_str(json).unwrap();
752 assert_eq!(payload.sender, "zeroclaw_user");
753 assert_eq!(payload.content, "hello");
754 assert_eq!(payload.thread_id.as_deref(), Some("t1"));
755 }
756
757 #[test]
758 fn incoming_payload_without_thread() {
759 let json = r#"{"sender": "zeroclaw_user", "content": "hi"}"#;
760 let payload: IncomingWebhook = serde_json::from_str(json).unwrap();
761 assert_eq!(payload.sender, "zeroclaw_user");
762 assert_eq!(payload.content, "hi");
763 assert!(payload.thread_id.is_none());
764 }
765
766 #[test]
767 fn outgoing_payload_serializes_content() {
768 let payload = OutgoingWebhook {
769 content: "response".into(),
770 thread_id: Some("t1".into()),
771 recipient: Some("zeroclaw_user".into()),
772 };
773 let json = serde_json::to_value(&payload).unwrap();
774 assert_eq!(json["content"], "response");
775 assert_eq!(json["thread_id"], "t1");
776 assert_eq!(json["recipient"], "zeroclaw_user");
777 }
778
779 #[test]
780 fn outgoing_payload_omits_none_fields() {
781 let payload = OutgoingWebhook {
782 content: "response".into(),
783 thread_id: None,
784 recipient: None,
785 };
786 let json = serde_json::to_value(&payload).unwrap();
787 assert_eq!(json["content"], "response");
788 assert!(json.get("thread_id").is_none());
789 assert!(json.get("recipient").is_none());
790 }
791
792 #[test]
793 fn verify_signature_no_secret() {
794 let ch = make_channel();
795 assert!(ch.verify_signature(b"body", None));
796 }
797
798 #[test]
799 fn verify_signature_missing_header() {
800 let ch = make_channel_with_secret();
801 assert!(!ch.verify_signature(b"body", None));
802 }
803
804 #[test]
805 fn verify_signature_valid() {
806 use hmac::{Hmac, Mac};
807 use sha2::Sha256;
808 type HmacSha256 = Hmac<Sha256>;
809
810 let ch = make_channel_with_secret();
811 let body = b"test body";
812
813 let mut mac = HmacSha256::new_from_slice(b"mysecret").unwrap();
814 mac.update(body);
815 let sig = hex::encode(mac.finalize().into_bytes());
816
817 assert!(ch.verify_signature(body, Some(&sig)));
818 }
819
820 #[test]
821 fn verify_signature_invalid() {
822 let ch = make_channel_with_secret();
823 assert!(!ch.verify_signature(b"body", Some("badhex")));
824 }
825
826 fn test_message() -> SendMessage {
827 SendMessage::new("hello", "zeroclaw_user")
828 }
829
830 #[tokio::test]
831 async fn send_happy_path_returns_ok() {
832 use wiremock::matchers::{method, path};
833 use wiremock::{Mock, MockServer, ResponseTemplate};
834
835 let mock = MockServer::start().await;
836 Mock::given(method("POST"))
837 .and(path("/cb"))
838 .respond_with(ResponseTemplate::new(200))
839 .expect(1)
840 .mount(&mock)
841 .await;
842
843 let ch = make_channel_to(&format!("{}/cb", mock.uri()));
844 ch.send(&test_message()).await.unwrap();
845 }
846
847 #[tokio::test]
848 async fn send_retries_on_5xx_then_succeeds() {
849 use wiremock::matchers::{method, path};
850 use wiremock::{Mock, MockServer, ResponseTemplate};
851
852 let mock = MockServer::start().await;
853 Mock::given(method("POST"))
854 .and(path("/cb"))
855 .respond_with(ResponseTemplate::new(503))
856 .up_to_n_times(1)
857 .expect(1)
858 .mount(&mock)
859 .await;
860 Mock::given(method("POST"))
861 .and(path("/cb"))
862 .respond_with(ResponseTemplate::new(200))
863 .expect(1)
864 .mount(&mock)
865 .await;
866
867 let ch = make_channel_to(&format!("{}/cb", mock.uri()));
868 ch.send(&test_message()).await.unwrap();
869 }
870
871 #[tokio::test]
872 async fn send_does_not_retry_on_4xx() {
873 use wiremock::matchers::{method, path};
874 use wiremock::{Mock, MockServer, ResponseTemplate};
875
876 let mock = MockServer::start().await;
877 Mock::given(method("POST"))
878 .and(path("/cb"))
879 .respond_with(ResponseTemplate::new(400))
880 .expect(1) .mount(&mock)
882 .await;
883
884 let ch = make_channel_to(&format!("{}/cb", mock.uri()));
885 let err = ch.send(&test_message()).await.unwrap_err();
886 assert!(err.to_string().contains("400"));
887 }
888
889 #[tokio::test]
890 async fn send_retries_on_429_then_exhausts() {
891 use wiremock::matchers::{method, path};
892 use wiremock::{Mock, MockServer, ResponseTemplate};
893
894 let mock = MockServer::start().await;
895 Mock::given(method("POST"))
896 .and(path("/cb"))
897 .respond_with(ResponseTemplate::new(429))
898 .expect(3) .mount(&mock)
900 .await;
901
902 let ch = make_channel_to(&format!("{}/cb", mock.uri()));
903 let err = ch.send(&test_message()).await.unwrap_err();
904 let msg = err.to_string();
905 assert!(
906 msg.contains("3 attempt"),
907 "expected attempt count in error: {msg}"
908 );
909 }
910
911 #[tokio::test]
912 async fn send_honors_retry_after_header() {
913 use std::time::Instant;
914 use wiremock::matchers::{method, path};
915 use wiremock::{Mock, MockServer, ResponseTemplate};
916
917 let mock = MockServer::start().await;
918 Mock::given(method("POST"))
919 .and(path("/cb"))
920 .respond_with(ResponseTemplate::new(429).insert_header("Retry-After", "1"))
921 .up_to_n_times(1)
922 .expect(1)
923 .mount(&mock)
924 .await;
925 Mock::given(method("POST"))
926 .and(path("/cb"))
927 .respond_with(ResponseTemplate::new(200))
928 .expect(1)
929 .mount(&mock)
930 .await;
931
932 let ch = WebhookChannel::new(
935 "test-hook".into(),
936 8080,
937 None,
938 Some(format!("{}/cb", mock.uri())),
939 None,
940 None,
941 None,
942 Some(2),
943 Some(10),
944 Some(2_000),
945 );
946
947 let start = Instant::now();
948 ch.send(&test_message()).await.unwrap();
949 let elapsed = start.elapsed();
950 assert!(
951 elapsed >= Duration::from_millis(900),
952 "expected to wait ~1s for Retry-After, elapsed = {:?}",
953 elapsed
954 );
955 }
956
957 #[tokio::test]
958 async fn send_honors_retry_after_http_date_header() {
959 use std::time::Instant;
960 use wiremock::matchers::{method, path};
961 use wiremock::{Mock, MockServer, ResponseTemplate};
962
963 let mock = MockServer::start().await;
964 let retry_at = (Utc::now() + chrono::Duration::seconds(60))
965 .format("%a, %d %b %Y %H:%M:%S GMT")
966 .to_string();
967 Mock::given(method("POST"))
968 .and(path("/cb"))
969 .respond_with(ResponseTemplate::new(429).insert_header("Retry-After", retry_at))
970 .up_to_n_times(1)
971 .expect(1)
972 .mount(&mock)
973 .await;
974 Mock::given(method("POST"))
975 .and(path("/cb"))
976 .respond_with(ResponseTemplate::new(200))
977 .expect(1)
978 .mount(&mock)
979 .await;
980
981 let ch = WebhookChannel::new(
982 "test-hook".into(),
983 8080,
984 None,
985 Some(format!("{}/cb", mock.uri())),
986 None,
987 None,
988 None,
989 Some(2),
990 Some(10),
991 Some(150),
992 );
993
994 let start = Instant::now();
995 ch.send(&test_message()).await.unwrap();
996 let elapsed = start.elapsed();
997 assert!(
998 elapsed >= Duration::from_millis(100),
999 "expected to wait for date-form Retry-After, elapsed = {:?}",
1000 elapsed
1001 );
1002 }
1003
1004 #[tokio::test]
1005 async fn send_honors_retry_after_on_503() {
1006 use std::time::Instant;
1007 use wiremock::matchers::{method, path};
1008 use wiremock::{Mock, MockServer, ResponseTemplate};
1009
1010 let mock = MockServer::start().await;
1011 Mock::given(method("POST"))
1012 .and(path("/cb"))
1013 .respond_with(ResponseTemplate::new(503).insert_header("Retry-After", "1"))
1014 .up_to_n_times(1)
1015 .expect(1)
1016 .mount(&mock)
1017 .await;
1018 Mock::given(method("POST"))
1019 .and(path("/cb"))
1020 .respond_with(ResponseTemplate::new(200))
1021 .expect(1)
1022 .mount(&mock)
1023 .await;
1024
1025 let ch = WebhookChannel::new(
1026 "test-hook".into(),
1027 8080,
1028 None,
1029 Some(format!("{}/cb", mock.uri())),
1030 None,
1031 None,
1032 None,
1033 Some(2),
1034 Some(10),
1035 Some(2_000),
1036 );
1037
1038 let start = Instant::now();
1039 ch.send(&test_message()).await.unwrap();
1040 let elapsed = start.elapsed();
1041 assert!(
1042 elapsed >= Duration::from_millis(900),
1043 "expected to wait ~1s for Retry-After on 503, elapsed = {:?}",
1044 elapsed
1045 );
1046 }
1047
1048 #[tokio::test]
1049 async fn send_max_retries_zero_disables_retry() {
1050 use wiremock::matchers::{method, path};
1051 use wiremock::{Mock, MockServer, ResponseTemplate};
1052
1053 let mock = MockServer::start().await;
1054 Mock::given(method("POST"))
1055 .and(path("/cb"))
1056 .respond_with(ResponseTemplate::new(503))
1057 .expect(1) .mount(&mock)
1059 .await;
1060
1061 let ch = WebhookChannel::new(
1062 "test-hook".into(),
1063 8080,
1064 None,
1065 Some(format!("{}/cb", mock.uri())),
1066 None,
1067 None,
1068 None,
1069 Some(0),
1070 Some(10),
1071 Some(100),
1072 );
1073 assert!(ch.send(&test_message()).await.is_err());
1074 }
1075}