1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::time::Duration;
5use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
6
7const DEFAULT_MAX_RETRIES: u32 = 3;
8const DEFAULT_RETRY_BASE_DELAY_MS: u64 = 500;
9const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 30_000;
10
11pub struct WebhookChannel {
15 alias: String,
16 listen_port: u16,
17 listen_path: String,
18 send_url: Option<String>,
19 send_method: String,
20 auth_header: Option<String>,
21 secret: Option<String>,
22 max_retries: u32,
23 retry_base_delay_ms: u64,
24 retry_max_delay_ms: u64,
25}
26
27#[derive(Debug, Deserialize)]
29struct IncomingWebhook {
30 sender: String,
31 content: String,
32 #[serde(default)]
33 thread_id: Option<String>,
34}
35
36#[derive(Debug, Serialize)]
38struct OutgoingWebhook {
39 content: String,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 thread_id: Option<String>,
42 #[serde(skip_serializing_if = "Option::is_none")]
43 recipient: Option<String>,
44}
45
46impl WebhookChannel {
47 #[allow(clippy::too_many_arguments)]
48 pub fn new(
49 alias: String,
50 listen_port: u16,
51 listen_path: Option<String>,
52 send_url: Option<String>,
53 send_method: Option<String>,
54 auth_header: Option<String>,
55 secret: Option<String>,
56 max_retries: Option<u32>,
57 retry_base_delay_ms: Option<u64>,
58 retry_max_delay_ms: Option<u64>,
59 ) -> Self {
60 let path = listen_path.unwrap_or_else(|| "/webhook".to_string());
61 let listen_path = if path.starts_with('/') {
63 path
64 } else {
65 format!("/{path}")
66 };
67
68 Self {
69 alias,
70 listen_port,
71 listen_path,
72 send_url,
73 send_method: send_method
74 .unwrap_or_else(|| "POST".to_string())
75 .to_uppercase(),
76 auth_header,
77 secret,
78 max_retries: max_retries.unwrap_or(DEFAULT_MAX_RETRIES),
79 retry_base_delay_ms: retry_base_delay_ms
81 .unwrap_or(DEFAULT_RETRY_BASE_DELAY_MS)
82 .max(1),
83 retry_max_delay_ms: retry_max_delay_ms
84 .unwrap_or(DEFAULT_RETRY_MAX_DELAY_MS)
85 .max(1),
86 }
87 }
88
89 fn http_client(&self) -> reqwest::Client {
90 zeroclaw_config::schema::build_runtime_proxy_client("channel.webhook")
91 }
92
93 fn compute_backoff(&self, attempt: u32) -> Duration {
97 let multiplier = 1_u64.checked_shl(attempt).unwrap_or(u64::MAX);
98 let base = self.retry_base_delay_ms.saturating_mul(multiplier);
99 let jittered = apply_jitter(base);
100 let capped = jittered.min(self.retry_max_delay_ms);
101 Duration::from_millis(capped)
102 }
103
104 #[cfg(test)]
106 fn verify_signature(&self, body: &[u8], signature: Option<&str>) -> bool {
107 let Some(ref secret) = self.secret else {
108 return true; };
110
111 let Some(sig) = signature else {
112 return false; };
114
115 use hmac::{Hmac, Mac};
117 use sha2::Sha256;
118
119 type HmacSha256 = Hmac<Sha256>;
120
121 let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
122 return false;
123 };
124 mac.update(body);
125
126 let Ok(expected) = hex::decode(sig.trim_start_matches("sha256=")) else {
128 return false;
129 };
130
131 mac.verify_slice(&expected).is_ok()
132 }
133
134 async fn attempt_send(
135 &self,
136 client: &reqwest::Client,
137 send_url: &str,
138 payload: &OutgoingWebhook,
139 ) -> AttemptOutcome {
140 let mut request = match self.send_method.as_str() {
141 "PUT" => client.put(send_url),
142 _ => client.post(send_url),
143 };
144
145 if let Some(ref auth) = self.auth_header {
146 request = request.header("Authorization", auth);
147 }
148
149 let resp = match request.json(payload).send().await {
150 Ok(r) => r,
151 Err(e) => return AttemptOutcome::Retry(format!("network error: {e}")),
152 };
153
154 let status = resp.status();
155 if status.is_success() {
156 return AttemptOutcome::Success;
157 }
158
159 let code = status.as_u16();
160 let retry_after = resp
161 .headers()
162 .get(reqwest::header::RETRY_AFTER)
163 .and_then(|v| v.to_str().ok())
164 .and_then(parse_retry_after_ms);
165
166 if (code == 429 || code == 503)
172 && let Some(ms) = retry_after
173 {
174 return AttemptOutcome::RetryAfter(Duration::from_millis(ms));
175 }
176
177 let body = resp
178 .text()
179 .await
180 .unwrap_or_else(|e| format!("<failed to read response: {e}>"));
181
182 if code == 429 || (500..600).contains(&code) {
184 return AttemptOutcome::Retry(format!("Webhook send failed ({status}): {body}"));
185 }
186
187 AttemptOutcome::Fatal(anyhow::Error::msg(format!(
189 "Webhook send failed ({status}): {body}"
190 )))
191 }
192}
193
194impl ::zeroclaw_api::attribution::Attributable for WebhookChannel {
195 fn role(&self) -> ::zeroclaw_api::attribution::Role {
196 ::zeroclaw_api::attribution::Role::Channel(
197 ::zeroclaw_api::attribution::ChannelKind::Webhook,
198 )
199 }
200 fn alias(&self) -> &str {
201 &self.alias
202 }
203}
204
205fn apply_jitter(delay_ms: u64) -> u64 {
207 if delay_ms == 0 {
208 return 0;
209 }
210 let jitter_factor = 0.75 + (rand::random::<f64>() * 0.5);
211 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
213 let jittered = ((delay_ms as f64) * jitter_factor) as u64;
214 jittered
215}
216
217fn parse_retry_after_ms(value: &str) -> Option<u64> {
220 let trimmed = value.trim();
221 if trimmed.is_empty() {
222 return None;
223 }
224 if let Ok(seconds) = trimmed.parse::<u64>() {
225 return Some(seconds.saturating_mul(1000));
226 }
227 let whole = trimmed
228 .split_once('.')
229 .map(|(whole, _)| whole)
230 .unwrap_or(trimmed);
231 whole.parse::<u64>().ok().map(|s| s.saturating_mul(1000))
232}
233
234enum AttemptOutcome {
236 Success,
237 RetryAfter(Duration),
238 Retry(String),
239 Fatal(anyhow::Error),
240}
241
242#[async_trait]
243impl Channel for WebhookChannel {
244 fn name(&self) -> &str {
245 "webhook"
246 }
247
248 async fn send(&self, message: &SendMessage) -> Result<()> {
249 let Some(ref send_url) = self.send_url else {
250 ::zeroclaw_log::record!(
251 DEBUG,
252 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
253 "channel: no send_url configured, skipping outbound message"
254 );
255 return Ok(());
256 };
257
258 let client = self.http_client();
259 let payload = OutgoingWebhook {
260 content: message.content.clone(),
261 thread_id: message.thread_ts.clone(),
262 recipient: if message.recipient.is_empty() {
263 None
264 } else {
265 Some(message.recipient.clone())
266 },
267 };
268
269 let total_attempts = self.max_retries.saturating_add(1);
270
271 for attempt in 0..total_attempts {
272 let outcome = self.attempt_send(&client, send_url, &payload).await;
273
274 match outcome {
275 AttemptOutcome::Success => return Ok(()),
276 AttemptOutcome::Fatal(err) => return Err(err),
277 AttemptOutcome::RetryAfter(delay) => {
278 if attempt + 1 >= total_attempts {
279 bail!(
280 "Webhook send failed after {total_attempts} attempt(s); last error: rate limited / server error with Retry-After"
281 );
282 }
283 let capped = delay.min(Duration::from_millis(self.retry_max_delay_ms));
284 ::zeroclaw_log::record!(
285 WARN,
286 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
287 &format!(
288 "Webhook send: server requested retry after {}ms (attempt {}/{}), waiting...",
289 capped.as_millis(),
290 attempt + 1,
291 total_attempts
292 )
293 );
294 tokio::time::sleep(capped).await;
295 }
296 AttemptOutcome::Retry(err_msg) => {
297 if attempt + 1 >= total_attempts {
298 bail!(
299 "Webhook send failed after {total_attempts} attempt(s); last error: {err_msg}"
300 );
301 }
302 let delay = self.compute_backoff(attempt);
303 ::zeroclaw_log::record!(
304 WARN,
305 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
306 &format!(
307 "Webhook send failed (attempt {}/{}): {}; retrying in {}ms",
308 attempt + 1,
309 total_attempts,
310 err_msg,
311 delay.as_millis()
312 )
313 );
314 tokio::time::sleep(delay).await;
315 }
316 }
317 }
318
319 unreachable!("send loop exits via return or bail on the final attempt")
320 }
321
322 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
323 use axum::{
324 Router,
325 body::Bytes,
326 extract::State,
327 http::{HeaderMap, StatusCode},
328 routing::post,
329 };
330 use portable_atomic::{AtomicU64, Ordering};
331 use std::sync::Arc;
332
333 let counter = Arc::new(AtomicU64::new(0));
334
335 struct WebhookState {
336 tx: tokio::sync::mpsc::Sender<ChannelMessage>,
337 secret: Option<String>,
338 counter: Arc<AtomicU64>,
339 }
340
341 let state = Arc::new(WebhookState {
342 tx: tx.clone(),
343 secret: self.secret.clone(),
344 counter: counter.clone(),
345 });
346
347 let listen_path = self.listen_path.clone();
348
349 async fn handle_webhook(
350 State(state): State<Arc<WebhookState>>,
351 headers: HeaderMap,
352 body: Bytes,
353 ) -> StatusCode {
354 if let Some(ref secret) = state.secret {
356 use hmac::{Hmac, Mac};
357 use sha2::Sha256;
358 type HmacSha256 = Hmac<Sha256>;
359
360 let signature = headers
361 .get("x-webhook-signature")
362 .and_then(|v| v.to_str().ok());
363
364 let valid = if let Some(sig) = signature {
365 if let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) {
366 mac.update(&body);
367 let expected =
368 hex::decode(sig.trim_start_matches("sha256=")).unwrap_or_default();
369 mac.verify_slice(&expected).is_ok()
370 } else {
371 false
372 }
373 } else {
374 false
375 };
376
377 if !valid {
378 ::zeroclaw_log::record!(
379 WARN,
380 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
381 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
382 "invalid signature, rejecting request"
383 );
384 return StatusCode::UNAUTHORIZED;
385 }
386 }
387
388 let payload: IncomingWebhook = match serde_json::from_slice(&body) {
389 Ok(p) => p,
390 Err(e) => {
391 ::zeroclaw_log::record!(
392 WARN,
393 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
394 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
395 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
396 "invalid JSON payload"
397 );
398 return StatusCode::BAD_REQUEST;
399 }
400 };
401
402 if payload.content.is_empty() {
403 return StatusCode::BAD_REQUEST;
404 }
405
406 let seq = state.counter.fetch_add(1, Ordering::Relaxed);
407
408 #[allow(clippy::cast_possible_truncation)]
409 let timestamp = std::time::SystemTime::now()
410 .duration_since(std::time::UNIX_EPOCH)
411 .unwrap_or_default()
412 .as_secs();
413
414 let reply_target = payload
415 .thread_id
416 .clone()
417 .unwrap_or_else(|| payload.sender.clone());
418
419 let msg = ChannelMessage {
420 id: format!("webhook_{seq}"),
421 sender: payload.sender,
422 reply_target,
423 content: payload.content,
424 channel: "webhook".to_string(),
425 channel_alias: None,
426 timestamp,
427 thread_ts: payload.thread_id,
428 interruption_scope_id: None,
429 attachments: vec![],
430 subject: None,
431 };
432
433 if state.tx.send(msg).await.is_err() {
434 return StatusCode::SERVICE_UNAVAILABLE;
435 }
436
437 StatusCode::OK
438 }
439
440 let app = Router::new()
441 .route(&listen_path, post(handle_webhook))
442 .with_state(state);
443
444 let addr = std::net::SocketAddr::from(([0, 0, 0, 0], self.listen_port));
445 ::zeroclaw_log::record!(
446 INFO,
447 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
448 &format!(
449 "Webhook channel listening on http://0.0.0.0:{}{} ...",
450 self.listen_port, self.listen_path
451 )
452 );
453
454 let listener = tokio::net::TcpListener::bind(addr).await?;
455 axum::serve(listener, app).await.map_err(|e| {
456 ::zeroclaw_log::record!(
457 ERROR,
458 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
459 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
460 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
461 "Webhook server error"
462 );
463 anyhow::Error::msg(format!("Webhook server error: {e}"))
464 })?;
465
466 Ok(())
467 }
468
469 async fn health_check(&self) -> bool {
470 true
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479
480 fn make_channel() -> WebhookChannel {
481 WebhookChannel::new(
482 "test-hook".into(),
483 8080,
484 Some("/webhook".into()),
485 Some("https://example.com/callback".into()),
486 None,
487 None,
488 None,
489 None,
490 None,
491 None,
492 )
493 }
494
495 fn make_channel_with_secret() -> WebhookChannel {
496 WebhookChannel::new(
497 "test-hook".into(),
498 8080,
499 None,
500 Some("https://example.com/callback".into()),
501 None,
502 None,
503 Some("mysecret".into()),
504 None,
505 None,
506 None,
507 )
508 }
509
510 fn make_channel_to(url: &str) -> WebhookChannel {
511 WebhookChannel::new(
513 "test-hook".into(),
514 8080,
515 None,
516 Some(url.into()),
517 None,
518 None,
519 None,
520 Some(2),
521 Some(10),
522 Some(100),
523 )
524 }
525
526 #[test]
527 fn default_path() {
528 let ch = WebhookChannel::new(
529 "test-hook".into(),
530 8080,
531 None,
532 None,
533 None,
534 None,
535 None,
536 None,
537 None,
538 None,
539 );
540 assert_eq!(ch.listen_path, "/webhook");
541 }
542
543 #[test]
544 fn path_normalized() {
545 let ch = WebhookChannel::new(
546 "test-hook".into(),
547 8080,
548 Some("hooks/incoming".into()),
549 None,
550 None,
551 None,
552 None,
553 None,
554 None,
555 None,
556 );
557 assert_eq!(ch.listen_path, "/hooks/incoming");
558 }
559
560 #[test]
561 fn send_method_default() {
562 let ch = make_channel();
563 assert_eq!(ch.send_method, "POST");
564 }
565
566 #[test]
567 fn send_method_put() {
568 let ch = WebhookChannel::new(
569 "test-hook".into(),
570 8080,
571 None,
572 Some("https://example.com".into()),
573 Some("put".into()),
574 None,
575 None,
576 None,
577 None,
578 None,
579 );
580 assert_eq!(ch.send_method, "PUT");
581 }
582
583 #[test]
584 fn retry_defaults_applied() {
585 let ch = make_channel();
586 assert_eq!(ch.max_retries, DEFAULT_MAX_RETRIES);
587 assert_eq!(ch.retry_base_delay_ms, DEFAULT_RETRY_BASE_DELAY_MS);
588 assert_eq!(ch.retry_max_delay_ms, DEFAULT_RETRY_MAX_DELAY_MS);
589 }
590
591 #[test]
592 fn retry_overrides_applied() {
593 let ch = WebhookChannel::new(
594 "test-hook".into(),
595 8080,
596 None,
597 Some("https://example.com".into()),
598 None,
599 None,
600 None,
601 Some(0),
602 Some(50),
603 Some(1_000),
604 );
605 assert_eq!(ch.max_retries, 0);
606 assert_eq!(ch.retry_base_delay_ms, 50);
607 assert_eq!(ch.retry_max_delay_ms, 1_000);
608 }
609
610 #[test]
611 fn backoff_capped_by_max_delay() {
612 let ch = WebhookChannel::new(
613 "test-hook".into(),
614 8080,
615 None,
616 Some("https://example.com".into()),
617 None,
618 None,
619 None,
620 Some(5),
621 Some(1_000),
622 Some(2_000),
623 );
624 let d = ch.compute_backoff(10);
628 assert_eq!(d.as_millis(), 2_000);
629 }
630
631 #[test]
632 fn backoff_never_exceeds_max_delay_near_cap() {
633 let ch = WebhookChannel::new(
637 "test-hook".into(),
638 8080,
639 None,
640 Some("https://example.com".into()),
641 None,
642 None,
643 None,
644 Some(5),
645 Some(1_000),
646 Some(2_000),
647 );
648 for _ in 0..256 {
649 let d = ch.compute_backoff(1); assert!(
651 d.as_millis() <= 2_000,
652 "compute_backoff exceeded retry_max_delay_ms: {}ms",
653 d.as_millis()
654 );
655 }
656 }
657
658 #[test]
659 fn parse_retry_after_integer_seconds() {
660 assert_eq!(parse_retry_after_ms("5"), Some(5_000));
661 }
662
663 #[test]
664 fn parse_retry_after_decimal_seconds() {
665 assert_eq!(parse_retry_after_ms("2.9"), Some(2_000));
666 }
667
668 #[test]
669 fn parse_retry_after_rejects_non_numeric() {
670 assert_eq!(parse_retry_after_ms("later"), None);
671 }
672
673 #[test]
674 fn parse_retry_after_empty() {
675 assert_eq!(parse_retry_after_ms(" "), None);
676 }
677
678 #[test]
679 fn parse_retry_after_zero() {
680 assert_eq!(parse_retry_after_ms("0"), Some(0));
681 }
682
683 #[test]
684 fn incoming_payload_deserializes_all_fields() {
685 let json = r#"{"sender": "zeroclaw_user", "content": "hello", "thread_id": "t1"}"#;
686 let payload: IncomingWebhook = serde_json::from_str(json).unwrap();
687 assert_eq!(payload.sender, "zeroclaw_user");
688 assert_eq!(payload.content, "hello");
689 assert_eq!(payload.thread_id.as_deref(), Some("t1"));
690 }
691
692 #[test]
693 fn incoming_payload_without_thread() {
694 let json = r#"{"sender": "zeroclaw_user", "content": "hi"}"#;
695 let payload: IncomingWebhook = serde_json::from_str(json).unwrap();
696 assert_eq!(payload.sender, "zeroclaw_user");
697 assert_eq!(payload.content, "hi");
698 assert!(payload.thread_id.is_none());
699 }
700
701 #[test]
702 fn outgoing_payload_serializes_content() {
703 let payload = OutgoingWebhook {
704 content: "response".into(),
705 thread_id: Some("t1".into()),
706 recipient: Some("zeroclaw_user".into()),
707 };
708 let json = serde_json::to_value(&payload).unwrap();
709 assert_eq!(json["content"], "response");
710 assert_eq!(json["thread_id"], "t1");
711 assert_eq!(json["recipient"], "zeroclaw_user");
712 }
713
714 #[test]
715 fn outgoing_payload_omits_none_fields() {
716 let payload = OutgoingWebhook {
717 content: "response".into(),
718 thread_id: None,
719 recipient: None,
720 };
721 let json = serde_json::to_value(&payload).unwrap();
722 assert_eq!(json["content"], "response");
723 assert!(json.get("thread_id").is_none());
724 assert!(json.get("recipient").is_none());
725 }
726
727 #[test]
728 fn verify_signature_no_secret() {
729 let ch = make_channel();
730 assert!(ch.verify_signature(b"body", None));
731 }
732
733 #[test]
734 fn verify_signature_missing_header() {
735 let ch = make_channel_with_secret();
736 assert!(!ch.verify_signature(b"body", None));
737 }
738
739 #[test]
740 fn verify_signature_valid() {
741 use hmac::{Hmac, Mac};
742 use sha2::Sha256;
743 type HmacSha256 = Hmac<Sha256>;
744
745 let ch = make_channel_with_secret();
746 let body = b"test body";
747
748 let mut mac = HmacSha256::new_from_slice(b"mysecret").unwrap();
749 mac.update(body);
750 let sig = hex::encode(mac.finalize().into_bytes());
751
752 assert!(ch.verify_signature(body, Some(&sig)));
753 }
754
755 #[test]
756 fn verify_signature_invalid() {
757 let ch = make_channel_with_secret();
758 assert!(!ch.verify_signature(b"body", Some("badhex")));
759 }
760
761 fn test_message() -> SendMessage {
762 SendMessage::new("hello", "zeroclaw_user")
763 }
764
765 #[tokio::test]
766 async fn send_happy_path_returns_ok() {
767 use wiremock::matchers::{method, path};
768 use wiremock::{Mock, MockServer, ResponseTemplate};
769
770 let mock = MockServer::start().await;
771 Mock::given(method("POST"))
772 .and(path("/cb"))
773 .respond_with(ResponseTemplate::new(200))
774 .expect(1)
775 .mount(&mock)
776 .await;
777
778 let ch = make_channel_to(&format!("{}/cb", mock.uri()));
779 ch.send(&test_message()).await.unwrap();
780 }
781
782 #[tokio::test]
783 async fn send_retries_on_5xx_then_succeeds() {
784 use wiremock::matchers::{method, path};
785 use wiremock::{Mock, MockServer, ResponseTemplate};
786
787 let mock = MockServer::start().await;
788 Mock::given(method("POST"))
789 .and(path("/cb"))
790 .respond_with(ResponseTemplate::new(503))
791 .up_to_n_times(1)
792 .expect(1)
793 .mount(&mock)
794 .await;
795 Mock::given(method("POST"))
796 .and(path("/cb"))
797 .respond_with(ResponseTemplate::new(200))
798 .expect(1)
799 .mount(&mock)
800 .await;
801
802 let ch = make_channel_to(&format!("{}/cb", mock.uri()));
803 ch.send(&test_message()).await.unwrap();
804 }
805
806 #[tokio::test]
807 async fn send_does_not_retry_on_4xx() {
808 use wiremock::matchers::{method, path};
809 use wiremock::{Mock, MockServer, ResponseTemplate};
810
811 let mock = MockServer::start().await;
812 Mock::given(method("POST"))
813 .and(path("/cb"))
814 .respond_with(ResponseTemplate::new(400))
815 .expect(1) .mount(&mock)
817 .await;
818
819 let ch = make_channel_to(&format!("{}/cb", mock.uri()));
820 let err = ch.send(&test_message()).await.unwrap_err();
821 assert!(err.to_string().contains("400"));
822 }
823
824 #[tokio::test]
825 async fn send_retries_on_429_then_exhausts() {
826 use wiremock::matchers::{method, path};
827 use wiremock::{Mock, MockServer, ResponseTemplate};
828
829 let mock = MockServer::start().await;
830 Mock::given(method("POST"))
831 .and(path("/cb"))
832 .respond_with(ResponseTemplate::new(429))
833 .expect(3) .mount(&mock)
835 .await;
836
837 let ch = make_channel_to(&format!("{}/cb", mock.uri()));
838 let err = ch.send(&test_message()).await.unwrap_err();
839 let msg = err.to_string();
840 assert!(
841 msg.contains("3 attempt"),
842 "expected attempt count in error: {msg}"
843 );
844 }
845
846 #[tokio::test]
847 async fn send_honors_retry_after_header() {
848 use std::time::Instant;
849 use wiremock::matchers::{method, path};
850 use wiremock::{Mock, MockServer, ResponseTemplate};
851
852 let mock = MockServer::start().await;
853 Mock::given(method("POST"))
854 .and(path("/cb"))
855 .respond_with(ResponseTemplate::new(429).insert_header("Retry-After", "1"))
856 .up_to_n_times(1)
857 .expect(1)
858 .mount(&mock)
859 .await;
860 Mock::given(method("POST"))
861 .and(path("/cb"))
862 .respond_with(ResponseTemplate::new(200))
863 .expect(1)
864 .mount(&mock)
865 .await;
866
867 let ch = WebhookChannel::new(
870 "test-hook".into(),
871 8080,
872 None,
873 Some(format!("{}/cb", mock.uri())),
874 None,
875 None,
876 None,
877 Some(2),
878 Some(10),
879 Some(2_000),
880 );
881
882 let start = Instant::now();
883 ch.send(&test_message()).await.unwrap();
884 let elapsed = start.elapsed();
885 assert!(
886 elapsed >= Duration::from_millis(900),
887 "expected to wait ~1s for Retry-After, elapsed = {:?}",
888 elapsed
889 );
890 }
891
892 #[tokio::test]
893 async fn send_honors_retry_after_on_503() {
894 use std::time::Instant;
895 use wiremock::matchers::{method, path};
896 use wiremock::{Mock, MockServer, ResponseTemplate};
897
898 let mock = MockServer::start().await;
899 Mock::given(method("POST"))
900 .and(path("/cb"))
901 .respond_with(ResponseTemplate::new(503).insert_header("Retry-After", "1"))
902 .up_to_n_times(1)
903 .expect(1)
904 .mount(&mock)
905 .await;
906 Mock::given(method("POST"))
907 .and(path("/cb"))
908 .respond_with(ResponseTemplate::new(200))
909 .expect(1)
910 .mount(&mock)
911 .await;
912
913 let ch = WebhookChannel::new(
914 "test-hook".into(),
915 8080,
916 None,
917 Some(format!("{}/cb", mock.uri())),
918 None,
919 None,
920 None,
921 Some(2),
922 Some(10),
923 Some(2_000),
924 );
925
926 let start = Instant::now();
927 ch.send(&test_message()).await.unwrap();
928 let elapsed = start.elapsed();
929 assert!(
930 elapsed >= Duration::from_millis(900),
931 "expected to wait ~1s for Retry-After on 503, elapsed = {:?}",
932 elapsed
933 );
934 }
935
936 #[tokio::test]
937 async fn send_max_retries_zero_disables_retry() {
938 use wiremock::matchers::{method, path};
939 use wiremock::{Mock, MockServer, ResponseTemplate};
940
941 let mock = MockServer::start().await;
942 Mock::given(method("POST"))
943 .and(path("/cb"))
944 .respond_with(ResponseTemplate::new(503))
945 .expect(1) .mount(&mock)
947 .await;
948
949 let ch = WebhookChannel::new(
950 "test-hook".into(),
951 8080,
952 None,
953 Some(format!("{}/cb", mock.uri())),
954 None,
955 None,
956 None,
957 Some(0),
958 Some(10),
959 Some(100),
960 );
961 assert!(ch.send(&test_message()).await.is_err());
962 }
963}