Skip to main content

zeroclaw_channels/
webhook.rs

1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use chrono::{DateTime, NaiveDateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::time::Duration;
6use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
7
8const DEFAULT_MAX_RETRIES: u32 = 3;
9const DEFAULT_RETRY_BASE_DELAY_MS: u64 = 500;
10const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 30_000;
11
12/// Generic Webhook channel — receives messages via HTTP POST and sends replies
13/// to a configurable outbound URL. This is the "universal adapter" for any system
14/// that supports webhooks.
15pub struct WebhookChannel {
16    alias: String,
17    listen_port: u16,
18    listen_path: String,
19    send_url: Option<String>,
20    send_method: String,
21    auth_header: Option<String>,
22    secret: Option<String>,
23    max_retries: u32,
24    retry_base_delay_ms: u64,
25    retry_max_delay_ms: u64,
26}
27
28/// Incoming webhook payload format.
29#[derive(Debug, Deserialize)]
30struct IncomingWebhook {
31    sender: String,
32    content: String,
33    #[serde(default)]
34    thread_id: Option<String>,
35}
36
37/// Outgoing webhook payload format.
38#[derive(Debug, Serialize)]
39struct OutgoingWebhook {
40    content: String,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    thread_id: Option<String>,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    recipient: Option<String>,
45}
46
47impl WebhookChannel {
48    #[allow(clippy::too_many_arguments)]
49    pub fn new(
50        alias: String,
51        listen_port: u16,
52        listen_path: Option<String>,
53        send_url: Option<String>,
54        send_method: Option<String>,
55        auth_header: Option<String>,
56        secret: Option<String>,
57        max_retries: Option<u32>,
58        retry_base_delay_ms: Option<u64>,
59        retry_max_delay_ms: Option<u64>,
60    ) -> Self {
61        let path = listen_path.unwrap_or_else(|| "/webhook".to_string());
62        // Ensure path starts with /
63        let listen_path = if path.starts_with('/') {
64            path
65        } else {
66            format!("/{path}")
67        };
68
69        Self {
70            alias,
71            listen_port,
72            listen_path,
73            send_url,
74            send_method: send_method
75                .unwrap_or_else(|| "POST".to_string())
76                .to_uppercase(),
77            auth_header,
78            secret,
79            max_retries: max_retries.unwrap_or(DEFAULT_MAX_RETRIES),
80            // Clamp delays to >=1ms so a misconfigured `0` does not busy-retry without yielding.
81            retry_base_delay_ms: retry_base_delay_ms
82                .unwrap_or(DEFAULT_RETRY_BASE_DELAY_MS)
83                .max(1),
84            retry_max_delay_ms: retry_max_delay_ms
85                .unwrap_or(DEFAULT_RETRY_MAX_DELAY_MS)
86                .max(1),
87        }
88    }
89
90    fn http_client(&self) -> reqwest::Client {
91        zeroclaw_config::schema::build_runtime_proxy_client("channel.webhook")
92    }
93
94    /// Compute the backoff delay for a given attempt, bounded by `retry_max_delay_ms`
95    /// and with ±25% jitter applied. Jitter is applied before the final cap, so the
96    /// returned delay is strictly `<= retry_max_delay_ms`.
97    fn compute_backoff(&self, attempt: u32) -> Duration {
98        let multiplier = 1_u64.checked_shl(attempt).unwrap_or(u64::MAX);
99        let base = self.retry_base_delay_ms.saturating_mul(multiplier);
100        let jittered = apply_jitter(base);
101        let capped = jittered.min(self.retry_max_delay_ms);
102        Duration::from_millis(capped)
103    }
104
105    /// Verify an incoming request's signature if a secret is configured.
106    #[cfg(test)]
107    fn verify_signature(&self, body: &[u8], signature: Option<&str>) -> bool {
108        let Some(ref secret) = self.secret else {
109            return true; // No secret configured, accept all
110        };
111
112        let Some(sig) = signature else {
113            return false; // Secret is set but no signature header provided
114        };
115
116        // HMAC-SHA256 verification
117        use hmac::{Hmac, Mac};
118        use sha2::Sha256;
119
120        type HmacSha256 = Hmac<Sha256>;
121
122        let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
123            return false;
124        };
125        mac.update(body);
126
127        // Signature should be hex-encoded
128        let Ok(expected) = hex::decode(sig.trim_start_matches("sha256=")) else {
129            return false;
130        };
131
132        mac.verify_slice(&expected).is_ok()
133    }
134
135    async fn attempt_send(
136        &self,
137        client: &reqwest::Client,
138        send_url: &str,
139        payload: &OutgoingWebhook,
140    ) -> AttemptOutcome {
141        let mut request = match self.send_method.as_str() {
142            "PUT" => client.put(send_url),
143            _ => client.post(send_url),
144        };
145
146        if let Some(ref auth) = self.auth_header {
147            request = request.header("Authorization", auth);
148        }
149
150        let resp = match request.json(payload).send().await {
151            Ok(r) => r,
152            Err(e) => return AttemptOutcome::Retry(format!("network error: {e}")),
153        };
154
155        let status = resp.status();
156        if status.is_success() {
157            return AttemptOutcome::Success;
158        }
159
160        let code = status.as_u16();
161        let retry_after = resp
162            .headers()
163            .get(reqwest::header::RETRY_AFTER)
164            .and_then(|v| v.to_str().ok())
165            .and_then(parse_retry_after_ms);
166
167        // 429 and 503 may include Retry-After; honor it if present. 429 appears here
168        // *and* in the branch below: here we take the server-supplied delay, below we
169        // fall back to exponential backoff when no Retry-After header was sent.
170        // Reading the body is deferred until after this early-return so hot 429 loops
171        // against large pages don't pay the I/O cost.
172        if (code == 429 || code == 503)
173            && let Some(ms) = retry_after
174        {
175            return AttemptOutcome::RetryAfter(Duration::from_millis(ms));
176        }
177
178        let body = resp
179            .text()
180            .await
181            .unwrap_or_else(|e| format!("<failed to read response: {e}>"));
182
183        // Retry 429 (rate limit) and 5xx (server errors).
184        if code == 429 || (500..600).contains(&code) {
185            return AttemptOutcome::Retry(format!("Webhook send failed ({status}): {body}"));
186        }
187
188        // Other 4xx → do not retry.
189        AttemptOutcome::Fatal(anyhow::Error::msg(format!(
190            "Webhook send failed ({status}): {body}"
191        )))
192    }
193}
194
195impl ::zeroclaw_api::attribution::Attributable for WebhookChannel {
196    fn role(&self) -> ::zeroclaw_api::attribution::Role {
197        ::zeroclaw_api::attribution::Role::Channel(
198            ::zeroclaw_api::attribution::ChannelKind::Webhook,
199        )
200    }
201    fn alias(&self) -> &str {
202        &self.alias
203    }
204}
205
206/// Apply ±25% jitter to a delay so parallel senders do not thunder-herd.
207fn apply_jitter(delay_ms: u64) -> u64 {
208    if delay_ms == 0 {
209        return 0;
210    }
211    let jitter_factor = 0.75 + (rand::random::<f64>() * 0.5);
212    // Safe: jitter_factor > 0 keeps the product non-negative; f64→u64 cast saturates on overflow.
213    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
214    let jittered = ((delay_ms as f64) * jitter_factor) as u64;
215    jittered
216}
217
218/// Parse a `Retry-After` header value. Supports integer seconds, decimal
219/// seconds (truncated to whole seconds), and HTTP-date values.
220fn parse_retry_after_ms(value: &str) -> Option<u64> {
221    parse_retry_after_ms_at(value, Utc::now())
222}
223
224fn parse_retry_after_ms_at(value: &str, now: DateTime<Utc>) -> Option<u64> {
225    let trimmed = value.trim();
226    if trimmed.is_empty() {
227        return None;
228    }
229    if let Ok(seconds) = trimmed.parse::<u64>() {
230        return Some(seconds.saturating_mul(1000));
231    }
232    let whole = trimmed
233        .split_once('.')
234        .map(|(whole, _)| whole)
235        .unwrap_or(trimmed);
236    if let Ok(seconds) = whole.parse::<u64>() {
237        return Some(seconds.saturating_mul(1000));
238    }
239
240    parse_retry_after_http_date(trimmed).map(|date| {
241        let delay_ms = date.signed_duration_since(now).num_milliseconds();
242        if delay_ms <= 0 {
243            0
244        } else {
245            u64::try_from(delay_ms).unwrap_or(u64::MAX)
246        }
247    })
248}
249
250fn parse_retry_after_http_date(value: &str) -> Option<DateTime<Utc>> {
251    if let Ok(date) = NaiveDateTime::parse_from_str(value, "%a, %d %b %Y %H:%M:%S GMT") {
252        return Some(DateTime::from_naive_utc_and_offset(date, Utc));
253    }
254    if let Ok(date) = NaiveDateTime::parse_from_str(value, "%A, %d-%b-%y %H:%M:%S GMT") {
255        return Some(DateTime::from_naive_utc_and_offset(date, Utc));
256    }
257    NaiveDateTime::parse_from_str(value, "%a %b %e %H:%M:%S %Y")
258        .ok()
259        .map(|date| DateTime::from_naive_utc_and_offset(date, Utc))
260}
261
262/// Outcome of a single send attempt.
263enum AttemptOutcome {
264    Success,
265    RetryAfter(Duration),
266    Retry(String),
267    Fatal(anyhow::Error),
268}
269
270#[async_trait]
271impl Channel for WebhookChannel {
272    fn name(&self) -> &str {
273        "webhook"
274    }
275
276    async fn send(&self, message: &SendMessage) -> Result<()> {
277        let Some(ref send_url) = self.send_url else {
278            ::zeroclaw_log::record!(
279                DEBUG,
280                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
281                "channel: no send_url configured, skipping outbound message"
282            );
283            return Ok(());
284        };
285
286        let client = self.http_client();
287        let payload = OutgoingWebhook {
288            content: message.content.clone(),
289            thread_id: message.thread_ts.clone(),
290            recipient: if message.recipient.is_empty() {
291                None
292            } else {
293                Some(message.recipient.clone())
294            },
295        };
296
297        let total_attempts = self.max_retries.saturating_add(1);
298
299        for attempt in 0..total_attempts {
300            let outcome = self.attempt_send(&client, send_url, &payload).await;
301
302            match outcome {
303                AttemptOutcome::Success => return Ok(()),
304                AttemptOutcome::Fatal(err) => return Err(err),
305                AttemptOutcome::RetryAfter(delay) => {
306                    if attempt + 1 >= total_attempts {
307                        bail!(
308                            "Webhook send failed after {total_attempts} attempt(s); last error: rate limited / server error with Retry-After"
309                        );
310                    }
311                    let capped = delay.min(Duration::from_millis(self.retry_max_delay_ms));
312                    ::zeroclaw_log::record!(
313                        WARN,
314                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
315                        &format!(
316                            "Webhook send: server requested retry after {}ms (attempt {}/{}), waiting...",
317                            capped.as_millis(),
318                            attempt + 1,
319                            total_attempts
320                        )
321                    );
322                    tokio::time::sleep(capped).await;
323                }
324                AttemptOutcome::Retry(err_msg) => {
325                    if attempt + 1 >= total_attempts {
326                        bail!(
327                            "Webhook send failed after {total_attempts} attempt(s); last error: {err_msg}"
328                        );
329                    }
330                    let delay = self.compute_backoff(attempt);
331                    ::zeroclaw_log::record!(
332                        WARN,
333                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
334                        &format!(
335                            "Webhook send failed (attempt {}/{}): {}; retrying in {}ms",
336                            attempt + 1,
337                            total_attempts,
338                            err_msg,
339                            delay.as_millis()
340                        )
341                    );
342                    tokio::time::sleep(delay).await;
343                }
344            }
345        }
346
347        unreachable!("send loop exits via return or bail on the final attempt")
348    }
349
350    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
351        use axum::{
352            Router,
353            body::Bytes,
354            extract::State,
355            http::{HeaderMap, StatusCode},
356            routing::post,
357        };
358        use portable_atomic::{AtomicU64, Ordering};
359        use std::sync::Arc;
360
361        let counter = Arc::new(AtomicU64::new(0));
362
363        struct WebhookState {
364            tx: tokio::sync::mpsc::Sender<ChannelMessage>,
365            secret: Option<String>,
366            counter: Arc<AtomicU64>,
367        }
368
369        let state = Arc::new(WebhookState {
370            tx: tx.clone(),
371            secret: self.secret.clone(),
372            counter: counter.clone(),
373        });
374
375        let listen_path = self.listen_path.clone();
376
377        async fn handle_webhook(
378            State(state): State<Arc<WebhookState>>,
379            headers: HeaderMap,
380            body: Bytes,
381        ) -> StatusCode {
382            // Verify signature if secret is configured
383            if let Some(ref secret) = state.secret {
384                use hmac::{Hmac, Mac};
385                use sha2::Sha256;
386                type HmacSha256 = Hmac<Sha256>;
387
388                let signature = headers
389                    .get("x-webhook-signature")
390                    .and_then(|v| v.to_str().ok());
391
392                let valid = if let Some(sig) = signature {
393                    if let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) {
394                        mac.update(&body);
395                        let expected =
396                            hex::decode(sig.trim_start_matches("sha256=")).unwrap_or_default();
397                        mac.verify_slice(&expected).is_ok()
398                    } else {
399                        false
400                    }
401                } else {
402                    false
403                };
404
405                if !valid {
406                    ::zeroclaw_log::record!(
407                        WARN,
408                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
409                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
410                        "invalid signature, rejecting request"
411                    );
412                    return StatusCode::UNAUTHORIZED;
413                }
414            }
415
416            let payload: IncomingWebhook = match serde_json::from_slice(&body) {
417                Ok(p) => p,
418                Err(e) => {
419                    ::zeroclaw_log::record!(
420                        WARN,
421                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
422                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
423                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
424                        "invalid JSON payload"
425                    );
426                    return StatusCode::BAD_REQUEST;
427                }
428            };
429
430            if payload.content.is_empty() {
431                return StatusCode::BAD_REQUEST;
432            }
433
434            let seq = state.counter.fetch_add(1, Ordering::Relaxed);
435
436            #[allow(clippy::cast_possible_truncation)]
437            let timestamp = std::time::SystemTime::now()
438                .duration_since(std::time::UNIX_EPOCH)
439                .unwrap_or_default()
440                .as_secs();
441
442            let reply_target = payload
443                .thread_id
444                .clone()
445                .unwrap_or_else(|| payload.sender.clone());
446
447            let msg = ChannelMessage {
448                id: format!("webhook_{seq}"),
449                sender: payload.sender,
450                reply_target,
451                content: payload.content,
452                channel: "webhook".to_string(),
453                channel_alias: None,
454                timestamp,
455                thread_ts: payload.thread_id,
456                interruption_scope_id: None,
457                attachments: vec![],
458                subject: None,
459            };
460
461            if state.tx.send(msg).await.is_err() {
462                return StatusCode::SERVICE_UNAVAILABLE;
463            }
464
465            StatusCode::OK
466        }
467
468        let app = Router::new()
469            .route(&listen_path, post(handle_webhook))
470            .with_state(state);
471
472        let addr = std::net::SocketAddr::from(([0, 0, 0, 0], self.listen_port));
473        ::zeroclaw_log::record!(
474            INFO,
475            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
476            &format!(
477                "Webhook channel listening on http://0.0.0.0:{}{} ...",
478                self.listen_port, self.listen_path
479            )
480        );
481
482        let listener = tokio::net::TcpListener::bind(addr).await?;
483        axum::serve(listener, app).await.map_err(|e| {
484            ::zeroclaw_log::record!(
485                ERROR,
486                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
487                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
488                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
489                "Webhook server error"
490            );
491            anyhow::Error::msg(format!("Webhook server error: {e}"))
492        })?;
493
494        Ok(())
495    }
496
497    async fn health_check(&self) -> bool {
498        // Webhook channel is healthy if the port can be bound (basic check).
499        // In practice, once listen() starts the server is running.
500        true
501    }
502}
503
504#[cfg(test)]
505mod tests {
506    use super::*;
507
508    fn make_channel() -> WebhookChannel {
509        WebhookChannel::new(
510            "test-hook".into(),
511            8080,
512            Some("/webhook".into()),
513            Some("https://example.com/callback".into()),
514            None,
515            None,
516            None,
517            None,
518            None,
519            None,
520        )
521    }
522
523    fn make_channel_with_secret() -> WebhookChannel {
524        WebhookChannel::new(
525            "test-hook".into(),
526            8080,
527            None,
528            Some("https://example.com/callback".into()),
529            None,
530            None,
531            Some("mysecret".into()),
532            None,
533            None,
534            None,
535        )
536    }
537
538    fn make_channel_to(url: &str) -> WebhookChannel {
539        // Fast retries to keep tests snappy.
540        WebhookChannel::new(
541            "test-hook".into(),
542            8080,
543            None,
544            Some(url.into()),
545            None,
546            None,
547            None,
548            Some(2),
549            Some(10),
550            Some(100),
551        )
552    }
553
554    #[test]
555    fn default_path() {
556        let ch = WebhookChannel::new(
557            "test-hook".into(),
558            8080,
559            None,
560            None,
561            None,
562            None,
563            None,
564            None,
565            None,
566            None,
567        );
568        assert_eq!(ch.listen_path, "/webhook");
569    }
570
571    #[test]
572    fn path_normalized() {
573        let ch = WebhookChannel::new(
574            "test-hook".into(),
575            8080,
576            Some("hooks/incoming".into()),
577            None,
578            None,
579            None,
580            None,
581            None,
582            None,
583            None,
584        );
585        assert_eq!(ch.listen_path, "/hooks/incoming");
586    }
587
588    #[test]
589    fn send_method_default() {
590        let ch = make_channel();
591        assert_eq!(ch.send_method, "POST");
592    }
593
594    #[test]
595    fn send_method_put() {
596        let ch = WebhookChannel::new(
597            "test-hook".into(),
598            8080,
599            None,
600            Some("https://example.com".into()),
601            Some("put".into()),
602            None,
603            None,
604            None,
605            None,
606            None,
607        );
608        assert_eq!(ch.send_method, "PUT");
609    }
610
611    #[test]
612    fn retry_defaults_applied() {
613        let ch = make_channel();
614        assert_eq!(ch.max_retries, DEFAULT_MAX_RETRIES);
615        assert_eq!(ch.retry_base_delay_ms, DEFAULT_RETRY_BASE_DELAY_MS);
616        assert_eq!(ch.retry_max_delay_ms, DEFAULT_RETRY_MAX_DELAY_MS);
617    }
618
619    #[test]
620    fn retry_overrides_applied() {
621        let ch = WebhookChannel::new(
622            "test-hook".into(),
623            8080,
624            None,
625            Some("https://example.com".into()),
626            None,
627            None,
628            None,
629            Some(0),
630            Some(50),
631            Some(1_000),
632        );
633        assert_eq!(ch.max_retries, 0);
634        assert_eq!(ch.retry_base_delay_ms, 50);
635        assert_eq!(ch.retry_max_delay_ms, 1_000);
636    }
637
638    #[test]
639    fn backoff_capped_by_max_delay() {
640        let ch = WebhookChannel::new(
641            "test-hook".into(),
642            8080,
643            None,
644            Some("https://example.com".into()),
645            None,
646            None,
647            None,
648            Some(5),
649            Some(1_000),
650            Some(2_000),
651        );
652        // Base for attempt=10 is 1_000 * 2^10 = 1_024_000ms. Jitter scales it by
653        // [0.75, 1.25] → still well above the 2_000ms cap. The strict cap clamps
654        // the jittered value, so the returned delay must equal `retry_max_delay_ms`.
655        let d = ch.compute_backoff(10);
656        assert_eq!(d.as_millis(), 2_000);
657    }
658
659    #[test]
660    fn backoff_never_exceeds_max_delay_near_cap() {
661        // When the un-capped base is close to `retry_max_delay_ms`, jitter could
662        // historically push the result above the cap. With strict capping the
663        // returned delay must stay `<= retry_max_delay_ms` on every draw.
664        let ch = WebhookChannel::new(
665            "test-hook".into(),
666            8080,
667            None,
668            Some("https://example.com".into()),
669            None,
670            None,
671            None,
672            Some(5),
673            Some(1_000),
674            Some(2_000),
675        );
676        for _ in 0..256 {
677            let d = ch.compute_backoff(1); // base = 2_000ms, jitter ∈ [1_500, 2_500]
678            assert!(
679                d.as_millis() <= 2_000,
680                "compute_backoff exceeded retry_max_delay_ms: {}ms",
681                d.as_millis()
682            );
683        }
684    }
685
686    #[test]
687    fn parse_retry_after_integer_seconds() {
688        assert_eq!(parse_retry_after_ms("5"), Some(5_000));
689    }
690
691    #[test]
692    fn parse_retry_after_decimal_seconds() {
693        assert_eq!(parse_retry_after_ms("2.9"), Some(2_000));
694    }
695
696    #[test]
697    fn parse_retry_after_http_date() {
698        let now = DateTime::parse_from_rfc2822("Sun, 06 Nov 1994 08:49:37 GMT")
699            .unwrap()
700            .with_timezone(&Utc);
701        assert_eq!(
702            parse_retry_after_ms_at("Sun, 06 Nov 1994 08:49:39 GMT", now),
703            Some(2_000)
704        );
705    }
706
707    #[test]
708    fn parse_retry_after_obsolete_http_dates() {
709        let now = DateTime::parse_from_rfc2822("Sun, 06 Nov 1994 08:49:37 GMT")
710            .unwrap()
711            .with_timezone(&Utc);
712        assert_eq!(
713            parse_retry_after_ms_at("Sunday, 06-Nov-94 08:49:39 GMT", now),
714            Some(2_000)
715        );
716        assert_eq!(
717            parse_retry_after_ms_at("Sun Nov  6 08:49:39 1994", now),
718            Some(2_000)
719        );
720    }
721
722    #[test]
723    fn parse_retry_after_past_http_date_as_zero() {
724        let now = DateTime::parse_from_rfc2822("Sun, 06 Nov 1994 08:49:39 GMT")
725            .unwrap()
726            .with_timezone(&Utc);
727        assert_eq!(
728            parse_retry_after_ms_at("Sun, 06 Nov 1994 08:49:37 GMT", now),
729            Some(0)
730        );
731    }
732
733    #[test]
734    fn parse_retry_after_rejects_non_numeric() {
735        assert_eq!(parse_retry_after_ms("later"), None);
736    }
737
738    #[test]
739    fn parse_retry_after_empty() {
740        assert_eq!(parse_retry_after_ms("  "), None);
741    }
742
743    #[test]
744    fn parse_retry_after_zero() {
745        assert_eq!(parse_retry_after_ms("0"), Some(0));
746    }
747
748    #[test]
749    fn incoming_payload_deserializes_all_fields() {
750        let json = r#"{"sender": "zeroclaw_user", "content": "hello", "thread_id": "t1"}"#;
751        let payload: IncomingWebhook = serde_json::from_str(json).unwrap();
752        assert_eq!(payload.sender, "zeroclaw_user");
753        assert_eq!(payload.content, "hello");
754        assert_eq!(payload.thread_id.as_deref(), Some("t1"));
755    }
756
757    #[test]
758    fn incoming_payload_without_thread() {
759        let json = r#"{"sender": "zeroclaw_user", "content": "hi"}"#;
760        let payload: IncomingWebhook = serde_json::from_str(json).unwrap();
761        assert_eq!(payload.sender, "zeroclaw_user");
762        assert_eq!(payload.content, "hi");
763        assert!(payload.thread_id.is_none());
764    }
765
766    #[test]
767    fn outgoing_payload_serializes_content() {
768        let payload = OutgoingWebhook {
769            content: "response".into(),
770            thread_id: Some("t1".into()),
771            recipient: Some("zeroclaw_user".into()),
772        };
773        let json = serde_json::to_value(&payload).unwrap();
774        assert_eq!(json["content"], "response");
775        assert_eq!(json["thread_id"], "t1");
776        assert_eq!(json["recipient"], "zeroclaw_user");
777    }
778
779    #[test]
780    fn outgoing_payload_omits_none_fields() {
781        let payload = OutgoingWebhook {
782            content: "response".into(),
783            thread_id: None,
784            recipient: None,
785        };
786        let json = serde_json::to_value(&payload).unwrap();
787        assert_eq!(json["content"], "response");
788        assert!(json.get("thread_id").is_none());
789        assert!(json.get("recipient").is_none());
790    }
791
792    #[test]
793    fn verify_signature_no_secret() {
794        let ch = make_channel();
795        assert!(ch.verify_signature(b"body", None));
796    }
797
798    #[test]
799    fn verify_signature_missing_header() {
800        let ch = make_channel_with_secret();
801        assert!(!ch.verify_signature(b"body", None));
802    }
803
804    #[test]
805    fn verify_signature_valid() {
806        use hmac::{Hmac, Mac};
807        use sha2::Sha256;
808        type HmacSha256 = Hmac<Sha256>;
809
810        let ch = make_channel_with_secret();
811        let body = b"test body";
812
813        let mut mac = HmacSha256::new_from_slice(b"mysecret").unwrap();
814        mac.update(body);
815        let sig = hex::encode(mac.finalize().into_bytes());
816
817        assert!(ch.verify_signature(body, Some(&sig)));
818    }
819
820    #[test]
821    fn verify_signature_invalid() {
822        let ch = make_channel_with_secret();
823        assert!(!ch.verify_signature(b"body", Some("badhex")));
824    }
825
826    fn test_message() -> SendMessage {
827        SendMessage::new("hello", "zeroclaw_user")
828    }
829
830    #[tokio::test]
831    async fn send_happy_path_returns_ok() {
832        use wiremock::matchers::{method, path};
833        use wiremock::{Mock, MockServer, ResponseTemplate};
834
835        let mock = MockServer::start().await;
836        Mock::given(method("POST"))
837            .and(path("/cb"))
838            .respond_with(ResponseTemplate::new(200))
839            .expect(1)
840            .mount(&mock)
841            .await;
842
843        let ch = make_channel_to(&format!("{}/cb", mock.uri()));
844        ch.send(&test_message()).await.unwrap();
845    }
846
847    #[tokio::test]
848    async fn send_retries_on_5xx_then_succeeds() {
849        use wiremock::matchers::{method, path};
850        use wiremock::{Mock, MockServer, ResponseTemplate};
851
852        let mock = MockServer::start().await;
853        Mock::given(method("POST"))
854            .and(path("/cb"))
855            .respond_with(ResponseTemplate::new(503))
856            .up_to_n_times(1)
857            .expect(1)
858            .mount(&mock)
859            .await;
860        Mock::given(method("POST"))
861            .and(path("/cb"))
862            .respond_with(ResponseTemplate::new(200))
863            .expect(1)
864            .mount(&mock)
865            .await;
866
867        let ch = make_channel_to(&format!("{}/cb", mock.uri()));
868        ch.send(&test_message()).await.unwrap();
869    }
870
871    #[tokio::test]
872    async fn send_does_not_retry_on_4xx() {
873        use wiremock::matchers::{method, path};
874        use wiremock::{Mock, MockServer, ResponseTemplate};
875
876        let mock = MockServer::start().await;
877        Mock::given(method("POST"))
878            .and(path("/cb"))
879            .respond_with(ResponseTemplate::new(400))
880            .expect(1) // exactly one call — must not retry
881            .mount(&mock)
882            .await;
883
884        let ch = make_channel_to(&format!("{}/cb", mock.uri()));
885        let err = ch.send(&test_message()).await.unwrap_err();
886        assert!(err.to_string().contains("400"));
887    }
888
889    #[tokio::test]
890    async fn send_retries_on_429_then_exhausts() {
891        use wiremock::matchers::{method, path};
892        use wiremock::{Mock, MockServer, ResponseTemplate};
893
894        let mock = MockServer::start().await;
895        Mock::given(method("POST"))
896            .and(path("/cb"))
897            .respond_with(ResponseTemplate::new(429))
898            .expect(3) // max_retries=2 → 3 total attempts
899            .mount(&mock)
900            .await;
901
902        let ch = make_channel_to(&format!("{}/cb", mock.uri()));
903        let err = ch.send(&test_message()).await.unwrap_err();
904        let msg = err.to_string();
905        assert!(
906            msg.contains("3 attempt"),
907            "expected attempt count in error: {msg}"
908        );
909    }
910
911    #[tokio::test]
912    async fn send_honors_retry_after_header() {
913        use std::time::Instant;
914        use wiremock::matchers::{method, path};
915        use wiremock::{Mock, MockServer, ResponseTemplate};
916
917        let mock = MockServer::start().await;
918        Mock::given(method("POST"))
919            .and(path("/cb"))
920            .respond_with(ResponseTemplate::new(429).insert_header("Retry-After", "1"))
921            .up_to_n_times(1)
922            .expect(1)
923            .mount(&mock)
924            .await;
925        Mock::given(method("POST"))
926            .and(path("/cb"))
927            .respond_with(ResponseTemplate::new(200))
928            .expect(1)
929            .mount(&mock)
930            .await;
931
932        // Use a channel whose retry_max_delay_ms is high enough to let us actually
933        // wait the full Retry-After (cap at 2s so we honor the 1s instruction).
934        let ch = WebhookChannel::new(
935            "test-hook".into(),
936            8080,
937            None,
938            Some(format!("{}/cb", mock.uri())),
939            None,
940            None,
941            None,
942            Some(2),
943            Some(10),
944            Some(2_000),
945        );
946
947        let start = Instant::now();
948        ch.send(&test_message()).await.unwrap();
949        let elapsed = start.elapsed();
950        assert!(
951            elapsed >= Duration::from_millis(900),
952            "expected to wait ~1s for Retry-After, elapsed = {:?}",
953            elapsed
954        );
955    }
956
957    #[tokio::test]
958    async fn send_honors_retry_after_http_date_header() {
959        use std::time::Instant;
960        use wiremock::matchers::{method, path};
961        use wiremock::{Mock, MockServer, ResponseTemplate};
962
963        let mock = MockServer::start().await;
964        let retry_at = (Utc::now() + chrono::Duration::seconds(60))
965            .format("%a, %d %b %Y %H:%M:%S GMT")
966            .to_string();
967        Mock::given(method("POST"))
968            .and(path("/cb"))
969            .respond_with(ResponseTemplate::new(429).insert_header("Retry-After", retry_at))
970            .up_to_n_times(1)
971            .expect(1)
972            .mount(&mock)
973            .await;
974        Mock::given(method("POST"))
975            .and(path("/cb"))
976            .respond_with(ResponseTemplate::new(200))
977            .expect(1)
978            .mount(&mock)
979            .await;
980
981        let ch = WebhookChannel::new(
982            "test-hook".into(),
983            8080,
984            None,
985            Some(format!("{}/cb", mock.uri())),
986            None,
987            None,
988            None,
989            Some(2),
990            Some(10),
991            Some(150),
992        );
993
994        let start = Instant::now();
995        ch.send(&test_message()).await.unwrap();
996        let elapsed = start.elapsed();
997        assert!(
998            elapsed >= Duration::from_millis(100),
999            "expected to wait for date-form Retry-After, elapsed = {:?}",
1000            elapsed
1001        );
1002    }
1003
1004    #[tokio::test]
1005    async fn send_honors_retry_after_on_503() {
1006        use std::time::Instant;
1007        use wiremock::matchers::{method, path};
1008        use wiremock::{Mock, MockServer, ResponseTemplate};
1009
1010        let mock = MockServer::start().await;
1011        Mock::given(method("POST"))
1012            .and(path("/cb"))
1013            .respond_with(ResponseTemplate::new(503).insert_header("Retry-After", "1"))
1014            .up_to_n_times(1)
1015            .expect(1)
1016            .mount(&mock)
1017            .await;
1018        Mock::given(method("POST"))
1019            .and(path("/cb"))
1020            .respond_with(ResponseTemplate::new(200))
1021            .expect(1)
1022            .mount(&mock)
1023            .await;
1024
1025        let ch = WebhookChannel::new(
1026            "test-hook".into(),
1027            8080,
1028            None,
1029            Some(format!("{}/cb", mock.uri())),
1030            None,
1031            None,
1032            None,
1033            Some(2),
1034            Some(10),
1035            Some(2_000),
1036        );
1037
1038        let start = Instant::now();
1039        ch.send(&test_message()).await.unwrap();
1040        let elapsed = start.elapsed();
1041        assert!(
1042            elapsed >= Duration::from_millis(900),
1043            "expected to wait ~1s for Retry-After on 503, elapsed = {:?}",
1044            elapsed
1045        );
1046    }
1047
1048    #[tokio::test]
1049    async fn send_max_retries_zero_disables_retry() {
1050        use wiremock::matchers::{method, path};
1051        use wiremock::{Mock, MockServer, ResponseTemplate};
1052
1053        let mock = MockServer::start().await;
1054        Mock::given(method("POST"))
1055            .and(path("/cb"))
1056            .respond_with(ResponseTemplate::new(503))
1057            .expect(1) // only one attempt when max_retries=0
1058            .mount(&mock)
1059            .await;
1060
1061        let ch = WebhookChannel::new(
1062            "test-hook".into(),
1063            8080,
1064            None,
1065            Some(format!("{}/cb", mock.uri())),
1066            None,
1067            None,
1068            None,
1069            Some(0),
1070            Some(10),
1071            Some(100),
1072        );
1073        assert!(ch.send(&test_message()).await.is_err());
1074    }
1075}