Skip to main content

zeroclaw_gateway/
lib.rs

1#![allow(
2    clippy::to_string_in_format_args,
3    clippy::useless_format,
4    clippy::collapsible_if
5)]
6//! Axum-based HTTP gateway with proper HTTP/1.1 compliance, body limits, and timeouts.
7//!
8//! This module replaces the raw TCP implementation with axum for:
9//! - Proper HTTP/1.1 parsing and compliance
10//! - Content-Length validation (handled by hyper)
11//! - Request body size limits (64KB max)
12//! - Request timeouts (30s) to prevent slow-loris attacks
13//! - Header sanitization (handled by axum/hyper)
14
15pub mod acp;
16pub mod api;
17pub mod api_browse;
18pub mod api_config;
19pub mod api_logs;
20pub mod api_onboard;
21pub mod api_pairing;
22pub mod api_personality;
23#[cfg(feature = "plugins-wasm")]
24pub mod api_plugins;
25pub mod api_skills;
26#[cfg(feature = "webauthn")]
27pub mod api_webauthn;
28pub mod auth_rate_limit;
29pub mod canvas;
30pub mod hardware_context;
31pub mod node_tool;
32pub mod nodes;
33pub mod openapi;
34pub mod session_queue;
35pub mod sse;
36pub mod static_files;
37pub mod tls;
38#[cfg(feature = "gateway-voice-duplex")]
39pub mod voice_duplex;
40pub mod ws;
41pub mod ws_approval;
42
43use anyhow::{Context, Result};
44use axum::{
45    Router,
46    body::Bytes,
47    extract::{ConnectInfo, Query, State},
48    http::{HeaderMap, StatusCode, header},
49    response::{IntoResponse, Json},
50    routing::{delete, get, post},
51};
52use parking_lot::{Mutex, RwLock};
53use std::collections::HashMap;
54use std::net::{IpAddr, SocketAddr};
55use std::sync::Arc;
56use std::time::{Duration, Instant};
57use tower_http::limit::RequestBodyLimitLayer;
58use tower_http::timeout::TimeoutLayer;
59use uuid::Uuid;
60use zeroclaw_api::channel::{Channel, SendMessage};
61use zeroclaw_api::tool::ToolSpec;
62use zeroclaw_channels::{
63    gmail_push::GmailPushChannel, linq::LinqChannel, nextcloud_talk::NextcloudTalkChannel,
64    wati::WatiChannel, whatsapp::WhatsAppChannel,
65};
66use zeroclaw_config::policy::SecurityPolicy;
67use zeroclaw_config::schema::Config;
68use zeroclaw_infra::session_backend::SessionBackend;
69use zeroclaw_memory::{self, Memory, MemoryCategory};
70use zeroclaw_providers::{self, ModelProvider};
71use zeroclaw_runtime::cost::CostTracker;
72use zeroclaw_runtime::i18n;
73use zeroclaw_runtime::platform;
74use zeroclaw_runtime::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
75use zeroclaw_runtime::tools;
76use zeroclaw_runtime::tools::CanvasStore;
77
78/// Maximum request body size (64KB) — prevents memory exhaustion
79pub const MAX_BODY_SIZE: usize = 65_536;
80/// Default request timeout (30s) — prevents slow-loris attacks.
81pub const REQUEST_TIMEOUT_SECS: u64 = 30;
82
83/// Default request timeout for `POST /api/cron/{id}/run` (10 minutes).
84///
85/// Manually-triggered cron jobs run synchronously inside the request handler
86/// and frequently exceed the 30s gateway-wide default — agent jobs in
87/// particular can take minutes to complete a full reasoning loop. Capping at
88/// 10 minutes keeps the route from hanging indefinitely while still allowing
89/// realistic workloads to finish.
90pub const LONG_RUNNING_REQUEST_TIMEOUT_SECS: u64 = 600;
91
92/// Gateway request timeout (seconds) for routes other than the long-running
93/// cron-trigger endpoint. Reads from typed config.
94pub fn gateway_request_timeout_secs(cfg: &zeroclaw_config::schema::GatewayConfig) -> u64 {
95    cfg.request_timeout_secs
96}
97
98/// Manual cron-trigger request timeout (seconds), exempt from the
99/// gateway-wide [`gateway_request_timeout_secs`] limit so synchronous agent
100/// jobs can run to completion. Reads from typed config.
101pub fn gateway_long_running_request_timeout_secs(
102    cfg: &zeroclaw_config::schema::GatewayConfig,
103) -> u64 {
104    cfg.long_running_request_timeout_secs
105}
106/// Sliding window used by gateway rate limiting.
107pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
108/// Fallback max distinct client keys tracked in gateway rate limiter.
109pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
110/// Fallback max distinct idempotency keys retained in gateway memory.
111pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
112
113fn webhook_memory_key() -> String {
114    format!("webhook_msg_{}", Uuid::new_v4())
115}
116
117fn whatsapp_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
118    format!("whatsapp_{}_{}", msg.sender, msg.id)
119}
120
121fn linq_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
122    format!("linq_{}_{}", msg.sender, msg.id)
123}
124
125fn wati_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
126    format!("wati_{}_{}", msg.sender, msg.id)
127}
128
129fn nextcloud_talk_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
130    format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
131}
132
133fn sender_session_id(channel: &str, msg: &zeroclaw_api::channel::ChannelMessage) -> String {
134    match &msg.thread_ts {
135        Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
136        None => format!("{channel}_{}", msg.sender),
137    }
138}
139
140fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
141    const MAX_SESSION_ID_LEN: usize = 128;
142    headers
143        .get("X-Session-Id")
144        .and_then(|v| v.to_str().ok())
145        .map(str::trim)
146        .filter(|value| !value.is_empty())
147        .filter(|value| value.len() <= MAX_SESSION_ID_LEN)
148        .filter(|value| {
149            value
150                .bytes()
151                .all(|b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.')
152        })
153        .map(str::to_owned)
154}
155
156fn hash_webhook_secret(value: &str) -> String {
157    use sha2::{Digest, Sha256};
158
159    let digest = Sha256::digest(value.as_bytes());
160    hex::encode(digest)
161}
162
163/// How often the rate limiter sweeps stale IP entries from its map.
164const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; // 5 minutes
165
166#[derive(Debug)]
167struct SlidingWindowRateLimiter {
168    limit_per_window: u32,
169    window: Duration,
170    max_keys: usize,
171    requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
172}
173
174impl SlidingWindowRateLimiter {
175    fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
176        Self {
177            limit_per_window,
178            window,
179            max_keys: max_keys.max(1),
180            requests: Mutex::new((HashMap::new(), Instant::now())),
181        }
182    }
183
184    fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
185        requests.retain(|_, timestamps| {
186            timestamps.retain(|t| *t > cutoff);
187            !timestamps.is_empty()
188        });
189    }
190
191    fn allow(&self, key: &str) -> bool {
192        if self.limit_per_window == 0 {
193            return true;
194        }
195
196        let now = Instant::now();
197        let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
198
199        let mut guard = self.requests.lock();
200        let (requests, last_sweep) = &mut *guard;
201
202        // Periodic sweep: remove keys with no recent requests
203        if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
204            Self::prune_stale(requests, cutoff);
205            *last_sweep = now;
206        }
207
208        if !requests.contains_key(key) && requests.len() >= self.max_keys {
209            // Opportunistic stale cleanup before eviction under cardinality pressure.
210            Self::prune_stale(requests, cutoff);
211            *last_sweep = now;
212
213            if requests.len() >= self.max_keys {
214                let evict_key = requests
215                    .iter()
216                    .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
217                    .map(|(k, _)| k.clone());
218                if let Some(evict_key) = evict_key {
219                    requests.remove(&evict_key);
220                }
221            }
222        }
223
224        let entry = requests.entry(key.to_owned()).or_default();
225        entry.retain(|instant| *instant > cutoff);
226
227        if entry.len() >= self.limit_per_window as usize {
228            return false;
229        }
230
231        entry.push(now);
232        true
233    }
234}
235
236#[derive(Debug)]
237pub struct GatewayRateLimiter {
238    pair: SlidingWindowRateLimiter,
239    webhook: SlidingWindowRateLimiter,
240}
241
242impl GatewayRateLimiter {
243    pub fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
244        let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
245        Self {
246            pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
247            webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
248        }
249    }
250
251    fn allow_pair(&self, key: &str) -> bool {
252        self.pair.allow(key)
253    }
254
255    fn allow_webhook(&self, key: &str) -> bool {
256        self.webhook.allow(key)
257    }
258}
259
260#[derive(Debug)]
261pub struct IdempotencyStore {
262    ttl: Duration,
263    max_keys: usize,
264    keys: Mutex<HashMap<String, Instant>>,
265}
266
267impl IdempotencyStore {
268    pub fn new(ttl: Duration, max_keys: usize) -> Self {
269        Self {
270            ttl,
271            max_keys: max_keys.max(1),
272            keys: Mutex::new(HashMap::new()),
273        }
274    }
275
276    /// Returns true if this key is new and is now recorded.
277    fn record_if_new(&self, key: &str) -> bool {
278        let now = Instant::now();
279        let mut keys = self.keys.lock();
280
281        keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
282
283        if keys.contains_key(key) {
284            return false;
285        }
286
287        if keys.len() >= self.max_keys {
288            let evict_key = keys
289                .iter()
290                .min_by_key(|(_, seen_at)| *seen_at)
291                .map(|(k, _)| k.clone());
292            if let Some(evict_key) = evict_key {
293                keys.remove(&evict_key);
294            }
295        }
296
297        keys.insert(key.to_owned(), now);
298        true
299    }
300}
301
302fn parse_client_ip(value: &str) -> Option<IpAddr> {
303    let value = value.trim().trim_matches('"').trim();
304    if value.is_empty() {
305        return None;
306    }
307
308    if let Ok(ip) = value.parse::<IpAddr>() {
309        return Some(ip);
310    }
311
312    if let Ok(addr) = value.parse::<SocketAddr>() {
313        return Some(addr.ip());
314    }
315
316    let value = value.trim_matches(['[', ']']);
317    value.parse::<IpAddr>().ok()
318}
319
320fn dirs_data_local() -> Option<std::path::PathBuf> {
321    directories::BaseDirs::new().map(|d| d.data_local_dir().to_path_buf())
322}
323
324fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
325    if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
326        for candidate in xff.split(',') {
327            if let Some(ip) = parse_client_ip(candidate) {
328                return Some(ip);
329            }
330        }
331    }
332
333    headers
334        .get("X-Real-IP")
335        .and_then(|v| v.to_str().ok())
336        .and_then(parse_client_ip)
337}
338
339fn client_key_from_request(
340    peer_addr: Option<SocketAddr>,
341    headers: &HeaderMap,
342    trust_forwarded_headers: bool,
343) -> String {
344    if trust_forwarded_headers && let Some(ip) = forwarded_client_ip(headers) {
345        return ip.to_string();
346    }
347
348    peer_addr
349        .map(|addr| addr.ip().to_string())
350        .unwrap_or_else(|| "unknown".to_string())
351}
352
353fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
354    if configured == 0 {
355        fallback.max(1)
356    } else {
357        configured
358    }
359}
360
361/// Shared state for all axum handlers
362#[derive(Clone)]
363pub struct AppState {
364    pub config: Arc<RwLock<Config>>,
365    pub model_provider: Arc<dyn ModelProvider>,
366    pub model: String,
367    /// `None` means "let the provider decide" — required for models
368    /// (e.g. claude-opus-4-7) that reject the field. Always preserve
369    /// `Option<f64>` end-to-end; never substitute a hardcoded default.
370    pub temperature: Option<f64>,
371    pub mem: Arc<dyn Memory>,
372    pub auto_save: bool,
373    /// SHA-256 hash of `X-Webhook-Secret` (hex-encoded), never plaintext.
374    pub webhook_secret_hash: Option<Arc<str>>,
375    pub pairing: Arc<PairingGuard>,
376    pub trust_forwarded_headers: bool,
377    pub rate_limiter: Arc<GatewayRateLimiter>,
378    pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
379    pub idempotency_store: Arc<IdempotencyStore>,
380    pub whatsapp: Option<Arc<WhatsAppChannel>>,
381    /// `WhatsApp` app secret for webhook signature verification (`X-Hub-Signature-256`)
382    pub whatsapp_app_secret: Option<Arc<str>>,
383    pub linq: Option<Arc<LinqChannel>>,
384    /// Linq webhook signing secret for signature verification
385    pub linq_signing_secret: Option<Arc<str>>,
386    pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
387    /// Nextcloud Talk webhook secret for signature verification
388    pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
389    pub wati: Option<Arc<WatiChannel>>,
390    /// Gmail Pub/Sub push notification channel
391    pub gmail_push: Option<Arc<GmailPushChannel>>,
392    /// Observability backend for metrics scraping
393    pub observer: Arc<dyn zeroclaw_runtime::observability::Observer>,
394    /// Registered tool specs (for web dashboard tools page)
395    pub tools_registry: Arc<Vec<ToolSpec>>,
396    /// Cost tracker (optional, for web dashboard cost page)
397    pub cost_tracker: Option<Arc<CostTracker>>,
398    /// SSE broadcast channel for real-time events
399    pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
400    /// Ring buffer of recent events for history replay
401    pub event_buffer: Arc<sse::EventBuffer>,
402    /// Shutdown signal sender for graceful shutdown
403    pub shutdown_tx: tokio::sync::watch::Sender<bool>,
404    /// Reload signal sender owned by the daemon. /admin/reload writes `true`
405    /// here; the daemon's wait loop reacts and re-instantiates every
406    /// subsystem in place. `None` when running standalone (`zeroclaw gateway start`)
407    /// — reload then degrades to a 503 with a clear message.
408    pub reload_tx: Option<tokio::sync::watch::Sender<bool>>,
409    /// Registry of dynamically connected nodes
410    pub node_registry: Arc<nodes::NodeRegistry>,
411    /// Path prefix for reverse-proxy deployments (empty string = no prefix)
412    pub path_prefix: String,
413    /// Filesystem path to `web/dist/` for serving the dashboard (None = API-only)
414    pub web_dist_dir: Option<std::path::PathBuf>,
415    /// Session backend for persisting gateway WS chat sessions
416    pub session_backend: Option<Arc<dyn SessionBackend>>,
417    /// Per-session actor queue for serializing concurrent turns
418    pub session_queue: Arc<session_queue::SessionActorQueue>,
419    /// Device registry for paired device management
420    pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
421    /// Pending pairing request store
422    pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
423    /// Shared canvas store for Live Canvas (A2UI) system
424    pub canvas_store: CanvasStore,
425    /// WebAuthn state for hardware key authentication (optional, requires `webauthn` feature)
426    #[cfg(feature = "webauthn")]
427    pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
428    /// Per-session cancellation tokens for aborting in-flight agent responses.
429    /// Key is session_key (e.g. `gw_<session_id>`), value is the token for the
430    /// current turn. Entries are inserted before each turn and removed after
431    /// completion (normal or cancelled).
432    pub cancel_tokens: Arc<
433        std::sync::Mutex<std::collections::HashMap<String, tokio_util::sync::CancellationToken>>,
434    >,
435    /// Flag set whenever a config write (PATCH, init, map-key mutation) lands
436    /// via `persist_and_swap`, cleared on `/admin/reload`. Distinct from disk
437    /// drift (which fires only when an external editor touches the file): this
438    /// signals "the operator changed config in this session, subsystems may
439    /// need to be rebuilt to apply it." The dashboard polls
440    /// `/api/config/reload-status` and surfaces a reload banner when true.
441    pub pending_reload: Arc<std::sync::atomic::AtomicBool>,
442}
443
444/// Run the HTTP gateway using axum with proper HTTP/1.1 compliance.
445#[allow(clippy::too_many_lines)]
446pub async fn run_gateway(
447    host: &str,
448    port: u16,
449    config: Config,
450    external_event_tx: Option<tokio::sync::broadcast::Sender<serde_json::Value>>,
451    // Reload sender owned by the daemon. /admin/reload writes `true` here;
452    // the daemon's wait loop reacts via `subscribe()` and tears down to
453    // re-init. Cross-platform replacement for the SIGUSR1 hack.
454    reload_tx: Option<tokio::sync::watch::Sender<bool>>,
455    canvas_store: Option<CanvasStore>,
456) -> Result<()> {
457    // ── Security: warn on public bind without tunnel or explicit opt-in ──
458    if is_public_bind(host)
459        && config.tunnel.tunnel_provider == "none"
460        && !config.gateway.allow_public_bind
461    {
462        ::zeroclaw_log::record!(
463            WARN,
464            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
465                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
466            "⚠️  Binding to {host} — gateway will be exposed to all network interfaces.\n\
467             Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
468             [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
469             Docker/VM: if you are running inside a container or VM, this is expected."
470        );
471    }
472    let config_state = Arc::new(RwLock::new(config.clone()));
473
474    // ── Hooks ──────────────────────────────────────────────────────
475    let hooks: Option<std::sync::Arc<zeroclaw_runtime::hooks::HookRunner>> = if config.hooks.enabled
476    {
477        Some(std::sync::Arc::new(
478            zeroclaw_runtime::hooks::HookRunner::new(),
479        ))
480    } else {
481        None
482    };
483
484    let addr: SocketAddr = format!("{host}:{port}").parse()?;
485    let listener = tokio::net::TcpListener::bind(addr).await?;
486    let actual_port = listener.local_addr()?.port();
487    let display_addr = format!("{host}:{actual_port}");
488
489    let fallback = config.first_model_provider();
490    let model_provider_name = config
491        .first_model_provider_alias()
492        .unwrap_or_else(|| "openrouter".to_string());
493    let provider_runtime_options_base =
494        zeroclaw_providers::provider_runtime_options_from_config(&config);
495    let provider_runtime_options = zeroclaw_providers::options_for_provider_ref(
496        &config,
497        &model_provider_name,
498        &provider_runtime_options_base,
499    );
500    let model_provider: Arc<dyn ModelProvider> = Arc::from(
501        zeroclaw_providers::create_resilient_model_provider_from_ref(
502            &config,
503            &model_provider_name,
504            fallback.and_then(|e| e.api_key.as_deref()),
505            fallback.and_then(|e| e.uri.as_deref()),
506            &config.reliability,
507            &provider_runtime_options,
508        )?,
509    );
510    // Model resolution (1) the first-model_provider's `model`,
511    // (2) the first configured `[providers.models.<type>.<alias>]`
512    // model with a WARN naming what to set, (3) leave the model empty so
513    // the gateway boots and the dashboard can complete browser-based
514    // onboarding at /onboard. The chat-dispatch path checks
515    // `state.model.is_empty()` and returns a structured needs_onboarding
516    // error before any model_provider call, so the original "no silent
517    // vendor-default substitution" guarantee is preserved at request-time
518    // rather than at boot. V3 has no global fallback model_provider — every
519    // gateway request that needs agent context resolves through its
520    // `?agent=` parameter; this resolution is purely the seed value the
521    // gateway uses for boot-time logging and the AppState default model
522    // string.
523    let model = match fallback
524        .and_then(|e| e.model.as_deref())
525        .map(str::trim)
526        .filter(|m| !m.is_empty())
527    {
528        Some(m) => m.to_string(),
529        None => {
530            match config.resolve_default_model() {
531                Some(m) => {
532                    ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"model_provider": model_provider_name, "model": m})), "first model_provider has no `model` set; using first configured \
533                     providers.models entry as default. Set \
534                     [providers.models.<type>.<alias>] model = \"...\" to silence \
535                     this warning.");
536                    m
537                }
538                None => {
539                    ::zeroclaw_log::record!(
540                        WARN,
541                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
542                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
543                            .with_attrs(::serde_json::json!({"display_addr": display_addr})),
544                        "Gateway booting without a configured model. Visit http:///onboard to complete browser onboarding. Chat endpoints will return 503 needs_onboarding until at least one [providers.models.<type>.<alias>] model = \"...\" is set."
545                    );
546                    String::new()
547                }
548            }
549        }
550    };
551    // Preserve `Option<f64>` end-to-end. Substituting a hardcoded default
552    // here would clobber the "let the provider decide" intent for models
553    // (e.g. claude-opus-4-7) that reject `temperature`.
554    let temperature: Option<f64> = fallback.and_then(|e| e.temperature);
555    // Skip the install-wide memory backend init when zero agents are
556    // configured. Building a SQLite (or other) backend here would
557    // synthesize `<workspace_dir>/memory/brain.db` on a fresh install
558    // that has nothing to remember; per-agent memory factories under
559    // `agents/<alias>/workspace/memory/` are the only legitimate
560    // origin of memory state in v0.8.0. AppState gets a NoneMemory
561    // stub so endpoints that read `state.mem` keep working until an
562    // agent comes online.
563    let mem: Arc<dyn Memory> = if config.agents.is_empty() {
564        Arc::new(zeroclaw_memory::NoneMemory::new("none"))
565    } else {
566        Arc::from(zeroclaw_memory::create_memory_with_storage_and_routes(
567            &config.memory,
568            &config.embedding_routes,
569            config.resolve_active_storage(),
570            &config.data_dir,
571            fallback.and_then(|e| e.api_key.as_deref()),
572        )?)
573    };
574    let runtime: Arc<dyn platform::RuntimeAdapter> =
575        Arc::from(platform::create_runtime(&config.runtime)?);
576    // Gateway is infrastructure — it doesn't run as an agent. Endpoints
577    // that need an agent context (`/webhook?agent=`, `/ws/chat?agent=`,
578    // ACP `session/new`, agent-scoped tools/memory) take it from the
579    // request. The shared SecurityPolicy / risk_profile / tools_registry
580    // built here are vestiges driving the legacy single-agent
581    // `/api/tools` listing and the `run_gateway_chat_with_tools` test
582    // mock; per-request agent dispatch is tracked as a follow-up.
583    //
584    // Agent count is unconstrained at boot. Zero agents is a valid
585    // state — the gateway must come up so `/admin/reload` and
586    // `/onboard` can install one — and the legacy seed simply stays
587    // empty. With one or more enabled agents, any of them seeds the
588    // vestige; aliases are arbitrary so the iteration-order pick is
589    // load-bearing on nothing.
590    let canvas_store = canvas_store.unwrap_or_default();
591    let agent_alias_opt = config
592        .agents
593        .iter()
594        .find(|(_, a)| a.enabled)
595        .map(|(alias, _)| alias.clone());
596
597    let (composio_key, composio_entity_id) = if config.composio.enabled {
598        (
599            config.composio.api_key.as_deref(),
600            Some(config.composio.entity_id.as_str()),
601        )
602    } else {
603        (None, None)
604    };
605
606    // The seeded `risk_profile` + `SecurityPolicy` here drive the legacy
607    // single-agent `/api/tools` listing and the `run_gateway_chat_with_tools`
608    // test mock — they are not load-bearing for per-request agent dispatch.
609    // When the seed agent's `risk_profile` (or any related per-agent
610    // validation) fails to resolve, the gateway must still boot so the
611    // operator can fix the config via `/admin/reload` or `/onboard`
612    // instead of crash-looping the daemon supervisor. Degraded boot:
613    // log a warning and fall through to the empty-tools-registry branch.
614    let agent_setup: Option<(
615        zeroclaw_config::schema::RiskProfileConfig,
616        Arc<SecurityPolicy>,
617    )> = agent_alias_opt.as_ref().and_then(|agent_alias| {
618        let Some(risk_profile) = config.risk_profile_for_agent(agent_alias) else {
619            ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "agent_alias": agent_alias})), "Gateway: agents..risk_profile does not name a configured risk_profiles entry; booting with empty tools registry. Fix via /admin/reload or /onboard.");
620            return None;
621        };
622        let risk_profile = risk_profile.clone();
623        let security = match SecurityPolicy::for_agent(&config, agent_alias) {
624            Ok(s) => Arc::new(s),
625            Err(e) => {
626                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "error": format!("{}", e), "agent_alias": agent_alias})), "Gateway: agent SecurityPolicy failed to build; booting with empty tools registry. Fix [agents.] via /admin/reload or /onboard.");
627                return None;
628            }
629        };
630        Some((risk_profile, security))
631    });
632
633    let (mut tools_registry_raw, delegate_handle_gw) = match (&agent_alias_opt, agent_setup) {
634        (Some(agent_alias), Some((risk_profile, security))) => {
635            let all_tools_result = tools::all_tools_with_runtime(
636                Arc::new(config.clone()),
637                &security,
638                &risk_profile,
639                agent_alias,
640                runtime,
641                Arc::clone(&mem),
642                composio_key,
643                composio_entity_id,
644                &config.browser,
645                &config.http_request,
646                &config.web_fetch,
647                &config.data_dir,
648                &config.agents,
649                config
650                    .first_model_provider()
651                    .and_then(|e| e.api_key.as_deref()),
652                &config,
653                Some(canvas_store.clone()),
654                false,
655            );
656            // Wire channel-driven tool handles so the dashboard agent can
657            // deliver messages to configured channels (same pattern as
658            // orchestrator::start_channels).
659            // reaction_handle_gw is PerToolChannelHandle (not Option);
660            // register_channels_for_tools expects &Option for all handles.
661            let reaction_handle_gw_opt = Some(all_tools_result.reaction_handle.clone());
662            let channel_names = zeroclaw_channels::orchestrator::register_channels_for_tools(
663                &config,
664                &all_tools_result.ask_user_handle,
665                &reaction_handle_gw_opt,
666                &all_tools_result.poll_handle,
667                &all_tools_result.escalate_handle,
668                &all_tools_result.channel_send_handle,
669            );
670            if !channel_names.is_empty() {
671                ::zeroclaw_log::record!(
672                    INFO,
673                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
674                        .with_attrs(::serde_json::json!({"count": channel_names.len()})),
675                    &format!(
676                        "Registered {} channel(s) for dashboard agent",
677                        channel_names.len()
678                    ),
679                );
680            }
681            (all_tools_result.tools, all_tools_result.delegate_handle)
682        }
683        (Some(_), None) => {
684            // Agent existed but its config failed to resolve. Warned
685            // above; fall through to the empty-registry shape.
686            (Vec::new(), None)
687        }
688        (None, _) => {
689            ::zeroclaw_log::record!(
690                INFO,
691                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
692                    .with_attrs(::serde_json::json!({"display_addr": display_addr})),
693                "Gateway: no [agents.<alias>] configured — booting with empty tools registry. Visit http:///onboard to add an agent."
694            );
695            (Vec::new(), None)
696        }
697    };
698
699    // ── Wire MCP tools into the gateway tool registry (non-fatal) ───
700    // Without this, the `/api/tools` endpoint misses MCP tools.
701    if config.mcp.enabled && !config.mcp.servers.is_empty() {
702        ::zeroclaw_log::record!(
703            INFO,
704            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
705            &format!(
706                "Gateway: initializing MCP client — {} server(s) configured",
707                config.mcp.servers.len()
708            )
709        );
710        match tools::McpRegistry::connect_all(&config.mcp.servers).await {
711            Ok(registry) => {
712                let registry = std::sync::Arc::new(registry);
713                if config.mcp.deferred_loading {
714                    let deferred_set =
715                        tools::DeferredMcpToolSet::from_registry(std::sync::Arc::clone(&registry))
716                            .await;
717                    ::zeroclaw_log::record!(
718                        INFO,
719                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
720                        &format!(
721                            "Gateway MCP deferred: {} tool stub(s) from {} server(s)",
722                            deferred_set.len(),
723                            registry.server_count()
724                        )
725                    );
726                    let activated =
727                        std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
728                    tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
729                        deferred_set,
730                        activated,
731                    )));
732                } else {
733                    let names = registry.tool_names();
734                    let mut registered = 0usize;
735                    for name in names {
736                        if let Some(def) = registry.get_tool_def(&name).await {
737                            let wrapper: std::sync::Arc<dyn tools::Tool> =
738                                std::sync::Arc::new(tools::McpToolWrapper::new(
739                                    name,
740                                    def,
741                                    std::sync::Arc::clone(&registry),
742                                ));
743                            if let Some(ref handle) = delegate_handle_gw {
744                                handle.write().push(std::sync::Arc::clone(&wrapper));
745                            }
746                            tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
747                            registered += 1;
748                        }
749                    }
750                    ::zeroclaw_log::record!(
751                        INFO,
752                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
753                        &format!(
754                            "Gateway MCP: {} tool(s) registered from {} server(s)",
755                            registered,
756                            registry.server_count()
757                        )
758                    );
759                }
760            }
761            Err(e) => {
762                ::zeroclaw_log::record!(
763                    ERROR,
764                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
765                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
766                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
767                    "MCP registry failed to initialize"
768                );
769            }
770        }
771    }
772
773    let tools_registry: Arc<Vec<ToolSpec>> =
774        Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
775
776    // Cost tracker — process-global singleton so channels share the same instance
777    let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.data_dir);
778
779    // SSE broadcast channel for real-time events.
780    // Use an externally provided sender (e.g. from the daemon) so that other
781    // components (cron, heartbeat) can publish events to the same bus.
782    let event_tx = external_event_tx.unwrap_or_else(|| {
783        let (tx, _rx) = tokio::sync::broadcast::channel::<serde_json::Value>(256);
784        tx
785    });
786    let event_buffer = Arc::new(sse::EventBuffer::new(500));
787    // Extract webhook secret for authentication
788    let webhook_secret_hash: Option<Arc<str>> =
789        config.channels.webhook.values().next().and_then(|webhook| {
790            webhook.secret.as_ref().and_then(|raw_secret| {
791                let trimmed_secret = raw_secret.trim();
792                (!trimmed_secret.is_empty())
793                    .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
794            })
795        });
796
797    // WhatsApp channel (if configured)
798    let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
799        .channels
800        .whatsapp
801        .get("default")
802        .filter(|wa| wa.is_cloud_config())
803        .map(|wa| {
804            let alias = "default".to_string();
805            let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
806                let cfg_arc = config_state.clone();
807                let alias = alias.clone();
808                Arc::new(move || cfg_arc.read().channel_external_peers("whatsapp", &alias))
809            };
810            Arc::new(WhatsAppChannel::new(
811                wa.access_token.clone().unwrap_or_default(),
812                wa.phone_number_id.clone().unwrap_or_default(),
813                wa.verify_token.clone().unwrap_or_default(),
814                alias,
815                peer_resolver,
816            ))
817        });
818
819    // WhatsApp app secret for webhook signature verification.
820    let whatsapp_app_secret: Option<Arc<str>> = config
821        .channels
822        .whatsapp
823        .values()
824        .next()
825        .and_then(|wa| {
826            wa.app_secret
827                .as_deref()
828                .map(str::trim)
829                .filter(|secret| !secret.is_empty())
830                .map(ToOwned::to_owned)
831        })
832        .map(Arc::from);
833
834    // Linq channel (if configured)
835    let linq_channel: Option<Arc<LinqChannel>> = config.channels.linq.values().next().map(|lq| {
836        let alias = "default".to_string();
837        let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
838            let cfg_arc = config_state.clone();
839            let alias = alias.clone();
840            Arc::new(move || cfg_arc.read().channel_external_peers("linq", &alias))
841        };
842        Arc::new(LinqChannel::new(
843            lq.api_token.clone(),
844            lq.from_phone.clone(),
845            alias,
846            peer_resolver,
847        ))
848    });
849
850    // Linq signing secret for webhook signature verification.
851    let linq_signing_secret: Option<Arc<str>> = config
852        .channels
853        .linq
854        .values()
855        .next()
856        .and_then(|lq| {
857            lq.signing_secret
858                .as_deref()
859                .map(str::trim)
860                .filter(|secret| !secret.is_empty())
861                .map(ToOwned::to_owned)
862        })
863        .map(Arc::from);
864
865    // WATI channel (if configured)
866    let wati_channel: Option<Arc<WatiChannel>> =
867        config.channels.wati.values().next().map(|wati_cfg| {
868            let alias = "default".to_string();
869            let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
870                let cfg_arc = config_state.clone();
871                let alias = alias.clone();
872                Arc::new(move || cfg_arc.read().channel_external_peers("wati", &alias))
873            };
874            Arc::new(
875                WatiChannel::new(
876                    wati_cfg.api_token.clone(),
877                    wati_cfg.api_url.clone(),
878                    wati_cfg.tenant_id.clone(),
879                    alias,
880                    peer_resolver,
881                )
882                .with_transcription(config.transcription.clone()),
883            )
884        });
885
886    // Nextcloud Talk channel (if configured)
887    let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
888        config.channels.nextcloud_talk.values().next().map(|nc| {
889            let alias = "default".to_string();
890            let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
891                let cfg_arc = config_state.clone();
892                let alias = alias.clone();
893                Arc::new(move || {
894                    cfg_arc
895                        .read()
896                        .channel_external_peers("nextcloud_talk", &alias)
897                })
898            };
899            Arc::new(NextcloudTalkChannel::new(
900                nc.base_url.clone(),
901                nc.app_token.clone(),
902                nc.bot_name.clone().unwrap_or_default(),
903                alias,
904                peer_resolver,
905            ))
906        });
907
908    // Nextcloud Talk webhook secret for signature verification.
909    let nextcloud_talk_webhook_secret: Option<Arc<str>> = config
910        .channels
911        .nextcloud_talk
912        .get("default")
913        .and_then(|nc| {
914            nc.webhook_secret
915                .as_deref()
916                .map(str::trim)
917                .filter(|secret| !secret.is_empty())
918                .map(ToOwned::to_owned)
919        })
920        .map(Arc::from);
921
922    // Gmail Push channel (if configured and referenced by an enabled agent)
923    let gmail_push_channel: Option<Arc<GmailPushChannel>> = {
924        let active: std::collections::HashSet<String> = config
925            .agents
926            .values()
927            .filter(|a| a.enabled)
928            .flat_map(|a| a.channels.iter().map(|c| c.as_str().to_string()))
929            .collect();
930        config
931            .channels
932            .gmail_push
933            .iter()
934            .find(|(alias, _)| active.contains(&format!("gmail_push.{alias}")))
935            .map(|(alias, gp)| {
936                let alias = alias.clone();
937                let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
938                    let cfg_arc = config_state.clone();
939                    let alias = alias.clone();
940                    Arc::new(move || cfg_arc.read().channel_external_peers("gmail_push", &alias))
941                };
942                Arc::new(GmailPushChannel::new(gp.clone(), alias, peer_resolver))
943            })
944    };
945
946    // ── Session persistence for WS chat ─────────────────────
947    // Routes through `make_session_backend` so `[channels].session_backend`
948    // is the single source of truth for which backend stores sessions.
949    // Picking `"jsonl"` would otherwise leave gateway WS sessions writing
950    // to SQLite while channel + tool reads went to JSONL — the original
951    // #5769 split, just on a different backend pairing.
952    let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
953        match zeroclaw_infra::make_session_backend(
954            &config.data_dir,
955            &config.channels.session_backend,
956        ) {
957            Ok(backend) => {
958                ::zeroclaw_log::record!(
959                    INFO,
960                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
961                    &format!(
962                        "Gateway session persistence enabled (backend={})",
963                        config.channels.session_backend
964                    )
965                );
966                if config.gateway.session_ttl_hours > 0
967                    && let Ok(cleaned) = backend.cleanup_stale(config.gateway.session_ttl_hours)
968                    && cleaned > 0
969                {
970                    ::zeroclaw_log::record!(
971                        INFO,
972                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
973                            .with_attrs(::serde_json::json!({"cleaned": cleaned})),
974                        "Cleaned up stale gateway sessions"
975                    );
976                }
977                Some(backend)
978            }
979            Err(e) => {
980                ::zeroclaw_log::record!(
981                    WARN,
982                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
983                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
984                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
985                    "Session persistence disabled"
986                );
987                None
988            }
989        }
990    } else {
991        None
992    };
993
994    // ── Pairing guard ──────────────────────────────────────
995    let pairing = Arc::new(PairingGuard::new(
996        config.gateway.require_pairing,
997        &config.gateway.paired_tokens,
998    ));
999    let rate_limit_max_keys = normalize_max_keys(
1000        config.gateway.rate_limit_max_keys,
1001        RATE_LIMIT_MAX_KEYS_DEFAULT,
1002    );
1003    let rate_limiter = Arc::new(GatewayRateLimiter::new(
1004        config.gateway.pair_rate_limit_per_minute,
1005        config.gateway.webhook_rate_limit_per_minute,
1006        rate_limit_max_keys,
1007    ));
1008    let idempotency_max_keys = normalize_max_keys(
1009        config.gateway.idempotency_max_keys,
1010        IDEMPOTENCY_MAX_KEYS_DEFAULT,
1011    );
1012    let idempotency_store = Arc::new(IdempotencyStore::new(
1013        Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
1014        idempotency_max_keys,
1015    ));
1016
1017    // Resolve optional path prefix for reverse-proxy deployments.
1018    let path_prefix: Option<&str> = config
1019        .gateway
1020        .path_prefix
1021        .as_deref()
1022        .filter(|p| !p.is_empty());
1023
1024    // ── Tunnel ────────────────────────────────────────────────
1025    let tunnel = zeroclaw_runtime::tunnel::create_tunnel(&config.tunnel)?;
1026    let mut tunnel_url: Option<String> = None;
1027
1028    if let Some(ref tun) = tunnel {
1029        println!("🔗 Starting {} tunnel...", tun.name());
1030        match tun.start(host, actual_port).await {
1031            Ok(url) => {
1032                println!("🌐 Tunnel active: {url}");
1033                tunnel_url = Some(url);
1034            }
1035            Err(e) => {
1036                println!("⚠️  Tunnel failed to start: {e}");
1037                println!("   Falling back to local-only mode.");
1038            }
1039        }
1040    }
1041
1042    // Resolve web_dist_dir: explicit config (when valid) → auto-detect.
1043    // Treat the configured path as advisory — if it doesn't contain
1044    // index.html on this machine (stale/leaked path from another host,
1045    // typo, missing build), fall back to auto-detect rather than hard-
1046    // failing every dashboard request. We log the demotion so the
1047    // operator can spot a misconfigured path.
1048    let auto_detect_web_dist = || -> Option<std::path::PathBuf> {
1049        let mut candidates = vec![
1050            // Relative to CWD (development: running from repo root)
1051            std::path::PathBuf::from("web/dist"),
1052            // Relative to binary (installed alongside binary)
1053            std::env::current_exe()
1054                .ok()
1055                .and_then(|p| p.parent().map(|d| d.join("web/dist")))
1056                .unwrap_or_default(),
1057            // Docker / packaged layout
1058            std::path::PathBuf::from("/zeroclaw-data/web/dist"),
1059            // AUR / system package
1060            std::path::PathBuf::from("/usr/share/zeroclawlabs/web/dist"),
1061        ];
1062        // XDG data home (prebuilt binary installer)
1063        if let Some(data_dir) = dirs_data_local() {
1064            candidates.push(data_dir.join("zeroclaw/web/dist"));
1065        }
1066        candidates
1067            .into_iter()
1068            .find(|p| !p.as_os_str().is_empty() && p.join("index.html").is_file())
1069    };
1070
1071    let web_dist_dir: Option<std::path::PathBuf> = match config
1072        .gateway
1073        .web_dist_dir
1074        .as_ref()
1075        .map(std::path::PathBuf::from)
1076    {
1077        Some(explicit) if explicit.join("index.html").is_file() => Some(explicit),
1078        Some(stale) => {
1079            ::zeroclaw_log::record!(
1080                WARN,
1081                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1082                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1083                    .with_attrs(::serde_json::json!({"configured": stale.display().to_string()})),
1084                "gateway.web_dist_dir points at a path that doesn't contain index.html on \
1085                 this machine; falling back to auto-detect. Update or remove the setting in \
1086                 config.toml to silence this warning."
1087            );
1088            auto_detect_web_dist()
1089        }
1090        None => auto_detect_web_dist(),
1091    };
1092
1093    if let Some(ref dir) = web_dist_dir {
1094        ::zeroclaw_log::record!(
1095            INFO,
1096            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1097            &format!("Web dashboard: serving from {}", dir.display().to_string())
1098        );
1099    } else if config.gateway.web_dist_dir.is_some() {
1100        ::zeroclaw_log::record!(
1101            INFO,
1102            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1103            "Web dashboard: not available — configured gateway.web_dist_dir is missing on \
1104             this machine and no fallback location was found. Build with `cargo web build` \
1105             and point gateway.web_dist_dir at the resulting web/dist directory."
1106        );
1107    } else {
1108        ::zeroclaw_log::record!(
1109            INFO,
1110            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1111            "Web dashboard: not available — no web/dist found. Build with `cargo web build` \
1112             and point gateway.web_dist_dir at the resulting web/dist directory."
1113        );
1114    }
1115
1116    let pfx = path_prefix.unwrap_or("");
1117    println!("🦀 ZeroClaw Gateway listening on http://{display_addr}{pfx}");
1118    if let Some(ref url) = tunnel_url {
1119        println!("  🌐 Public URL: {url}");
1120    }
1121    println!("  🌐 Web Dashboard: http://{display_addr}{pfx}/");
1122    if let Some(code) = pairing.pairing_code() {
1123        println!();
1124        println!("  🔐 PAIRING REQUIRED — use this one-time code:");
1125        println!("     ┌──────────────┐");
1126        println!("     │  {code}  │");
1127        println!("     └──────────────┘");
1128        println!("     Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
1129    } else if pairing.require_pairing() {
1130        println!("  🔒 Pairing: ACTIVE (bearer token required)");
1131        println!(
1132            "     To pair a new device: {}",
1133            format_paircode_recovery_command(host, actual_port)
1134        );
1135        println!(
1136            "     Fallback: {}",
1137            format_paircode_recovery_curl(host, actual_port, pfx)
1138        );
1139        println!();
1140    } else {
1141        println!("  ⚠️  Pairing: DISABLED (all requests accepted)");
1142        println!();
1143    }
1144    println!("  POST {pfx}/pair      — pair a new client (X-Pairing-Code header)");
1145    println!("  POST {pfx}/webhook   — {{\"message\": \"your prompt\"}}");
1146    if whatsapp_channel.is_some() {
1147        println!("  GET  {pfx}/whatsapp  — Meta webhook verification");
1148        println!("  POST {pfx}/whatsapp  — WhatsApp message webhook");
1149    }
1150    if linq_channel.is_some() {
1151        println!("  POST {pfx}/linq      — Linq message webhook (iMessage/RCS/SMS)");
1152    }
1153    if wati_channel.is_some() {
1154        println!("  GET  {pfx}/wati      — WATI webhook verification");
1155        println!("  POST {pfx}/wati      — WATI message webhook");
1156    }
1157    if nextcloud_talk_channel.is_some() {
1158        println!("  POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
1159    }
1160    println!("  GET  {pfx}/api/*     — REST API (bearer token required)");
1161    println!("  GET  {pfx}/ws/chat   — WebSocket agent chat");
1162    if config.nodes.enabled {
1163        println!("  GET  {pfx}/ws/nodes  — WebSocket node discovery");
1164    }
1165    println!("  GET  {pfx}/health    — health check");
1166    println!("  GET  {pfx}/metrics   — Prometheus metrics");
1167    println!("  Press Ctrl+C to stop.\n");
1168
1169    zeroclaw_runtime::health::mark_component_ok("gateway");
1170
1171    // Fire gateway start hook
1172    if let Some(ref hooks) = hooks {
1173        hooks.fire_gateway_start(host, actual_port).await;
1174    }
1175
1176    // Install the SSE broadcast hook before building any observer so that
1177    // events emitted by the agent's per-call observer (built inside
1178    // `process_message`) also reach `/api/events`. The state-level observer
1179    // is just the configured backend — `TeeObserver` (created by
1180    // `create_observer`) tees its events into the hook automatically.
1181    let broadcast_layer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::new(
1182        sse::BroadcastObserver::new(event_tx.clone(), event_buffer.clone()),
1183    );
1184    let broadcast_hook_guard =
1185        zeroclaw_runtime::observability::set_scoped_broadcast_hook(broadcast_layer);
1186
1187    // Install the same broadcast sender as zeroclaw-log's canonical
1188    // hook so that every event emitted through `record!` / `record_event`
1189    // also reaches `/api/events`. The Observer-trait hook above stays
1190    // wired for legacy `observer.record_event(ObserverEvent::...)`
1191    // callers that haven't migrated to `record!` yet.
1192    zeroclaw_log::set_broadcast_hook(event_tx.clone());
1193
1194    // Bound into AppState. Not a broadcaster — the broadcaster is the
1195    // `broadcast_layer` installed above as the global hook. This is the
1196    // configured backend (Log/Prometheus/...) wrapped by `TeeObserver`,
1197    // which tees events into the hook on every record.
1198    let state_observer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::from(
1199        zeroclaw_runtime::observability::create_observer(&config.observability),
1200    );
1201
1202    let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
1203
1204    // Node registry for dynamic node discovery
1205    let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
1206
1207    // Device registry and pairing store (only when pairing is required)
1208    let device_registry = if config.gateway.require_pairing {
1209        Some(Arc::new(api_pairing::DeviceRegistry::new(&config.data_dir)))
1210    } else {
1211        None
1212    };
1213    let pending_pairings = if config.gateway.require_pairing {
1214        Some(Arc::new(api_pairing::PairingStore::new()))
1215    } else {
1216        None
1217    };
1218
1219    let state = AppState {
1220        config: config_state,
1221        model_provider,
1222        model,
1223        temperature,
1224        mem,
1225        auto_save: config.memory.auto_save,
1226        webhook_secret_hash,
1227        pairing,
1228        trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1229        rate_limiter,
1230        auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1231        idempotency_store,
1232        whatsapp: whatsapp_channel,
1233        whatsapp_app_secret,
1234        linq: linq_channel,
1235        linq_signing_secret,
1236        nextcloud_talk: nextcloud_talk_channel,
1237        nextcloud_talk_webhook_secret,
1238        wati: wati_channel,
1239        gmail_push: gmail_push_channel,
1240        observer: state_observer,
1241        tools_registry,
1242        cost_tracker,
1243        event_tx,
1244        event_buffer,
1245        shutdown_tx,
1246        reload_tx,
1247        node_registry,
1248        session_backend,
1249        session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1250        device_registry,
1251        pending_pairings,
1252        path_prefix: path_prefix.unwrap_or("").to_string(),
1253        web_dist_dir,
1254        canvas_store,
1255        cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
1256        pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1257        #[cfg(feature = "webauthn")]
1258        webauthn: if config.security.webauthn.enabled {
1259            let secret_store = Arc::new(zeroclaw_runtime::security::SecretStore::new(
1260                &config.data_dir,
1261                true,
1262            ));
1263            let wa_config = zeroclaw_runtime::security::webauthn::WebAuthnConfig {
1264                enabled: true,
1265                rp_id: config.security.webauthn.rp_id.clone(),
1266                rp_origin: config.security.webauthn.rp_origin.clone(),
1267                rp_name: config.security.webauthn.rp_name.clone(),
1268            };
1269            Some(Arc::new(api_webauthn::WebAuthnState {
1270                manager: zeroclaw_runtime::security::webauthn::WebAuthnManager::new(
1271                    wa_config,
1272                    secret_store,
1273                    &config.data_dir,
1274                ),
1275                pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1276                pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1277            }))
1278        } else {
1279            None
1280        },
1281    };
1282
1283    // Build router with middleware
1284    let inner = Router::new()
1285        // ── Admin routes (for CLI management) ──
1286        .route("/admin/shutdown", post(handle_admin_shutdown))
1287        .route("/admin/reload", post(handle_admin_reload))
1288        .route("/admin/paircode", get(handle_admin_paircode))
1289        .route("/admin/paircode/new", post(handle_admin_paircode_new))
1290        // ── Existing routes ──
1291        .route("/health", get(handle_health))
1292        .route("/metrics", get(handle_metrics))
1293        .route("/pair", post(handle_pair))
1294        .route("/pair/code", get(handle_pair_code))
1295        .route("/webhook", post(handle_webhook))
1296        .route("/whatsapp", get(handle_whatsapp_verify))
1297        .route("/whatsapp", post(handle_whatsapp_message))
1298        .route("/linq", post(handle_linq_webhook))
1299        .route("/wati", get(handle_wati_verify))
1300        .route("/wati", post(handle_wati_webhook))
1301        .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
1302        .route("/webhook/gmail", post(handle_gmail_push_webhook))
1303        // ── Claude Code runner hooks ──
1304        .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1305        // ── Web Dashboard API routes ──
1306        .route("/api/status", get(api::handle_api_status))
1307        .route("/api/logs", get(api_logs::handle_api_logs))
1308        .route(
1309            "/api/config",
1310            get(api_config::handle_config_get)
1311                .patch(api_config::handle_patch)
1312                .options(api_config::handle_options_config),
1313        )
1314        .route(
1315            "/api/config/prop",
1316            get(api_config::handle_prop_get)
1317                .put(api_config::handle_prop_put)
1318                .delete(api_config::handle_prop_delete)
1319                .options(api_config::handle_options_prop),
1320        )
1321        .route("/api/config/list", get(api_config::handle_list))
1322        .route("/api/config/drift", get(api_config::handle_drift))
1323        .route(
1324            "/api/config/reload-status",
1325            get(api_config::handle_reload_status),
1326        )
1327        .route("/api/config/templates", get(api_config::handle_templates))
1328        .route("/api/config/map-keys", get(api_config::handle_get_map_keys))
1329        .route(
1330            "/api/config/map-key",
1331            post(api_config::handle_map_key).delete(api_config::handle_delete_map_key),
1332        )
1333        .route("/api/config/rename-map-key", post(api_config::handle_rename_map_key))
1334        .route("/api/onboard/catalog", get(api_onboard::handle_catalog))
1335        .route(
1336            "/api/onboard/catalog/models",
1337            get(api_onboard::handle_catalog_models),
1338        )
1339        .route("/api/onboard/status", get(api_onboard::handle_onboard_status))
1340        .route(
1341            "/api/onboard/agent-options",
1342            get(api_onboard::handle_agent_options),
1343        )
1344        .route("/api/onboard/sections", get(api_onboard::handle_sections))
1345        .route(
1346            "/api/onboard/sections/{section}",
1347            get(api_onboard::handle_section_picker),
1348        )
1349        .route(
1350            "/api/onboard/sections/{section}/items/{key}",
1351            post(api_onboard::handle_section_select),
1352        )
1353        .route("/api/personality", get(api_personality::handle_index))
1354        .route(
1355            "/api/personality/templates",
1356            get(api_personality::handle_templates),
1357        )
1358        .route(
1359            "/api/personality/{filename}",
1360            get(api_personality::handle_get).put(api_personality::handle_put),
1361        )
1362        .route("/api/browse", get(api_browse::handle_browse))
1363        .route("/api/browse/mkdir", post(api_browse::handle_browse_mkdir))
1364        .route("/api/browse/rmdir", delete(api_browse::handle_browse_rmdir))
1365        .route(
1366            "/api/agents/{alias}/workspace/list",
1367            get(api_browse::handle_agent_workspace_list),
1368        )
1369        .route(
1370            "/api/agents/{alias}/workspace/read",
1371            get(api_browse::handle_agent_workspace_read),
1372        )
1373        .route(
1374            "/api/agents/{alias}/workspace/path",
1375            delete(api_browse::handle_agent_workspace_delete),
1376        )
1377        .route(
1378            "/api/agents/{alias}/workspace/move",
1379            post(api_browse::handle_agent_workspace_move),
1380        )
1381        .route(
1382            "/api/agents/{alias}/workspace/mkdir",
1383            post(api_browse::handle_agent_workspace_mkdir),
1384        )
1385        .route("/api/skills/bundles", get(api_skills::handle_list_bundles))
1386        .route(
1387            "/api/skills/bundles/{alias}/skills",
1388            get(api_skills::handle_list_skills).post(api_skills::handle_create_skill),
1389        )
1390        .route(
1391            "/api/skills/bundles/{alias}/skills/{name}",
1392            get(api_skills::handle_read_skill)
1393                .put(api_skills::handle_write_skill)
1394                .delete(api_skills::handle_delete_skill),
1395        )
1396        .route("/api/config/init", post(api_config::handle_init))
1397        .route("/api/config/migrate", post(api_config::handle_migrate))
1398        .route("/api/openapi.json", get(openapi::handle_openapi_json))
1399        .route("/api/docs", get(openapi::handle_docs))
1400        .route("/api/tools", get(api::handle_api_tools))
1401        .route("/api/cron", get(api::handle_api_cron_list))
1402        .route("/api/cron", post(api::handle_api_cron_add))
1403        .route(
1404            "/api/cron/settings",
1405            get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1406        )
1407        .route(
1408            "/api/cron/{id}",
1409            delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1410        )
1411        .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1412        // Note: `/api/cron/{id}/run` is registered on a separate router below
1413        // with a longer TimeoutLayer — manual cron triggers run the job
1414        // synchronously and routinely exceed the 30s gateway-wide default.
1415        .route("/api/integrations", get(api::handle_api_integrations))
1416        .route(
1417            "/api/integrations/settings",
1418            get(api::handle_api_integrations_settings),
1419        )
1420        .route(
1421            "/api/doctor",
1422            get(api::handle_api_doctor).post(api::handle_api_doctor),
1423        )
1424        .route("/api/memory", get(api::handle_api_memory_list))
1425        .route("/api/memory", post(api::handle_api_memory_store))
1426        .route("/api/memory/{key}", delete(api::handle_api_memory_delete))
1427        .route("/api/cost", get(api::handle_api_cost))
1428        .route("/api/cli-tools", get(api::handle_api_cli_tools))
1429        .route("/api/channels", get(api::handle_api_channels))
1430        .route("/api/health", get(api::handle_api_health))
1431        .route("/api/sessions", get(api::handle_api_sessions_list))
1432        .route("/api/sessions/running", get(api::handle_api_sessions_running))
1433        .route(
1434            "/api/sessions/{id}/messages",
1435            get(api::handle_api_session_messages).post(api::handle_api_session_message_post),
1436        )
1437        .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1438        .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1439        .route("/api/sessions/{id}/abort", post(api::handle_api_session_abort))
1440        // ── Pairing + Device management API ──
1441        .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1442        .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1443        .route("/api/devices", get(api_pairing::list_devices))
1444        .route(
1445            "/api/devices/me/capabilities",
1446            post(api_pairing::update_my_capabilities),
1447        )
1448        .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1449        .route(
1450            "/api/devices/{id}/token/rotate",
1451            post(api_pairing::rotate_token),
1452        )
1453        // ── Live Canvas (A2UI) routes ──
1454        .route("/api/canvas", get(canvas::handle_canvas_list))
1455        .route(
1456            "/api/canvas/{id}",
1457            get(canvas::handle_canvas_get)
1458                .post(canvas::handle_canvas_post)
1459                .delete(canvas::handle_canvas_clear),
1460        )
1461        .route(
1462            "/api/canvas/{id}/history",
1463            get(canvas::handle_canvas_history),
1464        );
1465
1466    // ── WebAuthn hardware key authentication API (requires webauthn feature) ──
1467    #[cfg(feature = "webauthn")]
1468    let inner = inner
1469        .route(
1470            "/api/webauthn/register/start",
1471            post(api_webauthn::handle_register_start),
1472        )
1473        .route(
1474            "/api/webauthn/register/finish",
1475            post(api_webauthn::handle_register_finish),
1476        )
1477        .route(
1478            "/api/webauthn/auth/start",
1479            post(api_webauthn::handle_auth_start),
1480        )
1481        .route(
1482            "/api/webauthn/auth/finish",
1483            post(api_webauthn::handle_auth_finish),
1484        )
1485        .route(
1486            "/api/webauthn/credentials",
1487            get(api_webauthn::handle_list_credentials),
1488        )
1489        .route(
1490            "/api/webauthn/credentials/{id}",
1491            delete(api_webauthn::handle_delete_credential),
1492        );
1493
1494    // ── Plugin management API (requires plugins-wasm feature) ──
1495    #[cfg(feature = "plugins-wasm")]
1496    let inner = inner.route(
1497        "/api/plugins",
1498        get(api_plugins::plugin_routes::list_plugins),
1499    );
1500
1501    let inner = inner
1502        // ── SSE event stream ──
1503        .route("/api/events", get(sse::handle_sse_events))
1504        .route("/api/events/history", get(sse::handle_events_history))
1505        // ── ACP client bridge ──
1506        .route("/acp", get(acp::handle_ws_acp))
1507        // ── WebSocket agent chat ──
1508        .route("/ws/chat", get(ws::handle_ws_chat))
1509        // ── WebSocket canvas updates ──
1510        .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1511        // ── WebSocket node discovery ──
1512        .route("/ws/nodes", get(nodes::handle_ws_nodes))
1513        // ── Static assets (web dashboard) ──
1514        .route("/_app/{*path}", get(static_files::handle_static))
1515        // ── SPA fallback: non-API GET requests serve index.html ──
1516        .fallback(get(static_files::handle_spa_fallback))
1517        .with_state(state.clone())
1518        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1519        .layer(TimeoutLayer::with_status_code(
1520            StatusCode::REQUEST_TIMEOUT,
1521            Duration::from_secs(gateway_request_timeout_secs(&config.gateway)),
1522        ));
1523
1524    // Manual cron-trigger route lives on its own sub-router so it can opt out
1525    // of the 30s gateway-wide TimeoutLayer. Layers attached here travel with
1526    // the route through `merge`, so only this endpoint sees the longer
1527    // timeout.
1528    let cron_run_router: Router = Router::new()
1529        .route("/api/cron/{id}/run", post(api::handle_api_cron_run))
1530        .with_state(state)
1531        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1532        .layer(TimeoutLayer::with_status_code(
1533            StatusCode::REQUEST_TIMEOUT,
1534            Duration::from_secs(gateway_long_running_request_timeout_secs(&config.gateway)),
1535        ));
1536
1537    let inner = inner.merge(cron_run_router);
1538
1539    // Nest under path prefix when configured (axum strips prefix before routing).
1540    // nest() at "/prefix" handles both "/prefix" and "/prefix/*" but not "/prefix/"
1541    // with a trailing slash, so we add a fallback redirect for that case.
1542    let app = if let Some(prefix) = path_prefix {
1543        let redirect_target = prefix.to_string();
1544        Router::new().nest(prefix, inner).route(
1545            &format!("{prefix}/"),
1546            get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1547        )
1548    } else {
1549        inner
1550    };
1551
1552    // ── TLS / mTLS setup ───────────────────────────────────────────
1553    let tls_acceptor = match &config.gateway.tls {
1554        Some(tls_cfg) if tls_cfg.enabled => {
1555            let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1556            if has_mtls {
1557                ::zeroclaw_log::record!(
1558                    INFO,
1559                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1560                    "TLS enabled with mutual TLS (mTLS) client verification"
1561                );
1562            } else {
1563                ::zeroclaw_log::record!(
1564                    INFO,
1565                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1566                    "TLS enabled (no client certificate requirement)"
1567                );
1568            }
1569            Some(tls::build_tls_acceptor(tls_cfg)?)
1570        }
1571        _ => None,
1572    };
1573
1574    if let Some(tls_acceptor) = tls_acceptor {
1575        // Manual TLS accept loop — serves each connection via hyper.
1576        let app = app.into_make_service_with_connect_info::<SocketAddr>();
1577        let mut app = app;
1578
1579        let mut shutdown_signal = shutdown_rx;
1580        loop {
1581            tokio::select! {
1582                conn = listener.accept() => {
1583                    let (tcp_stream, remote_addr) = conn?;
1584                    let tls_acceptor = tls_acceptor.clone();
1585                    let svc = tower::MakeService::<
1586                        SocketAddr,
1587                        hyper::Request<hyper::body::Incoming>,
1588                    >::make_service(&mut app, remote_addr)
1589                    .await
1590                    .expect("infallible make_service");
1591
1592                    tokio::spawn(async move {
1593                        let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1594                            Ok(s) => s,
1595                            Err(e) => {
1596                                ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"error": format!("{}", e), "remote_addr": remote_addr})), "TLS handshake failed from");
1597                                return;
1598                            }
1599                        };
1600                        let io = hyper_util::rt::TokioIo::new(tls_stream);
1601                        let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1602                            let mut svc = svc.clone();
1603                            async move {
1604                                tower::Service::call(&mut svc, req).await
1605                            }
1606                        });
1607                        if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1608                            hyper_util::rt::TokioExecutor::new(),
1609                        )
1610                        .serve_connection(io, hyper_svc)
1611                        .await
1612                        {
1613                            ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"error": format!("{}", e), "remote_addr": remote_addr})), "connection error from");
1614                        }
1615                    });
1616                }
1617                _ = shutdown_signal.changed() => {
1618                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "ZeroClaw Gateway shutting down");
1619                    break;
1620                }
1621            }
1622        }
1623    } else {
1624        // Plain TCP — use axum's built-in serve.
1625        axum::serve(
1626            listener,
1627            app.into_make_service_with_connect_info::<SocketAddr>(),
1628        )
1629        .with_graceful_shutdown(async move {
1630            let _ = shutdown_rx.changed().await;
1631            ::zeroclaw_log::record!(
1632                INFO,
1633                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1634                "ZeroClaw Gateway shutting down"
1635            );
1636        })
1637        .await?;
1638    }
1639
1640    drop(broadcast_hook_guard);
1641
1642    Ok(())
1643}
1644
1645fn format_paircode_recovery_command(host: &str, port: u16) -> String {
1646    let mut cmd = format!("zeroclaw gateway get-paircode --new --port {port}");
1647    if let Some(host_arg) = paircode_recovery_host_arg(host) {
1648        cmd.push_str(" --host ");
1649        cmd.push_str(host_arg);
1650    }
1651    cmd
1652}
1653
1654fn paircode_recovery_host_arg(host: &str) -> Option<&str> {
1655    match host {
1656        "127.0.0.1" | "localhost" | "::1" | "0.0.0.0" | "::" => None,
1657        _ => Some(host),
1658    }
1659}
1660
1661fn format_paircode_recovery_curl(host: &str, port: u16, path_prefix: &str) -> String {
1662    format!("curl -s -X POST http://{host}:{port}{path_prefix}/admin/paircode/new")
1663}
1664
1665// ══════════════════════════════════════════════════════════════════════════════
1666// AXUM HANDLERS
1667// ══════════════════════════════════════════════════════════════════════════════
1668
1669/// GET /health — always public (no secrets leaked)
1670async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1671    let body = serde_json::json!({
1672        "status": "ok",
1673        "paired": state.pairing.is_paired(),
1674        "require_pairing": state.pairing.require_pairing(),
1675        "runtime": zeroclaw_runtime::health::snapshot_json(),
1676    });
1677    Json(body)
1678}
1679
1680/// Prometheus content type for text exposition format.
1681const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1682
1683fn prometheus_disabled_hint() -> String {
1684    String::from(
1685        "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1686    )
1687}
1688
1689#[cfg(feature = "observability-prometheus")]
1690fn prometheus_observer_from_state(
1691    observer: &dyn zeroclaw_runtime::observability::Observer,
1692) -> Option<&zeroclaw_runtime::observability::PrometheusObserver> {
1693    // `TeeObserver::as_any` returns the primary observer, so a single direct
1694    // downcast finds the PrometheusObserver whether the state observer is the
1695    // raw backend or wrapped by the factory tee.
1696    observer
1697        .as_any()
1698        .downcast_ref::<zeroclaw_runtime::observability::PrometheusObserver>()
1699}
1700
1701/// GET /metrics — Prometheus text exposition format
1702async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1703    let body = {
1704        #[cfg(feature = "observability-prometheus")]
1705        {
1706            if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1707                prom.encode()
1708            } else {
1709                prometheus_disabled_hint()
1710            }
1711        }
1712        #[cfg(not(feature = "observability-prometheus"))]
1713        {
1714            let _ = &state;
1715            prometheus_disabled_hint()
1716        }
1717    };
1718
1719    (
1720        StatusCode::OK,
1721        [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1722        body,
1723    )
1724}
1725
1726/// POST /pair — exchange one-time code for bearer token
1727#[axum::debug_handler]
1728async fn handle_pair(
1729    State(state): State<AppState>,
1730    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1731    headers: HeaderMap,
1732) -> impl IntoResponse {
1733    let rate_key =
1734        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1735    if !state.rate_limiter.allow_pair(&rate_key) {
1736        ::zeroclaw_log::record!(
1737            WARN,
1738            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1739                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1740            "/pair rate limit exceeded"
1741        );
1742        let err = serde_json::json!({
1743            "error": "Too many pairing requests. Please retry later.",
1744            "retry_after": RATE_LIMIT_WINDOW_SECS,
1745        });
1746        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1747    }
1748
1749    // ── Auth rate limiting (brute-force protection) ──
1750    if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key) {
1751        ::zeroclaw_log::record!(
1752            WARN,
1753            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1754                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1755                .with_attrs(::serde_json::json!({"rate_key": rate_key})),
1756            "pairing auth rate limit exceeded"
1757        );
1758        let err = serde_json::json!({
1759            "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1760            "retry_after": e.retry_after_secs,
1761        });
1762        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1763    }
1764
1765    let code = headers
1766        .get("X-Pairing-Code")
1767        .and_then(|v| v.to_str().ok())
1768        .unwrap_or("");
1769
1770    match state.pairing.try_pair(code, &rate_key).await {
1771        Ok(Some(token)) => {
1772            ::zeroclaw_log::record!(
1773                INFO,
1774                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1775                "new client paired successfully"
1776            );
1777            if let Err(err) =
1778                Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
1779            {
1780                ::zeroclaw_log::record!(
1781                    ERROR,
1782                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1783                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1784                        .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
1785                    "pairing succeeded but token persistence failed"
1786                );
1787                let body = serde_json::json!({
1788                    "paired": true,
1789                    "persisted": false,
1790                    "token": token,
1791                    "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
1792                });
1793                return (StatusCode::OK, Json(body));
1794            }
1795
1796            let body = serde_json::json!({
1797                "paired": true,
1798                "persisted": true,
1799                "token": token,
1800                "message": "Save this token — use it as Authorization: Bearer <token>"
1801            });
1802            (StatusCode::OK, Json(body))
1803        }
1804        Ok(None) => {
1805            state.auth_limiter.record_attempt(&rate_key);
1806            ::zeroclaw_log::record!(
1807                WARN,
1808                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1809                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1810                "pairing attempt with invalid code"
1811            );
1812            let err = serde_json::json!({"error": "Invalid pairing code"});
1813            (StatusCode::FORBIDDEN, Json(err))
1814        }
1815        Err(lockout_secs) => {
1816            ::zeroclaw_log::record!(
1817                WARN,
1818                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1819                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1820                    .with_attrs(::serde_json::json!({"lockout_secs": lockout_secs})),
1821                "pairing locked out; too many failed attempts"
1822            );
1823            let err = serde_json::json!({
1824                "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
1825                "retry_after": lockout_secs
1826            });
1827            (StatusCode::TOO_MANY_REQUESTS, Json(err))
1828        }
1829    }
1830}
1831
1832async fn persist_pairing_tokens(config: Arc<RwLock<Config>>, pairing: &PairingGuard) -> Result<()> {
1833    let paired_tokens = pairing.tokens();
1834    // This is needed because parking_lot's guard is not Send so we clone the inner
1835    // this should be removed once async mutexes are used everywhere
1836    let mut updated_cfg = { config.read().clone() };
1837    updated_cfg.gateway.paired_tokens = paired_tokens;
1838    updated_cfg.mark_dirty("gateway.paired-tokens");
1839    updated_cfg
1840        .save_dirty()
1841        .await
1842        .context("Failed to persist paired tokens to config.toml")?;
1843
1844    // Keep shared runtime config in sync with persisted tokens.
1845    *config.write() = updated_cfg;
1846    Ok(())
1847}
1848
1849/// Result of a gateway chat turn. Carries the response text plus per-turn
1850/// token / cost totals captured from the cost-tracking scope (when present)
1851/// so callers can populate observer-event annotations without racing
1852/// concurrent webhook traffic that shares the same `CostTracker`.
1853struct GatewayChatOutcome {
1854    response: String,
1855    input_tokens: Option<u64>,
1856    output_tokens: Option<u64>,
1857    cost_usd: Option<f64>,
1858}
1859
1860/// Returns a structured `needs_onboarding` error when `model` is empty
1861/// or whitespace-only, otherwise `None`. Empty model means the gateway
1862/// booted with nothing configured (fresh install). Callers refuse the
1863/// dispatch with this marker instead of calling the provider with an
1864/// empty model id. Mirrors `agent::Agent::from_config` at
1865/// request-time so `/onboard` stays reachable.
1866fn needs_onboarding_for(model: &str) -> Option<anyhow::Error> {
1867    if model.trim().is_empty() {
1868        ::zeroclaw_log::record!(
1869            WARN,
1870            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
1871                .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1872            "gateway dispatch refused: no model configured (browser onboarding incomplete)"
1873        );
1874        Some(anyhow::Error::msg(
1875            "needs_onboarding: gateway has no model configured. Complete \
1876             browser onboarding at /onboard, or set [providers.models.<type>.<alias>] \
1877             model = \"...\" before sending messages.",
1878        ))
1879    } else {
1880        None
1881    }
1882}
1883
1884/// True when `e` carries the marker produced by `needs_onboarding_for`.
1885/// Used by chat-dispatch error paths to map the marker to a 503
1886/// `needs_onboarding` HTTP response or a more accurate channel-side
1887/// reply, instead of the generic 500 / "sorry" catch-all.
1888fn is_needs_onboarding_err(e: &anyhow::Error) -> bool {
1889    e.to_string().contains("needs_onboarding")
1890}
1891
1892/// Reply text sent over a channel SDK when chat dispatch refuses
1893/// because the gateway has no model configured. Resolved through the
1894/// shared Fluent catalog (`channel-needs-onboarding-reply` in
1895/// `crates/zeroclaw-runtime/locales/<locale>/cli.ftl`) so non-English
1896/// operators see localized text instead of a Rust-side English literal.
1897fn needs_onboarding_channel_reply() -> String {
1898    i18n::get_required_cli_string("channel-needs-onboarding-reply")
1899}
1900
1901/// Full-featured chat with tools for channel and webhook handlers.
1902async fn run_gateway_chat_with_tools(
1903    state: &AppState,
1904    message: &str,
1905    session_id: Option<&str>,
1906) -> anyhow::Result<GatewayChatOutcome> {
1907    if let Some(err) = needs_onboarding_for(&state.model) {
1908        return Err(err);
1909    }
1910
1911    // Tests exercise webhook infrastructure (idempotency, auth, autosave)
1912    // through handle_webhook, so dispatch to the mock model_provider directly
1913    // instead of bootstrapping the full agent runtime. The mock path
1914    // doesn't go through the cost-tracking scope, so usage stays None.
1915    #[cfg(test)]
1916    {
1917        let _ = session_id;
1918        let response = state
1919            .model_provider
1920            .chat_with_system(None, message, &state.model, state.temperature)
1921            .await?;
1922        Ok(GatewayChatOutcome {
1923            response,
1924            input_tokens: None,
1925            output_tokens: None,
1926            cost_usd: None,
1927        })
1928    }
1929
1930    #[cfg(not(test))]
1931    {
1932        let config = state.config.read().clone();
1933        // Legacy: webhook chat / SSE / pairing endpoints don't yet
1934        // accept an explicit agent in the request payload. Pick the
1935        // migration-synthesized "default" agent (or first enabled) until
1936        // the per-request agent dispatch refactor lands.
1937        let agent_alias = config
1938            .agents
1939            .keys()
1940            .find(|k| k.as_str() == "default")
1941            .or_else(|| {
1942                config
1943                    .agents
1944                    .iter()
1945                    .find(|(_, a)| a.enabled)
1946                    .map(|(alias, _)| alias)
1947            })
1948            .ok_or_else(|| {
1949                ::zeroclaw_log::record!(
1950                    WARN,
1951                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
1952                        .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1953                    "webhook chat rejected: no configured [agents.<alias>] entry"
1954                );
1955                anyhow::Error::msg(
1956                    "webhook chat requires at least one configured [agents.<alias>] entry",
1957                )
1958            })?
1959            .clone();
1960
1961        // Scope the cost tracking context so per-LLM-call usage flows into the
1962        // gateway's cost tracker and costs.jsonl. Without this scope, the
1963        // tracker exists on AppState but never receives any records from the
1964        // runtime tool loop. The context's per-scope `turn_usage` accumulator
1965        // also lets us read out this turn's tokens / cost after the scope
1966        // exits without racing concurrent webhook traffic that shares the
1967        // same tracker. Pricing comes from the V3 per-provider shape
1968        // (`config.providers.models[*][*].pricing`), keyed as
1969        // `<type>.<alias>` to match how the channels orchestrator builds
1970        // its `ModelProviderPricing`.
1971        let cost_tracking_context = state.cost_tracker.as_ref().map(|tracker| {
1972            let pricing: zeroclaw_runtime::agent::cost::ModelProviderPricing = config
1973                .providers
1974                .models
1975                .iter_entries()
1976                .filter(|(_, _, base)| !base.pricing.is_empty())
1977                .map(|(type_k, alias_k, base)| {
1978                    (format!("{type_k}.{alias_k}"), base.pricing.clone())
1979                })
1980                .collect();
1981            zeroclaw_runtime::agent::cost::ToolLoopCostTrackingContext::new(
1982                tracker.clone(),
1983                std::sync::Arc::new(pricing),
1984            )
1985            .with_agent_alias(&agent_alias)
1986        });
1987        let captured_usage = cost_tracking_context
1988            .as_ref()
1989            .map(|ctx| ctx.turn_usage.clone());
1990        let response = Box::pin(
1991            zeroclaw_runtime::agent::cost::TOOL_LOOP_COST_TRACKING_CONTEXT.scope(
1992                cost_tracking_context,
1993                zeroclaw_runtime::agent::process_message(config, &agent_alias, message, session_id),
1994            ),
1995        )
1996        .await?;
1997        let usage = captured_usage
1998            .map(|cell| *cell.lock())
1999            .filter(|u| u.input_tokens > 0 || u.output_tokens > 0);
2000        let (input_tokens, output_tokens, cost_usd) = match usage {
2001            Some(u) => (
2002                Some(u.input_tokens),
2003                Some(u.output_tokens),
2004                Some(u.cost_usd),
2005            ),
2006            None => (None, None, None),
2007        };
2008        Ok(GatewayChatOutcome {
2009            response,
2010            input_tokens,
2011            output_tokens,
2012            cost_usd,
2013        })
2014    }
2015}
2016
2017/// Webhook request body
2018#[derive(serde::Deserialize)]
2019pub struct WebhookBody {
2020    pub message: String,
2021}
2022
2023/// POST /webhook — main webhook endpoint
2024async fn handle_webhook(
2025    State(state): State<AppState>,
2026    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
2027    headers: HeaderMap,
2028    body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
2029) -> impl IntoResponse {
2030    let rate_key =
2031        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
2032    if !state.rate_limiter.allow_webhook(&rate_key) {
2033        ::zeroclaw_log::record!(
2034            WARN,
2035            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2036                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2037            "/webhook rate limit exceeded"
2038        );
2039        let err = serde_json::json!({
2040            "error": "Too many webhook requests. Please retry later.",
2041            "retry_after": RATE_LIMIT_WINDOW_SECS,
2042        });
2043        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2044    }
2045
2046    // ── Bearer token auth (pairing) with auth rate limiting ──
2047    if state.pairing.require_pairing() {
2048        if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key) {
2049            ::zeroclaw_log::record!(
2050                WARN,
2051                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2052                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2053                    .with_attrs(::serde_json::json!({"rate_key": rate_key})),
2054                "webhook: auth rate limit exceeded for"
2055            );
2056            let err = serde_json::json!({
2057                "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
2058                "retry_after": e.retry_after_secs,
2059            });
2060            return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2061        }
2062        let auth = headers
2063            .get(header::AUTHORIZATION)
2064            .and_then(|v| v.to_str().ok())
2065            .unwrap_or("");
2066        let token = auth.strip_prefix("Bearer ").unwrap_or("");
2067        if !state.pairing.is_authenticated(token) {
2068            state.auth_limiter.record_attempt(&rate_key);
2069            ::zeroclaw_log::record!(
2070                WARN,
2071                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2072                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2073                "webhook: rejected — not paired / invalid bearer token"
2074            );
2075            let err = serde_json::json!({
2076                "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
2077            });
2078            return (StatusCode::UNAUTHORIZED, Json(err));
2079        }
2080    }
2081
2082    // ── Webhook secret auth (optional, additional layer) ──
2083    if let Some(ref secret_hash) = state.webhook_secret_hash {
2084        let header_hash = headers
2085            .get("X-Webhook-Secret")
2086            .and_then(|v| v.to_str().ok())
2087            .map(str::trim)
2088            .filter(|value| !value.is_empty())
2089            .map(hash_webhook_secret);
2090        match header_hash {
2091            Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
2092            _ => {
2093                ::zeroclaw_log::record!(
2094                    WARN,
2095                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2096                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2097                    "webhook: rejected request — invalid or missing X-Webhook-Secret"
2098                );
2099                let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
2100                return (StatusCode::UNAUTHORIZED, Json(err));
2101            }
2102        }
2103    }
2104
2105    // ── Parse body ──
2106    let Json(webhook_body) = match body {
2107        Ok(b) => b,
2108        Err(e) => {
2109            ::zeroclaw_log::record!(
2110                WARN,
2111                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2112                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2113                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2114                "webhook JSON parse error"
2115            );
2116            let err = serde_json::json!({
2117                "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
2118            });
2119            return (StatusCode::BAD_REQUEST, Json(err));
2120        }
2121    };
2122
2123    // ── Idempotency (optional) ──
2124    if let Some(idempotency_key) = headers
2125        .get("X-Idempotency-Key")
2126        .and_then(|v| v.to_str().ok())
2127        .map(str::trim)
2128        .filter(|value| !value.is_empty())
2129        && !state.idempotency_store.record_if_new(idempotency_key)
2130    {
2131        ::zeroclaw_log::record!(
2132            INFO,
2133            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2134                .with_attrs(::serde_json::json!({"idempotency_key": idempotency_key})),
2135            "webhook duplicate ignored"
2136        );
2137        let body = serde_json::json!({
2138            "status": "duplicate",
2139            "idempotent": true,
2140            "message": "Request already processed for this idempotency key"
2141        });
2142        return (StatusCode::OK, Json(body));
2143    }
2144
2145    let message = &webhook_body.message;
2146    let session_id = webhook_session_id(&headers);
2147
2148    if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(message) {
2149        let key = webhook_memory_key();
2150        let _ = state
2151            .mem
2152            .store(
2153                &key,
2154                message,
2155                MemoryCategory::Conversation,
2156                session_id.as_deref(),
2157            )
2158            .await;
2159    }
2160
2161    let provider_label = state
2162        .config
2163        .read()
2164        .first_model_provider_type()
2165        .unwrap_or("unknown")
2166        .to_string();
2167    let model_label = state.model.clone();
2168    let started_at = Instant::now();
2169
2170    state.observer.record_event(
2171        &zeroclaw_runtime::observability::ObserverEvent::AgentStart {
2172            model_provider: provider_label.clone(),
2173            model: model_label.clone(),
2174        },
2175    );
2176    state.observer.record_event(
2177        &zeroclaw_runtime::observability::ObserverEvent::LlmRequest {
2178            model_provider: provider_label.clone(),
2179            model: model_label.clone(),
2180            messages_count: 1,
2181        },
2182    );
2183
2184    match run_gateway_chat_with_tools(&state, message, session_id.as_deref()).await {
2185        Ok(GatewayChatOutcome {
2186            response,
2187            input_tokens,
2188            output_tokens,
2189            cost_usd,
2190        }) => {
2191            let duration = started_at.elapsed();
2192            // Per-turn token / cost annotation captured from the cost-tracking
2193            // scope inside `run_gateway_chat_with_tools` (None outside of test
2194            // / when no LLM call recorded). Cost is also persisted to
2195            // /api/cost and costs.jsonl via the same scope.
2196            let tokens_used = input_tokens
2197                .zip(output_tokens)
2198                .map(|(i, o)| i + o)
2199                .or(input_tokens)
2200                .or(output_tokens);
2201            state.observer.record_event(
2202                &zeroclaw_runtime::observability::ObserverEvent::LlmResponse {
2203                    model_provider: provider_label.clone(),
2204                    model: model_label.clone(),
2205                    duration,
2206                    success: true,
2207                    error_message: None,
2208                    input_tokens: None,
2209                    output_tokens: None,
2210                },
2211            );
2212            state.observer.record_metric(
2213                &zeroclaw_runtime::observability::traits::ObserverMetric::RequestLatency(duration),
2214            );
2215            state.observer.record_event(
2216                &zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
2217                    model_provider: provider_label,
2218                    model: model_label,
2219                    duration,
2220                    tokens_used,
2221                    cost_usd,
2222                },
2223            );
2224
2225            let body = serde_json::json!({"response": response, "model": state.model});
2226            (StatusCode::OK, Json(body))
2227        }
2228        Err(e) => {
2229            let duration = started_at.elapsed();
2230            let sanitized = zeroclaw_providers::sanitize_api_error(&e.to_string());
2231
2232            state.observer.record_event(
2233                &zeroclaw_runtime::observability::ObserverEvent::LlmResponse {
2234                    model_provider: provider_label.clone(),
2235                    model: model_label.clone(),
2236                    duration,
2237                    success: false,
2238                    error_message: Some(sanitized.clone()),
2239                    input_tokens: None,
2240                    output_tokens: None,
2241                },
2242            );
2243            state.observer.record_metric(
2244                &zeroclaw_runtime::observability::traits::ObserverMetric::RequestLatency(duration),
2245            );
2246            state
2247                .observer
2248                .record_event(&zeroclaw_runtime::observability::ObserverEvent::Error {
2249                    component: "gateway".to_string(),
2250                    message: sanitized.clone(),
2251                });
2252            state.observer.record_event(
2253                &zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
2254                    model_provider: provider_label,
2255                    model: model_label,
2256                    duration,
2257                    tokens_used: None,
2258                    cost_usd: None,
2259                },
2260            );
2261
2262            if is_needs_onboarding_err(&e) {
2263                ::zeroclaw_log::record!(
2264                    WARN,
2265                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2266                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2267                    "Webhook chat refused: gateway has no model configured; \
2268                     visit /onboard"
2269                );
2270                let body = serde_json::json!({
2271                    "error": "needs_onboarding",
2272                    "url": "/onboard"
2273                });
2274                (StatusCode::SERVICE_UNAVAILABLE, Json(body))
2275            } else {
2276                ::zeroclaw_log::record!(
2277                    ERROR,
2278                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2279                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2280                        .with_attrs(::serde_json::json!({"error": sanitized})),
2281                    "webhook model_provider error"
2282                );
2283                let err = serde_json::json!({"error": "LLM request failed"});
2284                (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
2285            }
2286        }
2287    }
2288}
2289
2290/// `WhatsApp` verification query params
2291#[derive(serde::Deserialize)]
2292pub struct WhatsAppVerifyQuery {
2293    #[serde(rename = "hub.mode")]
2294    pub mode: Option<String>,
2295    #[serde(rename = "hub.verify_token")]
2296    pub verify_token: Option<String>,
2297    #[serde(rename = "hub.challenge")]
2298    pub challenge: Option<String>,
2299}
2300
2301/// GET /whatsapp — Meta webhook verification
2302async fn handle_whatsapp_verify(
2303    State(state): State<AppState>,
2304    Query(params): Query<WhatsAppVerifyQuery>,
2305) -> impl IntoResponse {
2306    let Some(ref wa) = state.whatsapp else {
2307        return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
2308    };
2309
2310    // Verify the token matches (constant-time comparison to prevent timing attacks)
2311    let token_matches = params
2312        .verify_token
2313        .as_deref()
2314        .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
2315    if params.mode.as_deref() == Some("subscribe") && token_matches {
2316        if let Some(ch) = params.challenge {
2317            ::zeroclaw_log::record!(
2318                INFO,
2319                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2320                    .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2321                "webhook verified successfully"
2322            );
2323            return (StatusCode::OK, ch);
2324        }
2325        return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
2326    }
2327
2328    ::zeroclaw_log::record!(
2329        WARN,
2330        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2331            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2332            .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2333        "webhook verification failed — token mismatch"
2334    );
2335    (StatusCode::FORBIDDEN, "Forbidden".to_string())
2336}
2337
2338/// Verify `WhatsApp` webhook signature (`X-Hub-Signature-256`).
2339/// Returns true if the signature is valid, false otherwise.
2340/// See: <https://developers.facebook.com/docs/graph-api/webhooks/getting-started#verification-requests>
2341pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
2342    use hmac::{Hmac, Mac};
2343    use sha2::Sha256;
2344
2345    // Signature format: "sha256=<hex_signature>"
2346    let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
2347        return false;
2348    };
2349
2350    // Decode hex signature
2351    let Ok(expected) = hex::decode(hex_sig) else {
2352        return false;
2353    };
2354
2355    // Compute HMAC-SHA256
2356    let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
2357        return false;
2358    };
2359    mac.update(body);
2360
2361    // Constant-time comparison
2362    mac.verify_slice(&expected).is_ok()
2363}
2364
2365/// POST /whatsapp — incoming message webhook
2366async fn handle_whatsapp_message(
2367    State(state): State<AppState>,
2368    headers: HeaderMap,
2369    body: Bytes,
2370) -> impl IntoResponse {
2371    let Some(ref wa) = state.whatsapp else {
2372        return (
2373            StatusCode::NOT_FOUND,
2374            Json(serde_json::json!({"error": "WhatsApp not configured"})),
2375        );
2376    };
2377
2378    // ── Security: Verify X-Hub-Signature-256 if app_secret is configured ──
2379    if let Some(ref app_secret) = state.whatsapp_app_secret {
2380        let signature = headers
2381            .get("X-Hub-Signature-256")
2382            .and_then(|v| v.to_str().ok())
2383            .unwrap_or("");
2384
2385        if !verify_whatsapp_signature(app_secret, &body, signature) {
2386            ::zeroclaw_log::record!(
2387                WARN,
2388                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2389                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2390                    .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2391                &format!(
2392                    "webhook signature verification failed (signature: {})",
2393                    if signature.is_empty() {
2394                        "missing"
2395                    } else {
2396                        "invalid"
2397                    }
2398                )
2399            );
2400            return (
2401                StatusCode::UNAUTHORIZED,
2402                Json(serde_json::json!({"error": "Invalid signature"})),
2403            );
2404        }
2405    }
2406
2407    // Parse JSON body
2408    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2409        return (
2410            StatusCode::BAD_REQUEST,
2411            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2412        );
2413    };
2414
2415    // Parse messages from the webhook payload
2416    let messages = wa.parse_webhook_payload(&payload);
2417
2418    if messages.is_empty() {
2419        // Acknowledge the webhook even if no messages (could be status updates)
2420        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2421    }
2422
2423    // Process each message
2424    for msg in &messages {
2425        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "whatsapp", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2426
2427        // Route approval replies to pending approval requests before dispatching to agent
2428        if let Some((token, response)) = zeroclaw_channels::util::parse_approval_reply(&msg.content)
2429        {
2430            let mut map = wa.pending_approvals().lock().await;
2431            if let Some(sender) = map.remove(&token) {
2432                let _ = sender.send(response);
2433                continue;
2434            }
2435        }
2436
2437        let session_id = sender_session_id("whatsapp", msg);
2438
2439        // Auto-save to memory
2440        if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2441            let key = whatsapp_memory_key(msg);
2442            let _ = state
2443                .mem
2444                .store(
2445                    &key,
2446                    &msg.content,
2447                    MemoryCategory::Conversation,
2448                    Some(&session_id),
2449                )
2450                .await;
2451        }
2452
2453        match Box::pin(run_gateway_chat_with_tools(
2454            &state,
2455            &msg.content,
2456            Some(&session_id),
2457        ))
2458        .await
2459        {
2460            Ok(GatewayChatOutcome { response, .. }) => {
2461                // Send reply via WhatsApp
2462                if let Err(e) = wa
2463                    .send(&SendMessage::new(response, &msg.reply_target))
2464                    .await
2465                {
2466                    ::zeroclaw_log::record!(
2467                        ERROR,
2468                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2469                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2470                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2471                        "Failed to send WhatsApp reply"
2472                    );
2473                }
2474            }
2475            Err(e) => {
2476                let reply = if is_needs_onboarding_err(&e) {
2477                    ::zeroclaw_log::record!(
2478                        WARN,
2479                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2480                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2481                        "WhatsApp chat refused: gateway has no model configured; \
2482                         visit /onboard"
2483                    );
2484                    needs_onboarding_channel_reply()
2485                } else {
2486                    ::zeroclaw_log::record!(
2487                        ERROR,
2488                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2489                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2490                            .with_attrs(
2491                                ::serde_json::json!({"channel": "whatsapp", "error": format!("{}", e)})
2492                            ),
2493                        "LLM error"
2494                    );
2495                    "Sorry, I couldn't process your message right now.".to_string()
2496                };
2497                let _ = wa.send(&SendMessage::new(reply, &msg.reply_target)).await;
2498            }
2499        }
2500    }
2501
2502    // Acknowledge the webhook
2503    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2504}
2505
2506/// POST /linq — incoming message webhook (iMessage/RCS/SMS via Linq)
2507async fn handle_linq_webhook(
2508    State(state): State<AppState>,
2509    headers: HeaderMap,
2510    body: Bytes,
2511) -> impl IntoResponse {
2512    let Some(ref linq) = state.linq else {
2513        return (
2514            StatusCode::NOT_FOUND,
2515            Json(serde_json::json!({"error": "Linq not configured"})),
2516        );
2517    };
2518
2519    let body_str = String::from_utf8_lossy(&body);
2520
2521    // ── Security: Verify X-Webhook-Signature if signing_secret is configured ──
2522    if let Some(ref signing_secret) = state.linq_signing_secret {
2523        let timestamp = headers
2524            .get("X-Webhook-Timestamp")
2525            .and_then(|v| v.to_str().ok())
2526            .unwrap_or("");
2527
2528        let signature = headers
2529            .get("X-Webhook-Signature")
2530            .and_then(|v| v.to_str().ok())
2531            .unwrap_or("");
2532
2533        if !zeroclaw_channels::linq::verify_linq_signature(
2534            signing_secret,
2535            &body_str,
2536            timestamp,
2537            signature,
2538        ) {
2539            ::zeroclaw_log::record!(
2540                WARN,
2541                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2542                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2543                &format!(
2544                    "Linq webhook signature verification failed (signature: {})",
2545                    if signature.is_empty() {
2546                        "missing"
2547                    } else {
2548                        "invalid"
2549                    }
2550                )
2551            );
2552            return (
2553                StatusCode::UNAUTHORIZED,
2554                Json(serde_json::json!({"error": "Invalid signature"})),
2555            );
2556        }
2557    }
2558
2559    // Parse JSON body
2560    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2561        return (
2562            StatusCode::BAD_REQUEST,
2563            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2564        );
2565    };
2566
2567    // Parse messages from the webhook payload
2568    let messages = linq.parse_webhook_payload(&payload);
2569
2570    if messages.is_empty() {
2571        // Acknowledge the webhook even if no messages (could be status/delivery events)
2572        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2573    }
2574
2575    // Process each message
2576    for msg in &messages {
2577        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "linq", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2578        let session_id = sender_session_id("linq", msg);
2579
2580        // Auto-save to memory
2581        if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2582            let key = linq_memory_key(msg);
2583            let _ = state
2584                .mem
2585                .store(
2586                    &key,
2587                    &msg.content,
2588                    MemoryCategory::Conversation,
2589                    Some(&session_id),
2590                )
2591                .await;
2592        }
2593
2594        // Call the LLM
2595        match Box::pin(run_gateway_chat_with_tools(
2596            &state,
2597            &msg.content,
2598            Some(&session_id),
2599        ))
2600        .await
2601        {
2602            Ok(GatewayChatOutcome { response, .. }) => {
2603                // Send reply via Linq
2604                if let Err(e) = linq
2605                    .send(&SendMessage::new(response, &msg.reply_target))
2606                    .await
2607                {
2608                    ::zeroclaw_log::record!(
2609                        ERROR,
2610                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2611                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2612                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2613                        "Failed to send Linq reply"
2614                    );
2615                }
2616            }
2617            Err(e) => {
2618                let reply = if is_needs_onboarding_err(&e) {
2619                    ::zeroclaw_log::record!(
2620                        WARN,
2621                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2622                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2623                        "Linq chat refused: gateway has no model configured; \
2624                         visit /onboard"
2625                    );
2626                    needs_onboarding_channel_reply()
2627                } else {
2628                    ::zeroclaw_log::record!(
2629                        ERROR,
2630                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2631                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2632                            .with_attrs(
2633                                ::serde_json::json!({"channel": "linq", "error": format!("{}", e)})
2634                            ),
2635                        "LLM error"
2636                    );
2637                    "Sorry, I couldn't process your message right now.".to_string()
2638                };
2639                let _ = linq.send(&SendMessage::new(reply, &msg.reply_target)).await;
2640            }
2641        }
2642    }
2643
2644    // Acknowledge the webhook
2645    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2646}
2647
2648/// GET /wati — WATI webhook verification (echoes hub.challenge)
2649async fn handle_wati_verify(
2650    State(state): State<AppState>,
2651    Query(params): Query<WatiVerifyQuery>,
2652) -> impl IntoResponse {
2653    if state.wati.is_none() {
2654        return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
2655    }
2656
2657    // WATI may use Meta-style webhook verification; echo the challenge
2658    if let Some(challenge) = params.challenge {
2659        ::zeroclaw_log::record!(
2660            INFO,
2661            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2662                .with_attrs(::serde_json::json!({"channel": "wati"})),
2663            "webhook verified successfully"
2664        );
2665        return (StatusCode::OK, challenge);
2666    }
2667
2668    (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
2669}
2670
2671#[derive(Debug, serde::Deserialize)]
2672pub struct WatiVerifyQuery {
2673    #[serde(rename = "hub.challenge")]
2674    pub challenge: Option<String>,
2675}
2676
2677/// POST /wati — incoming WATI WhatsApp message webhook
2678async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
2679    let Some(ref wati) = state.wati else {
2680        return (
2681            StatusCode::NOT_FOUND,
2682            Json(serde_json::json!({"error": "WATI not configured"})),
2683        );
2684    };
2685
2686    // Parse JSON body
2687    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2688        return (
2689            StatusCode::BAD_REQUEST,
2690            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2691        );
2692    };
2693
2694    // Detect audio before the synchronous parse
2695    let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
2696
2697    let messages = if matches!(msg_type, "audio" | "voice") {
2698        // Build a synthetic ChannelMessage from the audio transcript
2699        if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
2700            wati.parse_audio_as_message(&payload, transcript)
2701        } else {
2702            vec![]
2703        }
2704    } else {
2705        wati.parse_webhook_payload(&payload)
2706    };
2707
2708    if messages.is_empty() {
2709        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2710    }
2711
2712    // Process each message
2713    for msg in &messages {
2714        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "wati", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2715        let session_id = sender_session_id("wati", msg);
2716
2717        // Auto-save to memory
2718        if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2719            let key = wati_memory_key(msg);
2720            let _ = state
2721                .mem
2722                .store(
2723                    &key,
2724                    &msg.content,
2725                    MemoryCategory::Conversation,
2726                    Some(&session_id),
2727                )
2728                .await;
2729        }
2730
2731        // Call the LLM
2732        match Box::pin(run_gateway_chat_with_tools(
2733            &state,
2734            &msg.content,
2735            Some(&session_id),
2736        ))
2737        .await
2738        {
2739            Ok(GatewayChatOutcome { response, .. }) => {
2740                // Send reply via WATI
2741                if let Err(e) = wati
2742                    .send(&SendMessage::new(response, &msg.reply_target))
2743                    .await
2744                {
2745                    ::zeroclaw_log::record!(
2746                        ERROR,
2747                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2748                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2749                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2750                        "Failed to send WATI reply"
2751                    );
2752                }
2753            }
2754            Err(e) => {
2755                let reply = if is_needs_onboarding_err(&e) {
2756                    ::zeroclaw_log::record!(
2757                        WARN,
2758                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2759                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2760                        "WATI chat refused: gateway has no model configured; \
2761                         visit /onboard"
2762                    );
2763                    needs_onboarding_channel_reply()
2764                } else {
2765                    ::zeroclaw_log::record!(
2766                        ERROR,
2767                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2768                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2769                            .with_attrs(
2770                                ::serde_json::json!({"channel": "wati", "error": format!("{}", e)})
2771                            ),
2772                        "LLM error"
2773                    );
2774                    "Sorry, I couldn't process your message right now.".to_string()
2775                };
2776                let _ = wati.send(&SendMessage::new(reply, &msg.reply_target)).await;
2777            }
2778        }
2779    }
2780
2781    // Acknowledge the webhook
2782    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2783}
2784
2785/// POST /nextcloud-talk — incoming message webhook (Nextcloud Talk bot API)
2786async fn handle_nextcloud_talk_webhook(
2787    State(state): State<AppState>,
2788    headers: HeaderMap,
2789    body: Bytes,
2790) -> impl IntoResponse {
2791    let Some(ref nextcloud_talk) = state.nextcloud_talk else {
2792        return (
2793            StatusCode::NOT_FOUND,
2794            Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
2795        );
2796    };
2797
2798    let body_str = String::from_utf8_lossy(&body);
2799
2800    // ── Security: Verify Nextcloud Talk HMAC signature if secret is configured ──
2801    if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
2802        let random = headers
2803            .get("X-Nextcloud-Talk-Random")
2804            .and_then(|v| v.to_str().ok())
2805            .unwrap_or("");
2806
2807        let signature = headers
2808            .get("X-Nextcloud-Talk-Signature")
2809            .and_then(|v| v.to_str().ok())
2810            .unwrap_or("");
2811
2812        if !zeroclaw_channels::nextcloud_talk::verify_nextcloud_talk_signature(
2813            webhook_secret,
2814            random,
2815            &body_str,
2816            signature,
2817        ) {
2818            ::zeroclaw_log::record!(
2819                WARN,
2820                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2821                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2822                &format!(
2823                    "Nextcloud Talk webhook signature verification failed (signature: {})",
2824                    if signature.is_empty() {
2825                        "missing"
2826                    } else {
2827                        "invalid"
2828                    }
2829                )
2830            );
2831            return (
2832                StatusCode::UNAUTHORIZED,
2833                Json(serde_json::json!({"error": "Invalid signature"})),
2834            );
2835        }
2836    }
2837
2838    // Parse JSON body
2839    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2840        return (
2841            StatusCode::BAD_REQUEST,
2842            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2843        );
2844    };
2845
2846    // Parse messages from webhook payload
2847    let messages = nextcloud_talk.parse_webhook_payload(&payload);
2848    if messages.is_empty() {
2849        // Acknowledge webhook even if payload does not contain actionable user messages.
2850        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2851    }
2852
2853    // Spawn per-message processing so the webhook returns 200 quickly.
2854    // Nextcloud Talk cancels webhook requests that don't complete within ~5s
2855    // (see #6156); slow local models routinely exceed that. Each message gets
2856    // its own task — the LLM call and reply are independent of the ack.
2857    for msg in messages {
2858        let state = state.clone();
2859        let nextcloud_talk = Arc::clone(nextcloud_talk);
2860        tokio::spawn(async move {
2861            ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "nextcloud_talk", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2862            let session_id = sender_session_id("nextcloud_talk", &msg);
2863
2864            if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2865                let key = nextcloud_talk_memory_key(&msg);
2866                let _ = state
2867                    .mem
2868                    .store(
2869                        &key,
2870                        &msg.content,
2871                        MemoryCategory::Conversation,
2872                        Some(&session_id),
2873                    )
2874                    .await;
2875            }
2876
2877            match Box::pin(run_gateway_chat_with_tools(
2878                &state,
2879                &msg.content,
2880                Some(&session_id),
2881            ))
2882            .await
2883            {
2884                Ok(GatewayChatOutcome { response, .. }) => {
2885                    if let Err(e) = nextcloud_talk
2886                        .send(&SendMessage::new(response, &msg.reply_target))
2887                        .await
2888                    {
2889                        ::zeroclaw_log::record!(
2890                            ERROR,
2891                            ::zeroclaw_log::Event::new(
2892                                module_path!(),
2893                                ::zeroclaw_log::Action::Fail
2894                            )
2895                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2896                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2897                            "Failed to send Nextcloud Talk reply"
2898                        );
2899                    }
2900                }
2901                Err(e) => {
2902                    let reply = if is_needs_onboarding_err(&e) {
2903                        ::zeroclaw_log::record!(
2904                            WARN,
2905                            ::zeroclaw_log::Event::new(
2906                                module_path!(),
2907                                ::zeroclaw_log::Action::Note
2908                            )
2909                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2910                            "Nextcloud Talk chat refused: gateway has no model configured; \
2911                             visit /onboard"
2912                        );
2913                        needs_onboarding_channel_reply()
2914                    } else {
2915                        ::zeroclaw_log::record!(ERROR, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail).with_outcome(::zeroclaw_log::EventOutcome::Failure).with_attrs(::serde_json::json!({"channel": "nextcloud_talk", "error": format!("{}", e)})), "LLM error");
2916                        "Sorry, I couldn't process your message right now.".to_string()
2917                    };
2918                    let _ = nextcloud_talk
2919                        .send(&SendMessage::new(reply, &msg.reply_target))
2920                        .await;
2921                }
2922            }
2923        });
2924    }
2925
2926    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2927}
2928
2929/// Maximum request body size for the Gmail webhook endpoint (1 MB).
2930/// Google Pub/Sub messages are typically under 10 KB.
2931const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
2932
2933/// POST /webhook/gmail — incoming Gmail Pub/Sub push notification
2934async fn handle_gmail_push_webhook(
2935    State(state): State<AppState>,
2936    headers: HeaderMap,
2937    body: Bytes,
2938) -> impl IntoResponse {
2939    let Some(ref gmail_push) = state.gmail_push else {
2940        return (
2941            StatusCode::NOT_FOUND,
2942            Json(serde_json::json!({"error": "Gmail push not configured"})),
2943        );
2944    };
2945
2946    // Enforce body size limit.
2947    if body.len() > GMAIL_WEBHOOK_MAX_BODY {
2948        return (
2949            StatusCode::PAYLOAD_TOO_LARGE,
2950            Json(serde_json::json!({"error": "Request body too large"})),
2951        );
2952    }
2953
2954    // Authenticate the webhook request using a shared secret.
2955    let secret = gmail_push.config.webhook_secret.clone();
2956    if !secret.is_empty() {
2957        let provided = headers
2958            .get(axum::http::header::AUTHORIZATION)
2959            .and_then(|v| v.to_str().ok())
2960            .and_then(|auth| auth.strip_prefix("Bearer "))
2961            .unwrap_or("");
2962
2963        if provided != secret {
2964            ::zeroclaw_log::record!(
2965                WARN,
2966                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2967                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2968                    .with_attrs(::serde_json::json!({"channel": "gmail_push"})),
2969                "webhook: unauthorized request"
2970            );
2971            return (
2972                StatusCode::UNAUTHORIZED,
2973                Json(serde_json::json!({"error": "Unauthorized"})),
2974            );
2975        }
2976    }
2977
2978    let body_str = String::from_utf8_lossy(&body);
2979    let envelope: zeroclaw_channels::gmail_push::PubSubEnvelope =
2980        match serde_json::from_str(&body_str) {
2981            Ok(e) => e,
2982            Err(e) => {
2983                ::zeroclaw_log::record!(
2984                WARN,
2985                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2986                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2987                    .with_attrs(
2988                        ::serde_json::json!({"error": format!("{}", e), "channel": "gmail_push"})
2989                    ),
2990                "webhook: invalid payload"
2991            );
2992                return (
2993                    StatusCode::BAD_REQUEST,
2994                    Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
2995                );
2996            }
2997        };
2998
2999    // Process the notification asynchronously (non-blocking for the webhook response)
3000    let channel = Arc::clone(gmail_push);
3001    tokio::spawn(async move {
3002        if let Err(e) = channel.handle_notification(&envelope).await {
3003            ::zeroclaw_log::record!(
3004                ERROR,
3005                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3006                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3007                    .with_attrs(
3008                        ::serde_json::json!({"channel": "gmail_push", "error": format!("{}", e)})
3009                    ),
3010                "push notification processing failed"
3011            );
3012        }
3013    });
3014
3015    // Acknowledge immediately — Google Pub/Sub requires a 2xx within ~10s
3016    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
3017}
3018
3019// ══════════════════════════════════════════════════════════════════════════════
3020// ADMIN HANDLERS (for CLI management)
3021// ══════════════════════════════════════════════════════════════════════════════
3022
3023/// Response for admin endpoints
3024#[derive(serde::Serialize)]
3025struct AdminResponse {
3026    success: bool,
3027    message: String,
3028}
3029
3030/// Reject requests that do not originate from a loopback address.
3031fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
3032    if peer.ip().is_loopback() {
3033        Ok(())
3034    } else {
3035        Err((
3036            StatusCode::FORBIDDEN,
3037            Json(serde_json::json!({
3038                "error": "Admin endpoints are restricted to localhost"
3039            })),
3040        ))
3041    }
3042}
3043
3044/// POST /admin/shutdown — graceful shutdown from CLI (localhost only)
3045async fn handle_admin_shutdown(
3046    State(state): State<AppState>,
3047    ConnectInfo(peer): ConnectInfo<SocketAddr>,
3048) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3049    require_localhost(&peer)?;
3050    ::zeroclaw_log::record!(
3051        INFO,
3052        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3053        "admin shutdown request received; initiating graceful shutdown"
3054    );
3055
3056    let body = AdminResponse {
3057        success: true,
3058        message: "Gateway shutdown initiated".to_string(),
3059    };
3060
3061    let _ = state.shutdown_tx.send(true);
3062
3063    Ok((StatusCode::OK, Json(body)))
3064}
3065
3066/// POST /admin/reload — reload the daemon in place (localhost only).
3067///
3068/// Sends `true` on the reload channel the daemon owns. The daemon's main
3069/// wait loop sees the change, returns `DaemonExit::Reload`, and the outer
3070/// loop in `src/main.rs` re-reads config from disk and re-runs
3071/// `daemon::run` — re-instantiating every subsystem (gateway / channels /
3072/// heartbeat / scheduler / mqtt) with the fresh config.
3073///
3074/// Same PID throughout. Brief HTTP downtime while the gateway listener
3075/// rebinds — typically sub-second. Clients should poll `/health` to detect
3076/// when the new instance is ready.
3077///
3078/// Cross-platform — works identically on Linux, macOS, and Windows because
3079/// the channel is in-process tokio, not an OS signal. The gateway-only
3080/// `zeroclaw gateway start` (no daemon supervisor) returns 503 with a
3081/// clear message because there's nothing to signal.
3082async fn handle_admin_reload(
3083    State(state): State<AppState>,
3084    ConnectInfo(peer): ConnectInfo<SocketAddr>,
3085) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3086    require_localhost(&peer)?;
3087
3088    let Some(reload_tx) = state.reload_tx.clone() else {
3089        return Err((
3090            StatusCode::SERVICE_UNAVAILABLE,
3091            Json(serde_json::json!({
3092                "error": "no daemon supervisor — running as standalone gateway. \
3093                          Restart the process to pick up config changes."
3094            })),
3095        ));
3096    };
3097
3098    ::zeroclaw_log::record!(
3099        INFO,
3100        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3101        "admin reload request received"
3102    );
3103    // Clear the pending-reload flag before the daemon supervisor brings up
3104    // the new gateway instance. The fresh instance starts with the flag
3105    // already false, matching its "subsystems just-loaded, no pending
3106    // changes" state.
3107    state
3108        .pending_reload
3109        .store(false, std::sync::atomic::Ordering::Relaxed);
3110    // Trigger graceful shutdown of THIS gateway instance's axum::serve so
3111    // its TcpListener releases the port before the daemon supervisor
3112    // spawns the new instance. Without this, daemon::run aborts the
3113    // gateway tokio task at the next await point — but the OLD listener
3114    // can stay bound briefly, racing the NEW gateway's bind. The new
3115    // bind then fails and spawn_component_supervisor backs off; in the
3116    // meantime the OLD gateway keeps serving requests with stale
3117    // in-memory config, and `/api/config/drift` reports drift against
3118    // disk because in-memory hasn't been replaced yet. Cold restart
3119    // (process exit + start) hits this path differently because the OS
3120    // fully releases the listener — that's why the user observes "shut
3121    // down + bring up = correct" but "/admin/reload = stale".
3122    let shutdown_tx = state.shutdown_tx.clone();
3123    // Brief delay so the HTTP response flushes before tear-down begins.
3124    tokio::spawn(async move {
3125        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3126        // Drain axum first so the listener releases.
3127        let _ = shutdown_tx.send(true);
3128        // Then signal the daemon to re-read disk and re-spawn subsystems.
3129        let _ = reload_tx.send(true);
3130    });
3131
3132    Ok((
3133        StatusCode::OK,
3134        Json(AdminResponse {
3135            success: true,
3136            message: "Daemon reload initiated".to_string(),
3137        }),
3138    ))
3139}
3140
3141/// GET /admin/paircode — fetch current pairing code (localhost only)
3142async fn handle_admin_paircode(
3143    State(state): State<AppState>,
3144    ConnectInfo(peer): ConnectInfo<SocketAddr>,
3145) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3146    require_localhost(&peer)?;
3147    let code = state.pairing.pairing_code();
3148
3149    let body = if let Some(c) = code {
3150        serde_json::json!({
3151            "success": true,
3152            "pairing_required": state.pairing.require_pairing(),
3153            "pairing_code": c,
3154            "message": "Use this one-time code to pair"
3155        })
3156    } else {
3157        serde_json::json!({
3158            "success": true,
3159            "pairing_required": state.pairing.require_pairing(),
3160            "pairing_code": null,
3161            "message": if state.pairing.require_pairing() {
3162                "Pairing is active but no new code available (already paired or code expired)"
3163            } else {
3164                "Pairing is disabled for this gateway"
3165            }
3166        })
3167    };
3168
3169    Ok((StatusCode::OK, Json(body)))
3170}
3171
3172/// POST /admin/paircode/new — generate a new pairing code (localhost only)
3173async fn handle_admin_paircode_new(
3174    State(state): State<AppState>,
3175    ConnectInfo(peer): ConnectInfo<SocketAddr>,
3176) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3177    require_localhost(&peer)?;
3178    match state.pairing.generate_new_pairing_code() {
3179        Some(code) => {
3180            ::zeroclaw_log::record!(
3181                INFO,
3182                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3183                "new pairing code generated via admin endpoint"
3184            );
3185            let body = serde_json::json!({
3186                "success": true,
3187                "pairing_required": state.pairing.require_pairing(),
3188                "pairing_code": code,
3189                "message": "New pairing code generated — use this one-time code to pair"
3190            });
3191            Ok((StatusCode::OK, Json(body)))
3192        }
3193        None => {
3194            let body = serde_json::json!({
3195                "success": false,
3196                "pairing_required": false,
3197                "pairing_code": null,
3198                "message": "Pairing is disabled for this gateway"
3199            });
3200            Ok((StatusCode::BAD_REQUEST, Json(body)))
3201        }
3202    }
3203}
3204
3205/// GET /pair/code — fetch the initial pairing code (no auth, no localhost restriction).
3206///
3207/// This endpoint is intentionally public so that Docker and remote users can see
3208/// the pairing code on the web dashboard without needing terminal access. It only
3209/// returns a code when the gateway is in its initial un-paired state (no devices
3210/// paired yet and a pairing code exists). Once the first device pairs, this
3211/// endpoint stops returning a code.
3212async fn handle_pair_code(State(state): State<AppState>) -> impl IntoResponse {
3213    let require = state.pairing.require_pairing();
3214    let is_paired = state.pairing.is_paired();
3215
3216    // Only expose the code during initial setup (before first pairing)
3217    let code = if require && !is_paired {
3218        state.pairing.pairing_code()
3219    } else {
3220        None
3221    };
3222
3223    let body = serde_json::json!({
3224        "success": true,
3225        "pairing_required": require,
3226        "pairing_code": code,
3227    });
3228
3229    (StatusCode::OK, Json(body))
3230}
3231
3232#[cfg(test)]
3233mod tests {
3234    use super::*;
3235    use async_trait::async_trait;
3236    use axum::http::HeaderValue;
3237    use axum::response::IntoResponse;
3238    use http_body_util::BodyExt;
3239    use parking_lot::{Mutex, RwLock};
3240    use std::sync::atomic::{AtomicUsize, Ordering};
3241    use zeroclaw_api::channel::ChannelMessage;
3242    use zeroclaw_memory::{Memory, MemoryCategory, MemoryEntry};
3243    use zeroclaw_providers::ModelProvider;
3244
3245    /// Generate a random hex secret at runtime to avoid hard-coded cryptographic values.
3246    fn generate_test_secret() -> String {
3247        let bytes: [u8; 32] = rand::random();
3248        hex::encode(bytes)
3249    }
3250
3251    #[test]
3252    fn security_body_limit_is_64kb() {
3253        assert_eq!(MAX_BODY_SIZE, 65_536);
3254    }
3255
3256    #[test]
3257    fn security_timeout_default_is_30_seconds() {
3258        assert_eq!(REQUEST_TIMEOUT_SECS, 30);
3259    }
3260
3261    #[test]
3262    fn gateway_timeout_uses_typed_config_default() {
3263        let cfg = zeroclaw_config::schema::GatewayConfig::default();
3264        assert_eq!(gateway_request_timeout_secs(&cfg), 30);
3265    }
3266
3267    #[test]
3268    fn paircode_recovery_command_includes_alternate_port() {
3269        assert_eq!(
3270            format_paircode_recovery_command("127.0.0.1", 42617),
3271            "zeroclaw gateway get-paircode --new --port 42617"
3272        );
3273    }
3274
3275    #[test]
3276    fn paircode_recovery_command_includes_specific_host_when_needed() {
3277        assert_eq!(
3278            format_paircode_recovery_command("192.168.1.20", 42617),
3279            "zeroclaw gateway get-paircode --new --port 42617 --host 192.168.1.20"
3280        );
3281    }
3282
3283    #[test]
3284    fn paircode_recovery_curl_targets_running_instance() {
3285        assert_eq!(
3286            format_paircode_recovery_curl("127.0.0.1", 42617, ""),
3287            "curl -s -X POST http://127.0.0.1:42617/admin/paircode/new"
3288        );
3289    }
3290
3291    #[test]
3292    fn paircode_recovery_curl_preserves_path_prefix() {
3293        assert_eq!(
3294            format_paircode_recovery_curl("127.0.0.1", 42617, "/gw"),
3295            "curl -s -X POST http://127.0.0.1:42617/gw/admin/paircode/new"
3296        );
3297    }
3298
3299    #[test]
3300    fn long_running_request_timeout_default_is_ten_minutes() {
3301        assert_eq!(LONG_RUNNING_REQUEST_TIMEOUT_SECS, 600);
3302    }
3303
3304    #[test]
3305    fn long_running_request_timeout_uses_typed_config_default() {
3306        let cfg = zeroclaw_config::schema::GatewayConfig::default();
3307        assert_eq!(gateway_long_running_request_timeout_secs(&cfg), 600);
3308    }
3309
3310    #[test]
3311    fn webhook_body_requires_message_field() {
3312        let valid = r#"{"message": "hello"}"#;
3313        let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
3314        assert!(parsed.is_ok());
3315        assert_eq!(parsed.unwrap().message, "hello");
3316
3317        let missing = r#"{"other": "field"}"#;
3318        let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
3319        assert!(parsed.is_err());
3320    }
3321
3322    #[test]
3323    fn whatsapp_query_fields_are_optional() {
3324        let q = WhatsAppVerifyQuery {
3325            mode: None,
3326            verify_token: None,
3327            challenge: None,
3328        };
3329        assert!(q.mode.is_none());
3330    }
3331
3332    #[test]
3333    fn app_state_is_clone() {
3334        fn assert_clone<T: Clone>() {}
3335        assert_clone::<AppState>();
3336    }
3337
3338    /// Regression: the gateway must boot with zero configured agents so
3339    /// a fresh install can reach `/admin/reload` and `/onboard` to add
3340    /// one. Earlier the boot path returned
3341    /// `gateway start requires at least one configured [agents.<alias>]
3342    /// entry`, which crashed the daemon supervisor before the reload
3343    /// channel could be exercised.
3344    #[tokio::test]
3345    async fn run_gateway_starts_with_zero_agents() {
3346        // Default Config has no [agents.*] entries — the exact shape
3347        // a fresh install presents on first daemon boot.
3348        let config = Config::default();
3349        assert!(
3350            config.agents.is_empty(),
3351            "regression assumes default Config has no agents",
3352        );
3353
3354        // Bind to an ephemeral port on loopback. If the boot path
3355        // erred on the agents-required check, the join would resolve
3356        // immediately with that Err. We race a short delay against
3357        // the spawn: a still-running task at the deadline means boot
3358        // got far enough to start serving.
3359        let handle =
3360            tokio::spawn(
3361                async move { run_gateway("127.0.0.1", 0, config, None, None, None).await },
3362            );
3363
3364        match tokio::time::timeout(
3365            std::time::Duration::from_millis(750),
3366            &mut Box::pin(async {
3367                // We cannot await `handle` directly because the gateway
3368                // never returns under normal operation; instead, peek at
3369                // whether it has finished by polling join with a tiny
3370                // budget.
3371                let _ = tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3372            }),
3373        )
3374        .await
3375        {
3376            Ok(()) => {}
3377            Err(_) => panic!("test setup timed out before checking gateway state"),
3378        }
3379
3380        // If the boot path errored, the task is finished and join
3381        // returns the error. If it's still running, abort and accept
3382        // boot reached the serving stage.
3383        if handle.is_finished() {
3384            let result = handle.await.expect("task did not panic");
3385            panic!(
3386                "gateway exited during boot with zero agents — must stay up for reload/onboard: {:?}",
3387                result
3388            );
3389        }
3390        handle.abort();
3391    }
3392
3393    /// Regression: the gateway must boot even when an enabled agent's
3394    /// `risk_profile` does not name a configured `risk_profiles` entry.
3395    /// Earlier the boot path used `config.risk_profile_for_agent(...).with_context(...)?`
3396    /// which propagated up through the daemon supervisor and crash-looped
3397    /// the gateway component, locking the operator out of `/admin/reload`
3398    /// and `/onboard` — the exact endpoints they need to fix the broken
3399    /// risk_profile reference. The fix degrades gracefully: warn,
3400    /// fall through to an empty tools registry, keep serving.
3401    #[tokio::test]
3402    async fn run_gateway_starts_with_unresolved_agent_risk_profile() {
3403        use zeroclaw_config::schema::AliasedAgentConfig;
3404
3405        let mut config = Config::default();
3406        // Enabled agent whose `risk_profile` does not resolve. No
3407        // matching [risk_profiles.<key>] entry exists.
3408        let agent = AliasedAgentConfig {
3409            enabled: true,
3410            risk_profile: "definitely_not_configured".to_string(),
3411            ..AliasedAgentConfig::default()
3412        };
3413        config.agents.insert("fake123".to_string(), agent);
3414
3415        let handle =
3416            tokio::spawn(
3417                async move { run_gateway("127.0.0.1", 0, config, None, None, None).await },
3418            );
3419
3420        match tokio::time::timeout(
3421            std::time::Duration::from_millis(750),
3422            &mut Box::pin(async {
3423                let _ = tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3424            }),
3425        )
3426        .await
3427        {
3428            Ok(()) => {}
3429            Err(_) => panic!("test setup timed out before checking gateway state"),
3430        }
3431
3432        if handle.is_finished() {
3433            let result = handle.await.expect("task did not panic");
3434            panic!(
3435                "gateway exited during boot when agent.risk_profile was unresolved \
3436                 — must stay up so operator can fix via /admin/reload or /onboard: {:?}",
3437                result
3438            );
3439        }
3440        handle.abort();
3441    }
3442
3443    #[tokio::test]
3444    async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
3445        let state = AppState {
3446            config: Arc::new(RwLock::new(Config::default())),
3447            model_provider: Arc::new(MockModelProvider::default()),
3448            model: "test-model".into(),
3449            temperature: None,
3450            mem: Arc::new(MockMemory),
3451            auto_save: false,
3452            webhook_secret_hash: None,
3453            pairing: Arc::new(PairingGuard::new(false, &[])),
3454            trust_forwarded_headers: false,
3455            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3456            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3457            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3458            whatsapp: None,
3459            whatsapp_app_secret: None,
3460            linq: None,
3461            linq_signing_secret: None,
3462            nextcloud_talk: None,
3463            nextcloud_talk_webhook_secret: None,
3464            wati: None,
3465            gmail_push: None,
3466            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
3467            tools_registry: Arc::new(Vec::new()),
3468            cost_tracker: None,
3469            event_tx: tokio::sync::broadcast::channel(16).0,
3470            event_buffer: Arc::new(sse::EventBuffer::new(16)),
3471            shutdown_tx: tokio::sync::watch::channel(false).0,
3472            reload_tx: None,
3473            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3474            path_prefix: String::new(),
3475            web_dist_dir: None,
3476            session_backend: None,
3477            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
3478                8, 30, 600,
3479            )),
3480            device_registry: None,
3481            pending_pairings: None,
3482            canvas_store: CanvasStore::new(),
3483            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
3484            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3485            #[cfg(feature = "webauthn")]
3486            webauthn: None,
3487        };
3488
3489        let response = handle_metrics(State(state)).await.into_response();
3490        assert_eq!(response.status(), StatusCode::OK);
3491        assert_eq!(
3492            response
3493                .headers()
3494                .get(header::CONTENT_TYPE)
3495                .and_then(|value| value.to_str().ok()),
3496            Some(PROMETHEUS_CONTENT_TYPE)
3497        );
3498
3499        let body = response.into_body().collect().await.unwrap().to_bytes();
3500        let text = String::from_utf8(body.to_vec()).unwrap();
3501        assert!(text.contains("Prometheus backend not enabled"));
3502    }
3503
3504    #[cfg(feature = "observability-prometheus")]
3505    #[tokio::test]
3506    async fn metrics_endpoint_renders_prometheus_output() {
3507        let event_tx = tokio::sync::broadcast::channel(16).0;
3508        let prom = zeroclaw_runtime::observability::PrometheusObserver::new();
3509        zeroclaw_runtime::observability::Observer::record_event(
3510            &prom,
3511            &zeroclaw_runtime::observability::ObserverEvent::HeartbeatTick,
3512        );
3513
3514        let observer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::new(prom);
3515        let state = AppState {
3516            config: Arc::new(RwLock::new(Config::default())),
3517            model_provider: Arc::new(MockModelProvider::default()),
3518            model: "test-model".into(),
3519            temperature: None,
3520            mem: Arc::new(MockMemory),
3521            auto_save: false,
3522            webhook_secret_hash: None,
3523            pairing: Arc::new(PairingGuard::new(false, &[])),
3524            trust_forwarded_headers: false,
3525            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3526            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3527            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3528            whatsapp: None,
3529            whatsapp_app_secret: None,
3530            linq: None,
3531            linq_signing_secret: None,
3532            nextcloud_talk: None,
3533            nextcloud_talk_webhook_secret: None,
3534            wati: None,
3535            gmail_push: None,
3536            observer,
3537            tools_registry: Arc::new(Vec::new()),
3538            cost_tracker: None,
3539            event_tx,
3540            event_buffer: Arc::new(sse::EventBuffer::new(16)),
3541            shutdown_tx: tokio::sync::watch::channel(false).0,
3542            reload_tx: None,
3543            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3544            path_prefix: String::new(),
3545            web_dist_dir: None,
3546            session_backend: None,
3547            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
3548                8, 30, 600,
3549            )),
3550            device_registry: None,
3551            pending_pairings: None,
3552            canvas_store: CanvasStore::new(),
3553            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
3554            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3555            #[cfg(feature = "webauthn")]
3556            webauthn: None,
3557        };
3558
3559        let response = handle_metrics(State(state)).await.into_response();
3560        assert_eq!(response.status(), StatusCode::OK);
3561
3562        let body = response.into_body().collect().await.unwrap().to_bytes();
3563        let text = String::from_utf8(body.to_vec()).unwrap();
3564        assert!(text.contains("zeroclaw_heartbeat_ticks_total 1"));
3565    }
3566
3567    #[test]
3568    fn gateway_rate_limiter_blocks_after_limit() {
3569        let limiter = GatewayRateLimiter::new(2, 2, 100);
3570        assert!(limiter.allow_pair("127.0.0.1"));
3571        assert!(limiter.allow_pair("127.0.0.1"));
3572        assert!(!limiter.allow_pair("127.0.0.1"));
3573    }
3574
3575    #[test]
3576    fn rate_limiter_sweep_removes_stale_entries() {
3577        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
3578        // Add entries for multiple IPs
3579        assert!(limiter.allow("ip-1"));
3580        assert!(limiter.allow("ip-2"));
3581        assert!(limiter.allow("ip-3"));
3582
3583        {
3584            let guard = limiter.requests.lock();
3585            assert_eq!(guard.0.len(), 3);
3586        }
3587
3588        // Force a sweep by backdating last_sweep
3589        {
3590            let mut guard = limiter.requests.lock();
3591            guard.1 = Instant::now()
3592                .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
3593                .unwrap();
3594            // Clear timestamps for ip-2 and ip-3 to simulate stale entries
3595            guard.0.get_mut("ip-2").unwrap().clear();
3596            guard.0.get_mut("ip-3").unwrap().clear();
3597        }
3598
3599        // Next allow() call should trigger sweep and remove stale entries
3600        assert!(limiter.allow("ip-1"));
3601
3602        {
3603            let guard = limiter.requests.lock();
3604            assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
3605            assert!(guard.0.contains_key("ip-1"));
3606        }
3607    }
3608
3609    #[test]
3610    fn rate_limiter_zero_limit_always_allows() {
3611        let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
3612        for _ in 0..100 {
3613            assert!(limiter.allow("any-key"));
3614        }
3615    }
3616
3617    #[test]
3618    fn idempotency_store_rejects_duplicate_key() {
3619        let store = IdempotencyStore::new(Duration::from_secs(30), 10);
3620        assert!(store.record_if_new("req-1"));
3621        assert!(!store.record_if_new("req-1"));
3622        assert!(store.record_if_new("req-2"));
3623    }
3624
3625    #[test]
3626    fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
3627        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
3628        assert!(limiter.allow("ip-1"));
3629        assert!(limiter.allow("ip-2"));
3630        assert!(limiter.allow("ip-3"));
3631
3632        let guard = limiter.requests.lock();
3633        assert_eq!(guard.0.len(), 2);
3634        assert!(guard.0.contains_key("ip-2"));
3635        assert!(guard.0.contains_key("ip-3"));
3636    }
3637
3638    #[test]
3639    fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
3640        let store = IdempotencyStore::new(Duration::from_secs(300), 2);
3641        assert!(store.record_if_new("k1"));
3642        std::thread::sleep(Duration::from_millis(2));
3643        assert!(store.record_if_new("k2"));
3644        std::thread::sleep(Duration::from_millis(2));
3645        assert!(store.record_if_new("k3"));
3646
3647        let keys = store.keys.lock();
3648        assert_eq!(keys.len(), 2);
3649        assert!(!keys.contains_key("k1"));
3650        assert!(keys.contains_key("k2"));
3651        assert!(keys.contains_key("k3"));
3652    }
3653
3654    #[test]
3655    fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
3656        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3657        let mut headers = HeaderMap::new();
3658        headers.insert(
3659            "X-Forwarded-For",
3660            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3661        );
3662
3663        let key = client_key_from_request(Some(peer), &headers, false);
3664        assert_eq!(key, "10.0.0.5");
3665    }
3666
3667    #[test]
3668    fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
3669        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3670        let mut headers = HeaderMap::new();
3671        headers.insert(
3672            "X-Forwarded-For",
3673            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3674        );
3675
3676        let key = client_key_from_request(Some(peer), &headers, true);
3677        assert_eq!(key, "198.51.100.10");
3678    }
3679
3680    #[test]
3681    fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
3682        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3683        let mut headers = HeaderMap::new();
3684        headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
3685
3686        let key = client_key_from_request(Some(peer), &headers, true);
3687        assert_eq!(key, "10.0.0.5");
3688    }
3689
3690    #[test]
3691    fn normalize_max_keys_uses_fallback_for_zero() {
3692        assert_eq!(normalize_max_keys(0, 10_000), 10_000);
3693        assert_eq!(normalize_max_keys(0, 0), 1);
3694    }
3695
3696    #[test]
3697    fn normalize_max_keys_preserves_nonzero_values() {
3698        assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
3699        assert_eq!(normalize_max_keys(1, 10_000), 1);
3700    }
3701
3702    #[tokio::test]
3703    async fn persist_pairing_tokens_writes_config_tokens() {
3704        let temp = tempfile::tempdir().unwrap();
3705        let config_path = temp.path().join("config.toml");
3706        let workspace_path = temp.path().join("workspace");
3707
3708        let config = Config {
3709            config_path: config_path.clone(),
3710            data_dir: workspace_path,
3711            ..Default::default()
3712        };
3713        config.save().await.unwrap();
3714
3715        let guard = PairingGuard::new(true, &[]);
3716        let code = guard.pairing_code().unwrap();
3717        let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
3718        assert!(guard.is_authenticated(&token));
3719
3720        let shared_config = Arc::new(RwLock::new(config));
3721        Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
3722            .await
3723            .unwrap();
3724
3725        // In-memory tokens should remain as plaintext 64-char hex hashes.
3726        let plaintext = {
3727            let in_memory = shared_config.read();
3728            assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
3729            in_memory.gateway.paired_tokens[0].clone()
3730        };
3731        assert_eq!(plaintext.len(), 64);
3732        assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
3733
3734        // On disk, the token should be encrypted (secrets.encrypt defaults to true).
3735        let saved = tokio::fs::read_to_string(config_path).await.unwrap();
3736        let raw_parsed: Config = toml::from_str(&saved).unwrap();
3737        assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
3738        let on_disk = &raw_parsed.gateway.paired_tokens[0];
3739        assert!(
3740            zeroclaw_runtime::security::SecretStore::is_encrypted(on_disk),
3741            "paired_token should be encrypted on disk"
3742        );
3743    }
3744
3745    #[test]
3746    fn webhook_memory_key_is_unique() {
3747        let key1 = webhook_memory_key();
3748        let key2 = webhook_memory_key();
3749
3750        assert!(key1.starts_with("webhook_msg_"));
3751        assert!(key2.starts_with("webhook_msg_"));
3752        assert_ne!(key1, key2);
3753    }
3754
3755    #[test]
3756    fn webhook_session_id_accepts_valid() {
3757        let mut headers = HeaderMap::new();
3758        headers.insert("X-Session-Id", HeaderValue::from_static("abc-DEF_123.foo"));
3759        assert_eq!(webhook_session_id(&headers), Some("abc-DEF_123.foo".into()));
3760    }
3761
3762    #[test]
3763    fn webhook_session_id_trims_whitespace() {
3764        let mut headers = HeaderMap::new();
3765        headers.insert("X-Session-Id", HeaderValue::from_static("  my-session  "));
3766        assert_eq!(webhook_session_id(&headers), Some("my-session".into()));
3767    }
3768
3769    #[test]
3770    fn webhook_session_id_rejects_empty() {
3771        let mut headers = HeaderMap::new();
3772        headers.insert("X-Session-Id", HeaderValue::from_static(""));
3773        assert_eq!(webhook_session_id(&headers), None);
3774
3775        headers.insert("X-Session-Id", HeaderValue::from_static("   "));
3776        assert_eq!(webhook_session_id(&headers), None);
3777    }
3778
3779    #[test]
3780    fn webhook_session_id_rejects_missing() {
3781        let headers = HeaderMap::new();
3782        assert_eq!(webhook_session_id(&headers), None);
3783    }
3784
3785    #[test]
3786    fn webhook_session_id_rejects_oversized() {
3787        let mut headers = HeaderMap::new();
3788        let long = "a".repeat(129);
3789        headers.insert("X-Session-Id", HeaderValue::from_str(&long).unwrap());
3790        assert_eq!(webhook_session_id(&headers), None);
3791
3792        let at_limit = "b".repeat(128);
3793        headers.insert("X-Session-Id", HeaderValue::from_str(&at_limit).unwrap());
3794        assert!(webhook_session_id(&headers).is_some());
3795    }
3796
3797    #[test]
3798    fn webhook_session_id_rejects_invalid_chars() {
3799        let mut headers = HeaderMap::new();
3800        for bad in &[
3801            "has/slash",
3802            "has:colon",
3803            "has space",
3804            "has@at",
3805            "emoji\u{1f600}",
3806        ] {
3807            if let Ok(val) = HeaderValue::from_str(bad) {
3808                headers.insert("X-Session-Id", val);
3809                assert_eq!(webhook_session_id(&headers), None, "should reject: {bad}");
3810            }
3811        }
3812    }
3813
3814    #[test]
3815    fn whatsapp_memory_key_includes_sender_and_message_id() {
3816        let msg = ChannelMessage {
3817            id: "wamid-123".into(),
3818            sender: "+1234567890".into(),
3819            reply_target: "+1234567890".into(),
3820            content: "hello".into(),
3821            channel: "whatsapp".into(),
3822            channel_alias: None,
3823            timestamp: 1,
3824            thread_ts: None,
3825            interruption_scope_id: None,
3826            attachments: vec![],
3827            subject: None,
3828        };
3829
3830        let key = whatsapp_memory_key(&msg);
3831        assert_eq!(key, "whatsapp_+1234567890_wamid-123");
3832    }
3833
3834    #[derive(Default)]
3835    struct MockMemory;
3836
3837    #[async_trait]
3838    impl Memory for MockMemory {
3839        fn name(&self) -> &str {
3840            "mock"
3841        }
3842
3843        async fn store(
3844            &self,
3845            _key: &str,
3846            _content: &str,
3847            _category: MemoryCategory,
3848            _session_id: Option<&str>,
3849        ) -> anyhow::Result<()> {
3850            Ok(())
3851        }
3852
3853        async fn recall(
3854            &self,
3855            _query: &str,
3856            _limit: usize,
3857            _session_id: Option<&str>,
3858            _since: Option<&str>,
3859            _until: Option<&str>,
3860        ) -> anyhow::Result<Vec<MemoryEntry>> {
3861            Ok(Vec::new())
3862        }
3863
3864        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3865            Ok(None)
3866        }
3867
3868        async fn list(
3869            &self,
3870            _category: Option<&MemoryCategory>,
3871            _session_id: Option<&str>,
3872        ) -> anyhow::Result<Vec<MemoryEntry>> {
3873            Ok(Vec::new())
3874        }
3875
3876        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3877            Ok(false)
3878        }
3879
3880        async fn forget_for_agent(&self, _key: &str, _agent_id: &str) -> anyhow::Result<bool> {
3881            Ok(false)
3882        }
3883
3884        async fn count(&self) -> anyhow::Result<usize> {
3885            Ok(0)
3886        }
3887
3888        async fn health_check(&self) -> bool {
3889            true
3890        }
3891
3892        async fn store_with_agent(
3893            &self,
3894            _key: &str,
3895            _content: &str,
3896            _category: MemoryCategory,
3897            _session_id: Option<&str>,
3898            _namespace: Option<&str>,
3899            _importance: Option<f64>,
3900            _agent_id: Option<&str>,
3901        ) -> anyhow::Result<()> {
3902            Ok(())
3903        }
3904
3905        async fn recall_for_agents(
3906            &self,
3907            _allowed_agent_ids: &[&str],
3908            _query: &str,
3909            _limit: usize,
3910            _session_id: Option<&str>,
3911            _since: Option<&str>,
3912            _until: Option<&str>,
3913        ) -> anyhow::Result<Vec<MemoryEntry>> {
3914            Ok(Vec::new())
3915        }
3916    }
3917    impl ::zeroclaw_api::attribution::Attributable for MockMemory {
3918        fn role(&self) -> ::zeroclaw_api::attribution::Role {
3919            ::zeroclaw_api::attribution::Role::Memory(
3920                ::zeroclaw_api::attribution::MemoryKind::InMemory,
3921            )
3922        }
3923        fn alias(&self) -> &str {
3924            "MockMemory"
3925        }
3926    }
3927
3928    #[derive(Default)]
3929    struct MockModelProvider {
3930        calls: AtomicUsize,
3931    }
3932
3933    #[async_trait]
3934    impl ModelProvider for MockModelProvider {
3935        async fn chat_with_system(
3936            &self,
3937            _system_prompt: Option<&str>,
3938            _message: &str,
3939            _model: &str,
3940            _temperature: Option<f64>,
3941        ) -> anyhow::Result<String> {
3942            self.calls.fetch_add(1, Ordering::SeqCst);
3943            Ok("ok".into())
3944        }
3945    }
3946    impl ::zeroclaw_api::attribution::Attributable for MockModelProvider {
3947        fn role(&self) -> ::zeroclaw_api::attribution::Role {
3948            ::zeroclaw_api::attribution::Role::Provider(
3949                ::zeroclaw_api::attribution::ProviderKind::Model(
3950                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3951                ),
3952            )
3953        }
3954        fn alias(&self) -> &str {
3955            "MockModelProvider"
3956        }
3957    }
3958
3959    #[derive(Default)]
3960    struct TrackingMemory {
3961        keys: Mutex<Vec<String>>,
3962    }
3963
3964    #[async_trait]
3965    impl Memory for TrackingMemory {
3966        fn name(&self) -> &str {
3967            "tracking"
3968        }
3969
3970        async fn store(
3971            &self,
3972            key: &str,
3973            _content: &str,
3974            _category: MemoryCategory,
3975            _session_id: Option<&str>,
3976        ) -> anyhow::Result<()> {
3977            self.keys.lock().push(key.to_string());
3978            Ok(())
3979        }
3980
3981        async fn recall(
3982            &self,
3983            _query: &str,
3984            _limit: usize,
3985            _session_id: Option<&str>,
3986            _since: Option<&str>,
3987            _until: Option<&str>,
3988        ) -> anyhow::Result<Vec<MemoryEntry>> {
3989            Ok(Vec::new())
3990        }
3991
3992        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3993            Ok(None)
3994        }
3995
3996        async fn list(
3997            &self,
3998            _category: Option<&MemoryCategory>,
3999            _session_id: Option<&str>,
4000        ) -> anyhow::Result<Vec<MemoryEntry>> {
4001            Ok(Vec::new())
4002        }
4003
4004        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
4005            Ok(false)
4006        }
4007
4008        async fn forget_for_agent(&self, _key: &str, _agent_id: &str) -> anyhow::Result<bool> {
4009            Ok(false)
4010        }
4011
4012        async fn count(&self) -> anyhow::Result<usize> {
4013            let size = self.keys.lock().len();
4014            Ok(size)
4015        }
4016
4017        async fn health_check(&self) -> bool {
4018            true
4019        }
4020
4021        async fn store_with_agent(
4022            &self,
4023            key: &str,
4024            content: &str,
4025            category: MemoryCategory,
4026            session_id: Option<&str>,
4027            _namespace: Option<&str>,
4028            _importance: Option<f64>,
4029            _agent_id: Option<&str>,
4030        ) -> anyhow::Result<()> {
4031            self.store(key, content, category, session_id).await
4032        }
4033
4034        async fn recall_for_agents(
4035            &self,
4036            _allowed_agent_ids: &[&str],
4037            _query: &str,
4038            _limit: usize,
4039            _session_id: Option<&str>,
4040            _since: Option<&str>,
4041            _until: Option<&str>,
4042        ) -> anyhow::Result<Vec<MemoryEntry>> {
4043            Ok(Vec::new())
4044        }
4045    }
4046    impl ::zeroclaw_api::attribution::Attributable for TrackingMemory {
4047        fn role(&self) -> ::zeroclaw_api::attribution::Role {
4048            ::zeroclaw_api::attribution::Role::Memory(
4049                ::zeroclaw_api::attribution::MemoryKind::InMemory,
4050            )
4051        }
4052        fn alias(&self) -> &str {
4053            "TrackingMemory"
4054        }
4055    }
4056
4057    fn test_connect_info() -> ConnectInfo<SocketAddr> {
4058        ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
4059    }
4060
4061    #[tokio::test]
4062    async fn webhook_idempotency_skips_duplicate_provider_calls() {
4063        let provider_impl = Arc::new(MockModelProvider::default());
4064        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4065        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4066
4067        let state = AppState {
4068            config: Arc::new(RwLock::new(Config::default())),
4069            model_provider,
4070            model: "test-model".into(),
4071            temperature: None,
4072            mem: memory,
4073            auto_save: false,
4074            webhook_secret_hash: None,
4075            pairing: Arc::new(PairingGuard::new(false, &[])),
4076            trust_forwarded_headers: false,
4077            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4078            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4079            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4080            whatsapp: None,
4081            whatsapp_app_secret: None,
4082            linq: None,
4083            linq_signing_secret: None,
4084            nextcloud_talk: None,
4085            nextcloud_talk_webhook_secret: None,
4086            wati: None,
4087            gmail_push: None,
4088            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4089            tools_registry: Arc::new(Vec::new()),
4090            cost_tracker: None,
4091            event_tx: tokio::sync::broadcast::channel(16).0,
4092            event_buffer: Arc::new(sse::EventBuffer::new(16)),
4093            shutdown_tx: tokio::sync::watch::channel(false).0,
4094            reload_tx: None,
4095            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4096            path_prefix: String::new(),
4097            web_dist_dir: None,
4098            session_backend: None,
4099            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4100                8, 30, 600,
4101            )),
4102            device_registry: None,
4103            pending_pairings: None,
4104            canvas_store: CanvasStore::new(),
4105            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4106            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4107            #[cfg(feature = "webauthn")]
4108            webauthn: None,
4109        };
4110
4111        let mut headers = HeaderMap::new();
4112        headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
4113
4114        let body = Ok(Json(WebhookBody {
4115            message: "hello".into(),
4116        }));
4117        let first = handle_webhook(
4118            State(state.clone()),
4119            test_connect_info(),
4120            headers.clone(),
4121            body,
4122        )
4123        .await
4124        .into_response();
4125        assert_eq!(first.status(), StatusCode::OK);
4126
4127        let body = Ok(Json(WebhookBody {
4128            message: "hello".into(),
4129        }));
4130        let second = handle_webhook(State(state), test_connect_info(), headers, body)
4131            .await
4132            .into_response();
4133        assert_eq!(second.status(), StatusCode::OK);
4134
4135        let payload = second.into_body().collect().await.unwrap().to_bytes();
4136        let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
4137        assert_eq!(parsed["status"], "duplicate");
4138        assert_eq!(parsed["idempotent"], true);
4139        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
4140    }
4141
4142    #[tokio::test]
4143    async fn webhook_autosave_stores_distinct_keys_per_request() {
4144        let provider_impl = Arc::new(MockModelProvider::default());
4145        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4146
4147        let tracking_impl = Arc::new(TrackingMemory::default());
4148        let memory: Arc<dyn Memory> = tracking_impl.clone();
4149
4150        let state = AppState {
4151            config: Arc::new(RwLock::new(Config::default())),
4152            model_provider,
4153            model: "test-model".into(),
4154            temperature: None,
4155            mem: memory,
4156            auto_save: true,
4157            webhook_secret_hash: None,
4158            pairing: Arc::new(PairingGuard::new(false, &[])),
4159            trust_forwarded_headers: false,
4160            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4161            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4162            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4163            whatsapp: None,
4164            whatsapp_app_secret: None,
4165            linq: None,
4166            linq_signing_secret: None,
4167            nextcloud_talk: None,
4168            nextcloud_talk_webhook_secret: None,
4169            wati: None,
4170            gmail_push: None,
4171            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4172            tools_registry: Arc::new(Vec::new()),
4173            cost_tracker: None,
4174            event_tx: tokio::sync::broadcast::channel(16).0,
4175            event_buffer: Arc::new(sse::EventBuffer::new(16)),
4176            shutdown_tx: tokio::sync::watch::channel(false).0,
4177            reload_tx: None,
4178            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4179            path_prefix: String::new(),
4180            web_dist_dir: None,
4181            session_backend: None,
4182            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4183                8, 30, 600,
4184            )),
4185            device_registry: None,
4186            pending_pairings: None,
4187            canvas_store: CanvasStore::new(),
4188            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4189            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4190            #[cfg(feature = "webauthn")]
4191            webauthn: None,
4192        };
4193
4194        let headers = HeaderMap::new();
4195
4196        let body1 = Ok(Json(WebhookBody {
4197            message: "hello one".into(),
4198        }));
4199        let first = handle_webhook(
4200            State(state.clone()),
4201            test_connect_info(),
4202            headers.clone(),
4203            body1,
4204        )
4205        .await
4206        .into_response();
4207        assert_eq!(first.status(), StatusCode::OK);
4208
4209        let body2 = Ok(Json(WebhookBody {
4210            message: "hello two".into(),
4211        }));
4212        let second = handle_webhook(State(state), test_connect_info(), headers, body2)
4213            .await
4214            .into_response();
4215        assert_eq!(second.status(), StatusCode::OK);
4216
4217        let keys = tracking_impl.keys.lock().clone();
4218        assert_eq!(keys.len(), 2);
4219        assert_ne!(keys[0], keys[1]);
4220        assert!(keys[0].starts_with("webhook_msg_"));
4221        assert!(keys[1].starts_with("webhook_msg_"));
4222        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
4223    }
4224
4225    #[test]
4226    fn webhook_secret_hash_is_deterministic_and_nonempty() {
4227        let secret_a = generate_test_secret();
4228        let secret_b = generate_test_secret();
4229        let one = hash_webhook_secret(&secret_a);
4230        let two = hash_webhook_secret(&secret_a);
4231        let other = hash_webhook_secret(&secret_b);
4232
4233        assert_eq!(one, two);
4234        assert_ne!(one, other);
4235        assert_eq!(one.len(), 64);
4236    }
4237
4238    #[tokio::test]
4239    async fn webhook_secret_hash_rejects_missing_header() {
4240        let provider_impl = Arc::new(MockModelProvider::default());
4241        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4242        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4243        let secret = generate_test_secret();
4244
4245        let state = AppState {
4246            config: Arc::new(RwLock::new(Config::default())),
4247            model_provider,
4248            model: "test-model".into(),
4249            temperature: None,
4250            mem: memory,
4251            auto_save: false,
4252            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
4253            pairing: Arc::new(PairingGuard::new(false, &[])),
4254            trust_forwarded_headers: false,
4255            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4256            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4257            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4258            whatsapp: None,
4259            whatsapp_app_secret: None,
4260            linq: None,
4261            linq_signing_secret: None,
4262            nextcloud_talk: None,
4263            nextcloud_talk_webhook_secret: None,
4264            wati: None,
4265            gmail_push: None,
4266            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4267            tools_registry: Arc::new(Vec::new()),
4268            cost_tracker: None,
4269            event_tx: tokio::sync::broadcast::channel(16).0,
4270            event_buffer: Arc::new(sse::EventBuffer::new(16)),
4271            shutdown_tx: tokio::sync::watch::channel(false).0,
4272            reload_tx: None,
4273            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4274            path_prefix: String::new(),
4275            web_dist_dir: None,
4276            session_backend: None,
4277            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4278                8, 30, 600,
4279            )),
4280            device_registry: None,
4281            pending_pairings: None,
4282            canvas_store: CanvasStore::new(),
4283            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4284            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4285            #[cfg(feature = "webauthn")]
4286            webauthn: None,
4287        };
4288
4289        let response = handle_webhook(
4290            State(state),
4291            test_connect_info(),
4292            HeaderMap::new(),
4293            Ok(Json(WebhookBody {
4294                message: "hello".into(),
4295            })),
4296        )
4297        .await
4298        .into_response();
4299
4300        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
4301        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
4302    }
4303
4304    #[tokio::test]
4305    async fn webhook_secret_hash_rejects_invalid_header() {
4306        let provider_impl = Arc::new(MockModelProvider::default());
4307        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4308        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4309        let valid_secret = generate_test_secret();
4310        let wrong_secret = generate_test_secret();
4311
4312        let state = AppState {
4313            config: Arc::new(RwLock::new(Config::default())),
4314            model_provider,
4315            model: "test-model".into(),
4316            temperature: None,
4317            mem: memory,
4318            auto_save: false,
4319            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
4320            pairing: Arc::new(PairingGuard::new(false, &[])),
4321            trust_forwarded_headers: false,
4322            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4323            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4324            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4325            whatsapp: None,
4326            whatsapp_app_secret: None,
4327            linq: None,
4328            linq_signing_secret: None,
4329            nextcloud_talk: None,
4330            nextcloud_talk_webhook_secret: None,
4331            wati: None,
4332            gmail_push: None,
4333            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4334            tools_registry: Arc::new(Vec::new()),
4335            cost_tracker: None,
4336            event_tx: tokio::sync::broadcast::channel(16).0,
4337            event_buffer: Arc::new(sse::EventBuffer::new(16)),
4338            shutdown_tx: tokio::sync::watch::channel(false).0,
4339            reload_tx: None,
4340            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4341            path_prefix: String::new(),
4342            web_dist_dir: None,
4343            session_backend: None,
4344            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4345                8, 30, 600,
4346            )),
4347            device_registry: None,
4348            pending_pairings: None,
4349            canvas_store: CanvasStore::new(),
4350            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4351            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4352            #[cfg(feature = "webauthn")]
4353            webauthn: None,
4354        };
4355
4356        let mut headers = HeaderMap::new();
4357        headers.insert(
4358            "X-Webhook-Secret",
4359            HeaderValue::from_str(&wrong_secret).unwrap(),
4360        );
4361
4362        let response = handle_webhook(
4363            State(state),
4364            test_connect_info(),
4365            headers,
4366            Ok(Json(WebhookBody {
4367                message: "hello".into(),
4368            })),
4369        )
4370        .await
4371        .into_response();
4372
4373        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
4374        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
4375    }
4376
4377    #[tokio::test]
4378    async fn webhook_secret_hash_accepts_valid_header() {
4379        let provider_impl = Arc::new(MockModelProvider::default());
4380        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4381        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4382        let secret = generate_test_secret();
4383
4384        let state = AppState {
4385            config: Arc::new(RwLock::new(Config::default())),
4386            model_provider,
4387            model: "test-model".into(),
4388            temperature: None,
4389            mem: memory,
4390            auto_save: false,
4391            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
4392            pairing: Arc::new(PairingGuard::new(false, &[])),
4393            trust_forwarded_headers: false,
4394            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4395            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4396            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4397            whatsapp: None,
4398            whatsapp_app_secret: None,
4399            linq: None,
4400            linq_signing_secret: None,
4401            nextcloud_talk: None,
4402            nextcloud_talk_webhook_secret: None,
4403            wati: None,
4404            gmail_push: None,
4405            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4406            tools_registry: Arc::new(Vec::new()),
4407            cost_tracker: None,
4408            event_tx: tokio::sync::broadcast::channel(16).0,
4409            event_buffer: Arc::new(sse::EventBuffer::new(16)),
4410            shutdown_tx: tokio::sync::watch::channel(false).0,
4411            reload_tx: None,
4412            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4413            path_prefix: String::new(),
4414            web_dist_dir: None,
4415            session_backend: None,
4416            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4417                8, 30, 600,
4418            )),
4419            device_registry: None,
4420            pending_pairings: None,
4421            canvas_store: CanvasStore::new(),
4422            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4423            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4424            #[cfg(feature = "webauthn")]
4425            webauthn: None,
4426        };
4427
4428        let mut headers = HeaderMap::new();
4429        headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
4430
4431        let response = handle_webhook(
4432            State(state),
4433            test_connect_info(),
4434            headers,
4435            Ok(Json(WebhookBody {
4436                message: "hello".into(),
4437            })),
4438        )
4439        .await
4440        .into_response();
4441
4442        assert_eq!(response.status(), StatusCode::OK);
4443        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
4444    }
4445
4446    fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
4447        use hmac::{Hmac, Mac};
4448        use sha2::Sha256;
4449
4450        let payload = format!("{random}{body}");
4451        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
4452        mac.update(payload.as_bytes());
4453        hex::encode(mac.finalize().into_bytes())
4454    }
4455
4456    #[tokio::test]
4457    async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
4458        let model_provider: Arc<dyn ModelProvider> = Arc::new(MockModelProvider::default());
4459        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4460
4461        let state = AppState {
4462            config: Arc::new(RwLock::new(Config::default())),
4463            model_provider,
4464            model: "test-model".into(),
4465            temperature: None,
4466            mem: memory,
4467            auto_save: false,
4468            webhook_secret_hash: None,
4469            pairing: Arc::new(PairingGuard::new(false, &[])),
4470            trust_forwarded_headers: false,
4471            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4472            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4473            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4474            whatsapp: None,
4475            whatsapp_app_secret: None,
4476            linq: None,
4477            linq_signing_secret: None,
4478            nextcloud_talk: None,
4479            nextcloud_talk_webhook_secret: None,
4480            wati: None,
4481            gmail_push: None,
4482            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4483            tools_registry: Arc::new(Vec::new()),
4484            cost_tracker: None,
4485            event_tx: tokio::sync::broadcast::channel(16).0,
4486            event_buffer: Arc::new(sse::EventBuffer::new(16)),
4487            shutdown_tx: tokio::sync::watch::channel(false).0,
4488            reload_tx: None,
4489            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4490            path_prefix: String::new(),
4491            web_dist_dir: None,
4492            session_backend: None,
4493            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4494                8, 30, 600,
4495            )),
4496            device_registry: None,
4497            pending_pairings: None,
4498            canvas_store: CanvasStore::new(),
4499            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4500            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4501            #[cfg(feature = "webauthn")]
4502            webauthn: None,
4503        };
4504
4505        let response = Box::pin(handle_nextcloud_talk_webhook(
4506            State(state),
4507            HeaderMap::new(),
4508            Bytes::from_static(br#"{"type":"message"}"#),
4509        ))
4510        .await
4511        .into_response();
4512
4513        assert_eq!(response.status(), StatusCode::NOT_FOUND);
4514    }
4515
4516    #[tokio::test]
4517    async fn nextcloud_talk_webhook_rejects_invalid_signature() {
4518        let provider_impl = Arc::new(MockModelProvider::default());
4519        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4520        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4521
4522        let alias = "nextcloud_talk_test_alias";
4523        let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = Arc::new(Vec::new);
4524        let channel = Arc::new(NextcloudTalkChannel::new(
4525            "https://cloud.example.com".into(),
4526            "app-token".into(),
4527            String::new(),
4528            alias,
4529            peer_resolver,
4530        ));
4531
4532        let secret = "nextcloud-test-secret";
4533        let random = "seed-value";
4534        let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
4535        let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
4536        let invalid_signature = "deadbeef";
4537
4538        let state = AppState {
4539            config: Arc::new(RwLock::new(Config::default())),
4540            model_provider,
4541            model: "test-model".into(),
4542            temperature: None,
4543            mem: memory,
4544            auto_save: false,
4545            webhook_secret_hash: None,
4546            pairing: Arc::new(PairingGuard::new(false, &[])),
4547            trust_forwarded_headers: false,
4548            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4549            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4550            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4551            whatsapp: None,
4552            whatsapp_app_secret: None,
4553            linq: None,
4554            linq_signing_secret: None,
4555            nextcloud_talk: Some(channel),
4556            nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
4557            wati: None,
4558            gmail_push: None,
4559            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4560            tools_registry: Arc::new(Vec::new()),
4561            cost_tracker: None,
4562            event_tx: tokio::sync::broadcast::channel(16).0,
4563            event_buffer: Arc::new(sse::EventBuffer::new(16)),
4564            shutdown_tx: tokio::sync::watch::channel(false).0,
4565            reload_tx: None,
4566            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4567            path_prefix: String::new(),
4568            web_dist_dir: None,
4569            session_backend: None,
4570            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4571                8, 30, 600,
4572            )),
4573            device_registry: None,
4574            pending_pairings: None,
4575            canvas_store: CanvasStore::new(),
4576            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4577            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4578            #[cfg(feature = "webauthn")]
4579            webauthn: None,
4580        };
4581
4582        let mut headers = HeaderMap::new();
4583        headers.insert(
4584            "X-Nextcloud-Talk-Random",
4585            HeaderValue::from_str(random).unwrap(),
4586        );
4587        headers.insert(
4588            "X-Nextcloud-Talk-Signature",
4589            HeaderValue::from_str(invalid_signature).unwrap(),
4590        );
4591
4592        let response = Box::pin(handle_nextcloud_talk_webhook(
4593            State(state),
4594            headers,
4595            Bytes::from(body),
4596        ))
4597        .await
4598        .into_response();
4599        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
4600        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
4601    }
4602
4603    // Regression for #6156: handler must return 200 OK before the (potentially
4604    // slow) LLM call completes, so Nextcloud Talk doesn't cancel the webhook
4605    // request at its ~5s timeout.
4606    #[derive(Default)]
4607    struct SlowProvider {
4608        calls: AtomicUsize,
4609        started_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
4610    }
4611
4612    #[async_trait]
4613    impl ModelProvider for SlowProvider {
4614        async fn chat_with_system(
4615            &self,
4616            _system_prompt: Option<&str>,
4617            _message: &str,
4618            _model: &str,
4619            _temperature: Option<f64>,
4620        ) -> anyhow::Result<String> {
4621            self.calls.fetch_add(1, Ordering::SeqCst);
4622            if let Some(tx) = self.started_tx.lock().take() {
4623                let _ = tx.send(());
4624            }
4625            tokio::time::sleep(Duration::from_secs(30)).await;
4626            Ok("slow ok".into())
4627        }
4628    }
4629    impl ::zeroclaw_api::attribution::Attributable for SlowProvider {
4630        fn role(&self) -> ::zeroclaw_api::attribution::Role {
4631            ::zeroclaw_api::attribution::Role::Provider(
4632                ::zeroclaw_api::attribution::ProviderKind::Model(
4633                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
4634                ),
4635            )
4636        }
4637        fn alias(&self) -> &str {
4638            "SlowProvider"
4639        }
4640    }
4641
4642    #[tokio::test]
4643    async fn nextcloud_talk_webhook_returns_before_llm_call_completes() {
4644        let (started_tx, started_rx) = tokio::sync::oneshot::channel();
4645        let provider_impl = Arc::new(SlowProvider {
4646            calls: AtomicUsize::new(0),
4647            started_tx: Mutex::new(Some(started_tx)),
4648        });
4649        let provider: Arc<dyn ModelProvider> = provider_impl.clone();
4650        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4651
4652        let channel = Arc::new(NextcloudTalkChannel::new(
4653            "https://cloud.example.com".into(),
4654            "app-token".into(),
4655            String::new(),
4656            "default",
4657            Arc::new(|| vec!["*".to_string()]),
4658        ));
4659
4660        let body = r#"{"type":"message","object":{"token":"room-token"},"actor":{"id":"user_a","name":"User A"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
4661
4662        let state = AppState {
4663            config: Arc::new(RwLock::new(Config::default())),
4664            model_provider: provider,
4665            model: "test-model".into(),
4666            temperature: None,
4667            mem: memory,
4668            auto_save: false,
4669            webhook_secret_hash: None,
4670            pairing: Arc::new(PairingGuard::new(false, &[])),
4671            trust_forwarded_headers: false,
4672            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4673            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4674            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4675            whatsapp: None,
4676            whatsapp_app_secret: None,
4677            linq: None,
4678            linq_signing_secret: None,
4679            nextcloud_talk: Some(channel),
4680            nextcloud_talk_webhook_secret: None,
4681            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4682            wati: None,
4683            gmail_push: None,
4684            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4685            tools_registry: Arc::new(Vec::new()),
4686            cost_tracker: None,
4687            event_tx: tokio::sync::broadcast::channel(16).0,
4688            event_buffer: Arc::new(sse::EventBuffer::new(16)),
4689            shutdown_tx: tokio::sync::watch::channel(false).0,
4690            reload_tx: None,
4691            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4692            path_prefix: String::new(),
4693            web_dist_dir: None,
4694            session_backend: None,
4695            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4696                8, 30, 600,
4697            )),
4698            device_registry: None,
4699            pending_pairings: None,
4700            canvas_store: CanvasStore::new(),
4701            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4702            #[cfg(feature = "webauthn")]
4703            webauthn: None,
4704        };
4705
4706        let start = std::time::Instant::now();
4707        let response = tokio::time::timeout(
4708            Duration::from_secs(2),
4709            Box::pin(handle_nextcloud_talk_webhook(
4710                State(state),
4711                HeaderMap::new(),
4712                Bytes::from(body),
4713            )),
4714        )
4715        .await
4716        .expect("webhook must return before 2s deadline (regression #6156)")
4717        .into_response();
4718
4719        let elapsed = start.elapsed();
4720        assert_eq!(response.status(), StatusCode::OK);
4721        assert!(
4722            elapsed < Duration::from_secs(2),
4723            "handler returned after {elapsed:?}; expected fast return for #6156"
4724        );
4725
4726        // Confirm the spawned task actually started the LLM call (i.e., the
4727        // ack didn't just skip processing). The 30s sleep is still in flight.
4728        tokio::time::timeout(Duration::from_secs(2), started_rx)
4729            .await
4730            .expect("spawned LLM call did not start within 2s")
4731            .expect("started_tx sender was dropped");
4732        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
4733    }
4734
4735    // ══════════════════════════════════════════════════════════
4736    // WhatsApp Signature Verification Tests (CWE-345 Prevention)
4737    // ══════════════════════════════════════════════════════════
4738
4739    fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
4740        use hmac::{Hmac, Mac};
4741        use sha2::Sha256;
4742
4743        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
4744        mac.update(body);
4745        hex::encode(mac.finalize().into_bytes())
4746    }
4747
4748    fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
4749        format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
4750    }
4751
4752    #[test]
4753    fn whatsapp_signature_valid() {
4754        let app_secret = generate_test_secret();
4755        let body = b"test body content";
4756
4757        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4758
4759        assert!(verify_whatsapp_signature(
4760            &app_secret,
4761            body,
4762            &signature_header
4763        ));
4764    }
4765
4766    #[test]
4767    fn whatsapp_signature_invalid_wrong_secret() {
4768        let app_secret = generate_test_secret();
4769        let wrong_secret = generate_test_secret();
4770        let body = b"test body content";
4771
4772        let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
4773
4774        assert!(!verify_whatsapp_signature(
4775            &app_secret,
4776            body,
4777            &signature_header
4778        ));
4779    }
4780
4781    #[test]
4782    fn whatsapp_signature_invalid_wrong_body() {
4783        let app_secret = generate_test_secret();
4784        let original_body = b"original body";
4785        let tampered_body = b"tampered body";
4786
4787        let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
4788
4789        // Verify with tampered body should fail
4790        assert!(!verify_whatsapp_signature(
4791            &app_secret,
4792            tampered_body,
4793            &signature_header
4794        ));
4795    }
4796
4797    #[test]
4798    fn whatsapp_signature_missing_prefix() {
4799        let app_secret = generate_test_secret();
4800        let body = b"test body";
4801
4802        // Signature without "sha256=" prefix
4803        let signature_header = "abc123def456";
4804
4805        assert!(!verify_whatsapp_signature(
4806            &app_secret,
4807            body,
4808            signature_header
4809        ));
4810    }
4811
4812    #[test]
4813    fn whatsapp_signature_empty_header() {
4814        let app_secret = generate_test_secret();
4815        let body = b"test body";
4816
4817        assert!(!verify_whatsapp_signature(&app_secret, body, ""));
4818    }
4819
4820    #[test]
4821    fn whatsapp_signature_invalid_hex() {
4822        let app_secret = generate_test_secret();
4823        let body = b"test body";
4824
4825        // Invalid hex characters
4826        let signature_header = "sha256=not_valid_hex_zzz";
4827
4828        assert!(!verify_whatsapp_signature(
4829            &app_secret,
4830            body,
4831            signature_header
4832        ));
4833    }
4834
4835    #[test]
4836    fn whatsapp_signature_empty_body() {
4837        let app_secret = generate_test_secret();
4838        let body = b"";
4839
4840        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4841
4842        assert!(verify_whatsapp_signature(
4843            &app_secret,
4844            body,
4845            &signature_header
4846        ));
4847    }
4848
4849    #[test]
4850    fn whatsapp_signature_unicode_body() {
4851        let app_secret = generate_test_secret();
4852        let body = "Hello 🦀 World".as_bytes();
4853
4854        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4855
4856        assert!(verify_whatsapp_signature(
4857            &app_secret,
4858            body,
4859            &signature_header
4860        ));
4861    }
4862
4863    #[test]
4864    fn whatsapp_signature_json_payload() {
4865        let app_secret = generate_test_secret();
4866        let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
4867
4868        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4869
4870        assert!(verify_whatsapp_signature(
4871            &app_secret,
4872            body,
4873            &signature_header
4874        ));
4875    }
4876
4877    #[test]
4878    fn whatsapp_signature_case_sensitive_prefix() {
4879        let app_secret = generate_test_secret();
4880        let body = b"test body";
4881
4882        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4883
4884        // Wrong case prefix should fail
4885        let wrong_prefix = format!("SHA256={hex_sig}");
4886        assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
4887
4888        // Correct prefix should pass
4889        let correct_prefix = format!("sha256={hex_sig}");
4890        assert!(verify_whatsapp_signature(
4891            &app_secret,
4892            body,
4893            &correct_prefix
4894        ));
4895    }
4896
4897    #[test]
4898    fn whatsapp_signature_truncated_hex() {
4899        let app_secret = generate_test_secret();
4900        let body = b"test body";
4901
4902        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4903        let truncated = &hex_sig[..32]; // Only half the signature
4904        let signature_header = format!("sha256={truncated}");
4905
4906        assert!(!verify_whatsapp_signature(
4907            &app_secret,
4908            body,
4909            &signature_header
4910        ));
4911    }
4912
4913    #[test]
4914    fn whatsapp_signature_extra_bytes() {
4915        let app_secret = generate_test_secret();
4916        let body = b"test body";
4917
4918        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4919        let extended = format!("{hex_sig}deadbeef");
4920        let signature_header = format!("sha256={extended}");
4921
4922        assert!(!verify_whatsapp_signature(
4923            &app_secret,
4924            body,
4925            &signature_header
4926        ));
4927    }
4928
4929    // ══════════════════════════════════════════════════════════
4930    // IdempotencyStore Edge-Case Tests
4931    // ══════════════════════════════════════════════════════════
4932
4933    #[test]
4934    fn idempotency_store_allows_different_keys() {
4935        let store = IdempotencyStore::new(Duration::from_secs(60), 100);
4936        assert!(store.record_if_new("key-a"));
4937        assert!(store.record_if_new("key-b"));
4938        assert!(store.record_if_new("key-c"));
4939        assert!(store.record_if_new("key-d"));
4940    }
4941
4942    #[test]
4943    fn idempotency_store_max_keys_clamped_to_one() {
4944        let store = IdempotencyStore::new(Duration::from_secs(60), 0);
4945        assert!(store.record_if_new("only-key"));
4946        assert!(!store.record_if_new("only-key"));
4947    }
4948
4949    #[test]
4950    fn idempotency_store_rapid_duplicate_rejected() {
4951        let store = IdempotencyStore::new(Duration::from_secs(300), 100);
4952        assert!(store.record_if_new("rapid"));
4953        assert!(!store.record_if_new("rapid"));
4954    }
4955
4956    #[test]
4957    fn idempotency_store_accepts_after_ttl_expires() {
4958        let store = IdempotencyStore::new(Duration::from_millis(1), 100);
4959        assert!(store.record_if_new("ttl-key"));
4960        std::thread::sleep(Duration::from_millis(10));
4961        assert!(store.record_if_new("ttl-key"));
4962    }
4963
4964    #[test]
4965    fn idempotency_store_eviction_preserves_newest() {
4966        let store = IdempotencyStore::new(Duration::from_secs(300), 1);
4967        assert!(store.record_if_new("old-key"));
4968        std::thread::sleep(Duration::from_millis(2));
4969        assert!(store.record_if_new("new-key"));
4970
4971        let keys = store.keys.lock();
4972        assert_eq!(keys.len(), 1);
4973        assert!(!keys.contains_key("old-key"));
4974        assert!(keys.contains_key("new-key"));
4975    }
4976
4977    #[test]
4978    fn rate_limiter_allows_after_window_expires() {
4979        let window = Duration::from_millis(50);
4980        let limiter = SlidingWindowRateLimiter::new(2, window, 100);
4981        assert!(limiter.allow("ip-1"));
4982        assert!(limiter.allow("ip-1"));
4983        assert!(!limiter.allow("ip-1")); // blocked
4984
4985        // Wait for window to expire
4986        std::thread::sleep(Duration::from_millis(60));
4987
4988        // Should be allowed again
4989        assert!(limiter.allow("ip-1"));
4990    }
4991
4992    #[test]
4993    fn rate_limiter_independent_keys_tracked_separately() {
4994        let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
4995        assert!(limiter.allow("ip-1"));
4996        assert!(limiter.allow("ip-1"));
4997        assert!(!limiter.allow("ip-1")); // ip-1 blocked
4998
4999        // ip-2 should still work
5000        assert!(limiter.allow("ip-2"));
5001        assert!(limiter.allow("ip-2"));
5002        assert!(!limiter.allow("ip-2")); // ip-2 now blocked
5003    }
5004
5005    #[test]
5006    fn rate_limiter_exact_boundary_at_max_keys() {
5007        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
5008        assert!(limiter.allow("ip-1"));
5009        assert!(limiter.allow("ip-2"));
5010        assert!(limiter.allow("ip-3"));
5011        // At capacity now
5012        assert!(limiter.allow("ip-4")); // should evict ip-1
5013
5014        let guard = limiter.requests.lock();
5015        assert_eq!(guard.0.len(), 3);
5016        assert!(
5017            !guard.0.contains_key("ip-1"),
5018            "ip-1 should have been evicted"
5019        );
5020        assert!(guard.0.contains_key("ip-2"));
5021        assert!(guard.0.contains_key("ip-3"));
5022        assert!(guard.0.contains_key("ip-4"));
5023    }
5024
5025    #[test]
5026    fn gateway_rate_limiter_pair_and_webhook_are_independent() {
5027        let limiter = GatewayRateLimiter::new(2, 3, 100);
5028
5029        // Exhaust pair limit
5030        assert!(limiter.allow_pair("ip-1"));
5031        assert!(limiter.allow_pair("ip-1"));
5032        assert!(!limiter.allow_pair("ip-1")); // pair blocked
5033
5034        // Webhook should still work
5035        assert!(limiter.allow_webhook("ip-1"));
5036        assert!(limiter.allow_webhook("ip-1"));
5037        assert!(limiter.allow_webhook("ip-1"));
5038        assert!(!limiter.allow_webhook("ip-1")); // webhook now blocked
5039    }
5040
5041    #[test]
5042    fn rate_limiter_single_key_max_allows_one_request() {
5043        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
5044        assert!(limiter.allow("ip-1"));
5045        assert!(limiter.allow("ip-2")); // evicts ip-1
5046
5047        let guard = limiter.requests.lock();
5048        assert_eq!(guard.0.len(), 1);
5049        assert!(guard.0.contains_key("ip-2"));
5050        assert!(!guard.0.contains_key("ip-1"));
5051    }
5052
5053    #[test]
5054    fn rate_limiter_concurrent_access_safe() {
5055        use std::sync::Arc;
5056
5057        let limiter = Arc::new(SlidingWindowRateLimiter::new(
5058            1000,
5059            Duration::from_secs(60),
5060            1000,
5061        ));
5062        let mut handles = Vec::new();
5063
5064        for i in 0..10 {
5065            let limiter = limiter.clone();
5066            handles.push(std::thread::spawn(move || {
5067                for j in 0..100 {
5068                    limiter.allow(&format!("thread-{i}-req-{j}"));
5069                }
5070            }));
5071        }
5072
5073        for handle in handles {
5074            handle.join().unwrap();
5075        }
5076
5077        // Should not panic or deadlock
5078        let guard = limiter.requests.lock();
5079        assert!(guard.0.len() <= 1000, "should respect max_keys");
5080    }
5081
5082    #[test]
5083    fn idempotency_store_concurrent_access_safe() {
5084        use std::sync::Arc;
5085
5086        let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
5087        let mut handles = Vec::new();
5088
5089        for i in 0..10 {
5090            let store = store.clone();
5091            handles.push(std::thread::spawn(move || {
5092                for j in 0..100 {
5093                    store.record_if_new(&format!("thread-{i}-key-{j}"));
5094                }
5095            }));
5096        }
5097
5098        for handle in handles {
5099            handle.join().unwrap();
5100        }
5101
5102        let keys = store.keys.lock();
5103        assert!(keys.len() <= 1000, "should respect max_keys");
5104    }
5105
5106    #[test]
5107    fn rate_limiter_rapid_burst_then_cooldown() {
5108        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
5109
5110        // Burst: use all 5 requests
5111        for _ in 0..5 {
5112            assert!(limiter.allow("burst-ip"));
5113        }
5114        assert!(!limiter.allow("burst-ip")); // 6th should fail
5115
5116        // Cooldown
5117        std::thread::sleep(Duration::from_millis(60));
5118
5119        // Should be allowed again
5120        assert!(limiter.allow("burst-ip"));
5121    }
5122
5123    #[test]
5124    fn require_localhost_accepts_ipv4_loopback() {
5125        let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
5126        assert!(require_localhost(&peer).is_ok());
5127    }
5128
5129    #[test]
5130    fn require_localhost_accepts_ipv6_loopback() {
5131        let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
5132        assert!(require_localhost(&peer).is_ok());
5133    }
5134
5135    #[test]
5136    fn require_localhost_rejects_non_loopback_ipv4() {
5137        let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
5138        let err = require_localhost(&peer).unwrap_err();
5139        assert_eq!(err.0, StatusCode::FORBIDDEN);
5140    }
5141
5142    #[test]
5143    fn require_localhost_rejects_non_loopback_ipv6() {
5144        let peer = SocketAddr::from((
5145            std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
5146            12345,
5147        ));
5148        let err = require_localhost(&peer).unwrap_err();
5149        assert_eq!(err.0, StatusCode::FORBIDDEN);
5150    }
5151
5152    #[test]
5153    fn needs_onboarding_for_flags_empty_model() {
5154        let err =
5155            needs_onboarding_for("").expect("empty model must produce a needs_onboarding error");
5156        let msg = err.to_string();
5157        assert!(
5158            msg.contains("needs_onboarding"),
5159            "error must carry the needs_onboarding marker for callers to map to 503; got: {msg}"
5160        );
5161        assert!(
5162            msg.contains("/onboard"),
5163            "error must point the user at /onboard; got: {msg}"
5164        );
5165    }
5166
5167    #[test]
5168    fn needs_onboarding_for_flags_whitespace_only_model() {
5169        assert!(
5170            needs_onboarding_for("   ").is_some(),
5171            "whitespace-only model must be treated as empty"
5172        );
5173        assert!(
5174            needs_onboarding_for("\n\t ").is_some(),
5175            "tabs and newlines count as empty too"
5176        );
5177    }
5178
5179    #[test]
5180    fn needs_onboarding_for_passes_real_model() {
5181        assert!(
5182            needs_onboarding_for("anthropic/claude-sonnet-4").is_none(),
5183            "a real model id must not be flagged"
5184        );
5185        assert!(
5186            needs_onboarding_for("  gpt-4  ").is_none(),
5187            "leading/trailing whitespace around a real model id must not be flagged"
5188        );
5189    }
5190
5191    #[test]
5192    fn is_needs_onboarding_err_detects_marker_from_helper() {
5193        let err = needs_onboarding_for("").expect("empty model produces marker");
5194        assert!(
5195            is_needs_onboarding_err(&err),
5196            "the marker emitted by needs_onboarding_for must be detected"
5197        );
5198    }
5199
5200    #[test]
5201    fn is_needs_onboarding_err_ignores_unrelated_errors() {
5202        let err = anyhow::Error::msg("upstream timeout: provider returned 504");
5203        assert!(
5204            !is_needs_onboarding_err(&err),
5205            "unrelated errors must not be misclassified as needs_onboarding"
5206        );
5207        let err = anyhow::Error::msg("invalid api key");
5208        assert!(!is_needs_onboarding_err(&err));
5209    }
5210
5211    #[test]
5212    fn is_needs_onboarding_err_detects_via_substring() {
5213        // Defends the contract that the substring marker is the
5214        // detection key — not the exact string. Wrappers (e.g.
5215        // anyhow::Error::context) must not break the check.
5216        let err =
5217            anyhow::Error::msg("provider call failed").context("needs_onboarding: empty model");
5218        assert!(is_needs_onboarding_err(&err));
5219    }
5220
5221    #[test]
5222    fn needs_onboarding_channel_reply_resolves_via_fluent() {
5223        // The Fluent key channel-needs-onboarding-reply must resolve
5224        // to real text from the embedded en/cli.ftl, not the missing-
5225        // key fallback `{channel-needs-onboarding-reply}` that
5226        // `missing_cli_string` produces. Guarding this in a test
5227        // keeps the i18n contract from quietly drifting if the key
5228        // gets renamed in lib.rs without a matching ftl edit.
5229        let reply = needs_onboarding_channel_reply();
5230        assert!(
5231            !reply.starts_with('{') && !reply.ends_with('}'),
5232            "fluent missing-key fallback leaked into channel reply: {reply:?}"
5233        );
5234        assert!(
5235            reply.to_lowercase().contains("onboarding"),
5236            "channel reply must mention onboarding so users know what's missing: {reply:?}"
5237        );
5238    }
5239}