Skip to main content

zeroclaw_gateway/
lib.rs

1#![allow(
2    clippy::to_string_in_format_args,
3    clippy::useless_format,
4    clippy::collapsible_if
5)]
6//! Axum-based HTTP gateway with proper HTTP/1.1 compliance, body limits, and timeouts.
7//!
8//! This module replaces the raw TCP implementation with axum for:
9//! - Proper HTTP/1.1 parsing and compliance
10//! - Content-Length validation (handled by hyper)
11//! - Request body size limits (64KB max)
12//! - Request timeouts (30s) to prevent slow-loris attacks
13//! - Header sanitization (handled by axum/hyper)
14
15pub mod acp;
16pub mod api;
17pub mod api_browse;
18pub mod api_config;
19pub mod api_logs;
20pub mod api_pairing;
21pub mod api_personality;
22#[cfg(feature = "plugins-wasm")]
23pub mod api_plugins;
24pub mod api_quickstart;
25pub mod api_sections;
26pub mod api_skills;
27#[cfg(feature = "webauthn")]
28pub mod api_webauthn;
29pub mod auth_rate_limit;
30pub mod canvas;
31pub mod hardware_context;
32pub mod node_tool;
33pub mod nodes;
34pub mod openapi;
35pub mod session_queue;
36pub mod sse;
37pub mod static_files;
38pub mod tls;
39#[cfg(feature = "gateway-voice-duplex")]
40pub mod voice_duplex;
41pub mod ws;
42pub mod ws_approval;
43
44use anyhow::{Context, Result};
45#[cfg(any(
46    feature = "channel-email",
47    feature = "channel-linq",
48    feature = "channel-nextcloud",
49    feature = "channel-wati",
50    feature = "channel-whatsapp-cloud"
51))]
52use axum::body::Bytes;
53#[cfg(feature = "channel-linq")]
54use axum::extract::Path;
55use axum::{
56    Router,
57    extract::{ConnectInfo, Query, State},
58    http::{HeaderMap, StatusCode, header},
59    response::{IntoResponse, Json},
60    routing::{delete, get, post},
61};
62use parking_lot::{Mutex, RwLock};
63use std::collections::HashMap;
64use std::net::{IpAddr, SocketAddr};
65use std::sync::Arc;
66use std::time::{Duration, Instant};
67
68/// Backoff after a transient `accept()` error so the serve loop does not
69/// hot-spin while the condition (e.g. fd exhaustion) clears.
70const ACCEPT_ERROR_BACKOFF_MS: u64 = 50;
71
72/// File-descriptor exhaustion errno values, stable across the Unix targets
73/// we support (Linux, macOS, BSD).
74#[cfg(unix)]
75const EMFILE: i32 = 24; // too many open files (this process)
76#[cfg(unix)]
77const ENFILE: i32 = 23; // too many open files (system-wide)
78
79/// Returns `true` when an error from a stream listener's `accept()` is
80/// transient and the listener itself remains usable, so the serve loop
81/// should log and keep running rather than terminating the daemon. Covers
82/// file-descriptor exhaustion (`EMFILE`/`ENFILE`, see #7042) and the usual
83/// per-connection hiccups. Mirrors the non-fatal accept handling that
84/// `axum::serve` already performs on the plain-TCP path.
85fn is_recoverable_accept_error(e: &std::io::Error) -> bool {
86    use std::io::ErrorKind;
87    if matches!(
88        e.kind(),
89        ErrorKind::ConnectionAborted | ErrorKind::Interrupted | ErrorKind::WouldBlock
90    ) {
91        return true;
92    }
93    #[cfg(unix)]
94    if matches!(e.raw_os_error(), Some(EMFILE) | Some(ENFILE)) {
95        return true;
96    }
97    false
98}
99use tower_http::limit::RequestBodyLimitLayer;
100use tower_http::timeout::TimeoutLayer;
101use uuid::Uuid;
102#[cfg(any(
103    feature = "channel-linq",
104    feature = "channel-nextcloud",
105    feature = "channel-wati",
106    feature = "channel-whatsapp-cloud"
107))]
108use zeroclaw_api::channel::{Channel, SendMessage};
109use zeroclaw_api::memory_traits::MemoryStrategy;
110use zeroclaw_api::tool::ToolSpec;
111#[cfg(feature = "channel-email")]
112use zeroclaw_channels::gmail_push::GmailPushChannel;
113#[cfg(feature = "channel-linq")]
114use zeroclaw_channels::linq::LinqChannel;
115#[cfg(feature = "channel-nextcloud")]
116use zeroclaw_channels::nextcloud_talk::NextcloudTalkChannel;
117#[cfg(feature = "channel-wati")]
118use zeroclaw_channels::wati::WatiChannel;
119#[cfg(feature = "channel-whatsapp-cloud")]
120use zeroclaw_channels::whatsapp::WhatsAppChannel;
121use zeroclaw_config::policy::SecurityPolicy;
122use zeroclaw_config::schema::Config;
123use zeroclaw_infra::session_backend::SessionBackend;
124use zeroclaw_memory::{self, Memory, MemoryCategory};
125use zeroclaw_providers::{self, ModelProvider};
126use zeroclaw_runtime::agent::memory_strategy::DefaultMemoryStrategy;
127use zeroclaw_runtime::cost::CostTracker;
128use zeroclaw_runtime::i18n;
129use zeroclaw_runtime::platform;
130use zeroclaw_runtime::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
131use zeroclaw_runtime::tools;
132use zeroclaw_runtime::tools::CanvasStore;
133
134/// Maximum request body size (64KB) — prevents memory exhaustion
135pub const MAX_BODY_SIZE: usize = 65_536;
136/// Default request timeout (30s) — prevents slow-loris attacks.
137pub const REQUEST_TIMEOUT_SECS: u64 = 30;
138
139/// Default request timeout for `POST /api/cron/{id}/run` (10 minutes).
140///
141/// Manually-triggered cron jobs run synchronously inside the request handler
142/// and frequently exceed the 30s gateway-wide default — agent jobs in
143/// particular can take minutes to complete a full reasoning loop. Capping at
144/// 10 minutes keeps the route from hanging indefinitely while still allowing
145/// realistic workloads to finish.
146pub const LONG_RUNNING_REQUEST_TIMEOUT_SECS: u64 = 600;
147
148/// Gateway request timeout (seconds) for routes other than the long-running
149/// cron-trigger endpoint. Reads from typed config.
150pub fn gateway_request_timeout_secs(cfg: &zeroclaw_config::schema::GatewayConfig) -> u64 {
151    cfg.request_timeout_secs
152}
153
154/// Manual cron-trigger request timeout (seconds), exempt from the
155/// gateway-wide [`gateway_request_timeout_secs`] limit so synchronous agent
156/// jobs can run to completion. Reads from typed config.
157pub fn gateway_long_running_request_timeout_secs(
158    cfg: &zeroclaw_config::schema::GatewayConfig,
159) -> u64 {
160    cfg.long_running_request_timeout_secs
161}
162/// Sliding window used by gateway rate limiting.
163pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
164/// Fallback max distinct client keys tracked in gateway rate limiter.
165pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
166/// Fallback max distinct idempotency keys retained in gateway memory.
167pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
168
169fn webhook_memory_key() -> String {
170    format!("webhook_msg_{}", Uuid::new_v4())
171}
172
173#[cfg(feature = "channel-whatsapp-cloud")]
174fn whatsapp_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
175    format!("whatsapp_{}_{}", msg.sender, msg.id)
176}
177
178#[cfg(feature = "channel-linq")]
179fn linq_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
180    format!("linq_{}_{}", msg.sender, msg.id)
181}
182
183#[cfg(feature = "channel-wati")]
184fn wati_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
185    format!("wati_{}_{}", msg.sender, msg.id)
186}
187
188#[cfg(feature = "channel-nextcloud")]
189fn nextcloud_talk_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
190    format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
191}
192
193#[cfg(any(
194    feature = "channel-linq",
195    feature = "channel-nextcloud",
196    feature = "channel-wati",
197    feature = "channel-whatsapp-cloud"
198))]
199fn sender_session_id(channel: &str, msg: &zeroclaw_api::channel::ChannelMessage) -> String {
200    match &msg.thread_ts {
201        Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
202        None => format!("{channel}_{}", msg.sender),
203    }
204}
205
206fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
207    const MAX_SESSION_ID_LEN: usize = 128;
208    headers
209        .get("X-Session-Id")
210        .and_then(|v| v.to_str().ok())
211        .map(str::trim)
212        .filter(|value| !value.is_empty())
213        .filter(|value| value.len() <= MAX_SESSION_ID_LEN)
214        .filter(|value| {
215            value
216                .bytes()
217                .all(|b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.')
218        })
219        .map(str::to_owned)
220}
221
222fn hash_webhook_secret(value: &str) -> String {
223    use sha2::{Digest, Sha256};
224
225    let digest = Sha256::digest(value.as_bytes());
226    hex::encode(digest)
227}
228
229/// How often the rate limiter sweeps stale IP entries from its map.
230const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; // 5 minutes
231
232#[derive(Debug)]
233struct SlidingWindowRateLimiter {
234    limit_per_window: u32,
235    window: Duration,
236    max_keys: usize,
237    requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
238}
239
240impl SlidingWindowRateLimiter {
241    fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
242        Self {
243            limit_per_window,
244            window,
245            max_keys: max_keys.max(1),
246            requests: Mutex::new((HashMap::new(), Instant::now())),
247        }
248    }
249
250    fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
251        requests.retain(|_, timestamps| {
252            timestamps.retain(|t| *t > cutoff);
253            !timestamps.is_empty()
254        });
255    }
256
257    fn allow(&self, key: &str) -> bool {
258        if self.limit_per_window == 0 {
259            return true;
260        }
261
262        let now = Instant::now();
263        let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
264
265        let mut guard = self.requests.lock();
266        let (requests, last_sweep) = &mut *guard;
267
268        // Periodic sweep: remove keys with no recent requests
269        if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
270            Self::prune_stale(requests, cutoff);
271            *last_sweep = now;
272        }
273
274        if !requests.contains_key(key) && requests.len() >= self.max_keys {
275            // Opportunistic stale cleanup before eviction under cardinality pressure.
276            Self::prune_stale(requests, cutoff);
277            *last_sweep = now;
278
279            if requests.len() >= self.max_keys {
280                let evict_key = requests
281                    .iter()
282                    .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
283                    .map(|(k, _)| k.clone());
284                if let Some(evict_key) = evict_key {
285                    requests.remove(&evict_key);
286                }
287            }
288        }
289
290        let entry = requests.entry(key.to_owned()).or_default();
291        entry.retain(|instant| *instant > cutoff);
292
293        if entry.len() >= self.limit_per_window as usize {
294            return false;
295        }
296
297        entry.push(now);
298        true
299    }
300}
301
302#[derive(Debug)]
303pub struct GatewayRateLimiter {
304    pair: SlidingWindowRateLimiter,
305    webhook: SlidingWindowRateLimiter,
306}
307
308impl GatewayRateLimiter {
309    pub fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
310        let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
311        Self {
312            pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
313            webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
314        }
315    }
316
317    fn allow_pair(&self, key: &str) -> bool {
318        self.pair.allow(key)
319    }
320
321    fn allow_webhook(&self, key: &str) -> bool {
322        self.webhook.allow(key)
323    }
324}
325
326#[derive(Debug)]
327pub struct IdempotencyStore {
328    ttl: Duration,
329    max_keys: usize,
330    keys: Mutex<HashMap<String, Instant>>,
331}
332
333impl IdempotencyStore {
334    pub fn new(ttl: Duration, max_keys: usize) -> Self {
335        Self {
336            ttl,
337            max_keys: max_keys.max(1),
338            keys: Mutex::new(HashMap::new()),
339        }
340    }
341
342    /// Returns true if this key is new and is now recorded.
343    fn record_if_new(&self, key: &str) -> bool {
344        let now = Instant::now();
345        let mut keys = self.keys.lock();
346
347        keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
348
349        if keys.contains_key(key) {
350            return false;
351        }
352
353        if keys.len() >= self.max_keys {
354            let evict_key = keys
355                .iter()
356                .min_by_key(|(_, seen_at)| *seen_at)
357                .map(|(k, _)| k.clone());
358            if let Some(evict_key) = evict_key {
359                keys.remove(&evict_key);
360            }
361        }
362
363        keys.insert(key.to_owned(), now);
364        true
365    }
366}
367
368fn parse_client_ip(value: &str) -> Option<IpAddr> {
369    let value = value.trim().trim_matches('"').trim();
370    if value.is_empty() {
371        return None;
372    }
373
374    if let Ok(ip) = value.parse::<IpAddr>() {
375        return Some(ip);
376    }
377
378    if let Ok(addr) = value.parse::<SocketAddr>() {
379        return Some(addr.ip());
380    }
381
382    let value = value.trim_matches(['[', ']']);
383    value.parse::<IpAddr>().ok()
384}
385
386fn dirs_data_local() -> Option<std::path::PathBuf> {
387    directories::BaseDirs::new().map(|d| d.data_local_dir().to_path_buf())
388}
389
390fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
391    if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
392        for candidate in xff.split(',') {
393            if let Some(ip) = parse_client_ip(candidate) {
394                return Some(ip);
395            }
396        }
397    }
398
399    headers
400        .get("X-Real-IP")
401        .and_then(|v| v.to_str().ok())
402        .and_then(parse_client_ip)
403}
404
405fn client_key_from_request(
406    peer_addr: Option<SocketAddr>,
407    headers: &HeaderMap,
408    trust_forwarded_headers: bool,
409) -> String {
410    if trust_forwarded_headers && let Some(ip) = forwarded_client_ip(headers) {
411        return ip.to_string();
412    }
413
414    peer_addr
415        .map(|addr| addr.ip().to_string())
416        .unwrap_or_else(|| "unknown".to_string())
417}
418
419fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
420    if configured == 0 {
421        fallback.max(1)
422    } else {
423        configured
424    }
425}
426
427/// Shared state for all axum handlers
428#[derive(Clone)]
429pub struct AppState {
430    pub config: Arc<RwLock<Config>>,
431    pub model_provider: Arc<dyn ModelProvider>,
432    pub model: String,
433    /// `None` means "let the provider decide" — required for models
434    /// (e.g. claude-opus-4-7) that reject the field. Always preserve
435    /// `Option<f64>` end-to-end; never substitute a hardcoded default.
436    pub temperature: Option<f64>,
437    pub mem: Arc<dyn Memory>,
438    pub memory_strategy: Arc<dyn MemoryStrategy>,
439    pub auto_save: bool,
440    /// SHA-256 hash of `X-Webhook-Secret` (hex-encoded), never plaintext.
441    pub webhook_secret_hash: Option<Arc<str>>,
442    pub pairing: Arc<PairingGuard>,
443    pub trust_forwarded_headers: bool,
444    pub rate_limiter: Arc<GatewayRateLimiter>,
445    pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
446    pub idempotency_store: Arc<IdempotencyStore>,
447    #[cfg(feature = "channel-whatsapp-cloud")]
448    pub whatsapp: Option<Arc<WhatsAppChannel>>,
449    /// `WhatsApp` app secret for webhook signature verification (`X-Hub-Signature-256`)
450    #[cfg(feature = "channel-whatsapp-cloud")]
451    pub whatsapp_app_secret: Option<Arc<str>>,
452    #[cfg(feature = "channel-linq")]
453    pub linq: HashMap<String, Arc<LinqChannel>>,
454    /// Linq webhook signing secrets per alias
455    #[cfg(feature = "channel-linq")]
456    pub linq_signing_secrets: HashMap<String, Arc<str>>,
457    #[cfg(feature = "channel-nextcloud")]
458    pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
459    /// Nextcloud Talk webhook secret for signature verification
460    #[cfg(feature = "channel-nextcloud")]
461    pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
462    #[cfg(feature = "channel-wati")]
463    pub wati: Option<Arc<WatiChannel>>,
464    /// Gmail Pub/Sub push notification channel
465    #[cfg(feature = "channel-email")]
466    pub gmail_push: Option<Arc<GmailPushChannel>>,
467    /// Observability backend for metrics scraping
468    pub observer: Arc<dyn zeroclaw_runtime::observability::Observer>,
469    /// Registered tool specs (for web dashboard tools page)
470    pub tools_registry: Arc<Vec<ToolSpec>>,
471    /// Cost tracker (optional, for web dashboard cost page)
472    pub cost_tracker: Option<Arc<CostTracker>>,
473    /// SSE broadcast channel for real-time events
474    pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
475    /// Ring buffer of recent events for history replay
476    pub event_buffer: Arc<sse::EventBuffer>,
477    /// Shutdown signal sender for graceful shutdown
478    pub shutdown_tx: tokio::sync::watch::Sender<bool>,
479    /// Reload signal sender owned by the daemon. /admin/reload writes `true`
480    /// here; the daemon's wait loop reacts and re-instantiates every
481    /// subsystem in place. `None` when running standalone (`zeroclaw gateway start`)
482    /// — reload then degrades to a 503 with a clear message.
483    pub reload_tx: Option<tokio::sync::watch::Sender<bool>>,
484    /// Registry of dynamically connected nodes
485    pub node_registry: Arc<nodes::NodeRegistry>,
486    /// Path prefix for reverse-proxy deployments (empty string = no prefix)
487    pub path_prefix: String,
488    /// Filesystem path to `web/dist/` for serving the dashboard (None = API-only)
489    pub web_dist_dir: Option<std::path::PathBuf>,
490    /// Session backend for persisting gateway WS chat sessions
491    pub session_backend: Option<Arc<dyn SessionBackend>>,
492    /// Per-session actor queue for serializing concurrent turns
493    pub session_queue: Arc<session_queue::SessionActorQueue>,
494    /// Device registry for paired device management
495    pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
496    /// Pending pairing request store
497    pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
498    /// Shared canvas store for Live Canvas (A2UI) system
499    pub canvas_store: CanvasStore,
500    /// WebAuthn state for hardware key authentication (optional, requires `webauthn` feature)
501    #[cfg(feature = "webauthn")]
502    pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
503    /// Per-session cancellation tokens for aborting in-flight agent responses.
504    /// Key is session_key (e.g. `gw_<session_id>`), value is the token for the
505    /// current turn. Entries are inserted before each turn and removed after
506    /// completion (normal or cancelled).
507    pub cancel_tokens: Arc<
508        std::sync::Mutex<std::collections::HashMap<String, tokio_util::sync::CancellationToken>>,
509    >,
510    /// Flag set whenever a config write (PATCH, init, map-key mutation) lands
511    /// via `persist_and_swap`, cleared on `/admin/reload`. Distinct from disk
512    /// drift (which fires only when an external editor touches the file): this
513    /// signals "the operator changed config in this session, subsystems may
514    /// need to be rebuilt to apply it." The dashboard polls
515    /// `/api/config/reload-status` and surfaces a reload banner when true.
516    pub pending_reload: Arc<std::sync::atomic::AtomicBool>,
517    /// TUI session registry from the daemon (for /api/tuis endpoint).
518    /// `None` when the gateway runs standalone without a daemon.
519    pub tui_registry: Option<Arc<zeroclaw_runtime::rpc::tui_identity::TuiRegistry>>,
520}
521
522/// Run the HTTP gateway using axum with proper HTTP/1.1 compliance.
523#[allow(clippy::too_many_lines)]
524pub async fn run_gateway(
525    host: &str,
526    port: u16,
527    config: Config,
528    external_event_tx: Option<tokio::sync::broadcast::Sender<serde_json::Value>>,
529    // Reload sender owned by the daemon. /admin/reload writes `true` here;
530    // the daemon's wait loop reacts via `subscribe()` and tears down to
531    // re-init. Cross-platform replacement for the SIGUSR1 hack.
532    reload_tx: Option<tokio::sync::watch::Sender<bool>>,
533    // TUI session registry from the daemon for the /api/tuis endpoint.
534    tui_registry: Option<Arc<zeroclaw_runtime::rpc::tui_identity::TuiRegistry>>,
535    canvas_store: Option<CanvasStore>,
536) -> Result<()> {
537    // ── Security: warn on public bind without tunnel or explicit opt-in ──
538    if is_public_bind(host)
539        && config.tunnel.tunnel_provider == "none"
540        && !config.gateway.allow_public_bind
541    {
542        ::zeroclaw_log::record!(
543            WARN,
544            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
545                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
546            "⚠️  Binding to {host} — gateway will be exposed to all network interfaces.\n\
547             Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
548             [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
549             Docker/VM: if you are running inside a container or VM, this is expected."
550        );
551    }
552    let config_state = Arc::new(RwLock::new(config.clone()));
553
554    // ── Hooks ──────────────────────────────────────────────────────
555    let hooks: Option<std::sync::Arc<zeroclaw_runtime::hooks::HookRunner>> = if config.hooks.enabled
556    {
557        Some(std::sync::Arc::new(
558            zeroclaw_runtime::hooks::HookRunner::new(),
559        ))
560    } else {
561        None
562    };
563
564    let addr: SocketAddr = match format!("{host}:{port}").parse() {
565        Ok(a) => a,
566        Err(e) => {
567            ::zeroclaw_log::record!(
568                WARN,
569                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
570                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
571                    .with_attrs(::serde_json::json!({
572                        "host": host,
573                        "port": port,
574                        "error": format!("{e}"),
575                    })),
576                "Gateway: host:port did not parse as a SocketAddr; falling back to \
577                 127.0.0.1 so the gateway can still boot. Fix [gateway] host and \
578                 POST /admin/reload."
579            );
580            SocketAddr::from(([127, 0, 0, 1], port))
581        }
582    };
583    let listener = tokio::net::TcpListener::bind(addr).await?;
584    let actual_port = listener.local_addr()?.port();
585    let display_addr = format!("{host}:{actual_port}");
586
587    let (boot_family, boot_alias, boot_entry) = config
588        .providers
589        .models
590        .iter_entries()
591        .next()
592        .map(|(f, a, e)| (f.to_string(), a.to_string(), Some(e)))
593        .unwrap_or_else(|| ("openrouter".to_string(), "default".to_string(), None));
594    let fallback = boot_entry;
595    let model_provider_name = boot_family.as_str();
596    let (model_provider, boot_provider_failed): (Arc<dyn ModelProvider>, bool) =
597        match zeroclaw_providers::create_resilient_model_provider_from_ref(
598            &config,
599            model_provider_name,
600            fallback.and_then(|e| e.api_key.as_deref()),
601            fallback.and_then(|e| e.uri.as_deref()),
602            &config.reliability,
603            &zeroclaw_providers::provider_runtime_options_for_alias(
604                &config,
605                &boot_family,
606                &boot_alias,
607            ),
608        ) {
609            Ok(p) => (Arc::from(p), false),
610            Err(e) => {
611                ::zeroclaw_log::record!(
612                    WARN,
613                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note,)
614                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
615                        .with_attrs(::serde_json::json!({
616                            "model_provider": model_provider_name,
617                            "alias": boot_alias,
618                            "error": format!("{e}"),
619                        })),
620                    "Gateway: seed model_provider failed to construct; booting in \
621                     needs_quickstart mode so /quickstart and /admin/reload stay \
622                     reachable. Fix the [providers.models.<type>.<alias>] entry \
623                     and POST /admin/reload."
624                );
625                (
626                    Arc::new(UnconfiguredModelProvider) as Arc<dyn ModelProvider>,
627                    true,
628                )
629            }
630        };
631    // Model resolution (1) the first-model_provider's `model`,
632    // (2) the first configured `[providers.models.<type>.<alias>]`
633    // model with a WARN naming what to set, (3) leave the model empty so
634    // the gateway boots and the dashboard can complete browser-based
635    // quickstart at /quickstart. The chat-dispatch path checks
636    // `state.model.is_empty()` and returns a structured needs_quickstart
637    // error before any model_provider call, so the original "no silent
638    // vendor-default substitution" guarantee is preserved at request-time
639    // rather than at boot. V3 has no global fallback model_provider — every
640    // gateway request that needs agent context resolves through its
641    // `?agent=` parameter; this resolution is purely the seed value the
642    // gateway uses for boot-time logging and the AppState default model
643    // string.
644    let model = if boot_provider_failed {
645        String::new()
646    } else {
647        match fallback
648            .and_then(|e| e.model.as_deref())
649            .map(str::trim)
650            .filter(|m| !m.is_empty())
651        {
652            Some(m) => m.to_string(),
653            None => match config.resolve_default_model() {
654                Some(m) => {
655                    ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"model_provider": model_provider_name, "model": m})), "first model_provider has no `model` set; using first configured \
656                     providers.models entry as default. Set \
657                     [providers.models.<type>.<alias>] model = \"...\" to silence \
658                     this warning.");
659                    m
660                }
661                None => {
662                    ::zeroclaw_log::record!(
663                        WARN,
664                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
665                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
666                            .with_attrs(::serde_json::json!({"display_addr": display_addr})),
667                        &format!(
668                            "Gateway booting without a configured model. Visit http://{display_addr}/quickstart to complete browser quickstart. Chat endpoints will return 503 needs_quickstart until at least one [providers.models.<type>.<alias>] model = \"...\" is set."
669                        )
670                    );
671                    String::new()
672                }
673            },
674        }
675    };
676    // Preserve `Option<f64>` end-to-end. Substituting a hardcoded default
677    // here would clobber the "let the provider decide" intent for models
678    // (e.g. claude-opus-4-7) that reject `temperature`.
679    let temperature: Option<f64> = fallback.and_then(|e| e.temperature);
680    // Skip the install-wide memory backend init when zero agents are
681    // configured. Building a SQLite (or other) backend here would
682    // synthesize `<workspace_dir>/memory/brain.db` on a fresh install
683    // that has nothing to remember; per-agent memory factories under
684    // `agents/<alias>/workspace/memory/` are the only legitimate
685    // origin of memory state. AppState gets a NoneMemory
686    // stub so endpoints that read `state.mem` keep working until an
687    // agent comes online.
688    let mem: Arc<dyn Memory> = if config.agents.is_empty() {
689        Arc::new(zeroclaw_memory::NoneMemory::new("none"))
690    } else {
691        match zeroclaw_memory::create_memory_with_storage_and_routes(
692            &config.memory,
693            &config.embedding_routes,
694            config.resolve_active_storage(),
695            &config.data_dir,
696            fallback.and_then(|e| e.api_key.as_deref()),
697        ) {
698            Ok(m) => Arc::from(m),
699            Err(e) => {
700                ::zeroclaw_log::record!(
701                    WARN,
702                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note,)
703                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
704                        .with_attrs(::serde_json::json!({"error": format!("{e}")})),
705                    "Gateway: memory backend failed to construct; falling back to \
706                     NoneMemory so the gateway can still boot. Fix [memory] and \
707                     POST /admin/reload."
708                );
709                Arc::new(zeroclaw_memory::NoneMemory::new("none"))
710            }
711        }
712    };
713    let runtime: Arc<dyn platform::RuntimeAdapter> = match platform::create_runtime(&config.runtime)
714    {
715        Ok(r) => Arc::from(r),
716        Err(e) => {
717            ::zeroclaw_log::record!(
718                WARN,
719                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note,)
720                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
721                    .with_attrs(::serde_json::json!({
722                        "runtime_kind": config.runtime.kind,
723                        "error": format!("{e}"),
724                    })),
725                "Gateway: runtime adapter failed to construct; falling back to \
726                     NativeRuntime so the gateway can still boot. Fix [runtime] and \
727                     POST /admin/reload."
728            );
729            Arc::new(platform::NativeRuntime::new())
730        }
731    };
732    let memory_strategy: Arc<dyn MemoryStrategy> = Arc::new(DefaultMemoryStrategy::with_config(
733        mem.clone(),
734        config.memory.clone(),
735        config.data_dir.clone(),
736    ));
737    // Gateway is infrastructure — it doesn't run as an agent. Endpoints
738    // that need an agent context (`/webhook?agent=`, `/ws/chat?agent=`,
739    // ACP `session/new`, agent-scoped tools/memory) take it from the
740    // request. The shared SecurityPolicy / risk_profile / tools_registry
741    // built here are vestiges driving the legacy single-agent
742    // `/api/tools` listing and the `run_gateway_chat_with_tools` test
743    // mock; `/webhook` honors `?agent=` per-request (validated against
744    // `config.agents`), while SSE / pairing per-request dispatch is still
745    // tracked as a follow-up.
746    //
747    // Agent count is unconstrained at boot. Zero agents is a valid
748    // state — the gateway must come up so `/admin/reload` and
749    // `/quickstart` can install one — and the legacy seed simply stays
750    // empty. With one or more enabled agents, any of them seeds the
751    // vestige; aliases are arbitrary so the iteration-order pick is
752    // load-bearing on nothing.
753    let canvas_store = canvas_store.unwrap_or_default();
754    let agent_alias_opt = config
755        .agents
756        .iter()
757        .find(|(_, a)| a.enabled)
758        .map(|(alias, _)| alias.clone());
759
760    let (composio_key, composio_entity_id) = if config.composio.enabled {
761        (
762            config.composio.api_key.as_deref(),
763            Some(config.composio.entity_id.as_str()),
764        )
765    } else {
766        (None, None)
767    };
768
769    // The seeded `risk_profile` + `SecurityPolicy` here drive the legacy
770    // single-agent `/api/tools` listing and the `run_gateway_chat_with_tools`
771    // test mock — they are not load-bearing for per-request agent dispatch.
772    // When the seed agent's `risk_profile` (or any related per-agent
773    // validation) fails to resolve, the gateway must still boot so the
774    // operator can fix the config via `/admin/reload` or `/quickstart`
775    // instead of crash-looping the daemon supervisor. Degraded boot:
776    // log a warning and fall through to the empty-tools-registry branch.
777    let agent_setup: Option<(
778        zeroclaw_config::schema::RiskProfileConfig,
779        Arc<SecurityPolicy>,
780    )> = agent_alias_opt.as_ref().and_then(|agent_alias| {
781        let Some(risk_profile) = config.risk_profile_for_agent(agent_alias) else {
782            ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "agent_alias": agent_alias})), "Gateway: agents..risk_profile does not name a configured risk_profiles entry; booting with empty tools registry. Fix via /admin/reload or /quickstart.");
783            return None;
784        };
785        let risk_profile = risk_profile.clone();
786        let security = match SecurityPolicy::for_agent(&config, agent_alias) {
787            Ok(s) => Arc::new(s),
788            Err(e) => {
789                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "error": format!("{}", e), "agent_alias": agent_alias})), "Gateway: agent SecurityPolicy failed to build; booting with empty tools registry. Fix [agents.] via /admin/reload or /quickstart.");
790                return None;
791            }
792        };
793        Some((risk_profile, security))
794    });
795
796    let (mut tools_registry_raw, delegate_handle_gw) = match (&agent_alias_opt, agent_setup) {
797        (Some(agent_alias), Some((risk_profile, security))) => {
798            let all_tools_result = tools::all_tools_with_runtime(
799                Arc::new(config.clone()),
800                &security,
801                &risk_profile,
802                agent_alias,
803                runtime,
804                Arc::clone(&mem),
805                composio_key,
806                composio_entity_id,
807                &config.browser,
808                &config.http_request,
809                &config.web_fetch,
810                &config.data_dir,
811                &config.agents,
812                config
813                    .model_provider_for_agent(agent_alias)
814                    .and_then(|e| e.api_key.as_deref()),
815                &config,
816                Some(canvas_store.clone()),
817                false,
818                None,
819            );
820            // Wire channel-driven tool handles so the dashboard agent can
821            // deliver messages to configured channels (same pattern as
822            // orchestrator::start_channels).
823            // reaction_handle_gw is PerToolChannelHandle (not Option);
824            // register_channels_for_tools expects &Option for all handles.
825            let reaction_handle_gw_opt = Some(all_tools_result.reaction_handle.clone());
826            let channel_names = zeroclaw_channels::orchestrator::register_channels_for_tools(
827                &config,
828                &all_tools_result.ask_user_handle,
829                &reaction_handle_gw_opt,
830                &all_tools_result.poll_handle,
831                &all_tools_result.escalate_handle,
832            );
833            if !channel_names.is_empty() {
834                ::zeroclaw_log::record!(
835                    INFO,
836                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
837                        .with_attrs(::serde_json::json!({"count": channel_names.len()})),
838                    &format!(
839                        "Registered {} channel(s) for dashboard agent",
840                        channel_names.len()
841                    ),
842                );
843            }
844            (all_tools_result.tools, all_tools_result.delegate_handle)
845        }
846        (Some(_), None) => {
847            // Agent existed but its config failed to resolve. Warned
848            // above; fall through to the empty-registry shape.
849            (Vec::new(), None)
850        }
851        (None, _) => {
852            ::zeroclaw_log::record!(
853                INFO,
854                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
855                    .with_attrs(::serde_json::json!({"display_addr": display_addr})),
856                &format!(
857                    "Gateway: no [agents.<alias>] configured — booting with empty tools registry. Visit http://{display_addr}/quickstart to add an agent."
858                )
859            );
860            (Vec::new(), None)
861        }
862    };
863
864    // ── Wire MCP tools into the gateway tool registry (non-fatal) ───
865    // Without this, the `/api/tools` endpoint misses MCP tools.
866    if config.mcp.enabled && !config.mcp.servers.is_empty() {
867        ::zeroclaw_log::record!(
868            INFO,
869            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
870            &format!(
871                "Gateway: initializing MCP client — {} server(s) configured",
872                config.mcp.servers.len()
873            )
874        );
875        match tools::McpRegistry::connect_all(&config.mcp.servers).await {
876            Ok(registry) => {
877                let registry = std::sync::Arc::new(registry);
878                if config.mcp.deferred_loading {
879                    let deferred_set =
880                        tools::DeferredMcpToolSet::from_registry(std::sync::Arc::clone(&registry))
881                            .await;
882                    ::zeroclaw_log::record!(
883                        INFO,
884                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
885                        &format!(
886                            "Gateway MCP deferred: {} tool stub(s) from {} server(s)",
887                            deferred_set.len(),
888                            registry.server_count()
889                        )
890                    );
891                    let activated =
892                        std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
893                    tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
894                        deferred_set,
895                        activated,
896                    )));
897                } else {
898                    let names = registry.tool_names();
899                    let mut registered = 0usize;
900                    for name in names {
901                        if let Some(def) = registry.get_tool_def(&name).await {
902                            let wrapper: std::sync::Arc<dyn tools::Tool> =
903                                std::sync::Arc::new(tools::McpToolWrapper::new(
904                                    name,
905                                    def,
906                                    std::sync::Arc::clone(&registry),
907                                ));
908                            if let Some(ref handle) = delegate_handle_gw {
909                                handle.write().push(std::sync::Arc::clone(&wrapper));
910                            }
911                            tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
912                            registered += 1;
913                        }
914                    }
915                    ::zeroclaw_log::record!(
916                        INFO,
917                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
918                        &format!(
919                            "Gateway MCP: {} tool(s) registered from {} server(s)",
920                            registered,
921                            registry.server_count()
922                        )
923                    );
924                }
925            }
926            Err(e) => {
927                ::zeroclaw_log::record!(
928                    ERROR,
929                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
930                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
931                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
932                    "MCP registry failed to initialize"
933                );
934            }
935        }
936    }
937
938    let tools_registry: Arc<Vec<ToolSpec>> =
939        Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
940
941    // Cost tracker — process-global singleton so channels share the same instance
942    let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.data_dir);
943
944    // SSE broadcast channel for real-time events.
945    // Use an externally provided sender (e.g. from the daemon) so that other
946    // components (cron, heartbeat) can publish events to the same bus.
947    let event_tx = external_event_tx.unwrap_or_else(|| {
948        let (tx, _rx) = tokio::sync::broadcast::channel::<serde_json::Value>(256);
949        tx
950    });
951    let event_buffer = Arc::new(sse::EventBuffer::new(500));
952    // Extract webhook secret for authentication
953    let webhook_secret_hash: Option<Arc<str>> =
954        config.channels.webhook.values().next().and_then(|webhook| {
955            webhook.secret.as_ref().and_then(|raw_secret| {
956                let trimmed_secret = raw_secret.trim();
957                (!trimmed_secret.is_empty())
958                    .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
959            })
960        });
961
962    // WhatsApp channel (if configured)
963    #[cfg(feature = "channel-whatsapp-cloud")]
964    let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
965        .channels
966        .whatsapp
967        .get("default")
968        .filter(|wa| wa.is_cloud_config())
969        .map(|wa| {
970            let alias = "default".to_string();
971            let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
972                let cfg_arc = config_state.clone();
973                let alias = alias.clone();
974                Arc::new(move || cfg_arc.read().channel_external_peers("whatsapp", &alias))
975            };
976            Arc::new(WhatsAppChannel::new(
977                wa.access_token.clone().unwrap_or_default(),
978                wa.phone_number_id.clone().unwrap_or_default(),
979                wa.verify_token.clone().unwrap_or_default(),
980                alias,
981                peer_resolver,
982            ))
983        });
984
985    // WhatsApp app secret for webhook signature verification.
986    #[cfg(feature = "channel-whatsapp-cloud")]
987    let whatsapp_app_secret: Option<Arc<str>> = config
988        .channels
989        .whatsapp
990        .values()
991        .next()
992        .and_then(|wa| {
993            wa.app_secret
994                .as_deref()
995                .map(str::trim)
996                .filter(|secret| !secret.is_empty())
997                .map(ToOwned::to_owned)
998        })
999        .map(Arc::from);
1000
1001    // Linq channel instances (multi-tenant: one per alias)
1002    #[cfg(feature = "channel-linq")]
1003    let linq_channels: HashMap<String, Arc<LinqChannel>> = config
1004        .channels
1005        .linq
1006        .iter()
1007        .filter(|(_, lq)| lq.enabled)
1008        .map(|(alias, lq)| {
1009            let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
1010                let cfg_arc = config_state.clone();
1011                let alias = alias.clone();
1012                Arc::new(move || cfg_arc.read().channel_external_peers("linq", &alias))
1013            };
1014            (
1015                alias.clone(),
1016                Arc::new(LinqChannel::new(
1017                    lq.api_token.clone(),
1018                    lq.from_phone.clone(),
1019                    alias.clone(),
1020                    peer_resolver,
1021                )),
1022            )
1023        })
1024        .collect();
1025
1026    // Linq signing secrets per alias.
1027    #[cfg(feature = "channel-linq")]
1028    let linq_signing_secrets: HashMap<String, Arc<str>> = config
1029        .channels
1030        .linq
1031        .iter()
1032        .filter_map(|(alias, lq)| {
1033            let secret = lq
1034                .signing_secret
1035                .as_deref()
1036                .map(str::trim)
1037                .filter(|s| !s.is_empty())
1038                .map(ToOwned::to_owned)?;
1039            Some((alias.clone(), Arc::from(secret)))
1040        })
1041        .collect();
1042
1043    // WATI channel (if configured)
1044    #[cfg(feature = "channel-wati")]
1045    let wati_channel: Option<Arc<WatiChannel>> =
1046        config.channels.wati.values().next().map(|wati_cfg| {
1047            let alias = "default".to_string();
1048            let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
1049                let cfg_arc = config_state.clone();
1050                let alias = alias.clone();
1051                Arc::new(move || cfg_arc.read().channel_external_peers("wati", &alias))
1052            };
1053            Arc::new(
1054                WatiChannel::new(
1055                    wati_cfg.api_token.clone(),
1056                    wati_cfg.api_url.clone(),
1057                    wati_cfg.tenant_id.clone(),
1058                    alias,
1059                    peer_resolver,
1060                )
1061                .with_transcription(config.transcription.clone()),
1062            )
1063        });
1064
1065    // Nextcloud Talk channel (if configured)
1066    #[cfg(feature = "channel-nextcloud")]
1067    let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
1068        config.channels.nextcloud_talk.values().next().map(|nc| {
1069            let alias = "default".to_string();
1070            let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
1071                let cfg_arc = config_state.clone();
1072                let alias = alias.clone();
1073                Arc::new(move || {
1074                    cfg_arc
1075                        .read()
1076                        .channel_external_peers("nextcloud_talk", &alias)
1077                })
1078            };
1079            Arc::new(NextcloudTalkChannel::new(
1080                nc.base_url.clone(),
1081                nc.app_token.clone(),
1082                nc.bot_name.clone().unwrap_or_default(),
1083                alias,
1084                peer_resolver,
1085            ))
1086        });
1087
1088    // Nextcloud Talk webhook secret for signature verification.
1089    #[cfg(feature = "channel-nextcloud")]
1090    let nextcloud_talk_webhook_secret: Option<Arc<str>> = config
1091        .channels
1092        .nextcloud_talk
1093        .get("default")
1094        .and_then(|nc| {
1095            nc.webhook_secret
1096                .as_deref()
1097                .map(str::trim)
1098                .filter(|secret| !secret.is_empty())
1099                .map(ToOwned::to_owned)
1100        })
1101        .map(Arc::from);
1102
1103    // Gmail Push channel (if configured and referenced by an enabled agent)
1104    #[cfg(feature = "channel-email")]
1105    let gmail_push_channel: Option<Arc<GmailPushChannel>> = {
1106        let active: std::collections::HashSet<String> = config
1107            .agents
1108            .values()
1109            .filter(|a| a.enabled)
1110            .flat_map(|a| a.channels.iter().map(|c| c.as_str().to_string()))
1111            .collect();
1112        config
1113            .channels
1114            .gmail_push
1115            .iter()
1116            .find(|(alias, _)| active.contains(&format!("gmail_push.{alias}")))
1117            .map(|(alias, gp)| {
1118                let alias = alias.clone();
1119                let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
1120                    let cfg_arc = config_state.clone();
1121                    let alias = alias.clone();
1122                    Arc::new(move || cfg_arc.read().channel_external_peers("gmail_push", &alias))
1123                };
1124                Arc::new(GmailPushChannel::new(gp.clone(), alias, peer_resolver))
1125            })
1126    };
1127
1128    // ── Session persistence for WS chat ─────────────────────
1129    // Routes through `make_session_backend` so `[channels].session_backend`
1130    // is the single source of truth for which backend stores sessions.
1131    // Picking `"jsonl"` would otherwise leave gateway WS sessions writing
1132    // to SQLite while channel + tool reads went to JSONL — the original
1133    // #5769 split, just on a different backend pairing.
1134    let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
1135        match zeroclaw_infra::make_session_backend(
1136            &config.data_dir,
1137            &config.channels.session_backend,
1138        ) {
1139            Ok(backend) => {
1140                ::zeroclaw_log::record!(
1141                    INFO,
1142                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1143                    &format!(
1144                        "Gateway session persistence enabled (backend={})",
1145                        config.channels.session_backend
1146                    )
1147                );
1148                if config.gateway.session_ttl_hours > 0
1149                    && let Ok(cleaned) = backend.cleanup_stale(config.gateway.session_ttl_hours)
1150                    && cleaned > 0
1151                {
1152                    ::zeroclaw_log::record!(
1153                        INFO,
1154                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1155                            .with_attrs(::serde_json::json!({"cleaned": cleaned})),
1156                        "Cleaned up stale gateway sessions"
1157                    );
1158                }
1159                Some(backend)
1160            }
1161            Err(e) => {
1162                ::zeroclaw_log::record!(
1163                    WARN,
1164                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1165                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1166                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1167                    "Session persistence disabled"
1168                );
1169                None
1170            }
1171        }
1172    } else {
1173        None
1174    };
1175
1176    // ── Pairing guard ──────────────────────────────────────
1177    let pairing = Arc::new(PairingGuard::new(
1178        config.gateway.require_pairing,
1179        &config.gateway.paired_tokens,
1180    ));
1181    let rate_limit_max_keys = normalize_max_keys(
1182        config.gateway.rate_limit_max_keys,
1183        RATE_LIMIT_MAX_KEYS_DEFAULT,
1184    );
1185    let rate_limiter = Arc::new(GatewayRateLimiter::new(
1186        config.gateway.pair_rate_limit_per_minute,
1187        config.gateway.webhook_rate_limit_per_minute,
1188        rate_limit_max_keys,
1189    ));
1190    let idempotency_max_keys = normalize_max_keys(
1191        config.gateway.idempotency_max_keys,
1192        IDEMPOTENCY_MAX_KEYS_DEFAULT,
1193    );
1194    let idempotency_store = Arc::new(IdempotencyStore::new(
1195        Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
1196        idempotency_max_keys,
1197    ));
1198
1199    // Resolve optional path prefix for reverse-proxy deployments.
1200    let path_prefix: Option<&str> = config
1201        .gateway
1202        .path_prefix
1203        .as_deref()
1204        .filter(|p| !p.is_empty());
1205
1206    // ── Tunnel ────────────────────────────────────────────────
1207    let tunnel = match zeroclaw_runtime::tunnel::create_tunnel(&config.tunnel) {
1208        Ok(t) => t,
1209        Err(e) => {
1210            ::zeroclaw_log::record!(
1211                WARN,
1212                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1213                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1214                    .with_attrs(::serde_json::json!({
1215                        "tunnel_provider": config.tunnel.tunnel_provider,
1216                        "error": format!("{e}"),
1217                    })),
1218                "Gateway: tunnel adapter failed to construct; booting without a \
1219                 tunnel. Fix [tunnel] and POST /admin/reload."
1220            );
1221            None
1222        }
1223    };
1224    let mut tunnel_url: Option<String> = None;
1225
1226    if let Some(ref tun) = tunnel {
1227        println!("🔗 Starting {} tunnel...", tun.name());
1228        match tun.start(host, actual_port).await {
1229            Ok(url) => {
1230                println!("🌐 Tunnel active: {url}");
1231                tunnel_url = Some(url);
1232            }
1233            Err(e) => {
1234                println!("⚠️  Tunnel failed to start: {e}");
1235                println!("   Falling back to local-only mode.");
1236            }
1237        }
1238    }
1239
1240    // Resolve web_dist_dir: explicit config (when valid) → auto-detect.
1241    // Treat the configured path as advisory — if it doesn't contain
1242    // index.html on this machine (stale/leaked path from another host,
1243    // typo, missing build), fall back to auto-detect rather than hard-
1244    // failing every dashboard request. We log the demotion so the
1245    // operator can spot a misconfigured path.
1246    let auto_detect_web_dist = || -> Option<std::path::PathBuf> {
1247        let mut candidates = vec![
1248            // Relative to CWD (development: running from repo root)
1249            std::path::PathBuf::from("web/dist"),
1250            // Relative to binary (installed alongside binary)
1251            std::env::current_exe()
1252                .ok()
1253                .and_then(|p| p.parent().map(|d| d.join("web/dist")))
1254                .unwrap_or_default(),
1255            // Docker / packaged layout
1256            std::path::PathBuf::from("/zeroclaw-data/web/dist"),
1257            // AUR / system package
1258            std::path::PathBuf::from("/usr/share/zeroclawlabs/web/dist"),
1259        ];
1260        // XDG data home (prebuilt binary installer)
1261        if let Some(data_dir) = dirs_data_local() {
1262            candidates.push(data_dir.join("zeroclaw/web/dist"));
1263        }
1264        candidates
1265            .into_iter()
1266            .find(|p| !p.as_os_str().is_empty() && p.join("index.html").is_file())
1267    };
1268
1269    let web_dist_dir: Option<std::path::PathBuf> = match config
1270        .gateway
1271        .web_dist_dir
1272        .as_ref()
1273        .map(std::path::PathBuf::from)
1274    {
1275        Some(explicit) if explicit.join("index.html").is_file() => Some(explicit),
1276        Some(stale) => {
1277            ::zeroclaw_log::record!(
1278                WARN,
1279                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1280                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1281                    .with_attrs(::serde_json::json!({"configured": stale.display().to_string()})),
1282                "gateway.web_dist_dir points at a path that doesn't contain index.html on \
1283                 this machine; falling back to auto-detect. Update or remove the setting in \
1284                 config.toml to silence this warning."
1285            );
1286            auto_detect_web_dist()
1287        }
1288        None => auto_detect_web_dist(),
1289    };
1290
1291    if let Some(ref dir) = web_dist_dir {
1292        ::zeroclaw_log::record!(
1293            INFO,
1294            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1295            &format!("Web dashboard: serving from {}", dir.display().to_string())
1296        );
1297    } else if config.gateway.web_dist_dir.is_some() {
1298        ::zeroclaw_log::record!(
1299            INFO,
1300            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1301            "Web dashboard: not available — configured gateway.web_dist_dir is missing on \
1302             this machine and no fallback location was found. Build with `cargo web build` \
1303             and point gateway.web_dist_dir at the resulting web/dist directory."
1304        );
1305    } else {
1306        ::zeroclaw_log::record!(
1307            INFO,
1308            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1309            "Web dashboard: not available — no web/dist found. Build with `cargo web build` \
1310             and point gateway.web_dist_dir at the resulting web/dist directory."
1311        );
1312    }
1313
1314    let pfx = path_prefix.unwrap_or("");
1315    println!("🦀 ZeroClaw Gateway listening on http://{display_addr}{pfx}");
1316    if let Some(ref url) = tunnel_url {
1317        println!("  🌐 Public URL: {url}");
1318    }
1319    println!("  🌐 Web Dashboard: http://{display_addr}{pfx}/");
1320    if let Some(code) = pairing.pairing_code() {
1321        println!();
1322        println!("  🔐 PAIRING REQUIRED — use this one-time code:");
1323        println!("     ┌──────────────┐");
1324        println!("     │  {code}  │");
1325        println!("     └──────────────┘");
1326        println!("     Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
1327    } else if pairing.require_pairing() {
1328        println!("  🔒 Pairing: ACTIVE (bearer token required)");
1329        println!(
1330            "     To pair a new device: {}",
1331            format_paircode_recovery_command(host, actual_port)
1332        );
1333        println!(
1334            "     Fallback: {}",
1335            format_paircode_recovery_curl(host, actual_port, pfx)
1336        );
1337        println!();
1338    } else {
1339        println!("  ⚠️  Pairing: DISABLED (all requests accepted)");
1340        println!();
1341    }
1342    println!("  POST {pfx}/pair      — pair a new client (X-Pairing-Code header)");
1343    println!("  POST {pfx}/webhook   — {{\"message\": \"your prompt\"}}");
1344    #[cfg(feature = "channel-whatsapp-cloud")]
1345    if whatsapp_channel.is_some() {
1346        println!("  GET  {pfx}/whatsapp  — Meta webhook verification");
1347        println!("  POST {pfx}/whatsapp  — WhatsApp message webhook");
1348    }
1349    #[cfg(feature = "channel-linq")]
1350    if !linq_channels.is_empty() {
1351        for alias in linq_channels.keys() {
1352            println!("  POST {pfx}/linq/{alias} — Linq SMS webhook ({alias})");
1353        }
1354    }
1355    #[cfg(feature = "channel-wati")]
1356    if wati_channel.is_some() {
1357        println!("  GET  {pfx}/wati      — WATI webhook verification");
1358        println!("  POST {pfx}/wati      — WATI message webhook");
1359    }
1360    #[cfg(feature = "channel-nextcloud")]
1361    if nextcloud_talk_channel.is_some() {
1362        println!("  POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
1363    }
1364    println!("  GET  {pfx}/api/*     — REST API (bearer token required)");
1365    println!("  GET  {pfx}/ws/chat   — WebSocket agent chat");
1366    if config.nodes.enabled {
1367        println!("  GET  {pfx}/ws/nodes  — WebSocket node discovery");
1368    }
1369    println!("  GET  {pfx}/health    — health check");
1370    println!("  GET  {pfx}/metrics   — Prometheus metrics");
1371    println!("  Press Ctrl+C to stop.\n");
1372
1373    zeroclaw_runtime::health::mark_component_ok("gateway");
1374
1375    // Fire gateway start hook
1376    if let Some(ref hooks) = hooks {
1377        hooks.fire_gateway_start(host, actual_port).await;
1378    }
1379
1380    // Install the SSE broadcast hook before building any observer so that
1381    // events emitted by the agent's per-call observer (built inside
1382    // `process_message`) also reach `/api/events`. The state-level observer
1383    // is just the configured backend — `TeeObserver` (created by
1384    // `create_observer`) tees its events into the hook automatically.
1385    let broadcast_layer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::new(
1386        sse::BroadcastObserver::new(event_tx.clone(), event_buffer.clone()),
1387    );
1388    let broadcast_hook_guard =
1389        zeroclaw_runtime::observability::set_scoped_broadcast_hook(broadcast_layer);
1390
1391    // Install the same broadcast sender as zeroclaw-log's canonical
1392    // hook so that every event emitted through `record!` / `record_event`
1393    // also reaches `/api/events`. The Observer-trait hook above stays
1394    // wired for legacy `observer.record_event(ObserverEvent::...)`
1395    // callers that haven't migrated to `record!` yet.
1396    zeroclaw_log::set_broadcast_hook(event_tx.clone());
1397
1398    // Bound into AppState. Not a broadcaster — the broadcaster is the
1399    // `broadcast_layer` installed above as the global hook. This is the
1400    // configured backend (Log/Prometheus/...) wrapped by `TeeObserver`,
1401    // which tees events into the hook on every record.
1402    let state_observer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::from(
1403        zeroclaw_runtime::observability::create_observer(&config.observability),
1404    );
1405
1406    let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
1407
1408    // Node registry for dynamic node discovery
1409    let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
1410
1411    // Device registry and pairing store (only when pairing is required)
1412    let device_registry = if config.gateway.require_pairing {
1413        Some(Arc::new(api_pairing::DeviceRegistry::new(&config.data_dir)))
1414    } else {
1415        None
1416    };
1417    let pending_pairings = if config.gateway.require_pairing {
1418        Some(Arc::new(api_pairing::PairingStore::new()))
1419    } else {
1420        None
1421    };
1422
1423    let state = AppState {
1424        config: config_state,
1425        model_provider,
1426        model,
1427        temperature,
1428        mem,
1429        memory_strategy,
1430        auto_save: config.memory.auto_save,
1431        webhook_secret_hash,
1432        pairing,
1433        trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1434        rate_limiter,
1435        auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1436        idempotency_store,
1437        #[cfg(feature = "channel-whatsapp-cloud")]
1438        whatsapp: whatsapp_channel,
1439        #[cfg(feature = "channel-whatsapp-cloud")]
1440        whatsapp_app_secret,
1441        #[cfg(feature = "channel-linq")]
1442        linq: linq_channels,
1443        #[cfg(feature = "channel-linq")]
1444        linq_signing_secrets,
1445        #[cfg(feature = "channel-nextcloud")]
1446        nextcloud_talk: nextcloud_talk_channel,
1447        #[cfg(feature = "channel-nextcloud")]
1448        nextcloud_talk_webhook_secret,
1449        #[cfg(feature = "channel-wati")]
1450        wati: wati_channel,
1451        #[cfg(feature = "channel-email")]
1452        gmail_push: gmail_push_channel,
1453        observer: state_observer,
1454        tools_registry,
1455        cost_tracker,
1456        event_tx,
1457        event_buffer,
1458        shutdown_tx,
1459        reload_tx,
1460        node_registry,
1461        session_backend,
1462        session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1463        device_registry,
1464        pending_pairings,
1465        path_prefix: path_prefix.unwrap_or("").to_string(),
1466        web_dist_dir,
1467        canvas_store,
1468        cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
1469        pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1470        tui_registry,
1471        #[cfg(feature = "webauthn")]
1472        webauthn: if config.security.webauthn.enabled {
1473            let secret_store = Arc::new(zeroclaw_runtime::security::SecretStore::new(
1474                &config.data_dir,
1475                true,
1476            ));
1477            let wa_config = zeroclaw_runtime::security::webauthn::WebAuthnConfig {
1478                enabled: true,
1479                rp_id: config.security.webauthn.rp_id.clone(),
1480                rp_origin: config.security.webauthn.rp_origin.clone(),
1481                rp_name: config.security.webauthn.rp_name.clone(),
1482            };
1483            Some(Arc::new(api_webauthn::WebAuthnState {
1484                manager: zeroclaw_runtime::security::webauthn::WebAuthnManager::new(
1485                    wa_config,
1486                    secret_store,
1487                    &config.data_dir,
1488                ),
1489                pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1490                pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1491            }))
1492        } else {
1493            None
1494        },
1495    };
1496
1497    // Build router with middleware
1498    let inner = Router::new()
1499        // ── Admin routes (for CLI management) ──
1500        .route("/admin/shutdown", post(handle_admin_shutdown))
1501        .route("/admin/reload", post(handle_admin_reload))
1502        .route("/admin/paircode", get(handle_admin_paircode))
1503        .route("/admin/paircode/new", post(handle_admin_paircode_new))
1504        // ── Existing routes ──
1505        .route("/health", get(handle_health))
1506        .route("/metrics", get(handle_metrics))
1507        .route("/pair", post(handle_pair))
1508        .route("/pair/code", get(handle_pair_code))
1509        .route("/webhook", post(handle_webhook))
1510        .merge(optional_channel_routes())
1511        // ── Claude Code runner hooks ──
1512        .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1513        // ── Web Dashboard API routes ──
1514        .route("/api/status", get(api::handle_api_status))
1515        .route("/api/logs", get(api_logs::handle_api_logs))
1516        .route(
1517            "/api/config",
1518            get(api_config::handle_config_get)
1519                .patch(api_config::handle_patch)
1520                .options(api_config::handle_options_config),
1521        )
1522        .route(
1523            "/api/config/prop",
1524            get(api_config::handle_prop_get)
1525                .put(api_config::handle_prop_put)
1526                .delete(api_config::handle_prop_delete)
1527                .options(api_config::handle_options_prop),
1528        )
1529        .route("/api/config/list", get(api_config::handle_list))
1530        .route("/api/config/drift", get(api_config::handle_drift))
1531        .route(
1532            "/api/config/reload-status",
1533            get(api_config::handle_reload_status),
1534        )
1535        .route("/api/config/templates", get(api_config::handle_templates))
1536        .route("/api/config/map-keys", get(api_config::handle_get_map_keys))
1537        .route(
1538            "/api/config/map-key",
1539            post(api_config::handle_map_key).delete(api_config::handle_delete_map_key),
1540        )
1541        .route("/api/config/rename-map-key", post(api_config::handle_rename_map_key))
1542        .route("/api/config/catalog", get(api_sections::handle_catalog))
1543        .route(
1544            "/api/config/catalog/models",
1545            get(api_sections::handle_catalog_models),
1546        )
1547        .route("/api/config/status", get(api_sections::handle_section_status))
1548        .route(
1549            "/api/config/agent-options",
1550            get(api_sections::handle_agent_options),
1551        )
1552        .route("/api/config/sections", get(api_sections::handle_sections))
1553        .route(
1554            "/api/config/sections/{section}",
1555            get(api_sections::handle_section_picker),
1556        )
1557        .route(
1558            "/api/config/sections/{section}/items/{key}",
1559            post(api_sections::handle_section_select),
1560        )
1561        .route("/api/personality", get(api_personality::handle_index))
1562        .route(
1563            "/api/quickstart/state",
1564            get(api_quickstart::handle_state),
1565        )
1566        .route(
1567            "/api/quickstart/fields",
1568            post(api_quickstart::handle_fields),
1569        )
1570        .route(
1571            "/api/quickstart/validate",
1572            post(api_quickstart::handle_validate),
1573        )
1574        .route(
1575            "/api/quickstart/apply",
1576            post(api_quickstart::handle_apply),
1577        )
1578        .route(
1579            "/api/quickstart/dismiss",
1580            post(api_quickstart::handle_dismiss),
1581        )
1582        .route(
1583            "/api/personality/templates",
1584            get(api_personality::handle_templates),
1585        )
1586        .route(
1587            "/api/personality/{filename}",
1588            get(api_personality::handle_get).put(api_personality::handle_put),
1589        )
1590        .route("/api/browse", get(api_browse::handle_browse))
1591        .route("/api/browse/mkdir", post(api_browse::handle_browse_mkdir))
1592        .route("/api/browse/rmdir", delete(api_browse::handle_browse_rmdir))
1593        .route(
1594            "/api/agents/{alias}/workspace/list",
1595            get(api_browse::handle_agent_workspace_list),
1596        )
1597        .route(
1598            "/api/agents/{alias}/workspace/read",
1599            get(api_browse::handle_agent_workspace_read),
1600        )
1601        .route(
1602            "/api/agents/{alias}/workspace/path",
1603            delete(api_browse::handle_agent_workspace_delete),
1604        )
1605        .route(
1606            "/api/agents/{alias}/workspace/move",
1607            post(api_browse::handle_agent_workspace_move),
1608        )
1609        .route(
1610            "/api/agents/{alias}/workspace/mkdir",
1611            post(api_browse::handle_agent_workspace_mkdir),
1612        )
1613        .route("/api/skills/bundles", get(api_skills::handle_list_bundles))
1614        .route(
1615            "/api/skills/bundles/{alias}/skills",
1616            get(api_skills::handle_list_skills).post(api_skills::handle_create_skill),
1617        )
1618        .route(
1619            "/api/skills/bundles/{alias}/skills/{name}",
1620            get(api_skills::handle_read_skill)
1621                .put(api_skills::handle_write_skill)
1622                .delete(api_skills::handle_delete_skill),
1623        )
1624        .route("/api/config/init", post(api_config::handle_init))
1625        .route("/api/config/migrate", post(api_config::handle_migrate))
1626        .route("/api/openapi.json", get(openapi::handle_openapi_json))
1627        .route("/api/docs", get(openapi::handle_docs))
1628        .route("/api/tools", get(api::handle_api_tools))
1629        .route("/api/cron", get(api::handle_api_cron_list))
1630        .route("/api/cron", post(api::handle_api_cron_add))
1631        .route(
1632            "/api/cron/settings",
1633            get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1634        )
1635        .route(
1636            "/api/cron/{id}",
1637            delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1638        )
1639        .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1640        // Note: `/api/cron/{id}/run` is registered on a separate router below
1641        // with a longer TimeoutLayer — manual cron triggers run the job
1642        // synchronously and routinely exceed the 30s gateway-wide default.
1643        .route("/api/integrations", get(api::handle_api_integrations))
1644        .route(
1645            "/api/integrations/settings",
1646            get(api::handle_api_integrations_settings),
1647        )
1648        .route(
1649            "/api/doctor",
1650            get(api::handle_api_doctor).post(api::handle_api_doctor),
1651        )
1652        .route("/api/memory", get(api::handle_api_memory_list))
1653        .route("/api/memory", post(api::handle_api_memory_store))
1654        .route("/api/memory/{key}", delete(api::handle_api_memory_delete))
1655        .route("/api/cost", get(api::handle_api_cost))
1656        .route("/api/cli-tools", get(api::handle_api_cli_tools))
1657        .route("/api/channels", get(api::handle_api_channels))
1658        .route("/api/health", get(api::handle_api_health))
1659        .route("/api/tuis", get(api::handle_api_tuis))
1660        .route("/api/sessions", get(api::handle_api_sessions_list))
1661        .route("/api/sessions/running", get(api::handle_api_sessions_running))
1662        .route(
1663            "/api/sessions/{id}/messages",
1664            get(api::handle_api_session_messages).post(api::handle_api_session_message_post),
1665        )
1666        .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1667        .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1668        .route("/api/sessions/{id}/abort", post(api::handle_api_session_abort))
1669        // ── Pairing + Device management API ──
1670        .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1671        .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1672        .route("/api/devices", get(api_pairing::list_devices))
1673        .route(
1674            "/api/devices/me/capabilities",
1675            post(api_pairing::update_my_capabilities),
1676        )
1677        .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1678        .route(
1679            "/api/devices/{id}/token/rotate",
1680            post(api_pairing::rotate_token),
1681        )
1682        // ── Live Canvas (A2UI) routes ──
1683        .route("/api/canvas", get(canvas::handle_canvas_list))
1684        .route(
1685            "/api/canvas/{id}",
1686            get(canvas::handle_canvas_get)
1687                .post(canvas::handle_canvas_post)
1688                .delete(canvas::handle_canvas_clear),
1689        )
1690        .route(
1691            "/api/canvas/{id}/history",
1692            get(canvas::handle_canvas_history),
1693        );
1694
1695    // ── WebAuthn hardware key authentication API (requires webauthn feature) ──
1696    #[cfg(feature = "webauthn")]
1697    let inner = inner
1698        .route(
1699            "/api/webauthn/register/start",
1700            post(api_webauthn::handle_register_start),
1701        )
1702        .route(
1703            "/api/webauthn/register/finish",
1704            post(api_webauthn::handle_register_finish),
1705        )
1706        .route(
1707            "/api/webauthn/auth/start",
1708            post(api_webauthn::handle_auth_start),
1709        )
1710        .route(
1711            "/api/webauthn/auth/finish",
1712            post(api_webauthn::handle_auth_finish),
1713        )
1714        .route(
1715            "/api/webauthn/credentials",
1716            get(api_webauthn::handle_list_credentials),
1717        )
1718        .route(
1719            "/api/webauthn/credentials/{id}",
1720            delete(api_webauthn::handle_delete_credential),
1721        );
1722
1723    // ── Plugin management API (requires plugins-wasm feature) ──
1724    #[cfg(feature = "plugins-wasm")]
1725    let inner = inner.route(
1726        "/api/plugins",
1727        get(api_plugins::plugin_routes::list_plugins),
1728    );
1729
1730    let inner = inner
1731        // ── SSE event stream ──
1732        .route("/api/events", get(sse::handle_sse_events))
1733        .route("/api/events/history", get(sse::handle_events_history))
1734        // ── ACP client bridge ──
1735        .route("/acp", get(acp::handle_ws_acp))
1736        // ── WebSocket agent chat ──
1737        .route("/ws/chat", get(ws::handle_ws_chat))
1738        // ── WebSocket canvas updates ──
1739        .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1740        // ── WebSocket node discovery ──
1741        .route("/ws/nodes", get(nodes::handle_ws_nodes))
1742        // ── Static assets (web dashboard) ──
1743        .route("/_app/{*path}", get(static_files::handle_static))
1744        // ── SPA fallback: non-API GET requests serve index.html ──
1745        .fallback(get(static_files::handle_spa_fallback))
1746        .with_state(state.clone())
1747        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1748        .layer(TimeoutLayer::with_status_code(
1749            StatusCode::REQUEST_TIMEOUT,
1750            Duration::from_secs(gateway_request_timeout_secs(&config.gateway)),
1751        ));
1752
1753    // Manual cron-trigger route lives on its own sub-router so it can opt out
1754    // of the 30s gateway-wide TimeoutLayer. Layers attached here travel with
1755    // the route through `merge`, so only this endpoint sees the longer
1756    // timeout.
1757    let cron_run_router: Router = Router::new()
1758        .route("/api/cron/{id}/run", post(api::handle_api_cron_run))
1759        .with_state(state)
1760        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1761        .layer(TimeoutLayer::with_status_code(
1762            StatusCode::REQUEST_TIMEOUT,
1763            Duration::from_secs(gateway_long_running_request_timeout_secs(&config.gateway)),
1764        ));
1765
1766    let inner = inner.merge(cron_run_router);
1767
1768    // Nest under path prefix when configured (axum strips prefix before routing).
1769    // nest() at "/prefix" handles both "/prefix" and "/prefix/*" but not "/prefix/"
1770    // with a trailing slash, so we add a fallback redirect for that case.
1771    let app = if let Some(prefix) = path_prefix {
1772        let redirect_target = prefix.to_string();
1773        Router::new().nest(prefix, inner).route(
1774            &format!("{prefix}/"),
1775            get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1776        )
1777    } else {
1778        inner
1779    };
1780
1781    // ── TLS / mTLS setup ───────────────────────────────────────────
1782    let tls_acceptor = match &config.gateway.tls {
1783        Some(tls_cfg) if tls_cfg.enabled => {
1784            let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1785            if has_mtls {
1786                ::zeroclaw_log::record!(
1787                    INFO,
1788                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1789                    "TLS enabled with mutual TLS (mTLS) client verification"
1790                );
1791            } else {
1792                ::zeroclaw_log::record!(
1793                    INFO,
1794                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1795                    "TLS enabled (no client certificate requirement)"
1796                );
1797            }
1798            Some(tls::build_tls_acceptor(tls_cfg)?)
1799        }
1800        _ => None,
1801    };
1802
1803    if let Some(tls_acceptor) = tls_acceptor {
1804        // Manual TLS accept loop — serves each connection via hyper.
1805        let app = app.into_make_service_with_connect_info::<SocketAddr>();
1806        let mut app = app;
1807
1808        let mut shutdown_signal = shutdown_rx;
1809        loop {
1810            tokio::select! {
1811                conn = listener.accept() => {
1812                    let (tcp_stream, remote_addr) = match conn {
1813                        Ok(pair) => pair,
1814                        Err(e) => {
1815                            if is_recoverable_accept_error(&e) {
1816                                // Transient (e.g. EMFILE under fd pressure):
1817                                // the listener is still valid. Back off
1818                                // briefly to avoid hot-spinning, then keep
1819                                // serving rather than killing the daemon (#7042).
1820                                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"error": format!("{}", e)})), "gateway accept() failed with a transient error; backing off and continuing");
1821                                tokio::time::sleep(Duration::from_millis(ACCEPT_ERROR_BACKOFF_MS)).await;
1822                                continue;
1823                            }
1824                            return Err(e.into());
1825                        }
1826                    };
1827                    let tls_acceptor = tls_acceptor.clone();
1828                    let svc = tower::MakeService::<
1829                        SocketAddr,
1830                        hyper::Request<hyper::body::Incoming>,
1831                    >::make_service(&mut app, remote_addr)
1832                    .await
1833                    .expect("infallible make_service");
1834
1835                    zeroclaw_spawn::spawn!(async move {
1836                        let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1837                            Ok(s) => s,
1838                            Err(e) => {
1839                                ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"error": format!("{}", e), "remote_addr": remote_addr})), "TLS handshake failed from");
1840                                return;
1841                            }
1842                        };
1843                        let io = hyper_util::rt::TokioIo::new(tls_stream);
1844                        let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1845                            let mut svc = svc.clone();
1846                            async move {
1847                                tower::Service::call(&mut svc, req).await
1848                            }
1849                        });
1850                        if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1851                            hyper_util::rt::TokioExecutor::new(),
1852                        )
1853                        .serve_connection(io, hyper_svc)
1854                        .await
1855                        {
1856                            ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"error": format!("{}", e), "remote_addr": remote_addr})), "connection error from");
1857                        }
1858                    });
1859                }
1860                _ = shutdown_signal.changed() => {
1861                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "ZeroClaw Gateway shutting down");
1862                    break;
1863                }
1864            }
1865        }
1866    } else {
1867        // Plain TCP — use axum's built-in serve.
1868        axum::serve(
1869            listener,
1870            app.into_make_service_with_connect_info::<SocketAddr>(),
1871        )
1872        .with_graceful_shutdown(async move {
1873            let _ = shutdown_rx.changed().await;
1874            ::zeroclaw_log::record!(
1875                INFO,
1876                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1877                "ZeroClaw Gateway shutting down"
1878            );
1879        })
1880        .await?;
1881    }
1882
1883    drop(broadcast_hook_guard);
1884
1885    Ok(())
1886}
1887
1888fn format_paircode_recovery_command(host: &str, port: u16) -> String {
1889    let mut cmd = format!("zeroclaw gateway get-paircode --new --port {port}");
1890    if let Some(host_arg) = paircode_recovery_host_arg(host) {
1891        cmd.push_str(" --host ");
1892        cmd.push_str(host_arg);
1893    }
1894    cmd
1895}
1896
1897fn paircode_recovery_host_arg(host: &str) -> Option<&str> {
1898    match host {
1899        "127.0.0.1" | "localhost" | "::1" | "0.0.0.0" | "::" => None,
1900        _ => Some(host),
1901    }
1902}
1903
1904fn format_paircode_recovery_curl(host: &str, port: u16, path_prefix: &str) -> String {
1905    format!("curl -s -X POST http://{host}:{port}{path_prefix}/admin/paircode/new")
1906}
1907
1908// ══════════════════════════════════════════════════════════════════════════════
1909// AXUM HANDLERS
1910// ══════════════════════════════════════════════════════════════════════════════
1911
1912/// GET /health — always public (no secrets leaked)
1913async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1914    let body = serde_json::json!({
1915        "status": "ok",
1916        "paired": state.pairing.is_paired(),
1917        "require_pairing": state.pairing.require_pairing(),
1918        "runtime": zeroclaw_runtime::health::snapshot_json(),
1919    });
1920    Json(body)
1921}
1922
1923/// Prometheus content type for text exposition format.
1924const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1925
1926fn prometheus_disabled_hint() -> String {
1927    String::from(
1928        "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1929    )
1930}
1931
1932#[cfg(feature = "observability-prometheus")]
1933fn prometheus_observer_from_state(
1934    observer: &dyn zeroclaw_runtime::observability::Observer,
1935) -> Option<&zeroclaw_runtime::observability::PrometheusObserver> {
1936    // `TeeObserver::as_any` returns the primary observer, so a single direct
1937    // downcast finds the PrometheusObserver whether the state observer is the
1938    // raw backend or wrapped by the factory tee.
1939    observer
1940        .as_any()
1941        .downcast_ref::<zeroclaw_runtime::observability::PrometheusObserver>()
1942}
1943
1944/// GET /metrics — Prometheus text exposition format
1945async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1946    let body = {
1947        #[cfg(feature = "observability-prometheus")]
1948        {
1949            if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1950                prom.encode()
1951            } else {
1952                prometheus_disabled_hint()
1953            }
1954        }
1955        #[cfg(not(feature = "observability-prometheus"))]
1956        {
1957            let _ = &state;
1958            prometheus_disabled_hint()
1959        }
1960    };
1961
1962    (
1963        StatusCode::OK,
1964        [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1965        body,
1966    )
1967}
1968
1969/// POST /pair — exchange one-time code for bearer token
1970#[axum::debug_handler]
1971async fn handle_pair(
1972    State(state): State<AppState>,
1973    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1974    headers: HeaderMap,
1975) -> impl IntoResponse {
1976    let rate_key =
1977        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1978    if !state.rate_limiter.allow_pair(&rate_key) {
1979        ::zeroclaw_log::record!(
1980            WARN,
1981            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1982                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1983            "/pair rate limit exceeded"
1984        );
1985        let err = serde_json::json!({
1986            "error": "Too many pairing requests. Please retry later.",
1987            "retry_after": RATE_LIMIT_WINDOW_SECS,
1988        });
1989        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1990    }
1991
1992    // ── Auth rate limiting (brute-force protection) ──
1993    if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key) {
1994        ::zeroclaw_log::record!(
1995            WARN,
1996            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1997                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1998                .with_attrs(::serde_json::json!({"rate_key": rate_key})),
1999            "pairing auth rate limit exceeded"
2000        );
2001        let err = serde_json::json!({
2002            "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
2003            "retry_after": e.retry_after_secs,
2004        });
2005        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2006    }
2007
2008    let code = headers
2009        .get("X-Pairing-Code")
2010        .and_then(|v| v.to_str().ok())
2011        .unwrap_or("");
2012
2013    match state.pairing.try_pair(code, &rate_key).await {
2014        Ok(Some(token)) => {
2015            ::zeroclaw_log::record!(
2016                INFO,
2017                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
2018                "new client paired successfully"
2019            );
2020            if let Err(err) =
2021                Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
2022            {
2023                ::zeroclaw_log::record!(
2024                    ERROR,
2025                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2026                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2027                        .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
2028                    "pairing succeeded but token persistence failed"
2029                );
2030                let body = serde_json::json!({
2031                    "paired": true,
2032                    "persisted": false,
2033                    "token": token,
2034                    "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
2035                });
2036                return (StatusCode::OK, Json(body));
2037            }
2038
2039            let body = serde_json::json!({
2040                "paired": true,
2041                "persisted": true,
2042                "token": token,
2043                "message": "Save this token — use it as Authorization: Bearer <token>"
2044            });
2045            (StatusCode::OK, Json(body))
2046        }
2047        Ok(None) => {
2048            state.auth_limiter.record_attempt(&rate_key);
2049            ::zeroclaw_log::record!(
2050                WARN,
2051                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2052                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2053                "pairing attempt with invalid code"
2054            );
2055            let err = serde_json::json!({"error": "Invalid pairing code"});
2056            (StatusCode::FORBIDDEN, Json(err))
2057        }
2058        Err(lockout_secs) => {
2059            ::zeroclaw_log::record!(
2060                WARN,
2061                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2062                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2063                    .with_attrs(::serde_json::json!({"lockout_secs": lockout_secs})),
2064                "pairing locked out; too many failed attempts"
2065            );
2066            let err = serde_json::json!({
2067                "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
2068                "retry_after": lockout_secs
2069            });
2070            (StatusCode::TOO_MANY_REQUESTS, Json(err))
2071        }
2072    }
2073}
2074
2075pub(crate) async fn persist_pairing_tokens(
2076    config: Arc<RwLock<Config>>,
2077    pairing: &PairingGuard,
2078) -> Result<()> {
2079    let paired_tokens = pairing.tokens();
2080    // This is needed because parking_lot's guard is not Send so we clone the inner
2081    // this should be removed once async mutexes are used everywhere
2082    let mut updated_cfg = { config.read().clone() };
2083    updated_cfg.gateway.paired_tokens = paired_tokens;
2084    // Snake-case to match the prop-field name emitted by the `Configurable`
2085    // derive. Until #7156 the string used here was `gateway.paired-tokens`
2086    // (kebab); it kept working only thanks to the `-`→`_` fallback in
2087    // `resolve_dirty_segments`. Aligning all references to the snake form
2088    // removes that fallback dependency and keeps the codebase consistent.
2089    updated_cfg.mark_dirty("gateway.paired_tokens");
2090    updated_cfg
2091        .save_dirty()
2092        .await
2093        .context("Failed to persist paired tokens to config.toml")?;
2094
2095    // Keep shared runtime config in sync with persisted tokens.
2096    *config.write() = updated_cfg;
2097    Ok(())
2098}
2099
2100/// Result of a gateway chat turn. Carries the response text plus per-turn
2101/// token / cost totals captured from the cost-tracking scope (when present)
2102/// so callers can populate observer-event annotations without racing
2103/// concurrent webhook traffic that shares the same `CostTracker`.
2104struct GatewayChatOutcome {
2105    response: String,
2106    input_tokens: Option<u64>,
2107    output_tokens: Option<u64>,
2108    cost_usd: Option<f64>,
2109}
2110
2111struct UnconfiguredModelProvider;
2112
2113#[async_trait::async_trait]
2114impl ModelProvider for UnconfiguredModelProvider {
2115    async fn chat_with_system(
2116        &self,
2117        _system_prompt: Option<&str>,
2118        _message: &str,
2119        _model: &str,
2120        _temperature: Option<f64>,
2121    ) -> anyhow::Result<String> {
2122        anyhow::bail!(
2123            "needs_quickstart: gateway booted without a working model_provider. \
2124             Complete browser quickstart at /quickstart, or fix \
2125             [providers.models.<type>.<alias>] and POST /admin/reload."
2126        )
2127    }
2128}
2129
2130impl ::zeroclaw_api::attribution::Attributable for UnconfiguredModelProvider {
2131    fn role(&self) -> ::zeroclaw_api::attribution::Role {
2132        ::zeroclaw_api::attribution::Role::Provider(
2133            ::zeroclaw_api::attribution::ProviderKind::Model(
2134                ::zeroclaw_api::attribution::ModelProviderKind::Custom,
2135            ),
2136        )
2137    }
2138    fn alias(&self) -> &str {
2139        "unconfigured"
2140    }
2141}
2142
2143/// Returns a structured `needs_quickstart` error when `model` is empty
2144/// or whitespace-only, otherwise `None`. Empty model means the gateway
2145/// booted with nothing configured (fresh install). Callers refuse the
2146/// dispatch with this marker instead of calling the provider with an
2147/// empty model id. Mirrors `agent::Agent::from_config` at
2148/// request-time so `/quickstart` stays reachable.
2149fn needs_quickstart_for(model: &str) -> Option<anyhow::Error> {
2150    if model.trim().is_empty() {
2151        ::zeroclaw_log::record!(
2152            WARN,
2153            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2154                .with_outcome(::zeroclaw_log::EventOutcome::Failure),
2155            "gateway dispatch refused: no model configured (browser quickstart incomplete)"
2156        );
2157        Some(anyhow::Error::msg(
2158            "needs_quickstart: gateway has no model configured. Complete \
2159             browser quickstart at /quickstart, or set [providers.models.<type>.<alias>] \
2160             model = \"...\" before sending messages.",
2161        ))
2162    } else {
2163        None
2164    }
2165}
2166
2167/// True when `e` carries the marker produced by `needs_quickstart_for`.
2168/// Used by chat-dispatch error paths to map the marker to a 503
2169/// `needs_quickstart` HTTP response or a more accurate channel-side
2170/// reply, instead of the generic 500 / "sorry" catch-all.
2171fn is_needs_quickstart_err(e: &anyhow::Error) -> bool {
2172    e.to_string().contains("needs_quickstart")
2173}
2174
2175/// Reply text sent over a channel SDK when chat dispatch refuses
2176/// because the gateway has no model configured. Resolved through the
2177/// shared Fluent catalog (`channel-needs-quickstart-reply` in
2178/// `crates/zeroclaw-runtime/locales/<locale>/cli.ftl`) so non-English
2179/// operators see localized text instead of a Rust-side English literal.
2180fn needs_quickstart_channel_reply() -> String {
2181    i18n::get_required_cli_string("channel-needs-quickstart-reply")
2182}
2183
2184/// Full-featured chat with tools for channel and webhook handlers.
2185///
2186/// `agent_override` is the caller-requested agent alias (`/webhook?agent=`),
2187/// already validated against `config.agents` by the handler. `None` keeps the
2188/// legacy default pick (migration-synthesized "default", else first enabled).
2189async fn run_gateway_chat_with_tools(
2190    state: &AppState,
2191    message: &str,
2192    session_id: Option<&str>,
2193    agent_override: Option<&str>,
2194) -> anyhow::Result<GatewayChatOutcome> {
2195    if let Some(err) = needs_quickstart_for(&state.model) {
2196        return Err(err);
2197    }
2198
2199    // Tests exercise webhook infrastructure (idempotency, auth, autosave)
2200    // through handle_webhook, so dispatch to the mock model_provider directly
2201    // instead of bootstrapping the full agent runtime. The mock path
2202    // doesn't go through the cost-tracking scope, so usage stays None.
2203    #[cfg(test)]
2204    {
2205        let _ = (session_id, agent_override);
2206        let response = state
2207            .model_provider
2208            .chat_with_system(None, message, &state.model, state.temperature)
2209            .await?;
2210        Ok(GatewayChatOutcome {
2211            response,
2212            input_tokens: None,
2213            output_tokens: None,
2214            cost_usd: None,
2215        })
2216    }
2217
2218    #[cfg(not(test))]
2219    {
2220        let config = state.config.read().clone();
2221        let agent_alias = require_gateway_chat_agent_alias(&config, agent_override)?;
2222
2223        // Scope the cost tracking context so per-LLM-call usage flows into the
2224        // gateway's cost tracker and costs.jsonl. Without this scope, the
2225        // tracker exists on AppState but never receives any records from the
2226        // runtime tool loop. The context's per-scope `turn_usage` accumulator
2227        // also lets us read out this turn's tokens / cost after the scope
2228        // exits without racing concurrent webhook traffic that shares the
2229        // same tracker. Pricing comes from the V3 per-provider shape
2230        // (`config.providers.models[*][*].pricing`), keyed as
2231        // `<type>.<alias>` to match how the channels orchestrator builds
2232        // its `ModelProviderPricing`.
2233        let cost_tracking_context = state.cost_tracker.as_ref().map(|tracker| {
2234            let pricing: zeroclaw_runtime::agent::cost::ModelProviderPricing = config
2235                .providers
2236                .models
2237                .iter_entries()
2238                .filter(|(_, _, base)| !base.pricing.is_empty())
2239                .map(|(type_k, alias_k, base)| {
2240                    (format!("{type_k}.{alias_k}"), base.pricing.clone())
2241                })
2242                .collect();
2243            zeroclaw_runtime::agent::cost::ToolLoopCostTrackingContext::new(
2244                tracker.clone(),
2245                std::sync::Arc::new(pricing),
2246            )
2247            .with_agent_alias(&agent_alias)
2248        });
2249        let captured_usage = cost_tracking_context
2250            .as_ref()
2251            .map(|ctx| ctx.turn_usage.clone());
2252        let response = Box::pin(
2253            zeroclaw_runtime::agent::cost::TOOL_LOOP_COST_TRACKING_CONTEXT.scope(
2254                cost_tracking_context,
2255                zeroclaw_runtime::agent::process_message(config, &agent_alias, message, session_id),
2256            ),
2257        )
2258        .await?;
2259        let usage = captured_usage
2260            .map(|cell| *cell.lock())
2261            .filter(|u| u.input_tokens > 0 || u.output_tokens > 0);
2262        let (input_tokens, output_tokens, cost_usd) = match usage {
2263            Some(u) => (
2264                Some(u.input_tokens),
2265                Some(u.output_tokens),
2266                Some(u.cost_usd),
2267            ),
2268            None => (None, None, None),
2269        };
2270        Ok(GatewayChatOutcome {
2271            response,
2272            input_tokens,
2273            output_tokens,
2274            cost_usd,
2275        })
2276    }
2277}
2278
2279fn resolve_gateway_chat_agent_alias(
2280    config: &Config,
2281    agent_override: Option<&str>,
2282) -> Option<String> {
2283    agent_override
2284        .map(ToString::to_string)
2285        .or_else(|| config.resolved_runtime_agent_alias().map(str::to_owned))
2286}
2287
2288#[cfg(not(test))]
2289fn require_gateway_chat_agent_alias(
2290    config: &Config,
2291    agent_override: Option<&str>,
2292) -> anyhow::Result<String> {
2293    resolve_gateway_chat_agent_alias(config, agent_override).ok_or_else(|| {
2294        ::zeroclaw_log::record!(
2295            WARN,
2296            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2297                .with_outcome(::zeroclaw_log::EventOutcome::Failure),
2298            "webhook chat rejected: no configured [agents.<alias>] entry"
2299        );
2300        anyhow::Error::msg("webhook chat requires at least one configured [agents.<alias>] entry")
2301    })
2302}
2303
2304fn optional_channel_routes() -> Router<AppState> {
2305    let router: Router<AppState> = Router::new();
2306    #[cfg(feature = "channel-whatsapp-cloud")]
2307    let router = router
2308        .route("/whatsapp", get(handle_whatsapp_verify))
2309        .route("/whatsapp", post(handle_whatsapp_message));
2310    #[cfg(feature = "channel-linq")]
2311    let router = router.route("/linq/{alias}", post(handle_linq_webhook));
2312    #[cfg(feature = "channel-wati")]
2313    let router = router
2314        .route("/wati", get(handle_wati_verify))
2315        .route("/wati", post(handle_wati_webhook));
2316    #[cfg(feature = "channel-nextcloud")]
2317    let router = router.route("/nextcloud-talk", post(handle_nextcloud_talk_webhook));
2318    #[cfg(feature = "channel-email")]
2319    let router = router.route("/webhook/gmail", post(handle_gmail_push_webhook));
2320    router
2321}
2322
2323/// Webhook request body
2324#[derive(serde::Deserialize)]
2325pub struct WebhookBody {
2326    pub message: String,
2327}
2328
2329/// Webhook query parameters
2330#[derive(Default, serde::Deserialize)]
2331pub struct WebhookQuery {
2332    /// Configured agent alias to dispatch to. Optional — when omitted, the
2333    /// legacy pick applies (migration-synthesized "default" agent, else the
2334    /// first enabled one). Aliases mirror `WsQuery` so `/ws/chat` callers
2335    /// can reuse their query string verbatim.
2336    #[serde(default, alias = "agentAlias", alias = "agent_alias")]
2337    pub agent: Option<String>,
2338}
2339
2340/// POST /webhook — main webhook endpoint
2341async fn handle_webhook(
2342    State(state): State<AppState>,
2343    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
2344    Query(query): Query<WebhookQuery>,
2345    headers: HeaderMap,
2346    body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
2347) -> impl IntoResponse {
2348    let rate_key =
2349        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
2350    if !state.rate_limiter.allow_webhook(&rate_key) {
2351        ::zeroclaw_log::record!(
2352            WARN,
2353            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2354                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2355            "/webhook rate limit exceeded"
2356        );
2357        let err = serde_json::json!({
2358            "error": "Too many webhook requests. Please retry later.",
2359            "retry_after": RATE_LIMIT_WINDOW_SECS,
2360        });
2361        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2362    }
2363
2364    // ── Bearer token auth (pairing) with auth rate limiting ──
2365    if state.pairing.require_pairing() {
2366        if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key) {
2367            ::zeroclaw_log::record!(
2368                WARN,
2369                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2370                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2371                    .with_attrs(::serde_json::json!({"rate_key": rate_key})),
2372                "webhook: auth rate limit exceeded for"
2373            );
2374            let err = serde_json::json!({
2375                "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
2376                "retry_after": e.retry_after_secs,
2377            });
2378            return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2379        }
2380        let auth = headers
2381            .get(header::AUTHORIZATION)
2382            .and_then(|v| v.to_str().ok())
2383            .unwrap_or("");
2384        let token = auth.strip_prefix("Bearer ").unwrap_or("");
2385        if !state.pairing.is_authenticated(token) {
2386            state.auth_limiter.record_attempt(&rate_key);
2387            ::zeroclaw_log::record!(
2388                WARN,
2389                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2390                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2391                "webhook: rejected — not paired / invalid bearer token"
2392            );
2393            let err = serde_json::json!({
2394                "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
2395            });
2396            return (StatusCode::UNAUTHORIZED, Json(err));
2397        }
2398    }
2399
2400    // ── Webhook secret auth (optional, additional layer) ──
2401    if let Some(ref secret_hash) = state.webhook_secret_hash {
2402        let header_hash = headers
2403            .get("X-Webhook-Secret")
2404            .and_then(|v| v.to_str().ok())
2405            .map(str::trim)
2406            .filter(|value| !value.is_empty())
2407            .map(hash_webhook_secret);
2408        match header_hash {
2409            Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
2410            _ => {
2411                ::zeroclaw_log::record!(
2412                    WARN,
2413                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2414                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2415                    "webhook: rejected request — invalid or missing X-Webhook-Secret"
2416                );
2417                let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
2418                return (StatusCode::UNAUTHORIZED, Json(err));
2419            }
2420        }
2421    }
2422
2423    // ── Parse body ──
2424    let Json(webhook_body) = match body {
2425        Ok(b) => b,
2426        Err(e) => {
2427            ::zeroclaw_log::record!(
2428                WARN,
2429                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2430                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2431                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2432                "webhook JSON parse error"
2433            );
2434            let err = serde_json::json!({
2435                "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
2436            });
2437            return (StatusCode::BAD_REQUEST, Json(err));
2438        }
2439    };
2440
2441    // ── Per-request agent dispatch (optional `?agent=` query param) ──
2442    // Validate before idempotency / autosave so a typo'd alias doesn't
2443    // consume the caller's idempotency key. Mirrors the `/ws/chat`
2444    // unknown-agent rejection.
2445    let agent_override = query
2446        .agent
2447        .as_deref()
2448        .map(str::trim)
2449        .filter(|s| !s.is_empty());
2450    if let Some(alias) = agent_override {
2451        let cfg = state.config.read();
2452        if cfg.agent(alias).is_none() {
2453            ::zeroclaw_log::record!(
2454                WARN,
2455                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2456                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2457                    .with_attrs(::serde_json::json!({"agent": alias})),
2458                "webhook: rejected — unknown agent alias"
2459            );
2460            let err = serde_json::json!({
2461                "error": format!(
2462                    "Unknown agent `{alias}` — no [agents.{alias}] entry configured."
2463                )
2464            });
2465            return (StatusCode::BAD_REQUEST, Json(err));
2466        }
2467    }
2468
2469    // ── Idempotency (optional) ──
2470    if let Some(idempotency_key) = headers
2471        .get("X-Idempotency-Key")
2472        .and_then(|v| v.to_str().ok())
2473        .map(str::trim)
2474        .filter(|value| !value.is_empty())
2475        && !state.idempotency_store.record_if_new(idempotency_key)
2476    {
2477        ::zeroclaw_log::record!(
2478            INFO,
2479            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2480                .with_attrs(::serde_json::json!({"idempotency_key": idempotency_key})),
2481            "webhook duplicate ignored"
2482        );
2483        let body = serde_json::json!({
2484            "status": "duplicate",
2485            "idempotent": true,
2486            "message": "Request already processed for this idempotency key"
2487        });
2488        return (StatusCode::OK, Json(body));
2489    }
2490
2491    let message = &webhook_body.message;
2492    let session_id = webhook_session_id(&headers);
2493
2494    if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(message) {
2495        let key = webhook_memory_key();
2496        let _ = state
2497            .mem
2498            .store(
2499                &key,
2500                message,
2501                MemoryCategory::Conversation,
2502                session_id.as_deref(),
2503            )
2504            .await;
2505    }
2506
2507    let (provider_label, model_label) = {
2508        let cfg = state.config.read();
2509        let resolved_agent_alias = resolve_gateway_chat_agent_alias(&cfg, agent_override);
2510        let resolved_provider = resolved_agent_alias
2511            .as_deref()
2512            .and_then(|alias| cfg.resolved_model_provider_for_agent(alias));
2513        let provider_label = resolved_provider
2514            .as_ref()
2515            .map(|(ty, alias, _)| format!("{ty}.{alias}"))
2516            .unwrap_or_else(|| "unknown".to_string());
2517        let model_label = resolved_provider
2518            .and_then(|(_, _, entry)| {
2519                entry
2520                    .model
2521                    .as_deref()
2522                    .map(str::trim)
2523                    .filter(|model| !model.is_empty())
2524                    .map(ToString::to_string)
2525            })
2526            .or_else(|| cfg.resolve_default_model())
2527            .unwrap_or_else(|| "<unresolved>".to_string());
2528        (provider_label, model_label)
2529    };
2530    let started_at = Instant::now();
2531
2532    state.observer.record_event(
2533        &zeroclaw_runtime::observability::ObserverEvent::AgentStart {
2534            model_provider: provider_label.clone(),
2535            model: model_label.clone(),
2536        },
2537    );
2538    state.observer.record_event(
2539        &zeroclaw_runtime::observability::ObserverEvent::LlmRequest {
2540            model_provider: provider_label.clone(),
2541            model: model_label.clone(),
2542            messages_count: 1,
2543        },
2544    );
2545
2546    match run_gateway_chat_with_tools(&state, message, session_id.as_deref(), agent_override).await
2547    {
2548        Ok(GatewayChatOutcome {
2549            response,
2550            input_tokens,
2551            output_tokens,
2552            cost_usd,
2553        }) => {
2554            let duration = started_at.elapsed();
2555            // Per-turn token / cost annotation captured from the cost-tracking
2556            // scope inside `run_gateway_chat_with_tools` (None outside of test
2557            // / when no LLM call recorded). Cost is also persisted to
2558            // /api/cost and costs.jsonl via the same scope.
2559            let tokens_used = input_tokens
2560                .zip(output_tokens)
2561                .map(|(i, o)| i + o)
2562                .or(input_tokens)
2563                .or(output_tokens);
2564            state.observer.record_event(
2565                &zeroclaw_runtime::observability::ObserverEvent::LlmResponse {
2566                    model_provider: provider_label.clone(),
2567                    model: model_label.clone(),
2568                    duration,
2569                    success: true,
2570                    error_message: None,
2571                    input_tokens: None,
2572                    output_tokens: None,
2573                },
2574            );
2575            state.observer.record_metric(
2576                &zeroclaw_runtime::observability::traits::ObserverMetric::RequestLatency(duration),
2577            );
2578            state.observer.record_event(
2579                &zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
2580                    model_provider: provider_label,
2581                    model: model_label.clone(),
2582                    duration,
2583                    tokens_used,
2584                    cost_usd,
2585                },
2586            );
2587
2588            let body = serde_json::json!({"response": response, "model": model_label});
2589            (StatusCode::OK, Json(body))
2590        }
2591        Err(e) => {
2592            let duration = started_at.elapsed();
2593            let sanitized = zeroclaw_providers::sanitize_api_error(&e.to_string());
2594
2595            state.observer.record_event(
2596                &zeroclaw_runtime::observability::ObserverEvent::LlmResponse {
2597                    model_provider: provider_label.clone(),
2598                    model: model_label.clone(),
2599                    duration,
2600                    success: false,
2601                    error_message: Some(sanitized.clone()),
2602                    input_tokens: None,
2603                    output_tokens: None,
2604                },
2605            );
2606            state.observer.record_metric(
2607                &zeroclaw_runtime::observability::traits::ObserverMetric::RequestLatency(duration),
2608            );
2609            state
2610                .observer
2611                .record_event(&zeroclaw_runtime::observability::ObserverEvent::Error {
2612                    component: "gateway".to_string(),
2613                    message: sanitized.clone(),
2614                });
2615            state.observer.record_event(
2616                &zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
2617                    model_provider: provider_label,
2618                    model: model_label,
2619                    duration,
2620                    tokens_used: None,
2621                    cost_usd: None,
2622                },
2623            );
2624
2625            if is_needs_quickstart_err(&e) {
2626                ::zeroclaw_log::record!(
2627                    WARN,
2628                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2629                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2630                    "Webhook chat refused: gateway has no model configured; \
2631                     visit /quickstart"
2632                );
2633                let body = serde_json::json!({
2634                    "error": "needs_quickstart",
2635                    "url": "/quickstart"
2636                });
2637                (StatusCode::SERVICE_UNAVAILABLE, Json(body))
2638            } else {
2639                ::zeroclaw_log::record!(
2640                    ERROR,
2641                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2642                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2643                        .with_attrs(::serde_json::json!({"error": sanitized})),
2644                    "webhook model_provider error"
2645                );
2646                let err = serde_json::json!({"error": "LLM request failed"});
2647                (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
2648            }
2649        }
2650    }
2651}
2652
2653/// `WhatsApp` verification query params
2654#[derive(serde::Deserialize)]
2655pub struct WhatsAppVerifyQuery {
2656    #[serde(rename = "hub.mode")]
2657    pub mode: Option<String>,
2658    #[serde(rename = "hub.verify_token")]
2659    pub verify_token: Option<String>,
2660    #[serde(rename = "hub.challenge")]
2661    pub challenge: Option<String>,
2662}
2663
2664/// GET /whatsapp — Meta webhook verification
2665#[cfg(feature = "channel-whatsapp-cloud")]
2666async fn handle_whatsapp_verify(
2667    State(state): State<AppState>,
2668    Query(params): Query<WhatsAppVerifyQuery>,
2669) -> impl IntoResponse {
2670    let Some(ref wa) = state.whatsapp else {
2671        return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
2672    };
2673
2674    // Verify the token matches (constant-time comparison to prevent timing attacks)
2675    let token_matches = params
2676        .verify_token
2677        .as_deref()
2678        .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
2679    if params.mode.as_deref() == Some("subscribe") && token_matches {
2680        if let Some(ch) = params.challenge {
2681            ::zeroclaw_log::record!(
2682                INFO,
2683                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2684                    .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2685                "webhook verified successfully"
2686            );
2687            return (StatusCode::OK, ch);
2688        }
2689        return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
2690    }
2691
2692    ::zeroclaw_log::record!(
2693        WARN,
2694        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2695            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2696            .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2697        "webhook verification failed — token mismatch"
2698    );
2699    (StatusCode::FORBIDDEN, "Forbidden".to_string())
2700}
2701
2702/// Verify `WhatsApp` webhook signature (`X-Hub-Signature-256`).
2703/// Returns true if the signature is valid, false otherwise.
2704/// See: <https://developers.facebook.com/docs/graph-api/webhooks/getting-started#verification-requests>
2705pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
2706    use hmac::{Hmac, Mac};
2707    use sha2::Sha256;
2708
2709    // Signature format: "sha256=<hex_signature>"
2710    let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
2711        return false;
2712    };
2713
2714    // Decode hex signature
2715    let Ok(expected) = hex::decode(hex_sig) else {
2716        return false;
2717    };
2718
2719    // Compute HMAC-SHA256
2720    let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
2721        return false;
2722    };
2723    mac.update(body);
2724
2725    // Constant-time comparison
2726    mac.verify_slice(&expected).is_ok()
2727}
2728
2729/// POST /whatsapp — incoming message webhook
2730#[cfg(feature = "channel-whatsapp-cloud")]
2731async fn handle_whatsapp_message(
2732    State(state): State<AppState>,
2733    headers: HeaderMap,
2734    body: Bytes,
2735) -> impl IntoResponse {
2736    let Some(ref wa) = state.whatsapp else {
2737        return (
2738            StatusCode::NOT_FOUND,
2739            Json(serde_json::json!({"error": "WhatsApp not configured"})),
2740        );
2741    };
2742
2743    // ── Security: Verify X-Hub-Signature-256 if app_secret is configured ──
2744    if let Some(ref app_secret) = state.whatsapp_app_secret {
2745        let signature = headers
2746            .get("X-Hub-Signature-256")
2747            .and_then(|v| v.to_str().ok())
2748            .unwrap_or("");
2749
2750        if !verify_whatsapp_signature(app_secret, &body, signature) {
2751            ::zeroclaw_log::record!(
2752                WARN,
2753                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2754                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2755                    .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2756                &format!(
2757                    "webhook signature verification failed (signature: {})",
2758                    if signature.is_empty() {
2759                        "missing"
2760                    } else {
2761                        "invalid"
2762                    }
2763                )
2764            );
2765            return (
2766                StatusCode::UNAUTHORIZED,
2767                Json(serde_json::json!({"error": "Invalid signature"})),
2768            );
2769        }
2770    }
2771
2772    // Parse JSON body
2773    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2774        return (
2775            StatusCode::BAD_REQUEST,
2776            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2777        );
2778    };
2779
2780    // Parse messages from the webhook payload
2781    let messages = wa.parse_webhook_payload(&payload);
2782
2783    if messages.is_empty() {
2784        // Acknowledge the webhook even if no messages (could be status updates)
2785        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2786    }
2787
2788    // Process each message
2789    for msg in &messages {
2790        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "whatsapp", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2791
2792        // Route approval replies to pending approval requests before dispatching to agent
2793        if let Some((token, response)) = zeroclaw_channels::util::parse_approval_reply(&msg.content)
2794        {
2795            let mut map = wa.pending_approvals().lock().await;
2796            if let Some(sender) = map.remove(&token) {
2797                let _ = sender.send(response);
2798                continue;
2799            }
2800        }
2801
2802        let session_id = sender_session_id("whatsapp", msg);
2803
2804        // Auto-save to memory
2805        if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2806            let key = whatsapp_memory_key(msg);
2807            let _ = state
2808                .mem
2809                .store(
2810                    &key,
2811                    &msg.content,
2812                    MemoryCategory::Conversation,
2813                    Some(&session_id),
2814                )
2815                .await;
2816        }
2817
2818        match Box::pin(run_gateway_chat_with_tools(
2819            &state,
2820            &msg.content,
2821            Some(&session_id),
2822            None,
2823        ))
2824        .await
2825        {
2826            Ok(GatewayChatOutcome { response, .. }) => {
2827                // Send reply via WhatsApp
2828                if let Err(e) = wa
2829                    .send(&SendMessage::new(response, &msg.reply_target))
2830                    .await
2831                {
2832                    ::zeroclaw_log::record!(
2833                        ERROR,
2834                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2835                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2836                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2837                        "Failed to send WhatsApp reply"
2838                    );
2839                }
2840            }
2841            Err(e) => {
2842                let reply = if is_needs_quickstart_err(&e) {
2843                    ::zeroclaw_log::record!(
2844                        WARN,
2845                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2846                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2847                        "WhatsApp chat refused: gateway has no model configured; \
2848                         visit /quickstart"
2849                    );
2850                    needs_quickstart_channel_reply()
2851                } else {
2852                    ::zeroclaw_log::record!(
2853                        ERROR,
2854                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2855                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2856                            .with_attrs(
2857                                ::serde_json::json!({"channel": "whatsapp", "error": format!("{}", e)})
2858                            ),
2859                        "LLM error"
2860                    );
2861                    "Sorry, I couldn't process your message right now.".to_string()
2862                };
2863                let _ = wa.send(&SendMessage::new(reply, &msg.reply_target)).await;
2864            }
2865        }
2866    }
2867
2868    // Acknowledge the webhook
2869    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2870}
2871
2872/// POST /linq/:alias — incoming message webhook (iMessage/RCS/SMS via Linq)
2873#[cfg(feature = "channel-linq")]
2874async fn handle_linq_webhook(
2875    State(state): State<AppState>,
2876    Path(alias): Path<String>,
2877    headers: HeaderMap,
2878    body: Bytes,
2879) -> impl IntoResponse {
2880    let Some(linq) = state.linq.get(&alias) else {
2881        return (
2882            StatusCode::NOT_FOUND,
2883            Json(serde_json::json!({"error": format!("Linq alias '{alias}' not configured")})),
2884        );
2885    };
2886
2887    let body_str = String::from_utf8_lossy(&body);
2888
2889    // ── Security: Verify X-Webhook-Signature if signing_secret is configured ──
2890    if let Some(signing_secret) = state.linq_signing_secrets.get(&alias) {
2891        let timestamp = headers
2892            .get("X-Webhook-Timestamp")
2893            .and_then(|v| v.to_str().ok())
2894            .unwrap_or("");
2895
2896        let signature = headers
2897            .get("X-Webhook-Signature")
2898            .and_then(|v| v.to_str().ok())
2899            .unwrap_or("");
2900
2901        if !zeroclaw_channels::linq::verify_linq_signature(
2902            signing_secret,
2903            &body_str,
2904            timestamp,
2905            signature,
2906        ) {
2907            ::zeroclaw_log::record!(
2908                WARN,
2909                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2910                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2911                    .with_attrs(::serde_json::json!({"channel": "linq", "alias": alias})),
2912                &format!(
2913                    "Linq webhook signature verification failed for alias '{alias}' (signature: {})",
2914                    if signature.is_empty() {
2915                        "missing"
2916                    } else {
2917                        "invalid"
2918                    }
2919                )
2920            );
2921            return (
2922                StatusCode::UNAUTHORIZED,
2923                Json(serde_json::json!({"error": "Invalid signature"})),
2924            );
2925        }
2926    }
2927
2928    // Parse JSON body
2929    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2930        return (
2931            StatusCode::BAD_REQUEST,
2932            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2933        );
2934    };
2935
2936    // Parse messages from the webhook payload
2937    let messages = linq.parse_webhook_payload(&payload);
2938
2939    if messages.is_empty() {
2940        // Acknowledge the webhook even if no messages (could be status/delivery events)
2941        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2942    }
2943
2944    // Process each message
2945    for msg in &messages {
2946        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "linq", "alias": alias, "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2947        let session_id = sender_session_id("linq", msg);
2948
2949        // Auto-save to memory
2950        if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2951            let key = linq_memory_key(msg);
2952            let _ = state
2953                .mem
2954                .store(
2955                    &key,
2956                    &msg.content,
2957                    MemoryCategory::Conversation,
2958                    Some(&session_id),
2959                )
2960                .await;
2961        }
2962
2963        // Call the LLM
2964        match Box::pin(run_gateway_chat_with_tools(
2965            &state,
2966            &msg.content,
2967            Some(&session_id),
2968            None,
2969        ))
2970        .await
2971        {
2972            Ok(GatewayChatOutcome { response, .. }) => {
2973                // Send reply via Linq
2974                if let Err(e) = linq
2975                    .send(&SendMessage::new(response, &msg.reply_target))
2976                    .await
2977                {
2978                    ::zeroclaw_log::record!(
2979                        ERROR,
2980                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2981                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2982                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2983                        "Failed to send Linq reply"
2984                    );
2985                }
2986            }
2987            Err(e) => {
2988                let reply = if is_needs_quickstart_err(&e) {
2989                    ::zeroclaw_log::record!(
2990                        WARN,
2991                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2992                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2993                        "Linq chat refused: gateway has no model configured; \
2994                         visit /quickstart"
2995                    );
2996                    needs_quickstart_channel_reply()
2997                } else {
2998                    ::zeroclaw_log::record!(
2999                        ERROR,
3000                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3001                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3002                            .with_attrs(
3003                                ::serde_json::json!({"channel": "linq", "error": format!("{}", e)})
3004                            ),
3005                        "LLM error"
3006                    );
3007                    "Sorry, I couldn't process your message right now.".to_string()
3008                };
3009                let _ = linq.send(&SendMessage::new(reply, &msg.reply_target)).await;
3010            }
3011        }
3012    }
3013
3014    // Acknowledge the webhook
3015    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
3016}
3017
3018/// GET /wati — WATI webhook verification (echoes hub.challenge)
3019#[cfg(feature = "channel-wati")]
3020async fn handle_wati_verify(
3021    State(state): State<AppState>,
3022    Query(params): Query<WatiVerifyQuery>,
3023) -> impl IntoResponse {
3024    if state.wati.is_none() {
3025        return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
3026    }
3027
3028    // WATI may use Meta-style webhook verification; echo the challenge
3029    if let Some(challenge) = params.challenge {
3030        ::zeroclaw_log::record!(
3031            INFO,
3032            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3033                .with_attrs(::serde_json::json!({"channel": "wati"})),
3034            "webhook verified successfully"
3035        );
3036        return (StatusCode::OK, challenge);
3037    }
3038
3039    (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
3040}
3041
3042#[derive(Debug, serde::Deserialize)]
3043pub struct WatiVerifyQuery {
3044    #[serde(rename = "hub.challenge")]
3045    pub challenge: Option<String>,
3046}
3047
3048/// POST /wati — incoming WATI WhatsApp message webhook
3049#[cfg(feature = "channel-wati")]
3050async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
3051    let Some(ref wati) = state.wati else {
3052        return (
3053            StatusCode::NOT_FOUND,
3054            Json(serde_json::json!({"error": "WATI not configured"})),
3055        );
3056    };
3057
3058    // Parse JSON body
3059    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
3060        return (
3061            StatusCode::BAD_REQUEST,
3062            Json(serde_json::json!({"error": "Invalid JSON payload"})),
3063        );
3064    };
3065
3066    // Detect audio before the synchronous parse
3067    let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
3068
3069    let messages = if matches!(msg_type, "audio" | "voice") {
3070        // Build a synthetic ChannelMessage from the audio transcript
3071        if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
3072            wati.parse_audio_as_message(&payload, transcript)
3073        } else {
3074            vec![]
3075        }
3076    } else {
3077        wati.parse_webhook_payload(&payload)
3078    };
3079
3080    if messages.is_empty() {
3081        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
3082    }
3083
3084    // Process each message
3085    for msg in &messages {
3086        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "wati", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
3087        let session_id = sender_session_id("wati", msg);
3088
3089        // Auto-save to memory
3090        if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
3091            let key = wati_memory_key(msg);
3092            let _ = state
3093                .mem
3094                .store(
3095                    &key,
3096                    &msg.content,
3097                    MemoryCategory::Conversation,
3098                    Some(&session_id),
3099                )
3100                .await;
3101        }
3102
3103        // Call the LLM
3104        match Box::pin(run_gateway_chat_with_tools(
3105            &state,
3106            &msg.content,
3107            Some(&session_id),
3108            None,
3109        ))
3110        .await
3111        {
3112            Ok(GatewayChatOutcome { response, .. }) => {
3113                // Send reply via WATI
3114                if let Err(e) = wati
3115                    .send(&SendMessage::new(response, &msg.reply_target))
3116                    .await
3117                {
3118                    ::zeroclaw_log::record!(
3119                        ERROR,
3120                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3121                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3122                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
3123                        "Failed to send WATI reply"
3124                    );
3125                }
3126            }
3127            Err(e) => {
3128                let reply = if is_needs_quickstart_err(&e) {
3129                    ::zeroclaw_log::record!(
3130                        WARN,
3131                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3132                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3133                        "WATI chat refused: gateway has no model configured; \
3134                         visit /quickstart"
3135                    );
3136                    needs_quickstart_channel_reply()
3137                } else {
3138                    ::zeroclaw_log::record!(
3139                        ERROR,
3140                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3141                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3142                            .with_attrs(
3143                                ::serde_json::json!({"channel": "wati", "error": format!("{}", e)})
3144                            ),
3145                        "LLM error"
3146                    );
3147                    "Sorry, I couldn't process your message right now.".to_string()
3148                };
3149                let _ = wati.send(&SendMessage::new(reply, &msg.reply_target)).await;
3150            }
3151        }
3152    }
3153
3154    // Acknowledge the webhook
3155    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
3156}
3157
3158/// POST /nextcloud-talk — incoming message webhook (Nextcloud Talk bot API)
3159#[cfg(feature = "channel-nextcloud")]
3160async fn handle_nextcloud_talk_webhook(
3161    State(state): State<AppState>,
3162    headers: HeaderMap,
3163    body: Bytes,
3164) -> impl IntoResponse {
3165    let Some(ref nextcloud_talk) = state.nextcloud_talk else {
3166        return (
3167            StatusCode::NOT_FOUND,
3168            Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
3169        );
3170    };
3171
3172    let body_str = String::from_utf8_lossy(&body);
3173
3174    // ── Security: Verify Nextcloud Talk HMAC signature if secret is configured ──
3175    if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
3176        let random = headers
3177            .get("X-Nextcloud-Talk-Random")
3178            .and_then(|v| v.to_str().ok())
3179            .unwrap_or("");
3180
3181        let signature = headers
3182            .get("X-Nextcloud-Talk-Signature")
3183            .and_then(|v| v.to_str().ok())
3184            .unwrap_or("");
3185
3186        if !zeroclaw_channels::nextcloud_talk::verify_nextcloud_talk_signature(
3187            webhook_secret,
3188            random,
3189            &body_str,
3190            signature,
3191        ) {
3192            ::zeroclaw_log::record!(
3193                WARN,
3194                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3195                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3196                &format!(
3197                    "Nextcloud Talk webhook signature verification failed (signature: {})",
3198                    if signature.is_empty() {
3199                        "missing"
3200                    } else {
3201                        "invalid"
3202                    }
3203                )
3204            );
3205            return (
3206                StatusCode::UNAUTHORIZED,
3207                Json(serde_json::json!({"error": "Invalid signature"})),
3208            );
3209        }
3210    }
3211
3212    // Parse JSON body
3213    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
3214        return (
3215            StatusCode::BAD_REQUEST,
3216            Json(serde_json::json!({"error": "Invalid JSON payload"})),
3217        );
3218    };
3219
3220    // Parse messages from webhook payload
3221    let messages = nextcloud_talk.parse_webhook_payload(&payload);
3222    if messages.is_empty() {
3223        // Acknowledge webhook even if payload does not contain actionable user messages.
3224        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
3225    }
3226
3227    // Spawn per-message processing so the webhook returns 200 quickly.
3228    // Nextcloud Talk cancels webhook requests that don't complete within ~5s
3229    // (see #6156); slow local models routinely exceed that. Each message gets
3230    // its own task — the LLM call and reply are independent of the ack.
3231    for msg in messages {
3232        let state = state.clone();
3233        let nextcloud_talk = Arc::clone(nextcloud_talk);
3234        zeroclaw_spawn::spawn!(async move {
3235            ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "nextcloud_talk", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
3236            let session_id = sender_session_id("nextcloud_talk", &msg);
3237
3238            if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
3239                let key = nextcloud_talk_memory_key(&msg);
3240                let _ = state
3241                    .mem
3242                    .store(
3243                        &key,
3244                        &msg.content,
3245                        MemoryCategory::Conversation,
3246                        Some(&session_id),
3247                    )
3248                    .await;
3249            }
3250
3251            match Box::pin(run_gateway_chat_with_tools(
3252                &state,
3253                &msg.content,
3254                Some(&session_id),
3255                None,
3256            ))
3257            .await
3258            {
3259                Ok(GatewayChatOutcome { response, .. }) => {
3260                    if let Err(e) = nextcloud_talk
3261                        .send(&SendMessage::new(response, &msg.reply_target))
3262                        .await
3263                    {
3264                        ::zeroclaw_log::record!(
3265                            ERROR,
3266                            ::zeroclaw_log::Event::new(
3267                                module_path!(),
3268                                ::zeroclaw_log::Action::Fail
3269                            )
3270                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3271                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
3272                            "Failed to send Nextcloud Talk reply"
3273                        );
3274                    }
3275                }
3276                Err(e) => {
3277                    let reply = if is_needs_quickstart_err(&e) {
3278                        ::zeroclaw_log::record!(
3279                            WARN,
3280                            ::zeroclaw_log::Event::new(
3281                                module_path!(),
3282                                ::zeroclaw_log::Action::Note
3283                            )
3284                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3285                            "Nextcloud Talk chat refused: gateway has no model configured; \
3286                             visit /quickstart"
3287                        );
3288                        needs_quickstart_channel_reply()
3289                    } else {
3290                        ::zeroclaw_log::record!(ERROR, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail).with_outcome(::zeroclaw_log::EventOutcome::Failure).with_attrs(::serde_json::json!({"channel": "nextcloud_talk", "error": format!("{}", e)})), "LLM error");
3291                        "Sorry, I couldn't process your message right now.".to_string()
3292                    };
3293                    let _ = nextcloud_talk
3294                        .send(&SendMessage::new(reply, &msg.reply_target))
3295                        .await;
3296                }
3297            }
3298        });
3299    }
3300
3301    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
3302}
3303
3304/// Maximum request body size for the Gmail webhook endpoint (1 MB).
3305/// Google Pub/Sub messages are typically under 10 KB.
3306#[cfg(feature = "channel-email")]
3307const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
3308
3309/// POST /webhook/gmail — incoming Gmail Pub/Sub push notification
3310#[cfg(feature = "channel-email")]
3311async fn handle_gmail_push_webhook(
3312    State(state): State<AppState>,
3313    headers: HeaderMap,
3314    body: Bytes,
3315) -> impl IntoResponse {
3316    let Some(ref gmail_push) = state.gmail_push else {
3317        return (
3318            StatusCode::NOT_FOUND,
3319            Json(serde_json::json!({"error": "Gmail push not configured"})),
3320        );
3321    };
3322
3323    // Enforce body size limit.
3324    if body.len() > GMAIL_WEBHOOK_MAX_BODY {
3325        return (
3326            StatusCode::PAYLOAD_TOO_LARGE,
3327            Json(serde_json::json!({"error": "Request body too large"})),
3328        );
3329    }
3330
3331    // Authenticate the webhook request using a shared secret.
3332    let secret = gmail_push.config.webhook_secret.clone();
3333    if !secret.is_empty() {
3334        let provided = headers
3335            .get(axum::http::header::AUTHORIZATION)
3336            .and_then(|v| v.to_str().ok())
3337            .and_then(|auth| auth.strip_prefix("Bearer "))
3338            .unwrap_or("");
3339
3340        if provided != secret {
3341            ::zeroclaw_log::record!(
3342                WARN,
3343                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3344                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
3345                    .with_attrs(::serde_json::json!({"channel": "gmail_push"})),
3346                "webhook: unauthorized request"
3347            );
3348            return (
3349                StatusCode::UNAUTHORIZED,
3350                Json(serde_json::json!({"error": "Unauthorized"})),
3351            );
3352        }
3353    }
3354
3355    let body_str = String::from_utf8_lossy(&body);
3356    let envelope: zeroclaw_channels::gmail_push::PubSubEnvelope =
3357        match serde_json::from_str(&body_str) {
3358            Ok(e) => e,
3359            Err(e) => {
3360                ::zeroclaw_log::record!(
3361                WARN,
3362                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3363                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
3364                    .with_attrs(
3365                        ::serde_json::json!({"error": format!("{}", e), "channel": "gmail_push"})
3366                    ),
3367                "webhook: invalid payload"
3368            );
3369                return (
3370                    StatusCode::BAD_REQUEST,
3371                    Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
3372                );
3373            }
3374        };
3375
3376    // Process the notification asynchronously (non-blocking for the webhook response)
3377    let channel = Arc::clone(gmail_push);
3378    zeroclaw_spawn::spawn!(async move {
3379        if let Err(e) = channel.handle_notification(&envelope).await {
3380            ::zeroclaw_log::record!(
3381                ERROR,
3382                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3383                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3384                    .with_attrs(
3385                        ::serde_json::json!({"channel": "gmail_push", "error": format!("{}", e)})
3386                    ),
3387                "push notification processing failed"
3388            );
3389        }
3390    });
3391
3392    // Acknowledge immediately — Google Pub/Sub requires a 2xx within ~10s
3393    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
3394}
3395
3396// ══════════════════════════════════════════════════════════════════════════════
3397// ADMIN HANDLERS (for CLI management)
3398// ══════════════════════════════════════════════════════════════════════════════
3399
3400/// Response for admin endpoints
3401#[derive(serde::Serialize)]
3402struct AdminResponse {
3403    success: bool,
3404    message: String,
3405}
3406
3407/// Reject requests that do not originate from a loopback address.
3408fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
3409    if peer.ip().is_loopback() {
3410        Ok(())
3411    } else {
3412        Err((
3413            StatusCode::FORBIDDEN,
3414            Json(serde_json::json!({
3415                "error": "Admin endpoints are restricted to localhost"
3416            })),
3417        ))
3418    }
3419}
3420
3421/// POST /admin/shutdown — graceful shutdown from CLI (localhost only)
3422async fn handle_admin_shutdown(
3423    State(state): State<AppState>,
3424    ConnectInfo(peer): ConnectInfo<SocketAddr>,
3425) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3426    require_localhost(&peer)?;
3427    ::zeroclaw_log::record!(
3428        INFO,
3429        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3430        "admin shutdown request received; initiating graceful shutdown"
3431    );
3432
3433    let body = AdminResponse {
3434        success: true,
3435        message: "Gateway shutdown initiated".to_string(),
3436    };
3437
3438    let _ = state.shutdown_tx.send(true);
3439
3440    Ok((StatusCode::OK, Json(body)))
3441}
3442
3443/// Authorization decision for `POST /admin/reload`, derived purely from the
3444/// caller's loopback status, the `gateway.allow_remote_admin` flag, and
3445/// whether pairing is enabled.
3446#[derive(Debug, PartialEq, Eq)]
3447enum AdminReloadGate {
3448    /// Loopback caller (the CLI) — allow without further checks.
3449    Allow,
3450    /// Non-loopback caller, opted in with pairing on — allow only if pairing
3451    /// auth passes.
3452    RequireAuth,
3453    /// Non-loopback caller, not opted in — reject.
3454    Forbidden,
3455    /// Non-loopback caller opted in, but pairing is disabled — reject rather
3456    /// than allow an unauthenticated remote reload. `require_auth` is a no-op
3457    /// when pairing is off, so without this guard `allow_remote_admin` would
3458    /// expose reload to anonymous remote callers.
3459    ForbiddenNoPairing,
3460}
3461
3462/// Pure gate decision for `/admin/reload`. Auth enforcement (for the
3463/// `RequireAuth` case) is handled separately by the caller.
3464///
3465/// Remote access requires *both* `allow_remote_admin` and pairing: opting in
3466/// without pairing yields `ForbiddenNoPairing`, never an unauthenticated
3467/// allow.
3468fn admin_reload_gate(
3469    is_loopback: bool,
3470    allow_remote_admin: bool,
3471    require_pairing: bool,
3472) -> AdminReloadGate {
3473    if is_loopback {
3474        AdminReloadGate::Allow
3475    } else if !allow_remote_admin {
3476        AdminReloadGate::Forbidden
3477    } else if require_pairing {
3478        AdminReloadGate::RequireAuth
3479    } else {
3480        AdminReloadGate::ForbiddenNoPairing
3481    }
3482}
3483
3484/// POST /admin/reload — reload the daemon in place.
3485///
3486/// Loopback callers (the CLI) are always allowed. Non-loopback callers are
3487/// rejected unless `gateway.allow_remote_admin` is enabled *and* pairing is
3488/// on, in which case the request must also pass pairing authentication
3489/// (`require_auth`). Opting in with pairing disabled is rejected rather than
3490/// allowing an unauthenticated remote reload.
3491///
3492/// Sends `true` on the reload channel the daemon owns. The daemon's main
3493/// wait loop sees the change, returns `DaemonExit::Reload`, and the outer
3494/// loop in `src/main.rs` re-reads config from disk and re-runs
3495/// `daemon::run` — re-instantiating every subsystem (gateway / channels /
3496/// heartbeat / scheduler / mqtt) with the fresh config.
3497///
3498/// Same PID throughout. Brief HTTP downtime while the gateway listener
3499/// rebinds — typically sub-second. Clients should poll `/health` to detect
3500/// when the new instance is ready.
3501///
3502/// Cross-platform — works identically on Linux, macOS, and Windows because
3503/// the channel is in-process tokio, not an OS signal. The gateway-only
3504/// `zeroclaw gateway start` (no daemon supervisor) returns 503 with a
3505/// clear message because there's nothing to signal.
3506async fn handle_admin_reload(
3507    State(state): State<AppState>,
3508    ConnectInfo(peer): ConnectInfo<SocketAddr>,
3509    headers: HeaderMap,
3510) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3511    // Loopback (the CLI) is always allowed. A non-loopback caller is rejected
3512    // unless the operator opted in via `gateway.allow_remote_admin`, and even
3513    // then must pass pairing auth — which requires pairing to be enabled, so
3514    // opting in without pairing is rejected rather than left unauthenticated.
3515    let allow_remote = state.config.read().gateway.allow_remote_admin;
3516    // Source pairing status from the guard `require_auth` consults, not the
3517    // raw config field, so the gate's `RequireAuth` decision can never
3518    // diverge from what `require_auth` will actually enforce.
3519    let require_pairing = state.pairing.require_pairing();
3520    match admin_reload_gate(peer.ip().is_loopback(), allow_remote, require_pairing) {
3521        AdminReloadGate::Allow => {}
3522        AdminReloadGate::RequireAuth => api::require_auth(&state, &headers)?,
3523        AdminReloadGate::Forbidden => {
3524            return Err((
3525                StatusCode::FORBIDDEN,
3526                Json(serde_json::json!({
3527                    "error": "Remote admin reload is disabled. Call from localhost, \
3528                              or set gateway.allow_remote_admin = true (with pairing \
3529                              enabled, then pair) to allow authenticated remote reloads."
3530                })),
3531            ));
3532        }
3533        AdminReloadGate::ForbiddenNoPairing => {
3534            return Err((
3535                StatusCode::FORBIDDEN,
3536                Json(serde_json::json!({
3537                    "error": "Remote admin reload requires pairing. \
3538                              gateway.allow_remote_admin is enabled but \
3539                              gateway.require_pairing is off, so remote callers \
3540                              cannot be authenticated. Enable require_pairing, or \
3541                              call /admin/reload from localhost."
3542                })),
3543            ));
3544        }
3545    }
3546
3547    let Some(reload_tx) = state.reload_tx.clone() else {
3548        return Err((
3549            StatusCode::SERVICE_UNAVAILABLE,
3550            Json(serde_json::json!({
3551                "error": "no daemon supervisor — running as standalone gateway. \
3552                          Restart the process to pick up config changes."
3553            })),
3554        ));
3555    };
3556
3557    ::zeroclaw_log::record!(
3558        INFO,
3559        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3560        "admin reload request received"
3561    );
3562    // Clear the pending-reload flag before the daemon supervisor brings up
3563    // the new gateway instance. The fresh instance starts with the flag
3564    // already false, matching its "subsystems just-loaded, no pending
3565    // changes" state.
3566    state
3567        .pending_reload
3568        .store(false, std::sync::atomic::Ordering::Relaxed);
3569    // Trigger graceful shutdown of THIS gateway instance's axum::serve so
3570    // its TcpListener releases the port before the daemon supervisor
3571    // spawns the new instance. Without this, daemon::run aborts the
3572    // gateway tokio task at the next await point — but the OLD listener
3573    // can stay bound briefly, racing the NEW gateway's bind. The new
3574    // bind then fails and spawn_component_supervisor backs off; in the
3575    // meantime the OLD gateway keeps serving requests with stale
3576    // in-memory config, and `/api/config/drift` reports drift against
3577    // disk because in-memory hasn't been replaced yet. Cold restart
3578    // (process exit + start) hits this path differently because the OS
3579    // fully releases the listener — that's why the user observes "shut
3580    // down + bring up = correct" but "/admin/reload = stale".
3581    let shutdown_tx = state.shutdown_tx.clone();
3582    // Brief delay so the HTTP response flushes before tear-down begins.
3583    zeroclaw_spawn::spawn!(async move {
3584        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3585        // Drain axum first so the listener releases.
3586        let _ = shutdown_tx.send(true);
3587        // Then signal the daemon to re-read disk and re-spawn subsystems.
3588        let _ = reload_tx.send(true);
3589    });
3590
3591    Ok((
3592        StatusCode::OK,
3593        Json(AdminResponse {
3594            success: true,
3595            message: "Daemon reload initiated".to_string(),
3596        }),
3597    ))
3598}
3599
3600/// GET /admin/paircode — fetch current pairing code (localhost only)
3601async fn handle_admin_paircode(
3602    State(state): State<AppState>,
3603    ConnectInfo(peer): ConnectInfo<SocketAddr>,
3604) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3605    require_localhost(&peer)?;
3606    let code = state.pairing.pairing_code();
3607
3608    let body = if let Some(c) = code {
3609        serde_json::json!({
3610            "success": true,
3611            "pairing_required": state.pairing.require_pairing(),
3612            "pairing_code": c,
3613            "message": "Use this one-time code to pair"
3614        })
3615    } else {
3616        serde_json::json!({
3617            "success": true,
3618            "pairing_required": state.pairing.require_pairing(),
3619            "pairing_code": null,
3620            "message": if state.pairing.require_pairing() {
3621                "Pairing is active but no new code available (already paired or code expired)"
3622            } else {
3623                "Pairing is disabled for this gateway"
3624            }
3625        })
3626    };
3627
3628    Ok((StatusCode::OK, Json(body)))
3629}
3630
3631/// Query parameters for `POST /admin/paircode/new`.
3632///
3633/// `rotate` distinguishes the destructive "rotate after compromise" path from
3634/// the default "add another client" path (#6984):
3635/// - absent / empty → add another client; existing tokens stay valid.
3636/// - `rotate=all` → revoke every paired token and clear the device registry,
3637///   then issue a fresh code. The only safe action when the operator does not
3638///   know which token leaked.
3639/// - `rotate=<device_id>` → revoke just that device's token, then issue a code.
3640#[derive(Debug, serde::Deserialize, Default)]
3641pub struct AdminPaircodeQuery {
3642    #[serde(default)]
3643    pub rotate: Option<String>,
3644}
3645
3646/// POST /admin/paircode/new — generate a new pairing code (localhost only).
3647///
3648/// With `?rotate=all` or `?rotate=<device_id>` this also revokes existing
3649/// bearer tokens before issuing the code, so the CLI/admin surface can
3650/// distinguish "add another client" from "rotate after compromise" (#6984).
3651async fn handle_admin_paircode_new(
3652    State(state): State<AppState>,
3653    ConnectInfo(peer): ConnectInfo<SocketAddr>,
3654    Query(params): Query<AdminPaircodeQuery>,
3655) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3656    require_localhost(&peer)?;
3657
3658    if !state.pairing.require_pairing() {
3659        let body = serde_json::json!({
3660            "success": false,
3661            "pairing_required": false,
3662            "pairing_code": null,
3663            "message": "Pairing is disabled for this gateway"
3664        });
3665        return Ok((StatusCode::BAD_REQUEST, Json(body)));
3666    }
3667
3668    let rotate = params
3669        .rotate
3670        .as_deref()
3671        .map(str::trim)
3672        .filter(|s| !s.is_empty());
3673
3674    let revocation_message = match rotate {
3675        Some("all") => {
3676            let revoked = state.pairing.revoke_all_tokens();
3677            if let Some(registry) = state.device_registry.as_ref() {
3678                if let Err(e) = registry.clear() {
3679                    let body = serde_json::json!({
3680                        "success": false,
3681                        "pairing_required": true,
3682                        "pairing_code": null,
3683                        "message": format!("Tokens revoked in memory but device registry clear failed: {e}"),
3684                    });
3685                    return Ok((StatusCode::INTERNAL_SERVER_ERROR, Json(body)));
3686                }
3687            }
3688            if let Err(e) = persist_pairing_tokens(state.config.clone(), &state.pairing).await {
3689                let body = serde_json::json!({
3690                    "success": false,
3691                    "pairing_required": true,
3692                    "pairing_code": null,
3693                    "message": format!("Tokens revoked in memory but config persist failed: {e}"),
3694                });
3695                return Ok((StatusCode::INTERNAL_SERVER_ERROR, Json(body)));
3696            }
3697            ::zeroclaw_log::record!(
3698                INFO,
3699                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3700                    .with_attrs(::serde_json::json!({"revoked": revoked})),
3701                "all paired tokens revoked via admin endpoint"
3702            );
3703            Some(format!(
3704                "Revoked all {revoked} paired token(s) and cleared the device registry."
3705            ))
3706        }
3707        Some(device_id) => {
3708            let Some(registry) = state.device_registry.as_ref() else {
3709                let body = serde_json::json!({
3710                    "success": false,
3711                    "pairing_required": true,
3712                    "pairing_code": null,
3713                    "message": "Device registry is disabled; cannot rotate a single device.",
3714                });
3715                return Ok((StatusCode::SERVICE_UNAVAILABLE, Json(body)));
3716            };
3717            let token_hash = match registry.revoke(device_id) {
3718                Ok(Some(hash)) => hash,
3719                Ok(None) => {
3720                    let body = serde_json::json!({
3721                        "success": false,
3722                        "pairing_required": true,
3723                        "pairing_code": null,
3724                        "message": format!("Device '{device_id}' not found; nothing revoked."),
3725                    });
3726                    return Ok((StatusCode::NOT_FOUND, Json(body)));
3727                }
3728                Err(e) => {
3729                    let body = serde_json::json!({
3730                        "success": false,
3731                        "pairing_required": true,
3732                        "pairing_code": null,
3733                        "message": format!("Device registry error: {e}"),
3734                    });
3735                    return Ok((StatusCode::INTERNAL_SERVER_ERROR, Json(body)));
3736                }
3737            };
3738            state.pairing.revoke_token_hash(&token_hash);
3739            if let Err(e) = persist_pairing_tokens(state.config.clone(), &state.pairing).await {
3740                let body = serde_json::json!({
3741                    "success": false,
3742                    "pairing_required": true,
3743                    "pairing_code": null,
3744                    "message": format!("Token revoked in memory but config persist failed: {e}"),
3745                });
3746                return Ok((StatusCode::INTERNAL_SERVER_ERROR, Json(body)));
3747            }
3748            ::zeroclaw_log::record!(
3749                INFO,
3750                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3751                "single device token revoked via admin endpoint"
3752            );
3753            Some(format!(
3754                "Revoked the bearer token for device '{device_id}'."
3755            ))
3756        }
3757        None => None,
3758    };
3759
3760    let code = state
3761        .pairing
3762        .generate_new_pairing_code()
3763        .expect("require_pairing checked above");
3764    if rotate.is_none() {
3765        ::zeroclaw_log::record!(
3766            INFO,
3767            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3768            "new pairing code generated via admin endpoint"
3769        );
3770    }
3771
3772    let message = match revocation_message {
3773        Some(revoked) => {
3774            format!("{revoked} Use this one-time code to re-pair.")
3775        }
3776        None => "New pairing code generated — use this one-time code to pair".to_string(),
3777    };
3778
3779    let body = serde_json::json!({
3780        "success": true,
3781        "pairing_required": true,
3782        "pairing_code": code,
3783        "message": message,
3784    });
3785    Ok((StatusCode::OK, Json(body)))
3786}
3787
3788/// GET /pair/code — fetch the initial pairing code (no auth, no localhost restriction).
3789///
3790/// This endpoint is intentionally public so that Docker and remote users can see
3791/// the pairing code on the web dashboard without needing terminal access. It only
3792/// returns a code when the gateway is in its initial un-paired state (no devices
3793/// paired yet and a pairing code exists). Once the first device pairs, this
3794/// endpoint stops returning a code.
3795async fn handle_pair_code(State(state): State<AppState>) -> impl IntoResponse {
3796    let require = state.pairing.require_pairing();
3797    let is_paired = state.pairing.is_paired();
3798
3799    // Only expose the code during initial setup (before first pairing)
3800    let code = if require && !is_paired {
3801        state.pairing.pairing_code()
3802    } else {
3803        None
3804    };
3805
3806    let body = serde_json::json!({
3807        "success": true,
3808        "pairing_required": require,
3809        "pairing_code": code,
3810    });
3811
3812    (StatusCode::OK, Json(body))
3813}
3814
3815#[cfg(test)]
3816mod tests {
3817    use super::*;
3818    use async_trait::async_trait;
3819    use axum::http::HeaderValue;
3820    use axum::response::IntoResponse;
3821    use http_body_util::BodyExt;
3822    use parking_lot::{Mutex, RwLock};
3823    use std::sync::atomic::{AtomicUsize, Ordering};
3824    #[cfg(feature = "channel-whatsapp-cloud")]
3825    use zeroclaw_api::channel::ChannelMessage;
3826    use zeroclaw_memory::{Memory, MemoryCategory, MemoryEntry};
3827    use zeroclaw_providers::ModelProvider;
3828
3829    /// Generate a random hex secret at runtime to avoid hard-coded cryptographic values.
3830    fn generate_test_secret() -> String {
3831        let bytes: [u8; 32] = rand::random();
3832        hex::encode(bytes)
3833    }
3834
3835    #[test]
3836    fn security_body_limit_is_64kb() {
3837        assert_eq!(MAX_BODY_SIZE, 65_536);
3838    }
3839
3840    #[test]
3841    fn security_timeout_default_is_30_seconds() {
3842        assert_eq!(REQUEST_TIMEOUT_SECS, 30);
3843    }
3844
3845    #[test]
3846    fn gateway_timeout_uses_typed_config_default() {
3847        let cfg = zeroclaw_config::schema::GatewayConfig::default();
3848        assert_eq!(gateway_request_timeout_secs(&cfg), 30);
3849    }
3850
3851    #[test]
3852    fn paircode_recovery_command_includes_alternate_port() {
3853        assert_eq!(
3854            format_paircode_recovery_command("127.0.0.1", 42617),
3855            "zeroclaw gateway get-paircode --new --port 42617"
3856        );
3857    }
3858
3859    #[test]
3860    fn paircode_recovery_command_includes_specific_host_when_needed() {
3861        assert_eq!(
3862            format_paircode_recovery_command("192.168.1.20", 42617),
3863            "zeroclaw gateway get-paircode --new --port 42617 --host 192.168.1.20"
3864        );
3865    }
3866
3867    #[test]
3868    fn paircode_recovery_curl_targets_running_instance() {
3869        assert_eq!(
3870            format_paircode_recovery_curl("127.0.0.1", 42617, ""),
3871            "curl -s -X POST http://127.0.0.1:42617/admin/paircode/new"
3872        );
3873    }
3874
3875    #[test]
3876    fn paircode_recovery_curl_preserves_path_prefix() {
3877        assert_eq!(
3878            format_paircode_recovery_curl("127.0.0.1", 42617, "/gw"),
3879            "curl -s -X POST http://127.0.0.1:42617/gw/admin/paircode/new"
3880        );
3881    }
3882
3883    /// Build an AppState wired with a real pairing guard, on-disk config path,
3884    /// and an optional device registry so the admin paircode handler's
3885    /// revoke + persist paths can be exercised end to end.
3886    fn admin_paircode_state(
3887        tmp: &tempfile::TempDir,
3888        require_pairing: bool,
3889        with_registry: bool,
3890    ) -> AppState {
3891        let data_dir = tmp.path().join("workspace");
3892        std::fs::create_dir_all(&data_dir).unwrap();
3893        let config = Config {
3894            data_dir: data_dir.clone(),
3895            config_path: tmp.path().join("config.toml"),
3896            ..Config::default()
3897        };
3898        let registry = with_registry.then(|| Arc::new(api_pairing::DeviceRegistry::new(&data_dir)));
3899        AppState {
3900            config: Arc::new(RwLock::new(config)),
3901            model_provider: Arc::new(MockModelProvider::default()),
3902            model: "test-model".into(),
3903            temperature: None,
3904            mem: Arc::new(MockMemory),
3905            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
3906                Arc::new(MockMemory),
3907                zeroclaw_config::schema::MemoryConfig::default(),
3908                std::path::PathBuf::new(),
3909            )),
3910            auto_save: false,
3911            webhook_secret_hash: None,
3912            pairing: Arc::new(PairingGuard::new(require_pairing, &[])),
3913            trust_forwarded_headers: false,
3914            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3915            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3916            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3917            #[cfg(feature = "channel-whatsapp-cloud")]
3918            whatsapp: None,
3919            #[cfg(feature = "channel-whatsapp-cloud")]
3920            whatsapp_app_secret: None,
3921            #[cfg(feature = "channel-linq")]
3922            linq: HashMap::new(),
3923            #[cfg(feature = "channel-linq")]
3924            linq_signing_secrets: HashMap::new(),
3925            #[cfg(feature = "channel-nextcloud")]
3926            nextcloud_talk: None,
3927            #[cfg(feature = "channel-nextcloud")]
3928            nextcloud_talk_webhook_secret: None,
3929            #[cfg(feature = "channel-wati")]
3930            wati: None,
3931            #[cfg(feature = "channel-email")]
3932            gmail_push: None,
3933            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
3934            tools_registry: Arc::new(Vec::new()),
3935            cost_tracker: None,
3936            event_tx: tokio::sync::broadcast::channel(16).0,
3937            event_buffer: Arc::new(sse::EventBuffer::new(16)),
3938            shutdown_tx: tokio::sync::watch::channel(false).0,
3939            reload_tx: None,
3940            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3941            path_prefix: String::new(),
3942            web_dist_dir: None,
3943            session_backend: None,
3944            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
3945                8, 30, 600,
3946            )),
3947            device_registry: registry,
3948            pending_pairings: None,
3949            canvas_store: CanvasStore::new(),
3950            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
3951            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3952            tui_registry: None,
3953            #[cfg(feature = "webauthn")]
3954            webauthn: None,
3955        }
3956    }
3957
3958    /// Pair a device into both the pairing guard and the device registry,
3959    /// returning the plaintext token so the test can assert it is revoked.
3960    async fn pair_device(state: &AppState, device_id: &str) -> String {
3961        let code = state
3962            .pairing
3963            .generate_new_pairing_code()
3964            .expect("pairing enabled");
3965        let token = state
3966            .pairing
3967            .try_pair(&code, device_id)
3968            .await
3969            .unwrap()
3970            .unwrap();
3971        state.device_registry.as_ref().unwrap().register(
3972            PairingGuard::token_hash(&token),
3973            api_pairing::DeviceInfo {
3974                id: device_id.to_string(),
3975                name: None,
3976                device_type: None,
3977                paired_at: chrono::Utc::now(),
3978                last_seen: chrono::Utc::now(),
3979                ip_address: None,
3980                capabilities: None,
3981            },
3982        );
3983        token
3984    }
3985
3986    async fn admin_paircode_response_json(
3987        result: Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)>,
3988    ) -> (StatusCode, serde_json::Value) {
3989        let response = result.into_response();
3990        let status = response.status();
3991        let bytes = response.into_body().collect().await.unwrap().to_bytes();
3992        let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
3993        (status, json)
3994    }
3995
3996    /// Default `?` absent path still just adds a client; existing tokens live.
3997    #[tokio::test]
3998    async fn admin_paircode_new_without_rotate_keeps_existing_tokens() {
3999        let tmp = tempfile::TempDir::new().unwrap();
4000        let state = admin_paircode_state(&tmp, true, true);
4001        let token = pair_device(&state, "dev-a").await;
4002
4003        let (status, json) = admin_paircode_response_json(
4004            handle_admin_paircode_new(
4005                State(state.clone()),
4006                test_connect_info(),
4007                Query(AdminPaircodeQuery::default()),
4008            )
4009            .await,
4010        )
4011        .await;
4012
4013        assert_eq!(status, StatusCode::OK);
4014        assert!(json["pairing_code"].is_string());
4015        assert!(
4016            state.pairing.is_authenticated(&token),
4017            "add-another-client path must not revoke existing tokens"
4018        );
4019    }
4020
4021    /// `?rotate=all` revokes every token, clears the registry, persists, and
4022    /// still issues a fresh code.
4023    #[tokio::test]
4024    async fn admin_paircode_new_rotate_all_revokes_everything() {
4025        let tmp = tempfile::TempDir::new().unwrap();
4026        let state = admin_paircode_state(&tmp, true, true);
4027        let token_a = pair_device(&state, "dev-a").await;
4028        let token_b = pair_device(&state, "dev-b").await;
4029
4030        let (status, json) = admin_paircode_response_json(
4031            handle_admin_paircode_new(
4032                State(state.clone()),
4033                test_connect_info(),
4034                Query(AdminPaircodeQuery {
4035                    rotate: Some("all".into()),
4036                }),
4037            )
4038            .await,
4039        )
4040        .await;
4041
4042        assert_eq!(status, StatusCode::OK);
4043        assert!(json["pairing_code"].is_string());
4044        assert!(!state.pairing.is_authenticated(&token_a));
4045        assert!(!state.pairing.is_authenticated(&token_b));
4046        assert!(
4047            state.config.read().gateway.paired_tokens.is_empty(),
4048            "rotate=all must persist an empty token set"
4049        );
4050        assert!(
4051            state.device_registry.as_ref().unwrap().list().is_empty(),
4052            "rotate=all must clear the device registry"
4053        );
4054    }
4055
4056    /// `?rotate=<id>` revokes only that device and leaves the rest valid.
4057    #[tokio::test]
4058    async fn admin_paircode_new_rotate_device_revokes_one() {
4059        let tmp = tempfile::TempDir::new().unwrap();
4060        let state = admin_paircode_state(&tmp, true, true);
4061        let token_a = pair_device(&state, "dev-a").await;
4062        let token_b = pair_device(&state, "dev-b").await;
4063
4064        let (status, json) = admin_paircode_response_json(
4065            handle_admin_paircode_new(
4066                State(state.clone()),
4067                test_connect_info(),
4068                Query(AdminPaircodeQuery {
4069                    rotate: Some("dev-a".into()),
4070                }),
4071            )
4072            .await,
4073        )
4074        .await;
4075
4076        assert_eq!(status, StatusCode::OK);
4077        assert!(json["pairing_code"].is_string());
4078        assert!(!state.pairing.is_authenticated(&token_a));
4079        assert!(
4080            state.pairing.is_authenticated(&token_b),
4081            "targeted rotate must not touch other devices"
4082        );
4083        let old_hash = PairingGuard::token_hash(&token_a);
4084        assert!(
4085            !state
4086                .config
4087                .read()
4088                .gateway
4089                .paired_tokens
4090                .contains(&old_hash)
4091        );
4092    }
4093
4094    /// Unknown device id returns 404 and revokes nothing.
4095    #[tokio::test]
4096    async fn admin_paircode_new_rotate_unknown_device_is_not_found() {
4097        let tmp = tempfile::TempDir::new().unwrap();
4098        let state = admin_paircode_state(&tmp, true, true);
4099        let token = pair_device(&state, "dev-a").await;
4100
4101        let (status, _json) = admin_paircode_response_json(
4102            handle_admin_paircode_new(
4103                State(state.clone()),
4104                test_connect_info(),
4105                Query(AdminPaircodeQuery {
4106                    rotate: Some("ghost".into()),
4107                }),
4108            )
4109            .await,
4110        )
4111        .await;
4112
4113        assert_eq!(status, StatusCode::NOT_FOUND);
4114        assert!(
4115            state.pairing.is_authenticated(&token),
4116            "a not-found rotate must not revoke any token"
4117        );
4118    }
4119
4120    /// Pairing disabled returns 400 regardless of rotate intent.
4121    #[tokio::test]
4122    async fn admin_paircode_new_pairing_disabled_is_bad_request() {
4123        let tmp = tempfile::TempDir::new().unwrap();
4124        let state = admin_paircode_state(&tmp, false, false);
4125
4126        let (status, json) = admin_paircode_response_json(
4127            handle_admin_paircode_new(
4128                State(state),
4129                test_connect_info(),
4130                Query(AdminPaircodeQuery {
4131                    rotate: Some("all".into()),
4132                }),
4133            )
4134            .await,
4135        )
4136        .await;
4137
4138        assert_eq!(status, StatusCode::BAD_REQUEST);
4139        assert_eq!(json["success"], false);
4140    }
4141
4142    #[test]
4143    fn long_running_request_timeout_default_is_ten_minutes() {
4144        assert_eq!(LONG_RUNNING_REQUEST_TIMEOUT_SECS, 600);
4145    }
4146
4147    #[test]
4148    fn long_running_request_timeout_uses_typed_config_default() {
4149        let cfg = zeroclaw_config::schema::GatewayConfig::default();
4150        assert_eq!(gateway_long_running_request_timeout_secs(&cfg), 600);
4151    }
4152
4153    #[test]
4154    fn webhook_body_requires_message_field() {
4155        let valid = r#"{"message": "hello"}"#;
4156        let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
4157        assert!(parsed.is_ok());
4158        assert_eq!(parsed.unwrap().message, "hello");
4159
4160        let missing = r#"{"other": "field"}"#;
4161        let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
4162        assert!(parsed.is_err());
4163    }
4164
4165    #[test]
4166    fn whatsapp_query_fields_are_optional() {
4167        let q = WhatsAppVerifyQuery {
4168            mode: None,
4169            verify_token: None,
4170            challenge: None,
4171        };
4172        assert!(q.mode.is_none());
4173    }
4174
4175    #[test]
4176    fn app_state_is_clone() {
4177        fn assert_clone<T: Clone>() {}
4178        assert_clone::<AppState>();
4179    }
4180
4181    /// Regression: the gateway must boot with zero configured agents so
4182    /// a fresh install can reach `/admin/reload` and `/quickstart` to add
4183    /// one. Earlier the boot path returned
4184    /// `gateway start requires at least one configured [agents.<alias>]
4185    /// entry`, which crashed the daemon supervisor before the reload
4186    /// channel could be exercised.
4187    #[tokio::test]
4188    async fn run_gateway_starts_with_zero_agents() {
4189        // Isolate data_dir so parallel nextest runs don't race on the
4190        // real ~/.zeroclaw/data (see #7054).
4191        let tmp = tempfile::TempDir::new().unwrap();
4192        let config = zeroclaw_config::schema::Config {
4193            data_dir: tmp.path().join("workspace"),
4194            config_path: tmp.path().join("config.toml"),
4195            ..zeroclaw_config::schema::Config::default()
4196        };
4197        std::fs::create_dir_all(&config.data_dir).unwrap();
4198
4199        // Default Config has no [agents.*] entries — the exact shape
4200        // a fresh install presents on first daemon boot.
4201        assert!(
4202            config.agents.is_empty(),
4203            "regression assumes default Config has no agents",
4204        );
4205
4206        // Bind to an ephemeral port on loopback. If the boot path
4207        // erred on the agents-required check, the join would resolve
4208        // immediately with that Err. We race a short delay against
4209        // the spawn: a still-running task at the deadline means boot
4210        // got far enough to start serving.
4211        let handle = zeroclaw_spawn::spawn!(async move {
4212            run_gateway("127.0.0.1", 0, config, None, None, None, None).await
4213        });
4214
4215        match tokio::time::timeout(
4216            std::time::Duration::from_millis(750),
4217            &mut Box::pin(async {
4218                // We cannot await `handle` directly because the gateway
4219                // never returns under normal operation; instead, peek at
4220                // whether it has finished by polling join with a tiny
4221                // budget.
4222                let _ = tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4223            }),
4224        )
4225        .await
4226        {
4227            Ok(()) => {}
4228            Err(_) => panic!("test setup timed out before checking gateway state"),
4229        }
4230
4231        // If the boot path errored, the task is finished and join
4232        // returns the error. If it's still running, abort and accept
4233        // boot reached the serving stage.
4234        if handle.is_finished() {
4235            let result = handle.await.expect("task did not panic");
4236            panic!(
4237                "gateway exited during boot with zero agents — must stay up for reload/quickstart: {:?}",
4238                result
4239            );
4240        }
4241        handle.abort();
4242    }
4243
4244    /// Regression: the gateway must boot even when an enabled agent's
4245    /// `risk_profile` does not name a configured `risk_profiles` entry.
4246    /// Earlier the boot path used `config.risk_profile_for_agent(...).with_context(...)?`
4247    /// which propagated up through the daemon supervisor and crash-looped
4248    /// the gateway component, locking the operator out of `/admin/reload`
4249    /// and `/quickstart` — the exact endpoints they need to fix the broken
4250    /// risk_profile reference. The fix degrades gracefully: warn,
4251    /// fall through to an empty tools registry, keep serving.
4252    #[tokio::test]
4253    async fn run_gateway_starts_with_unresolved_agent_risk_profile() {
4254        use zeroclaw_config::schema::AliasedAgentConfig;
4255
4256        // Isolate data_dir so parallel nextest runs don't race on the
4257        // real ~/.zeroclaw/data (see #7054).
4258        let tmp = tempfile::TempDir::new().unwrap();
4259        let mut config = zeroclaw_config::schema::Config {
4260            data_dir: tmp.path().join("workspace"),
4261            config_path: tmp.path().join("config.toml"),
4262            ..zeroclaw_config::schema::Config::default()
4263        };
4264        std::fs::create_dir_all(&config.data_dir).unwrap();
4265
4266        // Enabled agent whose `risk_profile` does not resolve. No
4267        // matching [risk_profiles.<key>] entry exists.
4268        let agent = AliasedAgentConfig {
4269            enabled: true,
4270            risk_profile: "definitely_not_configured".to_string(),
4271            ..AliasedAgentConfig::default()
4272        };
4273        config.agents.insert("fake123".to_string(), agent);
4274
4275        let handle = zeroclaw_spawn::spawn!(async move {
4276            run_gateway("127.0.0.1", 0, config, None, None, None, None).await
4277        });
4278
4279        match tokio::time::timeout(
4280            std::time::Duration::from_millis(750),
4281            &mut Box::pin(async {
4282                let _ = tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4283            }),
4284        )
4285        .await
4286        {
4287            Ok(()) => {}
4288            Err(_) => panic!("test setup timed out before checking gateway state"),
4289        }
4290
4291        if handle.is_finished() {
4292            let result = handle.await.expect("task did not panic");
4293            panic!(
4294                "gateway exited during boot when agent.risk_profile was unresolved \
4295                 — must stay up so operator can fix via /admin/reload or /quickstart: {:?}",
4296                result
4297            );
4298        }
4299        handle.abort();
4300    }
4301
4302    #[tokio::test]
4303    async fn run_gateway_starts_with_mismatched_provider_api_key() {
4304        let mut config = Config::default();
4305        config.providers.models.anthropic.insert(
4306            "default".to_string(),
4307            zeroclaw_config::schema::AnthropicModelProviderConfig {
4308                base: zeroclaw_config::schema::ModelProviderConfig {
4309                    model: Some("anthropic/claude-sonnet-4-6".to_string()),
4310                    api_key: Some("sk-test-openai-shaped-key".to_string()),
4311                    ..Default::default()
4312                },
4313            },
4314        );
4315
4316        let handle = zeroclaw_spawn::spawn!(async move {
4317            run_gateway("127.0.0.1", 0, config, None, None, None, None).await
4318        });
4319
4320        match tokio::time::timeout(
4321            std::time::Duration::from_millis(750),
4322            &mut Box::pin(async {
4323                let _ = tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4324            }),
4325        )
4326        .await
4327        {
4328            Ok(()) => {}
4329            Err(_) => panic!("test setup timed out before checking gateway state"),
4330        }
4331
4332        if handle.is_finished() {
4333            let result = handle.await.expect("task did not panic");
4334            panic!(
4335                "gateway exited during boot when seed provider API key was \
4336                 mismatched — must stay up so operator can fix via /admin/reload \
4337                 or /quickstart: {:?}",
4338                result
4339            );
4340        }
4341        handle.abort();
4342    }
4343
4344    #[tokio::test]
4345    async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
4346        let state = AppState {
4347            config: Arc::new(RwLock::new(Config::default())),
4348            model_provider: Arc::new(MockModelProvider::default()),
4349            model: "test-model".into(),
4350            temperature: None,
4351            mem: Arc::new(MockMemory),
4352            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
4353                Arc::new(MockMemory),
4354                zeroclaw_config::schema::MemoryConfig::default(),
4355                std::path::PathBuf::new(),
4356            )),
4357            auto_save: false,
4358            webhook_secret_hash: None,
4359            pairing: Arc::new(PairingGuard::new(false, &[])),
4360            trust_forwarded_headers: false,
4361            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4362            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4363            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4364            #[cfg(feature = "channel-whatsapp-cloud")]
4365            whatsapp: None,
4366            #[cfg(feature = "channel-whatsapp-cloud")]
4367            whatsapp_app_secret: None,
4368            #[cfg(feature = "channel-linq")]
4369            linq: HashMap::new(),
4370            #[cfg(feature = "channel-linq")]
4371            linq_signing_secrets: HashMap::new(),
4372            #[cfg(feature = "channel-nextcloud")]
4373            nextcloud_talk: None,
4374            #[cfg(feature = "channel-nextcloud")]
4375            nextcloud_talk_webhook_secret: None,
4376            #[cfg(feature = "channel-wati")]
4377            wati: None,
4378            #[cfg(feature = "channel-email")]
4379            gmail_push: None,
4380            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4381            tools_registry: Arc::new(Vec::new()),
4382            cost_tracker: None,
4383            event_tx: tokio::sync::broadcast::channel(16).0,
4384            event_buffer: Arc::new(sse::EventBuffer::new(16)),
4385            shutdown_tx: tokio::sync::watch::channel(false).0,
4386            reload_tx: None,
4387            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4388            path_prefix: String::new(),
4389            web_dist_dir: None,
4390            session_backend: None,
4391            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4392                8, 30, 600,
4393            )),
4394            device_registry: None,
4395            pending_pairings: None,
4396            canvas_store: CanvasStore::new(),
4397            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4398            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4399            tui_registry: None,
4400            #[cfg(feature = "webauthn")]
4401            webauthn: None,
4402        };
4403
4404        let response = handle_metrics(State(state)).await.into_response();
4405        assert_eq!(response.status(), StatusCode::OK);
4406        assert_eq!(
4407            response
4408                .headers()
4409                .get(header::CONTENT_TYPE)
4410                .and_then(|value| value.to_str().ok()),
4411            Some(PROMETHEUS_CONTENT_TYPE)
4412        );
4413
4414        let body = response.into_body().collect().await.unwrap().to_bytes();
4415        let text = String::from_utf8(body.to_vec()).unwrap();
4416        assert!(text.contains("Prometheus backend not enabled"));
4417    }
4418
4419    #[cfg(feature = "observability-prometheus")]
4420    #[tokio::test]
4421    async fn metrics_endpoint_renders_prometheus_output() {
4422        let event_tx = tokio::sync::broadcast::channel(16).0;
4423        let prom = zeroclaw_runtime::observability::PrometheusObserver::new();
4424        zeroclaw_runtime::observability::Observer::record_event(
4425            &prom,
4426            &zeroclaw_runtime::observability::ObserverEvent::HeartbeatTick,
4427        );
4428
4429        let observer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::new(prom);
4430        let state = AppState {
4431            config: Arc::new(RwLock::new(Config::default())),
4432            model_provider: Arc::new(MockModelProvider::default()),
4433            model: "test-model".into(),
4434            temperature: None,
4435            mem: Arc::new(MockMemory),
4436            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
4437                Arc::new(MockMemory),
4438                zeroclaw_config::schema::MemoryConfig::default(),
4439                std::path::PathBuf::new(),
4440            )),
4441            auto_save: false,
4442            webhook_secret_hash: None,
4443            pairing: Arc::new(PairingGuard::new(false, &[])),
4444            trust_forwarded_headers: false,
4445            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4446            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4447            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4448            #[cfg(feature = "channel-whatsapp-cloud")]
4449            whatsapp: None,
4450            #[cfg(feature = "channel-whatsapp-cloud")]
4451            whatsapp_app_secret: None,
4452            #[cfg(feature = "channel-linq")]
4453            linq: HashMap::new(),
4454            #[cfg(feature = "channel-linq")]
4455            linq_signing_secrets: HashMap::new(),
4456            #[cfg(feature = "channel-nextcloud")]
4457            nextcloud_talk: None,
4458            #[cfg(feature = "channel-nextcloud")]
4459            nextcloud_talk_webhook_secret: None,
4460            #[cfg(feature = "channel-wati")]
4461            wati: None,
4462            #[cfg(feature = "channel-email")]
4463            gmail_push: None,
4464            observer,
4465            tools_registry: Arc::new(Vec::new()),
4466            cost_tracker: None,
4467            event_tx,
4468            event_buffer: Arc::new(sse::EventBuffer::new(16)),
4469            shutdown_tx: tokio::sync::watch::channel(false).0,
4470            reload_tx: None,
4471            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4472            path_prefix: String::new(),
4473            web_dist_dir: None,
4474            session_backend: None,
4475            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4476                8, 30, 600,
4477            )),
4478            device_registry: None,
4479            pending_pairings: None,
4480            canvas_store: CanvasStore::new(),
4481            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4482            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4483            tui_registry: None,
4484            #[cfg(feature = "webauthn")]
4485            webauthn: None,
4486        };
4487
4488        let response = handle_metrics(State(state)).await.into_response();
4489        assert_eq!(response.status(), StatusCode::OK);
4490
4491        let body = response.into_body().collect().await.unwrap().to_bytes();
4492        let text = String::from_utf8(body.to_vec()).unwrap();
4493        assert!(text.contains("zeroclaw_heartbeat_ticks_total 1"));
4494    }
4495
4496    #[test]
4497    fn gateway_rate_limiter_blocks_after_limit() {
4498        let limiter = GatewayRateLimiter::new(2, 2, 100);
4499        assert!(limiter.allow_pair("127.0.0.1"));
4500        assert!(limiter.allow_pair("127.0.0.1"));
4501        assert!(!limiter.allow_pair("127.0.0.1"));
4502    }
4503
4504    #[test]
4505    fn rate_limiter_sweep_removes_stale_entries() {
4506        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
4507        // Add entries for multiple IPs
4508        assert!(limiter.allow("ip-1"));
4509        assert!(limiter.allow("ip-2"));
4510        assert!(limiter.allow("ip-3"));
4511
4512        {
4513            let guard = limiter.requests.lock();
4514            assert_eq!(guard.0.len(), 3);
4515        }
4516
4517        // Force a sweep by backdating last_sweep
4518        {
4519            let mut guard = limiter.requests.lock();
4520            guard.1 = Instant::now()
4521                .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
4522                .unwrap();
4523            // Clear timestamps for ip-2 and ip-3 to simulate stale entries
4524            guard.0.get_mut("ip-2").unwrap().clear();
4525            guard.0.get_mut("ip-3").unwrap().clear();
4526        }
4527
4528        // Next allow() call should trigger sweep and remove stale entries
4529        assert!(limiter.allow("ip-1"));
4530
4531        {
4532            let guard = limiter.requests.lock();
4533            assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
4534            assert!(guard.0.contains_key("ip-1"));
4535        }
4536    }
4537
4538    #[test]
4539    fn rate_limiter_zero_limit_always_allows() {
4540        let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
4541        for _ in 0..100 {
4542            assert!(limiter.allow("any-key"));
4543        }
4544    }
4545
4546    #[test]
4547    fn idempotency_store_rejects_duplicate_key() {
4548        let store = IdempotencyStore::new(Duration::from_secs(30), 10);
4549        assert!(store.record_if_new("req-1"));
4550        assert!(!store.record_if_new("req-1"));
4551        assert!(store.record_if_new("req-2"));
4552    }
4553
4554    #[test]
4555    fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
4556        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
4557        assert!(limiter.allow("ip-1"));
4558        assert!(limiter.allow("ip-2"));
4559        assert!(limiter.allow("ip-3"));
4560
4561        let guard = limiter.requests.lock();
4562        assert_eq!(guard.0.len(), 2);
4563        assert!(guard.0.contains_key("ip-2"));
4564        assert!(guard.0.contains_key("ip-3"));
4565    }
4566
4567    #[test]
4568    fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
4569        let store = IdempotencyStore::new(Duration::from_secs(300), 2);
4570        assert!(store.record_if_new("k1"));
4571        std::thread::sleep(Duration::from_millis(2));
4572        assert!(store.record_if_new("k2"));
4573        std::thread::sleep(Duration::from_millis(2));
4574        assert!(store.record_if_new("k3"));
4575
4576        let keys = store.keys.lock();
4577        assert_eq!(keys.len(), 2);
4578        assert!(!keys.contains_key("k1"));
4579        assert!(keys.contains_key("k2"));
4580        assert!(keys.contains_key("k3"));
4581    }
4582
4583    #[test]
4584    fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
4585        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
4586        let mut headers = HeaderMap::new();
4587        headers.insert(
4588            "X-Forwarded-For",
4589            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
4590        );
4591
4592        let key = client_key_from_request(Some(peer), &headers, false);
4593        assert_eq!(key, "10.0.0.5");
4594    }
4595
4596    #[test]
4597    fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
4598        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
4599        let mut headers = HeaderMap::new();
4600        headers.insert(
4601            "X-Forwarded-For",
4602            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
4603        );
4604
4605        let key = client_key_from_request(Some(peer), &headers, true);
4606        assert_eq!(key, "198.51.100.10");
4607    }
4608
4609    #[test]
4610    fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
4611        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
4612        let mut headers = HeaderMap::new();
4613        headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
4614
4615        let key = client_key_from_request(Some(peer), &headers, true);
4616        assert_eq!(key, "10.0.0.5");
4617    }
4618
4619    #[test]
4620    fn normalize_max_keys_uses_fallback_for_zero() {
4621        assert_eq!(normalize_max_keys(0, 10_000), 10_000);
4622        assert_eq!(normalize_max_keys(0, 0), 1);
4623    }
4624
4625    #[test]
4626    fn normalize_max_keys_preserves_nonzero_values() {
4627        assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
4628        assert_eq!(normalize_max_keys(1, 10_000), 1);
4629    }
4630
4631    #[tokio::test]
4632    async fn persist_pairing_tokens_writes_config_tokens() {
4633        let temp = tempfile::tempdir().unwrap();
4634        let config_path = temp.path().join("config.toml");
4635        let workspace_path = temp.path().join("workspace");
4636
4637        let config = Config {
4638            config_path: config_path.clone(),
4639            data_dir: workspace_path,
4640            ..Default::default()
4641        };
4642        config.save().await.unwrap();
4643
4644        let guard = PairingGuard::new(true, &[]);
4645        let code = guard.pairing_code().unwrap();
4646        let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
4647        assert!(guard.is_authenticated(&token));
4648
4649        let shared_config = Arc::new(RwLock::new(config));
4650        Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
4651            .await
4652            .unwrap();
4653
4654        // In-memory tokens should remain as plaintext 64-char hex hashes.
4655        let plaintext = {
4656            let in_memory = shared_config.read();
4657            assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
4658            in_memory.gateway.paired_tokens[0].clone()
4659        };
4660        assert_eq!(plaintext.len(), 64);
4661        assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
4662
4663        // On disk, the token should be encrypted (secrets.encrypt defaults to true).
4664        let saved = tokio::fs::read_to_string(config_path).await.unwrap();
4665        let raw_parsed: Config = toml::from_str(&saved).unwrap();
4666        assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
4667        let on_disk = &raw_parsed.gateway.paired_tokens[0];
4668        assert!(
4669            zeroclaw_runtime::security::SecretStore::is_encrypted(on_disk),
4670            "paired_token should be encrypted on disk"
4671        );
4672    }
4673
4674    #[test]
4675    fn webhook_memory_key_is_unique() {
4676        let key1 = webhook_memory_key();
4677        let key2 = webhook_memory_key();
4678
4679        assert!(key1.starts_with("webhook_msg_"));
4680        assert!(key2.starts_with("webhook_msg_"));
4681        assert_ne!(key1, key2);
4682    }
4683
4684    #[test]
4685    fn webhook_session_id_accepts_valid() {
4686        let mut headers = HeaderMap::new();
4687        headers.insert("X-Session-Id", HeaderValue::from_static("abc-DEF_123.foo"));
4688        assert_eq!(webhook_session_id(&headers), Some("abc-DEF_123.foo".into()));
4689    }
4690
4691    #[test]
4692    fn webhook_session_id_trims_whitespace() {
4693        let mut headers = HeaderMap::new();
4694        headers.insert("X-Session-Id", HeaderValue::from_static("  my-session  "));
4695        assert_eq!(webhook_session_id(&headers), Some("my-session".into()));
4696    }
4697
4698    #[test]
4699    fn webhook_session_id_rejects_empty() {
4700        let mut headers = HeaderMap::new();
4701        headers.insert("X-Session-Id", HeaderValue::from_static(""));
4702        assert_eq!(webhook_session_id(&headers), None);
4703
4704        headers.insert("X-Session-Id", HeaderValue::from_static("   "));
4705        assert_eq!(webhook_session_id(&headers), None);
4706    }
4707
4708    #[test]
4709    fn webhook_session_id_rejects_missing() {
4710        let headers = HeaderMap::new();
4711        assert_eq!(webhook_session_id(&headers), None);
4712    }
4713
4714    #[test]
4715    fn webhook_session_id_rejects_oversized() {
4716        let mut headers = HeaderMap::new();
4717        let long = "a".repeat(129);
4718        headers.insert("X-Session-Id", HeaderValue::from_str(&long).unwrap());
4719        assert_eq!(webhook_session_id(&headers), None);
4720
4721        let at_limit = "b".repeat(128);
4722        headers.insert("X-Session-Id", HeaderValue::from_str(&at_limit).unwrap());
4723        assert!(webhook_session_id(&headers).is_some());
4724    }
4725
4726    #[test]
4727    fn webhook_session_id_rejects_invalid_chars() {
4728        let mut headers = HeaderMap::new();
4729        for bad in &[
4730            "has/slash",
4731            "has:colon",
4732            "has space",
4733            "has@at",
4734            "emoji\u{1f600}",
4735        ] {
4736            if let Ok(val) = HeaderValue::from_str(bad) {
4737                headers.insert("X-Session-Id", val);
4738                assert_eq!(webhook_session_id(&headers), None, "should reject: {bad}");
4739            }
4740        }
4741    }
4742
4743    #[cfg(feature = "channel-whatsapp-cloud")]
4744    #[test]
4745    fn whatsapp_memory_key_includes_sender_and_message_id() {
4746        let msg = ChannelMessage {
4747            id: "wamid-123".into(),
4748            sender: "+1234567890".into(),
4749            reply_target: "+1234567890".into(),
4750            content: "hello".into(),
4751            channel: "whatsapp".into(),
4752            channel_alias: None,
4753            timestamp: 1,
4754            thread_ts: None,
4755            interruption_scope_id: None,
4756            attachments: vec![],
4757            subject: None,
4758        };
4759
4760        let key = whatsapp_memory_key(&msg);
4761        assert_eq!(key, "whatsapp_+1234567890_wamid-123");
4762    }
4763
4764    #[derive(Default)]
4765    struct MockMemory;
4766
4767    #[async_trait]
4768    impl Memory for MockMemory {
4769        fn name(&self) -> &str {
4770            "mock"
4771        }
4772
4773        async fn store(
4774            &self,
4775            _key: &str,
4776            _content: &str,
4777            _category: MemoryCategory,
4778            _session_id: Option<&str>,
4779        ) -> anyhow::Result<()> {
4780            Ok(())
4781        }
4782
4783        async fn recall(
4784            &self,
4785            _query: &str,
4786            _limit: usize,
4787            _session_id: Option<&str>,
4788            _since: Option<&str>,
4789            _until: Option<&str>,
4790        ) -> anyhow::Result<Vec<MemoryEntry>> {
4791            Ok(Vec::new())
4792        }
4793
4794        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
4795            Ok(None)
4796        }
4797
4798        async fn list(
4799            &self,
4800            _category: Option<&MemoryCategory>,
4801            _session_id: Option<&str>,
4802        ) -> anyhow::Result<Vec<MemoryEntry>> {
4803            Ok(Vec::new())
4804        }
4805
4806        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
4807            Ok(false)
4808        }
4809
4810        async fn forget_for_agent(&self, _key: &str, _agent_id: &str) -> anyhow::Result<bool> {
4811            Ok(false)
4812        }
4813
4814        async fn count(&self) -> anyhow::Result<usize> {
4815            Ok(0)
4816        }
4817
4818        async fn health_check(&self) -> bool {
4819            true
4820        }
4821
4822        async fn store_with_agent(
4823            &self,
4824            _key: &str,
4825            _content: &str,
4826            _category: MemoryCategory,
4827            _session_id: Option<&str>,
4828            _namespace: Option<&str>,
4829            _importance: Option<f64>,
4830            _agent_id: Option<&str>,
4831        ) -> anyhow::Result<()> {
4832            Ok(())
4833        }
4834
4835        async fn recall_for_agents(
4836            &self,
4837            _allowed_agent_ids: &[&str],
4838            _query: &str,
4839            _limit: usize,
4840            _session_id: Option<&str>,
4841            _since: Option<&str>,
4842            _until: Option<&str>,
4843        ) -> anyhow::Result<Vec<MemoryEntry>> {
4844            Ok(Vec::new())
4845        }
4846    }
4847    impl ::zeroclaw_api::attribution::Attributable for MockMemory {
4848        fn role(&self) -> ::zeroclaw_api::attribution::Role {
4849            ::zeroclaw_api::attribution::Role::Memory(
4850                ::zeroclaw_api::attribution::MemoryKind::InMemory,
4851            )
4852        }
4853        fn alias(&self) -> &str {
4854            "MockMemory"
4855        }
4856    }
4857
4858    #[derive(Default)]
4859    struct MockModelProvider {
4860        calls: AtomicUsize,
4861    }
4862
4863    #[async_trait]
4864    impl ModelProvider for MockModelProvider {
4865        async fn chat_with_system(
4866            &self,
4867            _system_prompt: Option<&str>,
4868            _message: &str,
4869            _model: &str,
4870            _temperature: Option<f64>,
4871        ) -> anyhow::Result<String> {
4872            self.calls.fetch_add(1, Ordering::SeqCst);
4873            Ok("ok".into())
4874        }
4875    }
4876    impl ::zeroclaw_api::attribution::Attributable for MockModelProvider {
4877        fn role(&self) -> ::zeroclaw_api::attribution::Role {
4878            ::zeroclaw_api::attribution::Role::Provider(
4879                ::zeroclaw_api::attribution::ProviderKind::Model(
4880                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
4881                ),
4882            )
4883        }
4884        fn alias(&self) -> &str {
4885            "MockModelProvider"
4886        }
4887    }
4888
4889    #[derive(Default)]
4890    struct CapturingObserver {
4891        events: Mutex<Vec<zeroclaw_runtime::observability::ObserverEvent>>,
4892    }
4893
4894    impl zeroclaw_runtime::observability::Observer for CapturingObserver {
4895        fn record_event(&self, event: &zeroclaw_runtime::observability::ObserverEvent) {
4896            self.events.lock().push(event.clone());
4897        }
4898
4899        fn record_metric(&self, _metric: &zeroclaw_runtime::observability::traits::ObserverMetric) {
4900        }
4901
4902        fn name(&self) -> &str {
4903            "capturing"
4904        }
4905
4906        fn as_any(&self) -> &dyn std::any::Any {
4907            self
4908        }
4909    }
4910
4911    #[derive(Default)]
4912    struct TrackingMemory {
4913        keys: Mutex<Vec<String>>,
4914    }
4915
4916    #[async_trait]
4917    impl Memory for TrackingMemory {
4918        fn name(&self) -> &str {
4919            "tracking"
4920        }
4921
4922        async fn store(
4923            &self,
4924            key: &str,
4925            _content: &str,
4926            _category: MemoryCategory,
4927            _session_id: Option<&str>,
4928        ) -> anyhow::Result<()> {
4929            self.keys.lock().push(key.to_string());
4930            Ok(())
4931        }
4932
4933        async fn recall(
4934            &self,
4935            _query: &str,
4936            _limit: usize,
4937            _session_id: Option<&str>,
4938            _since: Option<&str>,
4939            _until: Option<&str>,
4940        ) -> anyhow::Result<Vec<MemoryEntry>> {
4941            Ok(Vec::new())
4942        }
4943
4944        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
4945            Ok(None)
4946        }
4947
4948        async fn list(
4949            &self,
4950            _category: Option<&MemoryCategory>,
4951            _session_id: Option<&str>,
4952        ) -> anyhow::Result<Vec<MemoryEntry>> {
4953            Ok(Vec::new())
4954        }
4955
4956        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
4957            Ok(false)
4958        }
4959
4960        async fn forget_for_agent(&self, _key: &str, _agent_id: &str) -> anyhow::Result<bool> {
4961            Ok(false)
4962        }
4963
4964        async fn count(&self) -> anyhow::Result<usize> {
4965            let size = self.keys.lock().len();
4966            Ok(size)
4967        }
4968
4969        async fn health_check(&self) -> bool {
4970            true
4971        }
4972
4973        async fn store_with_agent(
4974            &self,
4975            key: &str,
4976            content: &str,
4977            category: MemoryCategory,
4978            session_id: Option<&str>,
4979            _namespace: Option<&str>,
4980            _importance: Option<f64>,
4981            _agent_id: Option<&str>,
4982        ) -> anyhow::Result<()> {
4983            self.store(key, content, category, session_id).await
4984        }
4985
4986        async fn recall_for_agents(
4987            &self,
4988            _allowed_agent_ids: &[&str],
4989            _query: &str,
4990            _limit: usize,
4991            _session_id: Option<&str>,
4992            _since: Option<&str>,
4993            _until: Option<&str>,
4994        ) -> anyhow::Result<Vec<MemoryEntry>> {
4995            Ok(Vec::new())
4996        }
4997    }
4998    impl ::zeroclaw_api::attribution::Attributable for TrackingMemory {
4999        fn role(&self) -> ::zeroclaw_api::attribution::Role {
5000            ::zeroclaw_api::attribution::Role::Memory(
5001                ::zeroclaw_api::attribution::MemoryKind::InMemory,
5002            )
5003        }
5004        fn alias(&self) -> &str {
5005            "TrackingMemory"
5006        }
5007    }
5008
5009    fn test_connect_info() -> ConnectInfo<SocketAddr> {
5010        ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
5011    }
5012
5013    #[tokio::test]
5014    async fn webhook_idempotency_skips_duplicate_provider_calls() {
5015        let provider_impl = Arc::new(MockModelProvider::default());
5016        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5017        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5018
5019        let state = AppState {
5020            config: Arc::new(RwLock::new(Config::default())),
5021            model_provider,
5022            model: "test-model".into(),
5023            temperature: None,
5024            mem: memory.clone(),
5025            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5026                Arc::clone(&memory),
5027                zeroclaw_config::schema::MemoryConfig::default(),
5028                std::path::PathBuf::new(),
5029            )),
5030            auto_save: false,
5031            webhook_secret_hash: None,
5032            pairing: Arc::new(PairingGuard::new(false, &[])),
5033            trust_forwarded_headers: false,
5034            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5035            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5036            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5037            #[cfg(feature = "channel-whatsapp-cloud")]
5038            whatsapp: None,
5039            #[cfg(feature = "channel-whatsapp-cloud")]
5040            whatsapp_app_secret: None,
5041            #[cfg(feature = "channel-linq")]
5042            linq: HashMap::new(),
5043            #[cfg(feature = "channel-linq")]
5044            linq_signing_secrets: HashMap::new(),
5045            #[cfg(feature = "channel-nextcloud")]
5046            nextcloud_talk: None,
5047            #[cfg(feature = "channel-nextcloud")]
5048            nextcloud_talk_webhook_secret: None,
5049            #[cfg(feature = "channel-wati")]
5050            wati: None,
5051            #[cfg(feature = "channel-email")]
5052            gmail_push: None,
5053            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5054            tools_registry: Arc::new(Vec::new()),
5055            cost_tracker: None,
5056            event_tx: tokio::sync::broadcast::channel(16).0,
5057            event_buffer: Arc::new(sse::EventBuffer::new(16)),
5058            shutdown_tx: tokio::sync::watch::channel(false).0,
5059            reload_tx: None,
5060            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5061            path_prefix: String::new(),
5062            web_dist_dir: None,
5063            session_backend: None,
5064            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5065                8, 30, 600,
5066            )),
5067            device_registry: None,
5068            pending_pairings: None,
5069            canvas_store: CanvasStore::new(),
5070            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5071            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5072            tui_registry: None,
5073            #[cfg(feature = "webauthn")]
5074            webauthn: None,
5075        };
5076
5077        let mut headers = HeaderMap::new();
5078        headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
5079
5080        let body = Ok(Json(WebhookBody {
5081            message: "hello".into(),
5082        }));
5083        let first = handle_webhook(
5084            State(state.clone()),
5085            test_connect_info(),
5086            Query(WebhookQuery::default()),
5087            headers.clone(),
5088            body,
5089        )
5090        .await
5091        .into_response();
5092        assert_eq!(first.status(), StatusCode::OK);
5093
5094        let body = Ok(Json(WebhookBody {
5095            message: "hello".into(),
5096        }));
5097        let second = handle_webhook(
5098            State(state),
5099            test_connect_info(),
5100            Query(WebhookQuery::default()),
5101            headers,
5102            body,
5103        )
5104        .await
5105        .into_response();
5106        assert_eq!(second.status(), StatusCode::OK);
5107
5108        let payload = second.into_body().collect().await.unwrap().to_bytes();
5109        let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
5110        assert_eq!(parsed["status"], "duplicate");
5111        assert_eq!(parsed["idempotent"], true);
5112        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
5113    }
5114
5115    #[tokio::test]
5116    async fn webhook_unknown_agent_rejected_before_dispatch() {
5117        let provider_impl = Arc::new(MockModelProvider::default());
5118        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5119        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5120
5121        let state = AppState {
5122            config: Arc::new(RwLock::new(Config::default())),
5123            model_provider,
5124            model: "test-model".into(),
5125            temperature: None,
5126            mem: memory.clone(),
5127            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5128                Arc::clone(&memory),
5129                zeroclaw_config::schema::MemoryConfig::default(),
5130                std::path::PathBuf::new(),
5131            )),
5132            auto_save: false,
5133            webhook_secret_hash: None,
5134            pairing: Arc::new(PairingGuard::new(false, &[])),
5135            trust_forwarded_headers: false,
5136            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5137            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5138            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5139            #[cfg(feature = "channel-whatsapp-cloud")]
5140            whatsapp: None,
5141            #[cfg(feature = "channel-whatsapp-cloud")]
5142            whatsapp_app_secret: None,
5143            #[cfg(feature = "channel-linq")]
5144            linq: HashMap::new(),
5145            #[cfg(feature = "channel-linq")]
5146            linq_signing_secrets: HashMap::new(),
5147            #[cfg(feature = "channel-nextcloud")]
5148            nextcloud_talk: None,
5149            #[cfg(feature = "channel-nextcloud")]
5150            nextcloud_talk_webhook_secret: None,
5151            #[cfg(feature = "channel-wati")]
5152            wati: None,
5153            #[cfg(feature = "channel-email")]
5154            gmail_push: None,
5155            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5156            tools_registry: Arc::new(Vec::new()),
5157            cost_tracker: None,
5158            event_tx: tokio::sync::broadcast::channel(16).0,
5159            event_buffer: Arc::new(sse::EventBuffer::new(16)),
5160            shutdown_tx: tokio::sync::watch::channel(false).0,
5161            reload_tx: None,
5162            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5163            path_prefix: String::new(),
5164            web_dist_dir: None,
5165            session_backend: None,
5166            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5167                8, 30, 600,
5168            )),
5169            device_registry: None,
5170            pending_pairings: None,
5171            canvas_store: CanvasStore::new(),
5172            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5173            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5174            tui_registry: None,
5175            #[cfg(feature = "webauthn")]
5176            webauthn: None,
5177        };
5178
5179        // An idempotency key on a rejected request must NOT be consumed.
5180        let mut headers = HeaderMap::new();
5181        headers.insert("X-Idempotency-Key", HeaderValue::from_static("ghost-key"));
5182
5183        let response = handle_webhook(
5184            State(state.clone()),
5185            test_connect_info(),
5186            Query(WebhookQuery {
5187                agent: Some("ghost".into()),
5188            }),
5189            headers,
5190            Ok(Json(WebhookBody {
5191                message: "hello".into(),
5192            })),
5193        )
5194        .await
5195        .into_response();
5196        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
5197        let payload = response.into_body().collect().await.unwrap().to_bytes();
5198        let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
5199        assert!(
5200            parsed["error"]
5201                .as_str()
5202                .unwrap_or_default()
5203                .contains("Unknown agent `ghost`")
5204        );
5205        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
5206        // Key still fresh — a corrected retry with the same key proceeds.
5207        assert!(state.idempotency_store.record_if_new("ghost-key"));
5208    }
5209
5210    #[tokio::test]
5211    async fn webhook_explicit_agent_reports_agent_model() {
5212        let provider_impl = Arc::new(MockModelProvider::default());
5213        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5214        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5215        let observer_impl = Arc::new(CapturingObserver::default());
5216        let observer: Arc<dyn zeroclaw_runtime::observability::Observer> = observer_impl.clone();
5217
5218        let mut config = Config::default();
5219        config.providers.models.anthropic.insert(
5220            "default".into(),
5221            zeroclaw_config::schema::AnthropicModelProviderConfig {
5222                base: zeroclaw_config::schema::ModelProviderConfig {
5223                    model: Some("agent-model".into()),
5224                    ..Default::default()
5225                },
5226            },
5227        );
5228        let expected_provider = "anthropic.default".to_string();
5229        config.agents.insert(
5230            "nova".to_string(),
5231            zeroclaw_config::schema::AliasedAgentConfig {
5232                enabled: true,
5233                model_provider: expected_provider.clone().into(),
5234                ..Default::default()
5235            },
5236        );
5237
5238        let state = AppState {
5239            config: Arc::new(RwLock::new(config)),
5240            model_provider,
5241            model: "startup-model".into(),
5242            temperature: None,
5243            mem: memory,
5244            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5245                Arc::new(MockMemory),
5246                zeroclaw_config::schema::MemoryConfig::default(),
5247                std::path::PathBuf::new(),
5248            )),
5249            auto_save: false,
5250            webhook_secret_hash: None,
5251            pairing: Arc::new(PairingGuard::new(false, &[])),
5252            trust_forwarded_headers: false,
5253            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5254            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5255            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5256            #[cfg(feature = "channel-whatsapp-cloud")]
5257            whatsapp: None,
5258            #[cfg(feature = "channel-whatsapp-cloud")]
5259            whatsapp_app_secret: None,
5260            #[cfg(feature = "channel-linq")]
5261            linq: HashMap::new(),
5262            #[cfg(feature = "channel-linq")]
5263            linq_signing_secrets: HashMap::new(),
5264            #[cfg(feature = "channel-nextcloud")]
5265            nextcloud_talk: None,
5266            #[cfg(feature = "channel-nextcloud")]
5267            nextcloud_talk_webhook_secret: None,
5268            #[cfg(feature = "channel-wati")]
5269            wati: None,
5270            #[cfg(feature = "channel-email")]
5271            gmail_push: None,
5272            observer,
5273            tools_registry: Arc::new(Vec::new()),
5274            cost_tracker: None,
5275            event_tx: tokio::sync::broadcast::channel(16).0,
5276            event_buffer: Arc::new(sse::EventBuffer::new(16)),
5277            shutdown_tx: tokio::sync::watch::channel(false).0,
5278            reload_tx: None,
5279            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5280            path_prefix: String::new(),
5281            web_dist_dir: None,
5282            session_backend: None,
5283            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5284                8, 30, 600,
5285            )),
5286            device_registry: None,
5287            pending_pairings: None,
5288            canvas_store: CanvasStore::new(),
5289            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5290            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5291            tui_registry: None,
5292            #[cfg(feature = "webauthn")]
5293            webauthn: None,
5294        };
5295
5296        let response = handle_webhook(
5297            State(state),
5298            test_connect_info(),
5299            Query(WebhookQuery {
5300                agent: Some("nova".into()),
5301            }),
5302            HeaderMap::new(),
5303            Ok(Json(WebhookBody {
5304                message: "hello".into(),
5305            })),
5306        )
5307        .await
5308        .into_response();
5309        assert_eq!(response.status(), StatusCode::OK);
5310        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
5311        let payload = response.into_body().collect().await.unwrap().to_bytes();
5312        let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
5313        assert_eq!(parsed["model"], "agent-model");
5314        let events = observer_impl.events.lock();
5315        assert!(
5316            events.iter().any(|event| matches!(
5317                event,
5318                zeroclaw_runtime::observability::ObserverEvent::AgentStart {
5319                    model_provider,
5320                    model,
5321                } if model_provider == &expected_provider && model == "agent-model"
5322            )),
5323            "expected AgentStart to use the explicit agent model; events were: {events:?}"
5324        );
5325    }
5326
5327    #[tokio::test]
5328    async fn webhook_autosave_stores_distinct_keys_per_request() {
5329        let provider_impl = Arc::new(MockModelProvider::default());
5330        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5331
5332        let tracking_impl = Arc::new(TrackingMemory::default());
5333        let memory: Arc<dyn Memory> = tracking_impl.clone();
5334
5335        let state = AppState {
5336            config: Arc::new(RwLock::new(Config::default())),
5337            model_provider,
5338            model: "test-model".into(),
5339            temperature: None,
5340            mem: memory,
5341            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5342                Arc::new(MockMemory),
5343                zeroclaw_config::schema::MemoryConfig::default(),
5344                std::path::PathBuf::new(),
5345            )),
5346            auto_save: true,
5347            webhook_secret_hash: None,
5348            pairing: Arc::new(PairingGuard::new(false, &[])),
5349            trust_forwarded_headers: false,
5350            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5351            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5352            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5353            #[cfg(feature = "channel-whatsapp-cloud")]
5354            whatsapp: None,
5355            #[cfg(feature = "channel-whatsapp-cloud")]
5356            whatsapp_app_secret: None,
5357            #[cfg(feature = "channel-linq")]
5358            linq: HashMap::new(),
5359            #[cfg(feature = "channel-linq")]
5360            linq_signing_secrets: HashMap::new(),
5361            #[cfg(feature = "channel-nextcloud")]
5362            nextcloud_talk: None,
5363            #[cfg(feature = "channel-nextcloud")]
5364            nextcloud_talk_webhook_secret: None,
5365            #[cfg(feature = "channel-wati")]
5366            wati: None,
5367            #[cfg(feature = "channel-email")]
5368            gmail_push: None,
5369            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5370            tools_registry: Arc::new(Vec::new()),
5371            cost_tracker: None,
5372            event_tx: tokio::sync::broadcast::channel(16).0,
5373            event_buffer: Arc::new(sse::EventBuffer::new(16)),
5374            shutdown_tx: tokio::sync::watch::channel(false).0,
5375            reload_tx: None,
5376            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5377            path_prefix: String::new(),
5378            web_dist_dir: None,
5379            session_backend: None,
5380            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5381                8, 30, 600,
5382            )),
5383            device_registry: None,
5384            pending_pairings: None,
5385            canvas_store: CanvasStore::new(),
5386            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5387            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5388            tui_registry: None,
5389            #[cfg(feature = "webauthn")]
5390            webauthn: None,
5391        };
5392
5393        let headers = HeaderMap::new();
5394
5395        let body1 = Ok(Json(WebhookBody {
5396            message: "hello one".into(),
5397        }));
5398        let first = handle_webhook(
5399            State(state.clone()),
5400            test_connect_info(),
5401            Query(WebhookQuery::default()),
5402            headers.clone(),
5403            body1,
5404        )
5405        .await
5406        .into_response();
5407        assert_eq!(first.status(), StatusCode::OK);
5408
5409        let body2 = Ok(Json(WebhookBody {
5410            message: "hello two".into(),
5411        }));
5412        let second = handle_webhook(
5413            State(state),
5414            test_connect_info(),
5415            Query(WebhookQuery::default()),
5416            headers,
5417            body2,
5418        )
5419        .await
5420        .into_response();
5421        assert_eq!(second.status(), StatusCode::OK);
5422
5423        let keys = tracking_impl.keys.lock().clone();
5424        assert_eq!(keys.len(), 2);
5425        assert_ne!(keys[0], keys[1]);
5426        assert!(keys[0].starts_with("webhook_msg_"));
5427        assert!(keys[1].starts_with("webhook_msg_"));
5428        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
5429    }
5430
5431    #[test]
5432    fn webhook_secret_hash_is_deterministic_and_nonempty() {
5433        let secret_a = generate_test_secret();
5434        let secret_b = generate_test_secret();
5435        let one = hash_webhook_secret(&secret_a);
5436        let two = hash_webhook_secret(&secret_a);
5437        let other = hash_webhook_secret(&secret_b);
5438
5439        assert_eq!(one, two);
5440        assert_ne!(one, other);
5441        assert_eq!(one.len(), 64);
5442    }
5443
5444    #[tokio::test]
5445    async fn webhook_secret_hash_rejects_missing_header() {
5446        let provider_impl = Arc::new(MockModelProvider::default());
5447        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5448        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5449        let secret = generate_test_secret();
5450
5451        let state = AppState {
5452            config: Arc::new(RwLock::new(Config::default())),
5453            model_provider,
5454            model: "test-model".into(),
5455            temperature: None,
5456            mem: memory.clone(),
5457            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5458                Arc::clone(&memory),
5459                zeroclaw_config::schema::MemoryConfig::default(),
5460                std::path::PathBuf::new(),
5461            )),
5462            auto_save: false,
5463            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
5464            pairing: Arc::new(PairingGuard::new(false, &[])),
5465            trust_forwarded_headers: false,
5466            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5467            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5468            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5469            #[cfg(feature = "channel-whatsapp-cloud")]
5470            whatsapp: None,
5471            #[cfg(feature = "channel-whatsapp-cloud")]
5472            whatsapp_app_secret: None,
5473            #[cfg(feature = "channel-linq")]
5474            linq: HashMap::new(),
5475            #[cfg(feature = "channel-linq")]
5476            linq_signing_secrets: HashMap::new(),
5477            #[cfg(feature = "channel-nextcloud")]
5478            nextcloud_talk: None,
5479            #[cfg(feature = "channel-nextcloud")]
5480            nextcloud_talk_webhook_secret: None,
5481            #[cfg(feature = "channel-wati")]
5482            wati: None,
5483            #[cfg(feature = "channel-email")]
5484            gmail_push: None,
5485            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5486            tools_registry: Arc::new(Vec::new()),
5487            cost_tracker: None,
5488            event_tx: tokio::sync::broadcast::channel(16).0,
5489            event_buffer: Arc::new(sse::EventBuffer::new(16)),
5490            shutdown_tx: tokio::sync::watch::channel(false).0,
5491            reload_tx: None,
5492            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5493            path_prefix: String::new(),
5494            web_dist_dir: None,
5495            session_backend: None,
5496            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5497                8, 30, 600,
5498            )),
5499            device_registry: None,
5500            pending_pairings: None,
5501            canvas_store: CanvasStore::new(),
5502            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5503            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5504            tui_registry: None,
5505            #[cfg(feature = "webauthn")]
5506            webauthn: None,
5507        };
5508
5509        let response = handle_webhook(
5510            State(state),
5511            test_connect_info(),
5512            Query(WebhookQuery::default()),
5513            HeaderMap::new(),
5514            Ok(Json(WebhookBody {
5515                message: "hello".into(),
5516            })),
5517        )
5518        .await
5519        .into_response();
5520
5521        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
5522        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
5523    }
5524
5525    #[tokio::test]
5526    async fn webhook_secret_hash_rejects_invalid_header() {
5527        let provider_impl = Arc::new(MockModelProvider::default());
5528        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5529        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5530        let valid_secret = generate_test_secret();
5531        let wrong_secret = generate_test_secret();
5532
5533        let state = AppState {
5534            config: Arc::new(RwLock::new(Config::default())),
5535            model_provider,
5536            model: "test-model".into(),
5537            temperature: None,
5538            mem: memory.clone(),
5539            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5540                Arc::clone(&memory),
5541                zeroclaw_config::schema::MemoryConfig::default(),
5542                std::path::PathBuf::new(),
5543            )),
5544            auto_save: false,
5545            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
5546            pairing: Arc::new(PairingGuard::new(false, &[])),
5547            trust_forwarded_headers: false,
5548            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5549            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5550            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5551            #[cfg(feature = "channel-whatsapp-cloud")]
5552            whatsapp: None,
5553            #[cfg(feature = "channel-whatsapp-cloud")]
5554            whatsapp_app_secret: None,
5555            #[cfg(feature = "channel-linq")]
5556            linq: HashMap::new(),
5557            #[cfg(feature = "channel-linq")]
5558            linq_signing_secrets: HashMap::new(),
5559            #[cfg(feature = "channel-nextcloud")]
5560            nextcloud_talk: None,
5561            #[cfg(feature = "channel-nextcloud")]
5562            nextcloud_talk_webhook_secret: None,
5563            #[cfg(feature = "channel-wati")]
5564            wati: None,
5565            #[cfg(feature = "channel-email")]
5566            gmail_push: None,
5567            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5568            tools_registry: Arc::new(Vec::new()),
5569            cost_tracker: None,
5570            event_tx: tokio::sync::broadcast::channel(16).0,
5571            event_buffer: Arc::new(sse::EventBuffer::new(16)),
5572            shutdown_tx: tokio::sync::watch::channel(false).0,
5573            reload_tx: None,
5574            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5575            path_prefix: String::new(),
5576            web_dist_dir: None,
5577            session_backend: None,
5578            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5579                8, 30, 600,
5580            )),
5581            device_registry: None,
5582            pending_pairings: None,
5583            canvas_store: CanvasStore::new(),
5584            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5585            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5586            tui_registry: None,
5587            #[cfg(feature = "webauthn")]
5588            webauthn: None,
5589        };
5590
5591        let mut headers = HeaderMap::new();
5592        headers.insert(
5593            "X-Webhook-Secret",
5594            HeaderValue::from_str(&wrong_secret).unwrap(),
5595        );
5596
5597        let response = handle_webhook(
5598            State(state),
5599            test_connect_info(),
5600            Query(WebhookQuery::default()),
5601            headers,
5602            Ok(Json(WebhookBody {
5603                message: "hello".into(),
5604            })),
5605        )
5606        .await
5607        .into_response();
5608
5609        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
5610        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
5611    }
5612
5613    #[tokio::test]
5614    async fn webhook_secret_hash_accepts_valid_header() {
5615        let provider_impl = Arc::new(MockModelProvider::default());
5616        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5617        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5618        let secret = generate_test_secret();
5619
5620        let state = AppState {
5621            config: Arc::new(RwLock::new(Config::default())),
5622            model_provider,
5623            model: "test-model".into(),
5624            temperature: None,
5625            mem: memory.clone(),
5626            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5627                Arc::clone(&memory),
5628                zeroclaw_config::schema::MemoryConfig::default(),
5629                std::path::PathBuf::new(),
5630            )),
5631            auto_save: false,
5632            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
5633            pairing: Arc::new(PairingGuard::new(false, &[])),
5634            trust_forwarded_headers: false,
5635            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5636            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5637            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5638            #[cfg(feature = "channel-whatsapp-cloud")]
5639            whatsapp: None,
5640            #[cfg(feature = "channel-whatsapp-cloud")]
5641            whatsapp_app_secret: None,
5642            #[cfg(feature = "channel-linq")]
5643            linq: HashMap::new(),
5644            #[cfg(feature = "channel-linq")]
5645            linq_signing_secrets: HashMap::new(),
5646            #[cfg(feature = "channel-nextcloud")]
5647            nextcloud_talk: None,
5648            #[cfg(feature = "channel-nextcloud")]
5649            nextcloud_talk_webhook_secret: None,
5650            #[cfg(feature = "channel-wati")]
5651            wati: None,
5652            #[cfg(feature = "channel-email")]
5653            gmail_push: None,
5654            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5655            tools_registry: Arc::new(Vec::new()),
5656            cost_tracker: None,
5657            event_tx: tokio::sync::broadcast::channel(16).0,
5658            event_buffer: Arc::new(sse::EventBuffer::new(16)),
5659            shutdown_tx: tokio::sync::watch::channel(false).0,
5660            reload_tx: None,
5661            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5662            path_prefix: String::new(),
5663            web_dist_dir: None,
5664            session_backend: None,
5665            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5666                8, 30, 600,
5667            )),
5668            device_registry: None,
5669            pending_pairings: None,
5670            canvas_store: CanvasStore::new(),
5671            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5672            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5673            tui_registry: None,
5674            #[cfg(feature = "webauthn")]
5675            webauthn: None,
5676        };
5677
5678        let mut headers = HeaderMap::new();
5679        headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
5680
5681        let response = handle_webhook(
5682            State(state),
5683            test_connect_info(),
5684            Query(WebhookQuery::default()),
5685            headers,
5686            Ok(Json(WebhookBody {
5687                message: "hello".into(),
5688            })),
5689        )
5690        .await
5691        .into_response();
5692
5693        assert_eq!(response.status(), StatusCode::OK);
5694        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
5695    }
5696
5697    #[cfg(feature = "channel-nextcloud")]
5698    fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
5699        use hmac::{Hmac, Mac};
5700        use sha2::Sha256;
5701
5702        let payload = format!("{random}{body}");
5703        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
5704        mac.update(payload.as_bytes());
5705        hex::encode(mac.finalize().into_bytes())
5706    }
5707
5708    #[cfg(feature = "channel-nextcloud")]
5709    #[tokio::test]
5710    async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
5711        let model_provider: Arc<dyn ModelProvider> = Arc::new(MockModelProvider::default());
5712        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5713
5714        let state = AppState {
5715            config: Arc::new(RwLock::new(Config::default())),
5716            model_provider,
5717            model: "test-model".into(),
5718            temperature: None,
5719            mem: memory.clone(),
5720            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5721                Arc::clone(&memory),
5722                zeroclaw_config::schema::MemoryConfig::default(),
5723                std::path::PathBuf::new(),
5724            )),
5725            auto_save: false,
5726            webhook_secret_hash: None,
5727            pairing: Arc::new(PairingGuard::new(false, &[])),
5728            trust_forwarded_headers: false,
5729            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5730            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5731            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5732            #[cfg(feature = "channel-whatsapp-cloud")]
5733            whatsapp: None,
5734            #[cfg(feature = "channel-whatsapp-cloud")]
5735            whatsapp_app_secret: None,
5736            #[cfg(feature = "channel-linq")]
5737            linq: HashMap::new(),
5738            #[cfg(feature = "channel-linq")]
5739            linq_signing_secrets: HashMap::new(),
5740            #[cfg(feature = "channel-nextcloud")]
5741            nextcloud_talk: None,
5742            #[cfg(feature = "channel-nextcloud")]
5743            nextcloud_talk_webhook_secret: None,
5744            #[cfg(feature = "channel-wati")]
5745            wati: None,
5746            #[cfg(feature = "channel-email")]
5747            gmail_push: None,
5748            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5749            tools_registry: Arc::new(Vec::new()),
5750            cost_tracker: None,
5751            event_tx: tokio::sync::broadcast::channel(16).0,
5752            event_buffer: Arc::new(sse::EventBuffer::new(16)),
5753            shutdown_tx: tokio::sync::watch::channel(false).0,
5754            reload_tx: None,
5755            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5756            path_prefix: String::new(),
5757            web_dist_dir: None,
5758            session_backend: None,
5759            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5760                8, 30, 600,
5761            )),
5762            device_registry: None,
5763            pending_pairings: None,
5764            canvas_store: CanvasStore::new(),
5765            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5766            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5767            tui_registry: None,
5768            #[cfg(feature = "webauthn")]
5769            webauthn: None,
5770        };
5771
5772        let response = Box::pin(handle_nextcloud_talk_webhook(
5773            State(state),
5774            HeaderMap::new(),
5775            Bytes::from_static(br#"{"type":"message"}"#),
5776        ))
5777        .await
5778        .into_response();
5779
5780        assert_eq!(response.status(), StatusCode::NOT_FOUND);
5781    }
5782
5783    #[cfg(feature = "channel-nextcloud")]
5784    #[tokio::test]
5785    async fn nextcloud_talk_webhook_rejects_invalid_signature() {
5786        let provider_impl = Arc::new(MockModelProvider::default());
5787        let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5788        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5789
5790        let alias = "nextcloud_talk_test_alias";
5791        let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = Arc::new(Vec::new);
5792        let channel = Arc::new(NextcloudTalkChannel::new(
5793            "https://cloud.example.com".into(),
5794            "app-token".into(),
5795            String::new(),
5796            alias,
5797            peer_resolver,
5798        ));
5799
5800        let secret = "nextcloud-test-secret";
5801        let random = "seed-value";
5802        let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
5803        let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
5804        let invalid_signature = "deadbeef";
5805
5806        let state = AppState {
5807            config: Arc::new(RwLock::new(Config::default())),
5808            model_provider,
5809            model: "test-model".into(),
5810            temperature: None,
5811            mem: memory.clone(),
5812            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5813                Arc::clone(&memory),
5814                zeroclaw_config::schema::MemoryConfig::default(),
5815                std::path::PathBuf::new(),
5816            )),
5817            auto_save: false,
5818            webhook_secret_hash: None,
5819            pairing: Arc::new(PairingGuard::new(false, &[])),
5820            trust_forwarded_headers: false,
5821            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5822            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5823            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5824            #[cfg(feature = "channel-whatsapp-cloud")]
5825            whatsapp: None,
5826            #[cfg(feature = "channel-whatsapp-cloud")]
5827            whatsapp_app_secret: None,
5828            #[cfg(feature = "channel-linq")]
5829            linq: HashMap::new(),
5830            #[cfg(feature = "channel-linq")]
5831            linq_signing_secrets: HashMap::new(),
5832            nextcloud_talk: Some(channel),
5833            nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
5834            #[cfg(feature = "channel-wati")]
5835            wati: None,
5836            #[cfg(feature = "channel-email")]
5837            gmail_push: None,
5838            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5839            tools_registry: Arc::new(Vec::new()),
5840            cost_tracker: None,
5841            event_tx: tokio::sync::broadcast::channel(16).0,
5842            event_buffer: Arc::new(sse::EventBuffer::new(16)),
5843            shutdown_tx: tokio::sync::watch::channel(false).0,
5844            reload_tx: None,
5845            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5846            path_prefix: String::new(),
5847            web_dist_dir: None,
5848            session_backend: None,
5849            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5850                8, 30, 600,
5851            )),
5852            device_registry: None,
5853            pending_pairings: None,
5854            canvas_store: CanvasStore::new(),
5855            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5856            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5857            tui_registry: None,
5858            #[cfg(feature = "webauthn")]
5859            webauthn: None,
5860        };
5861
5862        let mut headers = HeaderMap::new();
5863        headers.insert(
5864            "X-Nextcloud-Talk-Random",
5865            HeaderValue::from_str(random).unwrap(),
5866        );
5867        headers.insert(
5868            "X-Nextcloud-Talk-Signature",
5869            HeaderValue::from_str(invalid_signature).unwrap(),
5870        );
5871
5872        let response = Box::pin(handle_nextcloud_talk_webhook(
5873            State(state),
5874            headers,
5875            Bytes::from(body),
5876        ))
5877        .await
5878        .into_response();
5879        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
5880        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
5881    }
5882
5883    // Regression for #6156: handler must return 200 OK before the (potentially
5884    // slow) LLM call completes, so Nextcloud Talk doesn't cancel the webhook
5885    // request at its ~5s timeout.
5886    #[cfg(feature = "channel-nextcloud")]
5887    #[derive(Default)]
5888    struct SlowProvider {
5889        calls: AtomicUsize,
5890        started_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
5891    }
5892
5893    #[cfg(feature = "channel-nextcloud")]
5894    #[async_trait]
5895    impl ModelProvider for SlowProvider {
5896        async fn chat_with_system(
5897            &self,
5898            _system_prompt: Option<&str>,
5899            _message: &str,
5900            _model: &str,
5901            _temperature: Option<f64>,
5902        ) -> anyhow::Result<String> {
5903            self.calls.fetch_add(1, Ordering::SeqCst);
5904            if let Some(tx) = self.started_tx.lock().take() {
5905                let _ = tx.send(());
5906            }
5907            tokio::time::sleep(Duration::from_secs(30)).await;
5908            Ok("slow ok".into())
5909        }
5910    }
5911    #[cfg(feature = "channel-nextcloud")]
5912    impl ::zeroclaw_api::attribution::Attributable for SlowProvider {
5913        fn role(&self) -> ::zeroclaw_api::attribution::Role {
5914            ::zeroclaw_api::attribution::Role::Provider(
5915                ::zeroclaw_api::attribution::ProviderKind::Model(
5916                    ::zeroclaw_api::attribution::ModelProviderKind::Custom,
5917                ),
5918            )
5919        }
5920        fn alias(&self) -> &str {
5921            "SlowProvider"
5922        }
5923    }
5924
5925    #[cfg(feature = "channel-nextcloud")]
5926    #[tokio::test]
5927    async fn nextcloud_talk_webhook_returns_before_llm_call_completes() {
5928        let (started_tx, started_rx) = tokio::sync::oneshot::channel();
5929        let provider_impl = Arc::new(SlowProvider {
5930            calls: AtomicUsize::new(0),
5931            started_tx: Mutex::new(Some(started_tx)),
5932        });
5933        let provider: Arc<dyn ModelProvider> = provider_impl.clone();
5934        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5935
5936        let channel = Arc::new(NextcloudTalkChannel::new(
5937            "https://cloud.example.com".into(),
5938            "app-token".into(),
5939            String::new(),
5940            "default",
5941            Arc::new(|| vec!["*".to_string()]),
5942        ));
5943
5944        let body = r#"{"type":"message","object":{"token":"room-token"},"actor":{"id":"user_a","name":"User A"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
5945
5946        let state = AppState {
5947            config: Arc::new(RwLock::new(Config::default())),
5948            model_provider: provider,
5949            model: "test-model".into(),
5950            temperature: None,
5951            mem: memory.clone(),
5952            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5953                Arc::clone(&memory),
5954                zeroclaw_config::schema::MemoryConfig::default(),
5955                std::path::PathBuf::new(),
5956            )),
5957            auto_save: false,
5958            webhook_secret_hash: None,
5959            pairing: Arc::new(PairingGuard::new(false, &[])),
5960            trust_forwarded_headers: false,
5961            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5962            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5963            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5964            #[cfg(feature = "channel-whatsapp-cloud")]
5965            whatsapp: None,
5966            #[cfg(feature = "channel-whatsapp-cloud")]
5967            whatsapp_app_secret: None,
5968            #[cfg(feature = "channel-linq")]
5969            linq: HashMap::new(),
5970            #[cfg(feature = "channel-linq")]
5971            linq_signing_secrets: HashMap::new(),
5972            nextcloud_talk: Some(channel),
5973            nextcloud_talk_webhook_secret: None,
5974            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5975            tui_registry: None,
5976            #[cfg(feature = "channel-wati")]
5977            wati: None,
5978            #[cfg(feature = "channel-email")]
5979            gmail_push: None,
5980            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5981            tools_registry: Arc::new(Vec::new()),
5982            cost_tracker: None,
5983            event_tx: tokio::sync::broadcast::channel(16).0,
5984            event_buffer: Arc::new(sse::EventBuffer::new(16)),
5985            shutdown_tx: tokio::sync::watch::channel(false).0,
5986            reload_tx: None,
5987            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5988            path_prefix: String::new(),
5989            web_dist_dir: None,
5990            session_backend: None,
5991            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5992                8, 30, 600,
5993            )),
5994            device_registry: None,
5995            pending_pairings: None,
5996            canvas_store: CanvasStore::new(),
5997            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5998            #[cfg(feature = "webauthn")]
5999            webauthn: None,
6000        };
6001
6002        let start = std::time::Instant::now();
6003        let response = tokio::time::timeout(
6004            Duration::from_secs(2),
6005            Box::pin(handle_nextcloud_talk_webhook(
6006                State(state),
6007                HeaderMap::new(),
6008                Bytes::from(body),
6009            )),
6010        )
6011        .await
6012        .expect("webhook must return before 2s deadline (regression #6156)")
6013        .into_response();
6014
6015        let elapsed = start.elapsed();
6016        assert_eq!(response.status(), StatusCode::OK);
6017        assert!(
6018            elapsed < Duration::from_secs(2),
6019            "handler returned after {elapsed:?}; expected fast return for #6156"
6020        );
6021
6022        // Confirm the spawned task actually started the LLM call (i.e., the
6023        // ack didn't just skip processing). The 30s sleep is still in flight.
6024        tokio::time::timeout(Duration::from_secs(2), started_rx)
6025            .await
6026            .expect("spawned LLM call did not start within 2s")
6027            .expect("started_tx sender was dropped");
6028        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
6029    }
6030
6031    // ══════════════════════════════════════════════════════════
6032    // WhatsApp Signature Verification Tests (CWE-345 Prevention)
6033    // ══════════════════════════════════════════════════════════
6034
6035    #[cfg(feature = "channel-whatsapp-cloud")]
6036    fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
6037        use hmac::{Hmac, Mac};
6038        use sha2::Sha256;
6039
6040        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
6041        mac.update(body);
6042        hex::encode(mac.finalize().into_bytes())
6043    }
6044
6045    #[cfg(feature = "channel-whatsapp-cloud")]
6046    fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
6047        format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
6048    }
6049
6050    #[cfg(feature = "channel-whatsapp-cloud")]
6051    #[test]
6052    fn whatsapp_signature_valid() {
6053        let app_secret = generate_test_secret();
6054        let body = b"test body content";
6055
6056        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
6057
6058        assert!(verify_whatsapp_signature(
6059            &app_secret,
6060            body,
6061            &signature_header
6062        ));
6063    }
6064
6065    #[cfg(feature = "channel-whatsapp-cloud")]
6066    #[test]
6067    fn whatsapp_signature_invalid_wrong_secret() {
6068        let app_secret = generate_test_secret();
6069        let wrong_secret = generate_test_secret();
6070        let body = b"test body content";
6071
6072        let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
6073
6074        assert!(!verify_whatsapp_signature(
6075            &app_secret,
6076            body,
6077            &signature_header
6078        ));
6079    }
6080
6081    #[cfg(feature = "channel-whatsapp-cloud")]
6082    #[test]
6083    fn whatsapp_signature_invalid_wrong_body() {
6084        let app_secret = generate_test_secret();
6085        let original_body = b"original body";
6086        let tampered_body = b"tampered body";
6087
6088        let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
6089
6090        // Verify with tampered body should fail
6091        assert!(!verify_whatsapp_signature(
6092            &app_secret,
6093            tampered_body,
6094            &signature_header
6095        ));
6096    }
6097
6098    #[cfg(feature = "channel-whatsapp-cloud")]
6099    #[test]
6100    fn whatsapp_signature_missing_prefix() {
6101        let app_secret = generate_test_secret();
6102        let body = b"test body";
6103
6104        // Signature without "sha256=" prefix
6105        let signature_header = "abc123def456";
6106
6107        assert!(!verify_whatsapp_signature(
6108            &app_secret,
6109            body,
6110            signature_header
6111        ));
6112    }
6113
6114    #[cfg(feature = "channel-whatsapp-cloud")]
6115    #[test]
6116    fn whatsapp_signature_empty_header() {
6117        let app_secret = generate_test_secret();
6118        let body = b"test body";
6119
6120        assert!(!verify_whatsapp_signature(&app_secret, body, ""));
6121    }
6122
6123    #[cfg(feature = "channel-whatsapp-cloud")]
6124    #[test]
6125    fn whatsapp_signature_invalid_hex() {
6126        let app_secret = generate_test_secret();
6127        let body = b"test body";
6128
6129        // Invalid hex characters
6130        let signature_header = "sha256=not_valid_hex_zzz";
6131
6132        assert!(!verify_whatsapp_signature(
6133            &app_secret,
6134            body,
6135            signature_header
6136        ));
6137    }
6138
6139    #[cfg(feature = "channel-whatsapp-cloud")]
6140    #[test]
6141    fn whatsapp_signature_empty_body() {
6142        let app_secret = generate_test_secret();
6143        let body = b"";
6144
6145        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
6146
6147        assert!(verify_whatsapp_signature(
6148            &app_secret,
6149            body,
6150            &signature_header
6151        ));
6152    }
6153
6154    #[cfg(feature = "channel-whatsapp-cloud")]
6155    #[test]
6156    fn whatsapp_signature_unicode_body() {
6157        let app_secret = generate_test_secret();
6158        let body = "Hello 🦀 World".as_bytes();
6159
6160        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
6161
6162        assert!(verify_whatsapp_signature(
6163            &app_secret,
6164            body,
6165            &signature_header
6166        ));
6167    }
6168
6169    #[cfg(feature = "channel-whatsapp-cloud")]
6170    #[test]
6171    fn whatsapp_signature_json_payload() {
6172        let app_secret = generate_test_secret();
6173        let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
6174
6175        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
6176
6177        assert!(verify_whatsapp_signature(
6178            &app_secret,
6179            body,
6180            &signature_header
6181        ));
6182    }
6183
6184    #[cfg(feature = "channel-whatsapp-cloud")]
6185    #[test]
6186    fn whatsapp_signature_case_sensitive_prefix() {
6187        let app_secret = generate_test_secret();
6188        let body = b"test body";
6189
6190        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
6191
6192        // Wrong case prefix should fail
6193        let wrong_prefix = format!("SHA256={hex_sig}");
6194        assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
6195
6196        // Correct prefix should pass
6197        let correct_prefix = format!("sha256={hex_sig}");
6198        assert!(verify_whatsapp_signature(
6199            &app_secret,
6200            body,
6201            &correct_prefix
6202        ));
6203    }
6204
6205    #[cfg(feature = "channel-whatsapp-cloud")]
6206    #[test]
6207    fn whatsapp_signature_truncated_hex() {
6208        let app_secret = generate_test_secret();
6209        let body = b"test body";
6210
6211        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
6212        let truncated = &hex_sig[..32]; // Only half the signature
6213        let signature_header = format!("sha256={truncated}");
6214
6215        assert!(!verify_whatsapp_signature(
6216            &app_secret,
6217            body,
6218            &signature_header
6219        ));
6220    }
6221
6222    #[cfg(feature = "channel-whatsapp-cloud")]
6223    #[test]
6224    fn whatsapp_signature_extra_bytes() {
6225        let app_secret = generate_test_secret();
6226        let body = b"test body";
6227
6228        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
6229        let extended = format!("{hex_sig}deadbeef");
6230        let signature_header = format!("sha256={extended}");
6231
6232        assert!(!verify_whatsapp_signature(
6233            &app_secret,
6234            body,
6235            &signature_header
6236        ));
6237    }
6238
6239    // ══════════════════════════════════════════════════════════
6240    // IdempotencyStore Edge-Case Tests
6241    // ══════════════════════════════════════════════════════════
6242
6243    #[test]
6244    fn idempotency_store_allows_different_keys() {
6245        let store = IdempotencyStore::new(Duration::from_secs(60), 100);
6246        assert!(store.record_if_new("key-a"));
6247        assert!(store.record_if_new("key-b"));
6248        assert!(store.record_if_new("key-c"));
6249        assert!(store.record_if_new("key-d"));
6250    }
6251
6252    #[test]
6253    fn idempotency_store_max_keys_clamped_to_one() {
6254        let store = IdempotencyStore::new(Duration::from_secs(60), 0);
6255        assert!(store.record_if_new("only-key"));
6256        assert!(!store.record_if_new("only-key"));
6257    }
6258
6259    #[test]
6260    fn idempotency_store_rapid_duplicate_rejected() {
6261        let store = IdempotencyStore::new(Duration::from_secs(300), 100);
6262        assert!(store.record_if_new("rapid"));
6263        assert!(!store.record_if_new("rapid"));
6264    }
6265
6266    #[test]
6267    fn idempotency_store_accepts_after_ttl_expires() {
6268        let store = IdempotencyStore::new(Duration::from_millis(1), 100);
6269        assert!(store.record_if_new("ttl-key"));
6270        std::thread::sleep(Duration::from_millis(10));
6271        assert!(store.record_if_new("ttl-key"));
6272    }
6273
6274    #[test]
6275    fn idempotency_store_eviction_preserves_newest() {
6276        let store = IdempotencyStore::new(Duration::from_secs(300), 1);
6277        assert!(store.record_if_new("old-key"));
6278        std::thread::sleep(Duration::from_millis(2));
6279        assert!(store.record_if_new("new-key"));
6280
6281        let keys = store.keys.lock();
6282        assert_eq!(keys.len(), 1);
6283        assert!(!keys.contains_key("old-key"));
6284        assert!(keys.contains_key("new-key"));
6285    }
6286
6287    #[test]
6288    fn rate_limiter_allows_after_window_expires() {
6289        let window = Duration::from_millis(50);
6290        let limiter = SlidingWindowRateLimiter::new(2, window, 100);
6291        assert!(limiter.allow("ip-1"));
6292        assert!(limiter.allow("ip-1"));
6293        assert!(!limiter.allow("ip-1")); // blocked
6294
6295        // Wait for window to expire
6296        std::thread::sleep(Duration::from_millis(60));
6297
6298        // Should be allowed again
6299        assert!(limiter.allow("ip-1"));
6300    }
6301
6302    #[test]
6303    fn rate_limiter_independent_keys_tracked_separately() {
6304        let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
6305        assert!(limiter.allow("ip-1"));
6306        assert!(limiter.allow("ip-1"));
6307        assert!(!limiter.allow("ip-1")); // ip-1 blocked
6308
6309        // ip-2 should still work
6310        assert!(limiter.allow("ip-2"));
6311        assert!(limiter.allow("ip-2"));
6312        assert!(!limiter.allow("ip-2")); // ip-2 now blocked
6313    }
6314
6315    #[test]
6316    fn rate_limiter_exact_boundary_at_max_keys() {
6317        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
6318        assert!(limiter.allow("ip-1"));
6319        assert!(limiter.allow("ip-2"));
6320        assert!(limiter.allow("ip-3"));
6321        // At capacity now
6322        assert!(limiter.allow("ip-4")); // should evict ip-1
6323
6324        let guard = limiter.requests.lock();
6325        assert_eq!(guard.0.len(), 3);
6326        assert!(
6327            !guard.0.contains_key("ip-1"),
6328            "ip-1 should have been evicted"
6329        );
6330        assert!(guard.0.contains_key("ip-2"));
6331        assert!(guard.0.contains_key("ip-3"));
6332        assert!(guard.0.contains_key("ip-4"));
6333    }
6334
6335    #[test]
6336    fn gateway_rate_limiter_pair_and_webhook_are_independent() {
6337        let limiter = GatewayRateLimiter::new(2, 3, 100);
6338
6339        // Exhaust pair limit
6340        assert!(limiter.allow_pair("ip-1"));
6341        assert!(limiter.allow_pair("ip-1"));
6342        assert!(!limiter.allow_pair("ip-1")); // pair blocked
6343
6344        // Webhook should still work
6345        assert!(limiter.allow_webhook("ip-1"));
6346        assert!(limiter.allow_webhook("ip-1"));
6347        assert!(limiter.allow_webhook("ip-1"));
6348        assert!(!limiter.allow_webhook("ip-1")); // webhook now blocked
6349    }
6350
6351    #[test]
6352    fn rate_limiter_single_key_max_allows_one_request() {
6353        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
6354        assert!(limiter.allow("ip-1"));
6355        assert!(limiter.allow("ip-2")); // evicts ip-1
6356
6357        let guard = limiter.requests.lock();
6358        assert_eq!(guard.0.len(), 1);
6359        assert!(guard.0.contains_key("ip-2"));
6360        assert!(!guard.0.contains_key("ip-1"));
6361    }
6362
6363    #[test]
6364    fn rate_limiter_concurrent_access_safe() {
6365        use std::sync::Arc;
6366
6367        let limiter = Arc::new(SlidingWindowRateLimiter::new(
6368            1000,
6369            Duration::from_secs(60),
6370            1000,
6371        ));
6372        let mut handles = Vec::new();
6373
6374        for i in 0..10 {
6375            let limiter = limiter.clone();
6376            handles.push(std::thread::spawn(move || {
6377                for j in 0..100 {
6378                    limiter.allow(&format!("thread-{i}-req-{j}"));
6379                }
6380            }));
6381        }
6382
6383        for handle in handles {
6384            handle.join().unwrap();
6385        }
6386
6387        // Should not panic or deadlock
6388        let guard = limiter.requests.lock();
6389        assert!(guard.0.len() <= 1000, "should respect max_keys");
6390    }
6391
6392    #[test]
6393    fn idempotency_store_concurrent_access_safe() {
6394        use std::sync::Arc;
6395
6396        let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
6397        let mut handles = Vec::new();
6398
6399        for i in 0..10 {
6400            let store = store.clone();
6401            handles.push(std::thread::spawn(move || {
6402                for j in 0..100 {
6403                    store.record_if_new(&format!("thread-{i}-key-{j}"));
6404                }
6405            }));
6406        }
6407
6408        for handle in handles {
6409            handle.join().unwrap();
6410        }
6411
6412        let keys = store.keys.lock();
6413        assert!(keys.len() <= 1000, "should respect max_keys");
6414    }
6415
6416    #[test]
6417    fn rate_limiter_rapid_burst_then_cooldown() {
6418        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
6419
6420        // Burst: use all 5 requests
6421        for _ in 0..5 {
6422            assert!(limiter.allow("burst-ip"));
6423        }
6424        assert!(!limiter.allow("burst-ip")); // 6th should fail
6425
6426        // Cooldown
6427        std::thread::sleep(Duration::from_millis(60));
6428
6429        // Should be allowed again
6430        assert!(limiter.allow("burst-ip"));
6431    }
6432
6433    #[test]
6434    fn require_localhost_accepts_ipv4_loopback() {
6435        let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
6436        assert!(require_localhost(&peer).is_ok());
6437    }
6438
6439    #[test]
6440    fn require_localhost_accepts_ipv6_loopback() {
6441        let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
6442        assert!(require_localhost(&peer).is_ok());
6443    }
6444
6445    #[test]
6446    fn require_localhost_rejects_non_loopback_ipv4() {
6447        let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
6448        let err = require_localhost(&peer).unwrap_err();
6449        assert_eq!(err.0, StatusCode::FORBIDDEN);
6450    }
6451
6452    #[test]
6453    fn require_localhost_rejects_non_loopback_ipv6() {
6454        let peer = SocketAddr::from((
6455            std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
6456            12345,
6457        ));
6458        let err = require_localhost(&peer).unwrap_err();
6459        assert_eq!(err.0, StatusCode::FORBIDDEN);
6460    }
6461
6462    #[test]
6463    fn admin_reload_gate_loopback_always_allowed() {
6464        // Loopback is allowed regardless of the opt-in or pairing flags.
6465        assert_eq!(
6466            admin_reload_gate(true, false, false),
6467            AdminReloadGate::Allow
6468        );
6469        assert_eq!(admin_reload_gate(true, true, true), AdminReloadGate::Allow);
6470        assert_eq!(admin_reload_gate(true, false, true), AdminReloadGate::Allow);
6471        assert_eq!(admin_reload_gate(true, true, false), AdminReloadGate::Allow);
6472    }
6473
6474    #[test]
6475    fn admin_reload_gate_remote_blocked_by_default() {
6476        // Non-loopback caller with the flag off is rejected outright,
6477        // regardless of pairing.
6478        assert_eq!(
6479            admin_reload_gate(false, false, true),
6480            AdminReloadGate::Forbidden
6481        );
6482        assert_eq!(
6483            admin_reload_gate(false, false, false),
6484            AdminReloadGate::Forbidden
6485        );
6486    }
6487
6488    #[test]
6489    fn admin_reload_gate_remote_opt_in_requires_auth() {
6490        // Non-loopback caller with the flag on and pairing on must authenticate.
6491        assert_eq!(
6492            admin_reload_gate(false, true, true),
6493            AdminReloadGate::RequireAuth
6494        );
6495    }
6496
6497    #[test]
6498    fn admin_reload_gate_remote_opt_in_without_pairing_is_rejected() {
6499        // Opting in with pairing off cannot authenticate the caller, so the
6500        // request is rejected rather than allowed anonymously.
6501        assert_eq!(
6502            admin_reload_gate(false, true, false),
6503            AdminReloadGate::ForbiddenNoPairing
6504        );
6505    }
6506
6507    #[test]
6508    fn allow_remote_admin_defaults_off() {
6509        // Security default: remote admin reload is disabled until opted in.
6510        assert!(!zeroclaw_config::schema::GatewayConfig::default().allow_remote_admin);
6511    }
6512
6513    // ── handle_admin_reload route-level tests ─────────────────────
6514    // Beyond the pure `admin_reload_gate` policy tests, these exercise the
6515    // real handler path (ConnectInfo + HeaderMap + PairingGuard + config),
6516    // proving `allow_remote_admin` cannot expose an unauthenticated remote
6517    // reload and that a valid paired token is required and sufficient.
6518
6519    /// Build an `AppState` for `handle_admin_reload`: controls
6520    /// `gateway.allow_remote_admin`, pairing (and its tokens), and wires a
6521    /// live reload channel so the allowed path reaches `200` rather than the
6522    /// `503` standalone-gateway branch.
6523    fn admin_reload_state(
6524        tmp: &tempfile::TempDir,
6525        allow_remote_admin: bool,
6526        require_pairing: bool,
6527        tokens: &[String],
6528    ) -> AppState {
6529        let mut state = admin_paircode_state(tmp, require_pairing, false);
6530        state.config.write().gateway.allow_remote_admin = allow_remote_admin;
6531        state.pairing = Arc::new(PairingGuard::new(require_pairing, tokens));
6532        state.reload_tx = Some(tokio::sync::watch::channel(false).0);
6533        state
6534    }
6535
6536    fn loopback_peer() -> SocketAddr {
6537        SocketAddr::from(([127, 0, 0, 1], 40000))
6538    }
6539
6540    fn remote_peer() -> SocketAddr {
6541        // RFC 5737 TEST-NET-3 documentation address — a stable non-loopback
6542        // peer that is never a real host on anyone's network.
6543        SocketAddr::from(([203, 0, 113, 50], 40000))
6544    }
6545
6546    fn bearer_headers(token: &str) -> HeaderMap {
6547        let mut headers = HeaderMap::new();
6548        headers.insert(
6549            header::AUTHORIZATION,
6550            HeaderValue::from_str(&format!("Bearer {token}")).unwrap(),
6551        );
6552        headers
6553    }
6554
6555    #[tokio::test]
6556    async fn admin_reload_loopback_no_token_reloads() {
6557        let tmp = tempfile::tempdir().unwrap();
6558        let state = admin_reload_state(&tmp, false, true, &[]);
6559        let resp =
6560            handle_admin_reload(State(state), ConnectInfo(loopback_peer()), HeaderMap::new())
6561                .await
6562                .unwrap()
6563                .into_response();
6564        assert_eq!(resp.status(), StatusCode::OK);
6565    }
6566
6567    #[tokio::test]
6568    async fn admin_reload_remote_default_off_is_forbidden() {
6569        let tmp = tempfile::tempdir().unwrap();
6570        let state = admin_reload_state(&tmp, false, true, &[]);
6571        let err = handle_admin_reload(State(state), ConnectInfo(remote_peer()), HeaderMap::new())
6572            .await
6573            .err()
6574            .unwrap();
6575        assert_eq!(err.0, StatusCode::FORBIDDEN);
6576    }
6577
6578    #[tokio::test]
6579    async fn admin_reload_remote_opt_in_without_pairing_does_not_reload() {
6580        // The fixed hole: allow_remote_admin = true + require_pairing = false
6581        // must NOT permit an anonymous remote reload.
6582        let tmp = tempfile::tempdir().unwrap();
6583        let state = admin_reload_state(&tmp, true, false, &[]);
6584        let err = handle_admin_reload(State(state), ConnectInfo(remote_peer()), HeaderMap::new())
6585            .await
6586            .err()
6587            .unwrap();
6588        assert_eq!(err.0, StatusCode::FORBIDDEN);
6589    }
6590
6591    #[tokio::test]
6592    async fn admin_reload_remote_opt_in_missing_token_is_rejected() {
6593        let tmp = tempfile::tempdir().unwrap();
6594        let state = admin_reload_state(&tmp, true, true, &["zc_test_token".to_string()]);
6595        let err = handle_admin_reload(State(state), ConnectInfo(remote_peer()), HeaderMap::new())
6596            .await
6597            .err()
6598            .unwrap();
6599        assert_eq!(err.0, StatusCode::UNAUTHORIZED);
6600    }
6601
6602    #[tokio::test]
6603    async fn admin_reload_remote_opt_in_invalid_token_is_rejected() {
6604        let tmp = tempfile::tempdir().unwrap();
6605        let state = admin_reload_state(&tmp, true, true, &["zc_test_token".to_string()]);
6606        let err = handle_admin_reload(
6607            State(state),
6608            ConnectInfo(remote_peer()),
6609            bearer_headers("not-the-token"),
6610        )
6611        .await
6612        .err()
6613        .unwrap();
6614        assert_eq!(err.0, StatusCode::UNAUTHORIZED);
6615    }
6616
6617    #[tokio::test]
6618    async fn admin_reload_remote_opt_in_valid_token_reloads() {
6619        let tmp = tempfile::tempdir().unwrap();
6620        let state = admin_reload_state(&tmp, true, true, &["zc_test_token".to_string()]);
6621        let resp = handle_admin_reload(
6622            State(state),
6623            ConnectInfo(remote_peer()),
6624            bearer_headers("zc_test_token"),
6625        )
6626        .await
6627        .unwrap()
6628        .into_response();
6629        assert_eq!(resp.status(), StatusCode::OK);
6630    }
6631
6632    #[test]
6633    fn needs_quickstart_for_flags_empty_model() {
6634        let err =
6635            needs_quickstart_for("").expect("empty model must produce a needs_quickstart error");
6636        let msg = err.to_string();
6637        assert!(
6638            msg.contains("needs_quickstart"),
6639            "error must carry the needs_quickstart marker for callers to map to 503; got: {msg}"
6640        );
6641        assert!(
6642            msg.contains("/quickstart"),
6643            "error must point the user at /quickstart; got: {msg}"
6644        );
6645    }
6646
6647    #[test]
6648    fn needs_quickstart_for_flags_whitespace_only_model() {
6649        assert!(
6650            needs_quickstart_for("   ").is_some(),
6651            "whitespace-only model must be treated as empty"
6652        );
6653        assert!(
6654            needs_quickstart_for("\n\t ").is_some(),
6655            "tabs and newlines count as empty too"
6656        );
6657    }
6658
6659    #[test]
6660    fn needs_quickstart_for_passes_real_model() {
6661        assert!(
6662            needs_quickstart_for("anthropic/claude-sonnet-4").is_none(),
6663            "a real model id must not be flagged"
6664        );
6665        assert!(
6666            needs_quickstart_for("  gpt-4  ").is_none(),
6667            "leading/trailing whitespace around a real model id must not be flagged"
6668        );
6669    }
6670
6671    #[test]
6672    fn is_needs_quickstart_err_detects_marker_from_helper() {
6673        let err = needs_quickstart_for("").expect("empty model produces marker");
6674        assert!(
6675            is_needs_quickstart_err(&err),
6676            "the marker emitted by needs_quickstart_for must be detected"
6677        );
6678    }
6679
6680    #[test]
6681    fn is_needs_quickstart_err_ignores_unrelated_errors() {
6682        let err = anyhow::Error::msg("upstream timeout: provider returned 504");
6683        assert!(
6684            !is_needs_quickstart_err(&err),
6685            "unrelated errors must not be misclassified as needs_quickstart"
6686        );
6687        let err = anyhow::Error::msg("invalid api key");
6688        assert!(!is_needs_quickstart_err(&err));
6689    }
6690
6691    #[test]
6692    fn is_needs_quickstart_err_detects_via_substring() {
6693        // Defends the contract that the substring marker is the
6694        // detection key — not the exact string. Wrappers (e.g.
6695        // anyhow::Error::context) must not break the check.
6696        let err =
6697            anyhow::Error::msg("provider call failed").context("needs_quickstart: empty model");
6698        assert!(is_needs_quickstart_err(&err));
6699    }
6700
6701    #[test]
6702    fn needs_quickstart_channel_reply_resolves_via_fluent() {
6703        // The Fluent key channel-needs-quickstart-reply must resolve
6704        // to real text from the embedded en/cli.ftl, not the missing-
6705        // key fallback `{channel-needs-quickstart-reply}` that
6706        // `missing_cli_string` produces. Guarding this in a test
6707        // keeps the i18n contract from quietly drifting if the key
6708        // gets renamed in lib.rs without a matching ftl edit.
6709        let reply = needs_quickstart_channel_reply();
6710        assert!(
6711            !reply.starts_with('{') && !reply.ends_with('}'),
6712            "fluent missing-key fallback leaked into channel reply: {reply:?}"
6713        );
6714        assert!(
6715            reply.to_lowercase().contains("quickstart"),
6716            "channel reply must mention Quickstart so users know what's missing: {reply:?}"
6717        );
6718    }
6719
6720    // ══════════════════════════════════════════════════════════
6721    // Linq Multi-Tenant Webhook Routing Tests
6722    // ══════════════════════════════════════════════════════════
6723
6724    /// Helper: compute a valid Linq HMAC-SHA256 signature for the given
6725    /// secret, timestamp, and body.  Mirrors the verification logic in
6726    /// `zeroclaw_channels::linq::verify_linq_signature`.
6727    #[cfg(feature = "channel-linq")]
6728    fn compute_linq_signature_hex(secret: &str, timestamp: &str, body: &str) -> String {
6729        use hmac::{Hmac, Mac};
6730        use sha2::Sha256;
6731
6732        let message = format!("{timestamp}.{body}");
6733        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
6734        mac.update(message.as_bytes());
6735        hex::encode(mac.finalize().into_bytes())
6736    }
6737
6738    /// Helper: build a minimal Linq webhook payload that `parse_webhook_payload`
6739    /// recognises as a `message.received` event with one text part.
6740    #[cfg(feature = "channel-linq")]
6741    fn linq_webhook_body(sender: &str, text: &str) -> String {
6742        serde_json::json!({
6743            "event_type": "message.received",
6744            "data": {
6745                "sender": { "phone": sender },
6746                "message": {
6747                    "parts": [{ "type": "text", "value": text }]
6748                }
6749            }
6750        })
6751        .to_string()
6752    }
6753
6754    /// Helper: build an `AppState` with one Linq channel registered under the
6755    /// given alias, with an allow-any peer resolver and an optional signing
6756    /// secret.
6757    #[cfg(feature = "channel-linq")]
6758    fn linq_test_state(alias: &str, signing_secret: Option<&str>) -> AppState {
6759        let model_provider: Arc<dyn ModelProvider> = Arc::new(MockModelProvider::default());
6760        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
6761
6762        let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> =
6763            Arc::new(|| vec!["*".to_string()]);
6764        let channel = Arc::new(LinqChannel::new(
6765            "test-token".into(),
6766            "+15550000000".into(),
6767            alias,
6768            peer_resolver,
6769        ));
6770        let mut linq = HashMap::new();
6771        linq.insert(alias.to_string(), channel);
6772
6773        let mut linq_signing_secrets: HashMap<String, Arc<str>> = HashMap::new();
6774        if let Some(secret) = signing_secret {
6775            linq_signing_secrets.insert(alias.to_string(), Arc::from(secret));
6776        }
6777
6778        AppState {
6779            config: Arc::new(RwLock::new(Config::default())),
6780            model_provider,
6781            model: "test-model".into(),
6782            temperature: None,
6783            mem: memory,
6784            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
6785                Arc::new(MockMemory),
6786                zeroclaw_config::schema::MemoryConfig::default(),
6787                std::path::PathBuf::new(),
6788            )),
6789            auto_save: false,
6790            webhook_secret_hash: None,
6791            pairing: Arc::new(PairingGuard::new(false, &[])),
6792            trust_forwarded_headers: false,
6793            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
6794            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
6795            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
6796            #[cfg(feature = "channel-whatsapp-cloud")]
6797            whatsapp: None,
6798            #[cfg(feature = "channel-whatsapp-cloud")]
6799            whatsapp_app_secret: None,
6800            #[cfg(feature = "channel-linq")]
6801            linq,
6802            #[cfg(feature = "channel-linq")]
6803            linq_signing_secrets,
6804            #[cfg(feature = "channel-nextcloud")]
6805            nextcloud_talk: None,
6806            #[cfg(feature = "channel-nextcloud")]
6807            nextcloud_talk_webhook_secret: None,
6808            #[cfg(feature = "channel-wati")]
6809            wati: None,
6810            #[cfg(feature = "channel-email")]
6811            gmail_push: None,
6812            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
6813            tools_registry: Arc::new(Vec::new()),
6814            cost_tracker: None,
6815            event_tx: tokio::sync::broadcast::channel(16).0,
6816            event_buffer: Arc::new(sse::EventBuffer::new(16)),
6817            shutdown_tx: tokio::sync::watch::channel(false).0,
6818            reload_tx: None,
6819            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
6820            path_prefix: String::new(),
6821            web_dist_dir: None,
6822            session_backend: None,
6823            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
6824                8, 30, 600,
6825            )),
6826            device_registry: None,
6827            pending_pairings: None,
6828            canvas_store: CanvasStore::new(),
6829            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
6830            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
6831            tui_registry: None,
6832            #[cfg(feature = "webauthn")]
6833            webauthn: None,
6834        }
6835    }
6836
6837    #[cfg(feature = "channel-linq")]
6838    #[tokio::test]
6839    async fn linq_webhook_returns_not_found_for_unknown_alias() {
6840        // No Linq channels configured at all.
6841        let state = linq_test_state("production", None);
6842
6843        let response = Box::pin(handle_linq_webhook(
6844            State(state),
6845            Path("staging".to_string()),
6846            HeaderMap::new(),
6847            Bytes::from_static(br#"{"event_type":"message.received"}"#),
6848        ))
6849        .await
6850        .into_response();
6851
6852        assert_eq!(response.status(), StatusCode::NOT_FOUND);
6853    }
6854
6855    #[cfg(feature = "channel-linq")]
6856    #[tokio::test]
6857    async fn linq_webhook_returns_not_found_when_no_channels_configured() {
6858        let model_provider: Arc<dyn ModelProvider> = Arc::new(MockModelProvider::default());
6859        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
6860
6861        let state = AppState {
6862            config: Arc::new(RwLock::new(Config::default())),
6863            model_provider,
6864            model: "test-model".into(),
6865            temperature: None,
6866            mem: memory,
6867            memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
6868                Arc::new(MockMemory),
6869                zeroclaw_config::schema::MemoryConfig::default(),
6870                std::path::PathBuf::new(),
6871            )),
6872            auto_save: false,
6873            webhook_secret_hash: None,
6874            pairing: Arc::new(PairingGuard::new(false, &[])),
6875            trust_forwarded_headers: false,
6876            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
6877            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
6878            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
6879            #[cfg(feature = "channel-whatsapp-cloud")]
6880            whatsapp: None,
6881            #[cfg(feature = "channel-whatsapp-cloud")]
6882            whatsapp_app_secret: None,
6883            #[cfg(feature = "channel-linq")]
6884            linq: HashMap::new(),
6885            #[cfg(feature = "channel-linq")]
6886            linq_signing_secrets: HashMap::new(),
6887            #[cfg(feature = "channel-nextcloud")]
6888            nextcloud_talk: None,
6889            #[cfg(feature = "channel-nextcloud")]
6890            nextcloud_talk_webhook_secret: None,
6891            #[cfg(feature = "channel-wati")]
6892            wati: None,
6893            #[cfg(feature = "channel-email")]
6894            gmail_push: None,
6895            observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
6896            tools_registry: Arc::new(Vec::new()),
6897            cost_tracker: None,
6898            event_tx: tokio::sync::broadcast::channel(16).0,
6899            event_buffer: Arc::new(sse::EventBuffer::new(16)),
6900            shutdown_tx: tokio::sync::watch::channel(false).0,
6901            reload_tx: None,
6902            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
6903            path_prefix: String::new(),
6904            web_dist_dir: None,
6905            session_backend: None,
6906            session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
6907                8, 30, 600,
6908            )),
6909            device_registry: None,
6910            pending_pairings: None,
6911            canvas_store: CanvasStore::new(),
6912            cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
6913            pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
6914            tui_registry: None,
6915            #[cfg(feature = "webauthn")]
6916            webauthn: None,
6917        };
6918
6919        let response = Box::pin(handle_linq_webhook(
6920            State(state),
6921            Path("default".to_string()),
6922            HeaderMap::new(),
6923            Bytes::from_static(br#"{"event_type":"message.received"}"#),
6924        ))
6925        .await
6926        .into_response();
6927
6928        assert_eq!(response.status(), StatusCode::NOT_FOUND);
6929    }
6930
6931    #[cfg(feature = "channel-linq")]
6932    #[tokio::test]
6933    async fn linq_webhook_accepts_valid_message_for_known_alias() {
6934        let state = linq_test_state("default", None);
6935        let body = linq_webhook_body("+15551234567", "hello from test");
6936
6937        let response = Box::pin(handle_linq_webhook(
6938            State(state),
6939            Path("default".to_string()),
6940            HeaderMap::new(),
6941            Bytes::from(body),
6942        ))
6943        .await
6944        .into_response();
6945
6946        assert_eq!(response.status(), StatusCode::OK);
6947    }
6948
6949    #[cfg(feature = "channel-linq")]
6950    #[tokio::test]
6951    async fn linq_webhook_rejects_invalid_signature_for_alias() {
6952        let secret = generate_test_secret();
6953        let state = linq_test_state("secure-alias", Some(&secret));
6954
6955        let body = linq_webhook_body("+15551234567", "hello from test");
6956        let mut headers = HeaderMap::new();
6957        headers.insert(
6958            "X-Webhook-Signature",
6959            HeaderValue::from_static("sha256=deadbeef"),
6960        );
6961        headers.insert(
6962            "X-Webhook-Timestamp",
6963            HeaderValue::from_static("9999999999"),
6964        );
6965
6966        let response = Box::pin(handle_linq_webhook(
6967            State(state),
6968            Path("secure-alias".to_string()),
6969            headers,
6970            Bytes::from(body),
6971        ))
6972        .await
6973        .into_response();
6974
6975        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
6976    }
6977
6978    #[cfg(feature = "channel-linq")]
6979    #[tokio::test]
6980    async fn linq_webhook_accepts_valid_signature_for_alias() {
6981        let secret = generate_test_secret();
6982        let state = linq_test_state("secure-alias", Some(&secret));
6983
6984        let body = linq_webhook_body("+15551234567", "hello from test");
6985        let timestamp = chrono::Utc::now().timestamp().to_string();
6986        let sig = compute_linq_signature_hex(&secret, &timestamp, &body);
6987
6988        let mut headers = HeaderMap::new();
6989        headers.insert(
6990            "X-Webhook-Signature",
6991            HeaderValue::from_str(&format!("sha256={sig}")).unwrap(),
6992        );
6993        headers.insert(
6994            "X-Webhook-Timestamp",
6995            HeaderValue::from_str(&timestamp).unwrap(),
6996        );
6997
6998        let response = Box::pin(handle_linq_webhook(
6999            State(state),
7000            Path("secure-alias".to_string()),
7001            headers,
7002            Bytes::from(body),
7003        ))
7004        .await
7005        .into_response();
7006
7007        assert_eq!(response.status(), StatusCode::OK);
7008    }
7009}
7010
7011#[cfg(test)]
7012mod accept_error_tests {
7013    use super::is_recoverable_accept_error;
7014    use std::io::{Error, ErrorKind};
7015
7016    #[cfg(unix)]
7017    #[test]
7018    fn fd_exhaustion_accept_errors_are_recoverable() {
7019        // #7042: EMFILE/ENFILE must not terminate the daemon.
7020        assert!(is_recoverable_accept_error(&Error::from_raw_os_error(24))); // EMFILE
7021        assert!(is_recoverable_accept_error(&Error::from_raw_os_error(23))); // ENFILE
7022    }
7023
7024    #[test]
7025    fn transient_kinds_recover_but_fatal_propagates() {
7026        assert!(is_recoverable_accept_error(&Error::from(
7027            ErrorKind::ConnectionAborted
7028        )));
7029        assert!(is_recoverable_accept_error(&Error::from(
7030            ErrorKind::Interrupted
7031        )));
7032        // A non-transient error is not swallowed (loop will propagate it).
7033        assert!(!is_recoverable_accept_error(&Error::from(
7034            ErrorKind::InvalidInput
7035        )));
7036    }
7037}