Skip to main content

zeroclaw_channels/
webhook.rs

1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::time::Duration;
5use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
6
7const DEFAULT_MAX_RETRIES: u32 = 3;
8const DEFAULT_RETRY_BASE_DELAY_MS: u64 = 500;
9const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 30_000;
10
11/// Generic Webhook channel — receives messages via HTTP POST and sends replies
12/// to a configurable outbound URL. This is the "universal adapter" for any system
13/// that supports webhooks.
14pub struct WebhookChannel {
15    alias: String,
16    listen_port: u16,
17    listen_path: String,
18    send_url: Option<String>,
19    send_method: String,
20    auth_header: Option<String>,
21    secret: Option<String>,
22    max_retries: u32,
23    retry_base_delay_ms: u64,
24    retry_max_delay_ms: u64,
25}
26
27/// Incoming webhook payload format.
28#[derive(Debug, Deserialize)]
29struct IncomingWebhook {
30    sender: String,
31    content: String,
32    #[serde(default)]
33    thread_id: Option<String>,
34}
35
36/// Outgoing webhook payload format.
37#[derive(Debug, Serialize)]
38struct OutgoingWebhook {
39    content: String,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    thread_id: Option<String>,
42    #[serde(skip_serializing_if = "Option::is_none")]
43    recipient: Option<String>,
44}
45
46impl WebhookChannel {
47    #[allow(clippy::too_many_arguments)]
48    pub fn new(
49        alias: String,
50        listen_port: u16,
51        listen_path: Option<String>,
52        send_url: Option<String>,
53        send_method: Option<String>,
54        auth_header: Option<String>,
55        secret: Option<String>,
56        max_retries: Option<u32>,
57        retry_base_delay_ms: Option<u64>,
58        retry_max_delay_ms: Option<u64>,
59    ) -> Self {
60        let path = listen_path.unwrap_or_else(|| "/webhook".to_string());
61        // Ensure path starts with /
62        let listen_path = if path.starts_with('/') {
63            path
64        } else {
65            format!("/{path}")
66        };
67
68        Self {
69            alias,
70            listen_port,
71            listen_path,
72            send_url,
73            send_method: send_method
74                .unwrap_or_else(|| "POST".to_string())
75                .to_uppercase(),
76            auth_header,
77            secret,
78            max_retries: max_retries.unwrap_or(DEFAULT_MAX_RETRIES),
79            // Clamp delays to >=1ms so a misconfigured `0` does not busy-retry without yielding.
80            retry_base_delay_ms: retry_base_delay_ms
81                .unwrap_or(DEFAULT_RETRY_BASE_DELAY_MS)
82                .max(1),
83            retry_max_delay_ms: retry_max_delay_ms
84                .unwrap_or(DEFAULT_RETRY_MAX_DELAY_MS)
85                .max(1),
86        }
87    }
88
89    fn http_client(&self) -> reqwest::Client {
90        zeroclaw_config::schema::build_runtime_proxy_client("channel.webhook")
91    }
92
93    /// Compute the backoff delay for a given attempt, bounded by `retry_max_delay_ms`
94    /// and with ±25% jitter applied. Jitter is applied before the final cap, so the
95    /// returned delay is strictly `<= retry_max_delay_ms`.
96    fn compute_backoff(&self, attempt: u32) -> Duration {
97        let multiplier = 1_u64.checked_shl(attempt).unwrap_or(u64::MAX);
98        let base = self.retry_base_delay_ms.saturating_mul(multiplier);
99        let jittered = apply_jitter(base);
100        let capped = jittered.min(self.retry_max_delay_ms);
101        Duration::from_millis(capped)
102    }
103
104    /// Verify an incoming request's signature if a secret is configured.
105    #[cfg(test)]
106    fn verify_signature(&self, body: &[u8], signature: Option<&str>) -> bool {
107        let Some(ref secret) = self.secret else {
108            return true; // No secret configured, accept all
109        };
110
111        let Some(sig) = signature else {
112            return false; // Secret is set but no signature header provided
113        };
114
115        // HMAC-SHA256 verification
116        use hmac::{Hmac, Mac};
117        use sha2::Sha256;
118
119        type HmacSha256 = Hmac<Sha256>;
120
121        let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
122            return false;
123        };
124        mac.update(body);
125
126        // Signature should be hex-encoded
127        let Ok(expected) = hex::decode(sig.trim_start_matches("sha256=")) else {
128            return false;
129        };
130
131        mac.verify_slice(&expected).is_ok()
132    }
133
134    async fn attempt_send(
135        &self,
136        client: &reqwest::Client,
137        send_url: &str,
138        payload: &OutgoingWebhook,
139    ) -> AttemptOutcome {
140        let mut request = match self.send_method.as_str() {
141            "PUT" => client.put(send_url),
142            _ => client.post(send_url),
143        };
144
145        if let Some(ref auth) = self.auth_header {
146            request = request.header("Authorization", auth);
147        }
148
149        let resp = match request.json(payload).send().await {
150            Ok(r) => r,
151            Err(e) => return AttemptOutcome::Retry(format!("network error: {e}")),
152        };
153
154        let status = resp.status();
155        if status.is_success() {
156            return AttemptOutcome::Success;
157        }
158
159        let code = status.as_u16();
160        let retry_after = resp
161            .headers()
162            .get(reqwest::header::RETRY_AFTER)
163            .and_then(|v| v.to_str().ok())
164            .and_then(parse_retry_after_ms);
165
166        // 429 and 503 may include Retry-After; honor it if present. 429 appears here
167        // *and* in the branch below: here we take the server-supplied delay, below we
168        // fall back to exponential backoff when no Retry-After header was sent.
169        // Reading the body is deferred until after this early-return so hot 429 loops
170        // against large pages don't pay the I/O cost.
171        if (code == 429 || code == 503)
172            && let Some(ms) = retry_after
173        {
174            return AttemptOutcome::RetryAfter(Duration::from_millis(ms));
175        }
176
177        let body = resp
178            .text()
179            .await
180            .unwrap_or_else(|e| format!("<failed to read response: {e}>"));
181
182        // Retry 429 (rate limit) and 5xx (server errors).
183        if code == 429 || (500..600).contains(&code) {
184            return AttemptOutcome::Retry(format!("Webhook send failed ({status}): {body}"));
185        }
186
187        // Other 4xx → do not retry.
188        AttemptOutcome::Fatal(anyhow::Error::msg(format!(
189            "Webhook send failed ({status}): {body}"
190        )))
191    }
192}
193
194impl ::zeroclaw_api::attribution::Attributable for WebhookChannel {
195    fn role(&self) -> ::zeroclaw_api::attribution::Role {
196        ::zeroclaw_api::attribution::Role::Channel(
197            ::zeroclaw_api::attribution::ChannelKind::Webhook,
198        )
199    }
200    fn alias(&self) -> &str {
201        &self.alias
202    }
203}
204
205/// Apply ±25% jitter to a delay so parallel senders do not thunder-herd.
206fn apply_jitter(delay_ms: u64) -> u64 {
207    if delay_ms == 0 {
208        return 0;
209    }
210    let jitter_factor = 0.75 + (rand::random::<f64>() * 0.5);
211    // Safe: jitter_factor > 0 keeps the product non-negative; f64→u64 cast saturates on overflow.
212    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
213    let jittered = ((delay_ms as f64) * jitter_factor) as u64;
214    jittered
215}
216
217/// Parse a `Retry-After` header value. Supports integer seconds and
218/// decimal seconds (truncated to whole seconds). HTTP-date form is not supported.
219fn parse_retry_after_ms(value: &str) -> Option<u64> {
220    let trimmed = value.trim();
221    if trimmed.is_empty() {
222        return None;
223    }
224    if let Ok(seconds) = trimmed.parse::<u64>() {
225        return Some(seconds.saturating_mul(1000));
226    }
227    let whole = trimmed
228        .split_once('.')
229        .map(|(whole, _)| whole)
230        .unwrap_or(trimmed);
231    whole.parse::<u64>().ok().map(|s| s.saturating_mul(1000))
232}
233
234/// Outcome of a single send attempt.
235enum AttemptOutcome {
236    Success,
237    RetryAfter(Duration),
238    Retry(String),
239    Fatal(anyhow::Error),
240}
241
242#[async_trait]
243impl Channel for WebhookChannel {
244    fn name(&self) -> &str {
245        "webhook"
246    }
247
248    async fn send(&self, message: &SendMessage) -> Result<()> {
249        let Some(ref send_url) = self.send_url else {
250            ::zeroclaw_log::record!(
251                DEBUG,
252                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
253                "channel: no send_url configured, skipping outbound message"
254            );
255            return Ok(());
256        };
257
258        let client = self.http_client();
259        let payload = OutgoingWebhook {
260            content: message.content.clone(),
261            thread_id: message.thread_ts.clone(),
262            recipient: if message.recipient.is_empty() {
263                None
264            } else {
265                Some(message.recipient.clone())
266            },
267        };
268
269        let total_attempts = self.max_retries.saturating_add(1);
270
271        for attempt in 0..total_attempts {
272            let outcome = self.attempt_send(&client, send_url, &payload).await;
273
274            match outcome {
275                AttemptOutcome::Success => return Ok(()),
276                AttemptOutcome::Fatal(err) => return Err(err),
277                AttemptOutcome::RetryAfter(delay) => {
278                    if attempt + 1 >= total_attempts {
279                        bail!(
280                            "Webhook send failed after {total_attempts} attempt(s); last error: rate limited / server error with Retry-After"
281                        );
282                    }
283                    let capped = delay.min(Duration::from_millis(self.retry_max_delay_ms));
284                    ::zeroclaw_log::record!(
285                        WARN,
286                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
287                        &format!(
288                            "Webhook send: server requested retry after {}ms (attempt {}/{}), waiting...",
289                            capped.as_millis(),
290                            attempt + 1,
291                            total_attempts
292                        )
293                    );
294                    tokio::time::sleep(capped).await;
295                }
296                AttemptOutcome::Retry(err_msg) => {
297                    if attempt + 1 >= total_attempts {
298                        bail!(
299                            "Webhook send failed after {total_attempts} attempt(s); last error: {err_msg}"
300                        );
301                    }
302                    let delay = self.compute_backoff(attempt);
303                    ::zeroclaw_log::record!(
304                        WARN,
305                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
306                        &format!(
307                            "Webhook send failed (attempt {}/{}): {}; retrying in {}ms",
308                            attempt + 1,
309                            total_attempts,
310                            err_msg,
311                            delay.as_millis()
312                        )
313                    );
314                    tokio::time::sleep(delay).await;
315                }
316            }
317        }
318
319        unreachable!("send loop exits via return or bail on the final attempt")
320    }
321
322    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
323        use axum::{
324            Router,
325            body::Bytes,
326            extract::State,
327            http::{HeaderMap, StatusCode},
328            routing::post,
329        };
330        use portable_atomic::{AtomicU64, Ordering};
331        use std::sync::Arc;
332
333        let counter = Arc::new(AtomicU64::new(0));
334
335        struct WebhookState {
336            tx: tokio::sync::mpsc::Sender<ChannelMessage>,
337            secret: Option<String>,
338            counter: Arc<AtomicU64>,
339        }
340
341        let state = Arc::new(WebhookState {
342            tx: tx.clone(),
343            secret: self.secret.clone(),
344            counter: counter.clone(),
345        });
346
347        let listen_path = self.listen_path.clone();
348
349        async fn handle_webhook(
350            State(state): State<Arc<WebhookState>>,
351            headers: HeaderMap,
352            body: Bytes,
353        ) -> StatusCode {
354            // Verify signature if secret is configured
355            if let Some(ref secret) = state.secret {
356                use hmac::{Hmac, Mac};
357                use sha2::Sha256;
358                type HmacSha256 = Hmac<Sha256>;
359
360                let signature = headers
361                    .get("x-webhook-signature")
362                    .and_then(|v| v.to_str().ok());
363
364                let valid = if let Some(sig) = signature {
365                    if let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) {
366                        mac.update(&body);
367                        let expected =
368                            hex::decode(sig.trim_start_matches("sha256=")).unwrap_or_default();
369                        mac.verify_slice(&expected).is_ok()
370                    } else {
371                        false
372                    }
373                } else {
374                    false
375                };
376
377                if !valid {
378                    ::zeroclaw_log::record!(
379                        WARN,
380                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
381                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
382                        "invalid signature, rejecting request"
383                    );
384                    return StatusCode::UNAUTHORIZED;
385                }
386            }
387
388            let payload: IncomingWebhook = match serde_json::from_slice(&body) {
389                Ok(p) => p,
390                Err(e) => {
391                    ::zeroclaw_log::record!(
392                        WARN,
393                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
394                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
395                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
396                        "invalid JSON payload"
397                    );
398                    return StatusCode::BAD_REQUEST;
399                }
400            };
401
402            if payload.content.is_empty() {
403                return StatusCode::BAD_REQUEST;
404            }
405
406            let seq = state.counter.fetch_add(1, Ordering::Relaxed);
407
408            #[allow(clippy::cast_possible_truncation)]
409            let timestamp = std::time::SystemTime::now()
410                .duration_since(std::time::UNIX_EPOCH)
411                .unwrap_or_default()
412                .as_secs();
413
414            let reply_target = payload
415                .thread_id
416                .clone()
417                .unwrap_or_else(|| payload.sender.clone());
418
419            let msg = ChannelMessage {
420                id: format!("webhook_{seq}"),
421                sender: payload.sender,
422                reply_target,
423                content: payload.content,
424                channel: "webhook".to_string(),
425                channel_alias: None,
426                timestamp,
427                thread_ts: payload.thread_id,
428                interruption_scope_id: None,
429                attachments: vec![],
430                subject: None,
431            };
432
433            if state.tx.send(msg).await.is_err() {
434                return StatusCode::SERVICE_UNAVAILABLE;
435            }
436
437            StatusCode::OK
438        }
439
440        let app = Router::new()
441            .route(&listen_path, post(handle_webhook))
442            .with_state(state);
443
444        let addr = std::net::SocketAddr::from(([0, 0, 0, 0], self.listen_port));
445        ::zeroclaw_log::record!(
446            INFO,
447            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
448            &format!(
449                "Webhook channel listening on http://0.0.0.0:{}{} ...",
450                self.listen_port, self.listen_path
451            )
452        );
453
454        let listener = tokio::net::TcpListener::bind(addr).await?;
455        axum::serve(listener, app).await.map_err(|e| {
456            ::zeroclaw_log::record!(
457                ERROR,
458                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
459                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
460                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
461                "Webhook server error"
462            );
463            anyhow::Error::msg(format!("Webhook server error: {e}"))
464        })?;
465
466        Ok(())
467    }
468
469    async fn health_check(&self) -> bool {
470        // Webhook channel is healthy if the port can be bound (basic check).
471        // In practice, once listen() starts the server is running.
472        true
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479
480    fn make_channel() -> WebhookChannel {
481        WebhookChannel::new(
482            "test-hook".into(),
483            8080,
484            Some("/webhook".into()),
485            Some("https://example.com/callback".into()),
486            None,
487            None,
488            None,
489            None,
490            None,
491            None,
492        )
493    }
494
495    fn make_channel_with_secret() -> WebhookChannel {
496        WebhookChannel::new(
497            "test-hook".into(),
498            8080,
499            None,
500            Some("https://example.com/callback".into()),
501            None,
502            None,
503            Some("mysecret".into()),
504            None,
505            None,
506            None,
507        )
508    }
509
510    fn make_channel_to(url: &str) -> WebhookChannel {
511        // Fast retries to keep tests snappy.
512        WebhookChannel::new(
513            "test-hook".into(),
514            8080,
515            None,
516            Some(url.into()),
517            None,
518            None,
519            None,
520            Some(2),
521            Some(10),
522            Some(100),
523        )
524    }
525
526    #[test]
527    fn default_path() {
528        let ch = WebhookChannel::new(
529            "test-hook".into(),
530            8080,
531            None,
532            None,
533            None,
534            None,
535            None,
536            None,
537            None,
538            None,
539        );
540        assert_eq!(ch.listen_path, "/webhook");
541    }
542
543    #[test]
544    fn path_normalized() {
545        let ch = WebhookChannel::new(
546            "test-hook".into(),
547            8080,
548            Some("hooks/incoming".into()),
549            None,
550            None,
551            None,
552            None,
553            None,
554            None,
555            None,
556        );
557        assert_eq!(ch.listen_path, "/hooks/incoming");
558    }
559
560    #[test]
561    fn send_method_default() {
562        let ch = make_channel();
563        assert_eq!(ch.send_method, "POST");
564    }
565
566    #[test]
567    fn send_method_put() {
568        let ch = WebhookChannel::new(
569            "test-hook".into(),
570            8080,
571            None,
572            Some("https://example.com".into()),
573            Some("put".into()),
574            None,
575            None,
576            None,
577            None,
578            None,
579        );
580        assert_eq!(ch.send_method, "PUT");
581    }
582
583    #[test]
584    fn retry_defaults_applied() {
585        let ch = make_channel();
586        assert_eq!(ch.max_retries, DEFAULT_MAX_RETRIES);
587        assert_eq!(ch.retry_base_delay_ms, DEFAULT_RETRY_BASE_DELAY_MS);
588        assert_eq!(ch.retry_max_delay_ms, DEFAULT_RETRY_MAX_DELAY_MS);
589    }
590
591    #[test]
592    fn retry_overrides_applied() {
593        let ch = WebhookChannel::new(
594            "test-hook".into(),
595            8080,
596            None,
597            Some("https://example.com".into()),
598            None,
599            None,
600            None,
601            Some(0),
602            Some(50),
603            Some(1_000),
604        );
605        assert_eq!(ch.max_retries, 0);
606        assert_eq!(ch.retry_base_delay_ms, 50);
607        assert_eq!(ch.retry_max_delay_ms, 1_000);
608    }
609
610    #[test]
611    fn backoff_capped_by_max_delay() {
612        let ch = WebhookChannel::new(
613            "test-hook".into(),
614            8080,
615            None,
616            Some("https://example.com".into()),
617            None,
618            None,
619            None,
620            Some(5),
621            Some(1_000),
622            Some(2_000),
623        );
624        // Base for attempt=10 is 1_000 * 2^10 = 1_024_000ms. Jitter scales it by
625        // [0.75, 1.25] → still well above the 2_000ms cap. The strict cap clamps
626        // the jittered value, so the returned delay must equal `retry_max_delay_ms`.
627        let d = ch.compute_backoff(10);
628        assert_eq!(d.as_millis(), 2_000);
629    }
630
631    #[test]
632    fn backoff_never_exceeds_max_delay_near_cap() {
633        // When the un-capped base is close to `retry_max_delay_ms`, jitter could
634        // historically push the result above the cap. With strict capping the
635        // returned delay must stay `<= retry_max_delay_ms` on every draw.
636        let ch = WebhookChannel::new(
637            "test-hook".into(),
638            8080,
639            None,
640            Some("https://example.com".into()),
641            None,
642            None,
643            None,
644            Some(5),
645            Some(1_000),
646            Some(2_000),
647        );
648        for _ in 0..256 {
649            let d = ch.compute_backoff(1); // base = 2_000ms, jitter ∈ [1_500, 2_500]
650            assert!(
651                d.as_millis() <= 2_000,
652                "compute_backoff exceeded retry_max_delay_ms: {}ms",
653                d.as_millis()
654            );
655        }
656    }
657
658    #[test]
659    fn parse_retry_after_integer_seconds() {
660        assert_eq!(parse_retry_after_ms("5"), Some(5_000));
661    }
662
663    #[test]
664    fn parse_retry_after_decimal_seconds() {
665        assert_eq!(parse_retry_after_ms("2.9"), Some(2_000));
666    }
667
668    #[test]
669    fn parse_retry_after_rejects_non_numeric() {
670        assert_eq!(parse_retry_after_ms("later"), None);
671    }
672
673    #[test]
674    fn parse_retry_after_empty() {
675        assert_eq!(parse_retry_after_ms("  "), None);
676    }
677
678    #[test]
679    fn parse_retry_after_zero() {
680        assert_eq!(parse_retry_after_ms("0"), Some(0));
681    }
682
683    #[test]
684    fn incoming_payload_deserializes_all_fields() {
685        let json = r#"{"sender": "zeroclaw_user", "content": "hello", "thread_id": "t1"}"#;
686        let payload: IncomingWebhook = serde_json::from_str(json).unwrap();
687        assert_eq!(payload.sender, "zeroclaw_user");
688        assert_eq!(payload.content, "hello");
689        assert_eq!(payload.thread_id.as_deref(), Some("t1"));
690    }
691
692    #[test]
693    fn incoming_payload_without_thread() {
694        let json = r#"{"sender": "zeroclaw_user", "content": "hi"}"#;
695        let payload: IncomingWebhook = serde_json::from_str(json).unwrap();
696        assert_eq!(payload.sender, "zeroclaw_user");
697        assert_eq!(payload.content, "hi");
698        assert!(payload.thread_id.is_none());
699    }
700
701    #[test]
702    fn outgoing_payload_serializes_content() {
703        let payload = OutgoingWebhook {
704            content: "response".into(),
705            thread_id: Some("t1".into()),
706            recipient: Some("zeroclaw_user".into()),
707        };
708        let json = serde_json::to_value(&payload).unwrap();
709        assert_eq!(json["content"], "response");
710        assert_eq!(json["thread_id"], "t1");
711        assert_eq!(json["recipient"], "zeroclaw_user");
712    }
713
714    #[test]
715    fn outgoing_payload_omits_none_fields() {
716        let payload = OutgoingWebhook {
717            content: "response".into(),
718            thread_id: None,
719            recipient: None,
720        };
721        let json = serde_json::to_value(&payload).unwrap();
722        assert_eq!(json["content"], "response");
723        assert!(json.get("thread_id").is_none());
724        assert!(json.get("recipient").is_none());
725    }
726
727    #[test]
728    fn verify_signature_no_secret() {
729        let ch = make_channel();
730        assert!(ch.verify_signature(b"body", None));
731    }
732
733    #[test]
734    fn verify_signature_missing_header() {
735        let ch = make_channel_with_secret();
736        assert!(!ch.verify_signature(b"body", None));
737    }
738
739    #[test]
740    fn verify_signature_valid() {
741        use hmac::{Hmac, Mac};
742        use sha2::Sha256;
743        type HmacSha256 = Hmac<Sha256>;
744
745        let ch = make_channel_with_secret();
746        let body = b"test body";
747
748        let mut mac = HmacSha256::new_from_slice(b"mysecret").unwrap();
749        mac.update(body);
750        let sig = hex::encode(mac.finalize().into_bytes());
751
752        assert!(ch.verify_signature(body, Some(&sig)));
753    }
754
755    #[test]
756    fn verify_signature_invalid() {
757        let ch = make_channel_with_secret();
758        assert!(!ch.verify_signature(b"body", Some("badhex")));
759    }
760
761    fn test_message() -> SendMessage {
762        SendMessage::new("hello", "zeroclaw_user")
763    }
764
765    #[tokio::test]
766    async fn send_happy_path_returns_ok() {
767        use wiremock::matchers::{method, path};
768        use wiremock::{Mock, MockServer, ResponseTemplate};
769
770        let mock = MockServer::start().await;
771        Mock::given(method("POST"))
772            .and(path("/cb"))
773            .respond_with(ResponseTemplate::new(200))
774            .expect(1)
775            .mount(&mock)
776            .await;
777
778        let ch = make_channel_to(&format!("{}/cb", mock.uri()));
779        ch.send(&test_message()).await.unwrap();
780    }
781
782    #[tokio::test]
783    async fn send_retries_on_5xx_then_succeeds() {
784        use wiremock::matchers::{method, path};
785        use wiremock::{Mock, MockServer, ResponseTemplate};
786
787        let mock = MockServer::start().await;
788        Mock::given(method("POST"))
789            .and(path("/cb"))
790            .respond_with(ResponseTemplate::new(503))
791            .up_to_n_times(1)
792            .expect(1)
793            .mount(&mock)
794            .await;
795        Mock::given(method("POST"))
796            .and(path("/cb"))
797            .respond_with(ResponseTemplate::new(200))
798            .expect(1)
799            .mount(&mock)
800            .await;
801
802        let ch = make_channel_to(&format!("{}/cb", mock.uri()));
803        ch.send(&test_message()).await.unwrap();
804    }
805
806    #[tokio::test]
807    async fn send_does_not_retry_on_4xx() {
808        use wiremock::matchers::{method, path};
809        use wiremock::{Mock, MockServer, ResponseTemplate};
810
811        let mock = MockServer::start().await;
812        Mock::given(method("POST"))
813            .and(path("/cb"))
814            .respond_with(ResponseTemplate::new(400))
815            .expect(1) // exactly one call — must not retry
816            .mount(&mock)
817            .await;
818
819        let ch = make_channel_to(&format!("{}/cb", mock.uri()));
820        let err = ch.send(&test_message()).await.unwrap_err();
821        assert!(err.to_string().contains("400"));
822    }
823
824    #[tokio::test]
825    async fn send_retries_on_429_then_exhausts() {
826        use wiremock::matchers::{method, path};
827        use wiremock::{Mock, MockServer, ResponseTemplate};
828
829        let mock = MockServer::start().await;
830        Mock::given(method("POST"))
831            .and(path("/cb"))
832            .respond_with(ResponseTemplate::new(429))
833            .expect(3) // max_retries=2 → 3 total attempts
834            .mount(&mock)
835            .await;
836
837        let ch = make_channel_to(&format!("{}/cb", mock.uri()));
838        let err = ch.send(&test_message()).await.unwrap_err();
839        let msg = err.to_string();
840        assert!(
841            msg.contains("3 attempt"),
842            "expected attempt count in error: {msg}"
843        );
844    }
845
846    #[tokio::test]
847    async fn send_honors_retry_after_header() {
848        use std::time::Instant;
849        use wiremock::matchers::{method, path};
850        use wiremock::{Mock, MockServer, ResponseTemplate};
851
852        let mock = MockServer::start().await;
853        Mock::given(method("POST"))
854            .and(path("/cb"))
855            .respond_with(ResponseTemplate::new(429).insert_header("Retry-After", "1"))
856            .up_to_n_times(1)
857            .expect(1)
858            .mount(&mock)
859            .await;
860        Mock::given(method("POST"))
861            .and(path("/cb"))
862            .respond_with(ResponseTemplate::new(200))
863            .expect(1)
864            .mount(&mock)
865            .await;
866
867        // Use a channel whose retry_max_delay_ms is high enough to let us actually
868        // wait the full Retry-After (cap at 2s so we honor the 1s instruction).
869        let ch = WebhookChannel::new(
870            "test-hook".into(),
871            8080,
872            None,
873            Some(format!("{}/cb", mock.uri())),
874            None,
875            None,
876            None,
877            Some(2),
878            Some(10),
879            Some(2_000),
880        );
881
882        let start = Instant::now();
883        ch.send(&test_message()).await.unwrap();
884        let elapsed = start.elapsed();
885        assert!(
886            elapsed >= Duration::from_millis(900),
887            "expected to wait ~1s for Retry-After, elapsed = {:?}",
888            elapsed
889        );
890    }
891
892    #[tokio::test]
893    async fn send_honors_retry_after_on_503() {
894        use std::time::Instant;
895        use wiremock::matchers::{method, path};
896        use wiremock::{Mock, MockServer, ResponseTemplate};
897
898        let mock = MockServer::start().await;
899        Mock::given(method("POST"))
900            .and(path("/cb"))
901            .respond_with(ResponseTemplate::new(503).insert_header("Retry-After", "1"))
902            .up_to_n_times(1)
903            .expect(1)
904            .mount(&mock)
905            .await;
906        Mock::given(method("POST"))
907            .and(path("/cb"))
908            .respond_with(ResponseTemplate::new(200))
909            .expect(1)
910            .mount(&mock)
911            .await;
912
913        let ch = WebhookChannel::new(
914            "test-hook".into(),
915            8080,
916            None,
917            Some(format!("{}/cb", mock.uri())),
918            None,
919            None,
920            None,
921            Some(2),
922            Some(10),
923            Some(2_000),
924        );
925
926        let start = Instant::now();
927        ch.send(&test_message()).await.unwrap();
928        let elapsed = start.elapsed();
929        assert!(
930            elapsed >= Duration::from_millis(900),
931            "expected to wait ~1s for Retry-After on 503, elapsed = {:?}",
932            elapsed
933        );
934    }
935
936    #[tokio::test]
937    async fn send_max_retries_zero_disables_retry() {
938        use wiremock::matchers::{method, path};
939        use wiremock::{Mock, MockServer, ResponseTemplate};
940
941        let mock = MockServer::start().await;
942        Mock::given(method("POST"))
943            .and(path("/cb"))
944            .respond_with(ResponseTemplate::new(503))
945            .expect(1) // only one attempt when max_retries=0
946            .mount(&mock)
947            .await;
948
949        let ch = WebhookChannel::new(
950            "test-hook".into(),
951            8080,
952            None,
953            Some(format!("{}/cb", mock.uri())),
954            None,
955            None,
956            None,
957            Some(0),
958            Some(10),
959            Some(100),
960        );
961        assert!(ch.send(&test_message()).await.is_err());
962    }
963}