1#![allow(
2 clippy::to_string_in_format_args,
3 clippy::useless_format,
4 clippy::collapsible_if
5)]
6pub mod acp;
16pub mod api;
17pub mod api_browse;
18pub mod api_config;
19pub mod api_logs;
20pub mod api_pairing;
21pub mod api_personality;
22#[cfg(feature = "plugins-wasm")]
23pub mod api_plugins;
24pub mod api_quickstart;
25pub mod api_sections;
26pub mod api_skills;
27#[cfg(feature = "webauthn")]
28pub mod api_webauthn;
29pub mod auth_rate_limit;
30pub mod canvas;
31pub mod hardware_context;
32pub mod node_tool;
33pub mod nodes;
34pub mod openapi;
35pub mod session_queue;
36pub mod sse;
37pub mod static_files;
38pub mod tls;
39#[cfg(feature = "gateway-voice-duplex")]
40pub mod voice_duplex;
41pub mod ws;
42pub mod ws_approval;
43
44use anyhow::{Context, Result};
45#[cfg(any(
46 feature = "channel-email",
47 feature = "channel-linq",
48 feature = "channel-nextcloud",
49 feature = "channel-wati",
50 feature = "channel-whatsapp-cloud"
51))]
52use axum::body::Bytes;
53#[cfg(feature = "channel-linq")]
54use axum::extract::Path;
55use axum::{
56 Router,
57 extract::{ConnectInfo, Query, State},
58 http::{HeaderMap, StatusCode, header},
59 response::{IntoResponse, Json},
60 routing::{delete, get, post},
61};
62use parking_lot::{Mutex, RwLock};
63use std::collections::HashMap;
64use std::net::{IpAddr, SocketAddr};
65use std::sync::Arc;
66use std::time::{Duration, Instant};
67
68const ACCEPT_ERROR_BACKOFF_MS: u64 = 50;
71
72#[cfg(unix)]
75const EMFILE: i32 = 24; #[cfg(unix)]
77const ENFILE: i32 = 23; fn is_recoverable_accept_error(e: &std::io::Error) -> bool {
86 use std::io::ErrorKind;
87 if matches!(
88 e.kind(),
89 ErrorKind::ConnectionAborted | ErrorKind::Interrupted | ErrorKind::WouldBlock
90 ) {
91 return true;
92 }
93 #[cfg(unix)]
94 if matches!(e.raw_os_error(), Some(EMFILE) | Some(ENFILE)) {
95 return true;
96 }
97 false
98}
99use tower_http::limit::RequestBodyLimitLayer;
100use tower_http::timeout::TimeoutLayer;
101use uuid::Uuid;
102#[cfg(any(
103 feature = "channel-linq",
104 feature = "channel-nextcloud",
105 feature = "channel-wati",
106 feature = "channel-whatsapp-cloud"
107))]
108use zeroclaw_api::channel::{Channel, SendMessage};
109use zeroclaw_api::memory_traits::MemoryStrategy;
110use zeroclaw_api::tool::ToolSpec;
111#[cfg(feature = "channel-email")]
112use zeroclaw_channels::gmail_push::GmailPushChannel;
113#[cfg(feature = "channel-linq")]
114use zeroclaw_channels::linq::LinqChannel;
115#[cfg(feature = "channel-nextcloud")]
116use zeroclaw_channels::nextcloud_talk::NextcloudTalkChannel;
117#[cfg(feature = "channel-wati")]
118use zeroclaw_channels::wati::WatiChannel;
119#[cfg(feature = "channel-whatsapp-cloud")]
120use zeroclaw_channels::whatsapp::WhatsAppChannel;
121use zeroclaw_config::policy::SecurityPolicy;
122use zeroclaw_config::schema::Config;
123use zeroclaw_infra::session_backend::SessionBackend;
124use zeroclaw_memory::{self, Memory, MemoryCategory};
125use zeroclaw_providers::{self, ModelProvider};
126use zeroclaw_runtime::agent::memory_strategy::DefaultMemoryStrategy;
127use zeroclaw_runtime::cost::CostTracker;
128use zeroclaw_runtime::i18n;
129use zeroclaw_runtime::platform;
130use zeroclaw_runtime::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
131use zeroclaw_runtime::tools;
132use zeroclaw_runtime::tools::CanvasStore;
133
134pub const MAX_BODY_SIZE: usize = 65_536;
136pub const REQUEST_TIMEOUT_SECS: u64 = 30;
138
139pub const LONG_RUNNING_REQUEST_TIMEOUT_SECS: u64 = 600;
147
148pub fn gateway_request_timeout_secs(cfg: &zeroclaw_config::schema::GatewayConfig) -> u64 {
151 cfg.request_timeout_secs
152}
153
154pub fn gateway_long_running_request_timeout_secs(
158 cfg: &zeroclaw_config::schema::GatewayConfig,
159) -> u64 {
160 cfg.long_running_request_timeout_secs
161}
162pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
164pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
166pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
168
169fn webhook_memory_key() -> String {
170 format!("webhook_msg_{}", Uuid::new_v4())
171}
172
173#[cfg(feature = "channel-whatsapp-cloud")]
174fn whatsapp_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
175 format!("whatsapp_{}_{}", msg.sender, msg.id)
176}
177
178#[cfg(feature = "channel-linq")]
179fn linq_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
180 format!("linq_{}_{}", msg.sender, msg.id)
181}
182
183#[cfg(feature = "channel-wati")]
184fn wati_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
185 format!("wati_{}_{}", msg.sender, msg.id)
186}
187
188#[cfg(feature = "channel-nextcloud")]
189fn nextcloud_talk_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
190 format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
191}
192
193#[cfg(any(
194 feature = "channel-linq",
195 feature = "channel-nextcloud",
196 feature = "channel-wati",
197 feature = "channel-whatsapp-cloud"
198))]
199fn sender_session_id(channel: &str, msg: &zeroclaw_api::channel::ChannelMessage) -> String {
200 match &msg.thread_ts {
201 Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
202 None => format!("{channel}_{}", msg.sender),
203 }
204}
205
206fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
207 const MAX_SESSION_ID_LEN: usize = 128;
208 headers
209 .get("X-Session-Id")
210 .and_then(|v| v.to_str().ok())
211 .map(str::trim)
212 .filter(|value| !value.is_empty())
213 .filter(|value| value.len() <= MAX_SESSION_ID_LEN)
214 .filter(|value| {
215 value
216 .bytes()
217 .all(|b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.')
218 })
219 .map(str::to_owned)
220}
221
222fn hash_webhook_secret(value: &str) -> String {
223 use sha2::{Digest, Sha256};
224
225 let digest = Sha256::digest(value.as_bytes());
226 hex::encode(digest)
227}
228
229const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; #[derive(Debug)]
233struct SlidingWindowRateLimiter {
234 limit_per_window: u32,
235 window: Duration,
236 max_keys: usize,
237 requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
238}
239
240impl SlidingWindowRateLimiter {
241 fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
242 Self {
243 limit_per_window,
244 window,
245 max_keys: max_keys.max(1),
246 requests: Mutex::new((HashMap::new(), Instant::now())),
247 }
248 }
249
250 fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
251 requests.retain(|_, timestamps| {
252 timestamps.retain(|t| *t > cutoff);
253 !timestamps.is_empty()
254 });
255 }
256
257 fn allow(&self, key: &str) -> bool {
258 if self.limit_per_window == 0 {
259 return true;
260 }
261
262 let now = Instant::now();
263 let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
264
265 let mut guard = self.requests.lock();
266 let (requests, last_sweep) = &mut *guard;
267
268 if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
270 Self::prune_stale(requests, cutoff);
271 *last_sweep = now;
272 }
273
274 if !requests.contains_key(key) && requests.len() >= self.max_keys {
275 Self::prune_stale(requests, cutoff);
277 *last_sweep = now;
278
279 if requests.len() >= self.max_keys {
280 let evict_key = requests
281 .iter()
282 .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
283 .map(|(k, _)| k.clone());
284 if let Some(evict_key) = evict_key {
285 requests.remove(&evict_key);
286 }
287 }
288 }
289
290 let entry = requests.entry(key.to_owned()).or_default();
291 entry.retain(|instant| *instant > cutoff);
292
293 if entry.len() >= self.limit_per_window as usize {
294 return false;
295 }
296
297 entry.push(now);
298 true
299 }
300}
301
302#[derive(Debug)]
303pub struct GatewayRateLimiter {
304 pair: SlidingWindowRateLimiter,
305 webhook: SlidingWindowRateLimiter,
306}
307
308impl GatewayRateLimiter {
309 pub fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
310 let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
311 Self {
312 pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
313 webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
314 }
315 }
316
317 fn allow_pair(&self, key: &str) -> bool {
318 self.pair.allow(key)
319 }
320
321 fn allow_webhook(&self, key: &str) -> bool {
322 self.webhook.allow(key)
323 }
324}
325
326#[derive(Debug)]
327pub struct IdempotencyStore {
328 ttl: Duration,
329 max_keys: usize,
330 keys: Mutex<HashMap<String, Instant>>,
331}
332
333impl IdempotencyStore {
334 pub fn new(ttl: Duration, max_keys: usize) -> Self {
335 Self {
336 ttl,
337 max_keys: max_keys.max(1),
338 keys: Mutex::new(HashMap::new()),
339 }
340 }
341
342 fn record_if_new(&self, key: &str) -> bool {
344 let now = Instant::now();
345 let mut keys = self.keys.lock();
346
347 keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
348
349 if keys.contains_key(key) {
350 return false;
351 }
352
353 if keys.len() >= self.max_keys {
354 let evict_key = keys
355 .iter()
356 .min_by_key(|(_, seen_at)| *seen_at)
357 .map(|(k, _)| k.clone());
358 if let Some(evict_key) = evict_key {
359 keys.remove(&evict_key);
360 }
361 }
362
363 keys.insert(key.to_owned(), now);
364 true
365 }
366}
367
368fn parse_client_ip(value: &str) -> Option<IpAddr> {
369 let value = value.trim().trim_matches('"').trim();
370 if value.is_empty() {
371 return None;
372 }
373
374 if let Ok(ip) = value.parse::<IpAddr>() {
375 return Some(ip);
376 }
377
378 if let Ok(addr) = value.parse::<SocketAddr>() {
379 return Some(addr.ip());
380 }
381
382 let value = value.trim_matches(['[', ']']);
383 value.parse::<IpAddr>().ok()
384}
385
386fn dirs_data_local() -> Option<std::path::PathBuf> {
387 directories::BaseDirs::new().map(|d| d.data_local_dir().to_path_buf())
388}
389
390fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
391 if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
392 for candidate in xff.split(',') {
393 if let Some(ip) = parse_client_ip(candidate) {
394 return Some(ip);
395 }
396 }
397 }
398
399 headers
400 .get("X-Real-IP")
401 .and_then(|v| v.to_str().ok())
402 .and_then(parse_client_ip)
403}
404
405fn client_key_from_request(
406 peer_addr: Option<SocketAddr>,
407 headers: &HeaderMap,
408 trust_forwarded_headers: bool,
409) -> String {
410 if trust_forwarded_headers && let Some(ip) = forwarded_client_ip(headers) {
411 return ip.to_string();
412 }
413
414 peer_addr
415 .map(|addr| addr.ip().to_string())
416 .unwrap_or_else(|| "unknown".to_string())
417}
418
419fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
420 if configured == 0 {
421 fallback.max(1)
422 } else {
423 configured
424 }
425}
426
427#[derive(Clone)]
429pub struct AppState {
430 pub config: Arc<RwLock<Config>>,
431 pub model_provider: Arc<dyn ModelProvider>,
432 pub model: String,
433 pub temperature: Option<f64>,
437 pub mem: Arc<dyn Memory>,
438 pub memory_strategy: Arc<dyn MemoryStrategy>,
439 pub auto_save: bool,
440 pub webhook_secret_hash: Option<Arc<str>>,
442 pub pairing: Arc<PairingGuard>,
443 pub trust_forwarded_headers: bool,
444 pub rate_limiter: Arc<GatewayRateLimiter>,
445 pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
446 pub idempotency_store: Arc<IdempotencyStore>,
447 #[cfg(feature = "channel-whatsapp-cloud")]
448 pub whatsapp: Option<Arc<WhatsAppChannel>>,
449 #[cfg(feature = "channel-whatsapp-cloud")]
451 pub whatsapp_app_secret: Option<Arc<str>>,
452 #[cfg(feature = "channel-linq")]
453 pub linq: HashMap<String, Arc<LinqChannel>>,
454 #[cfg(feature = "channel-linq")]
456 pub linq_signing_secrets: HashMap<String, Arc<str>>,
457 #[cfg(feature = "channel-nextcloud")]
458 pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
459 #[cfg(feature = "channel-nextcloud")]
461 pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
462 #[cfg(feature = "channel-wati")]
463 pub wati: Option<Arc<WatiChannel>>,
464 #[cfg(feature = "channel-email")]
466 pub gmail_push: Option<Arc<GmailPushChannel>>,
467 pub observer: Arc<dyn zeroclaw_runtime::observability::Observer>,
469 pub tools_registry: Arc<Vec<ToolSpec>>,
471 pub cost_tracker: Option<Arc<CostTracker>>,
473 pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
475 pub event_buffer: Arc<sse::EventBuffer>,
477 pub shutdown_tx: tokio::sync::watch::Sender<bool>,
479 pub reload_tx: Option<tokio::sync::watch::Sender<bool>>,
484 pub node_registry: Arc<nodes::NodeRegistry>,
486 pub path_prefix: String,
488 pub web_dist_dir: Option<std::path::PathBuf>,
490 pub session_backend: Option<Arc<dyn SessionBackend>>,
492 pub session_queue: Arc<session_queue::SessionActorQueue>,
494 pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
496 pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
498 pub canvas_store: CanvasStore,
500 #[cfg(feature = "webauthn")]
502 pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
503 pub cancel_tokens: Arc<
508 std::sync::Mutex<std::collections::HashMap<String, tokio_util::sync::CancellationToken>>,
509 >,
510 pub pending_reload: Arc<std::sync::atomic::AtomicBool>,
517 pub tui_registry: Option<Arc<zeroclaw_runtime::rpc::tui_identity::TuiRegistry>>,
520}
521
522#[allow(clippy::too_many_lines)]
524pub async fn run_gateway(
525 host: &str,
526 port: u16,
527 config: Config,
528 external_event_tx: Option<tokio::sync::broadcast::Sender<serde_json::Value>>,
529 reload_tx: Option<tokio::sync::watch::Sender<bool>>,
533 tui_registry: Option<Arc<zeroclaw_runtime::rpc::tui_identity::TuiRegistry>>,
535 canvas_store: Option<CanvasStore>,
536) -> Result<()> {
537 if is_public_bind(host)
539 && config.tunnel.tunnel_provider == "none"
540 && !config.gateway.allow_public_bind
541 {
542 ::zeroclaw_log::record!(
543 WARN,
544 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
545 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
546 "⚠️ Binding to {host} — gateway will be exposed to all network interfaces.\n\
547 Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
548 [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
549 Docker/VM: if you are running inside a container or VM, this is expected."
550 );
551 }
552 let config_state = Arc::new(RwLock::new(config.clone()));
553
554 let hooks: Option<std::sync::Arc<zeroclaw_runtime::hooks::HookRunner>> = if config.hooks.enabled
556 {
557 Some(std::sync::Arc::new(
558 zeroclaw_runtime::hooks::HookRunner::new(),
559 ))
560 } else {
561 None
562 };
563
564 let addr: SocketAddr = match format!("{host}:{port}").parse() {
565 Ok(a) => a,
566 Err(e) => {
567 ::zeroclaw_log::record!(
568 WARN,
569 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
570 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
571 .with_attrs(::serde_json::json!({
572 "host": host,
573 "port": port,
574 "error": format!("{e}"),
575 })),
576 "Gateway: host:port did not parse as a SocketAddr; falling back to \
577 127.0.0.1 so the gateway can still boot. Fix [gateway] host and \
578 POST /admin/reload."
579 );
580 SocketAddr::from(([127, 0, 0, 1], port))
581 }
582 };
583 let listener = tokio::net::TcpListener::bind(addr).await?;
584 let actual_port = listener.local_addr()?.port();
585 let display_addr = format!("{host}:{actual_port}");
586
587 let (boot_family, boot_alias, boot_entry) = config
588 .providers
589 .models
590 .iter_entries()
591 .next()
592 .map(|(f, a, e)| (f.to_string(), a.to_string(), Some(e)))
593 .unwrap_or_else(|| ("openrouter".to_string(), "default".to_string(), None));
594 let fallback = boot_entry;
595 let model_provider_name = boot_family.as_str();
596 let (model_provider, boot_provider_failed): (Arc<dyn ModelProvider>, bool) =
597 match zeroclaw_providers::create_resilient_model_provider_from_ref(
598 &config,
599 model_provider_name,
600 fallback.and_then(|e| e.api_key.as_deref()),
601 fallback.and_then(|e| e.uri.as_deref()),
602 &config.reliability,
603 &zeroclaw_providers::provider_runtime_options_for_alias(
604 &config,
605 &boot_family,
606 &boot_alias,
607 ),
608 ) {
609 Ok(p) => (Arc::from(p), false),
610 Err(e) => {
611 ::zeroclaw_log::record!(
612 WARN,
613 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note,)
614 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
615 .with_attrs(::serde_json::json!({
616 "model_provider": model_provider_name,
617 "alias": boot_alias,
618 "error": format!("{e}"),
619 })),
620 "Gateway: seed model_provider failed to construct; booting in \
621 needs_quickstart mode so /quickstart and /admin/reload stay \
622 reachable. Fix the [providers.models.<type>.<alias>] entry \
623 and POST /admin/reload."
624 );
625 (
626 Arc::new(UnconfiguredModelProvider) as Arc<dyn ModelProvider>,
627 true,
628 )
629 }
630 };
631 let model = if boot_provider_failed {
645 String::new()
646 } else {
647 match fallback
648 .and_then(|e| e.model.as_deref())
649 .map(str::trim)
650 .filter(|m| !m.is_empty())
651 {
652 Some(m) => m.to_string(),
653 None => match config.resolve_default_model() {
654 Some(m) => {
655 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"model_provider": model_provider_name, "model": m})), "first model_provider has no `model` set; using first configured \
656 providers.models entry as default. Set \
657 [providers.models.<type>.<alias>] model = \"...\" to silence \
658 this warning.");
659 m
660 }
661 None => {
662 ::zeroclaw_log::record!(
663 WARN,
664 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
665 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
666 .with_attrs(::serde_json::json!({"display_addr": display_addr})),
667 &format!(
668 "Gateway booting without a configured model. Visit http://{display_addr}/quickstart to complete browser quickstart. Chat endpoints will return 503 needs_quickstart until at least one [providers.models.<type>.<alias>] model = \"...\" is set."
669 )
670 );
671 String::new()
672 }
673 },
674 }
675 };
676 let temperature: Option<f64> = fallback.and_then(|e| e.temperature);
680 let mem: Arc<dyn Memory> = if config.agents.is_empty() {
689 Arc::new(zeroclaw_memory::NoneMemory::new("none"))
690 } else {
691 match zeroclaw_memory::create_memory_with_storage_and_routes(
692 &config.memory,
693 &config.embedding_routes,
694 config.resolve_active_storage(),
695 &config.data_dir,
696 fallback.and_then(|e| e.api_key.as_deref()),
697 ) {
698 Ok(m) => Arc::from(m),
699 Err(e) => {
700 ::zeroclaw_log::record!(
701 WARN,
702 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note,)
703 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
704 .with_attrs(::serde_json::json!({"error": format!("{e}")})),
705 "Gateway: memory backend failed to construct; falling back to \
706 NoneMemory so the gateway can still boot. Fix [memory] and \
707 POST /admin/reload."
708 );
709 Arc::new(zeroclaw_memory::NoneMemory::new("none"))
710 }
711 }
712 };
713 let runtime: Arc<dyn platform::RuntimeAdapter> = match platform::create_runtime(&config.runtime)
714 {
715 Ok(r) => Arc::from(r),
716 Err(e) => {
717 ::zeroclaw_log::record!(
718 WARN,
719 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note,)
720 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
721 .with_attrs(::serde_json::json!({
722 "runtime_kind": config.runtime.kind,
723 "error": format!("{e}"),
724 })),
725 "Gateway: runtime adapter failed to construct; falling back to \
726 NativeRuntime so the gateway can still boot. Fix [runtime] and \
727 POST /admin/reload."
728 );
729 Arc::new(platform::NativeRuntime::new())
730 }
731 };
732 let memory_strategy: Arc<dyn MemoryStrategy> = Arc::new(DefaultMemoryStrategy::with_config(
733 mem.clone(),
734 config.memory.clone(),
735 config.data_dir.clone(),
736 ));
737 let canvas_store = canvas_store.unwrap_or_default();
754 let agent_alias_opt = config
755 .agents
756 .iter()
757 .find(|(_, a)| a.enabled)
758 .map(|(alias, _)| alias.clone());
759
760 let (composio_key, composio_entity_id) = if config.composio.enabled {
761 (
762 config.composio.api_key.as_deref(),
763 Some(config.composio.entity_id.as_str()),
764 )
765 } else {
766 (None, None)
767 };
768
769 let agent_setup: Option<(
778 zeroclaw_config::schema::RiskProfileConfig,
779 Arc<SecurityPolicy>,
780 )> = agent_alias_opt.as_ref().and_then(|agent_alias| {
781 let Some(risk_profile) = config.risk_profile_for_agent(agent_alias) else {
782 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "agent_alias": agent_alias})), "Gateway: agents..risk_profile does not name a configured risk_profiles entry; booting with empty tools registry. Fix via /admin/reload or /quickstart.");
783 return None;
784 };
785 let risk_profile = risk_profile.clone();
786 let security = match SecurityPolicy::for_agent(&config, agent_alias) {
787 Ok(s) => Arc::new(s),
788 Err(e) => {
789 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "error": format!("{}", e), "agent_alias": agent_alias})), "Gateway: agent SecurityPolicy failed to build; booting with empty tools registry. Fix [agents.] via /admin/reload or /quickstart.");
790 return None;
791 }
792 };
793 Some((risk_profile, security))
794 });
795
796 let (mut tools_registry_raw, delegate_handle_gw) = match (&agent_alias_opt, agent_setup) {
797 (Some(agent_alias), Some((risk_profile, security))) => {
798 let all_tools_result = tools::all_tools_with_runtime(
799 Arc::new(config.clone()),
800 &security,
801 &risk_profile,
802 agent_alias,
803 runtime,
804 Arc::clone(&mem),
805 composio_key,
806 composio_entity_id,
807 &config.browser,
808 &config.http_request,
809 &config.web_fetch,
810 &config.data_dir,
811 &config.agents,
812 config
813 .model_provider_for_agent(agent_alias)
814 .and_then(|e| e.api_key.as_deref()),
815 &config,
816 Some(canvas_store.clone()),
817 false,
818 None,
819 );
820 let reaction_handle_gw_opt = Some(all_tools_result.reaction_handle.clone());
826 let channel_names = zeroclaw_channels::orchestrator::register_channels_for_tools(
827 &config,
828 &all_tools_result.ask_user_handle,
829 &reaction_handle_gw_opt,
830 &all_tools_result.poll_handle,
831 &all_tools_result.escalate_handle,
832 );
833 if !channel_names.is_empty() {
834 ::zeroclaw_log::record!(
835 INFO,
836 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
837 .with_attrs(::serde_json::json!({"count": channel_names.len()})),
838 &format!(
839 "Registered {} channel(s) for dashboard agent",
840 channel_names.len()
841 ),
842 );
843 }
844 (all_tools_result.tools, all_tools_result.delegate_handle)
845 }
846 (Some(_), None) => {
847 (Vec::new(), None)
850 }
851 (None, _) => {
852 ::zeroclaw_log::record!(
853 INFO,
854 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
855 .with_attrs(::serde_json::json!({"display_addr": display_addr})),
856 &format!(
857 "Gateway: no [agents.<alias>] configured — booting with empty tools registry. Visit http://{display_addr}/quickstart to add an agent."
858 )
859 );
860 (Vec::new(), None)
861 }
862 };
863
864 if config.mcp.enabled && !config.mcp.servers.is_empty() {
867 ::zeroclaw_log::record!(
868 INFO,
869 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
870 &format!(
871 "Gateway: initializing MCP client — {} server(s) configured",
872 config.mcp.servers.len()
873 )
874 );
875 match tools::McpRegistry::connect_all(&config.mcp.servers).await {
876 Ok(registry) => {
877 let registry = std::sync::Arc::new(registry);
878 if config.mcp.deferred_loading {
879 let deferred_set =
880 tools::DeferredMcpToolSet::from_registry(std::sync::Arc::clone(®istry))
881 .await;
882 ::zeroclaw_log::record!(
883 INFO,
884 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
885 &format!(
886 "Gateway MCP deferred: {} tool stub(s) from {} server(s)",
887 deferred_set.len(),
888 registry.server_count()
889 )
890 );
891 let activated =
892 std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
893 tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
894 deferred_set,
895 activated,
896 )));
897 } else {
898 let names = registry.tool_names();
899 let mut registered = 0usize;
900 for name in names {
901 if let Some(def) = registry.get_tool_def(&name).await {
902 let wrapper: std::sync::Arc<dyn tools::Tool> =
903 std::sync::Arc::new(tools::McpToolWrapper::new(
904 name,
905 def,
906 std::sync::Arc::clone(®istry),
907 ));
908 if let Some(ref handle) = delegate_handle_gw {
909 handle.write().push(std::sync::Arc::clone(&wrapper));
910 }
911 tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
912 registered += 1;
913 }
914 }
915 ::zeroclaw_log::record!(
916 INFO,
917 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
918 &format!(
919 "Gateway MCP: {} tool(s) registered from {} server(s)",
920 registered,
921 registry.server_count()
922 )
923 );
924 }
925 }
926 Err(e) => {
927 ::zeroclaw_log::record!(
928 ERROR,
929 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
930 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
931 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
932 "MCP registry failed to initialize"
933 );
934 }
935 }
936 }
937
938 let tools_registry: Arc<Vec<ToolSpec>> =
939 Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
940
941 let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.data_dir);
943
944 let event_tx = external_event_tx.unwrap_or_else(|| {
948 let (tx, _rx) = tokio::sync::broadcast::channel::<serde_json::Value>(256);
949 tx
950 });
951 let event_buffer = Arc::new(sse::EventBuffer::new(500));
952 let webhook_secret_hash: Option<Arc<str>> =
954 config.channels.webhook.values().next().and_then(|webhook| {
955 webhook.secret.as_ref().and_then(|raw_secret| {
956 let trimmed_secret = raw_secret.trim();
957 (!trimmed_secret.is_empty())
958 .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
959 })
960 });
961
962 #[cfg(feature = "channel-whatsapp-cloud")]
964 let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
965 .channels
966 .whatsapp
967 .get("default")
968 .filter(|wa| wa.is_cloud_config())
969 .map(|wa| {
970 let alias = "default".to_string();
971 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
972 let cfg_arc = config_state.clone();
973 let alias = alias.clone();
974 Arc::new(move || cfg_arc.read().channel_external_peers("whatsapp", &alias))
975 };
976 Arc::new(WhatsAppChannel::new(
977 wa.access_token.clone().unwrap_or_default(),
978 wa.phone_number_id.clone().unwrap_or_default(),
979 wa.verify_token.clone().unwrap_or_default(),
980 alias,
981 peer_resolver,
982 ))
983 });
984
985 #[cfg(feature = "channel-whatsapp-cloud")]
987 let whatsapp_app_secret: Option<Arc<str>> = config
988 .channels
989 .whatsapp
990 .values()
991 .next()
992 .and_then(|wa| {
993 wa.app_secret
994 .as_deref()
995 .map(str::trim)
996 .filter(|secret| !secret.is_empty())
997 .map(ToOwned::to_owned)
998 })
999 .map(Arc::from);
1000
1001 #[cfg(feature = "channel-linq")]
1003 let linq_channels: HashMap<String, Arc<LinqChannel>> = config
1004 .channels
1005 .linq
1006 .iter()
1007 .filter(|(_, lq)| lq.enabled)
1008 .map(|(alias, lq)| {
1009 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
1010 let cfg_arc = config_state.clone();
1011 let alias = alias.clone();
1012 Arc::new(move || cfg_arc.read().channel_external_peers("linq", &alias))
1013 };
1014 (
1015 alias.clone(),
1016 Arc::new(LinqChannel::new(
1017 lq.api_token.clone(),
1018 lq.from_phone.clone(),
1019 alias.clone(),
1020 peer_resolver,
1021 )),
1022 )
1023 })
1024 .collect();
1025
1026 #[cfg(feature = "channel-linq")]
1028 let linq_signing_secrets: HashMap<String, Arc<str>> = config
1029 .channels
1030 .linq
1031 .iter()
1032 .filter_map(|(alias, lq)| {
1033 let secret = lq
1034 .signing_secret
1035 .as_deref()
1036 .map(str::trim)
1037 .filter(|s| !s.is_empty())
1038 .map(ToOwned::to_owned)?;
1039 Some((alias.clone(), Arc::from(secret)))
1040 })
1041 .collect();
1042
1043 #[cfg(feature = "channel-wati")]
1045 let wati_channel: Option<Arc<WatiChannel>> =
1046 config.channels.wati.values().next().map(|wati_cfg| {
1047 let alias = "default".to_string();
1048 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
1049 let cfg_arc = config_state.clone();
1050 let alias = alias.clone();
1051 Arc::new(move || cfg_arc.read().channel_external_peers("wati", &alias))
1052 };
1053 Arc::new(
1054 WatiChannel::new(
1055 wati_cfg.api_token.clone(),
1056 wati_cfg.api_url.clone(),
1057 wati_cfg.tenant_id.clone(),
1058 alias,
1059 peer_resolver,
1060 )
1061 .with_transcription(config.transcription.clone()),
1062 )
1063 });
1064
1065 #[cfg(feature = "channel-nextcloud")]
1067 let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
1068 config.channels.nextcloud_talk.values().next().map(|nc| {
1069 let alias = "default".to_string();
1070 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
1071 let cfg_arc = config_state.clone();
1072 let alias = alias.clone();
1073 Arc::new(move || {
1074 cfg_arc
1075 .read()
1076 .channel_external_peers("nextcloud_talk", &alias)
1077 })
1078 };
1079 Arc::new(NextcloudTalkChannel::new(
1080 nc.base_url.clone(),
1081 nc.app_token.clone(),
1082 nc.bot_name.clone().unwrap_or_default(),
1083 alias,
1084 peer_resolver,
1085 ))
1086 });
1087
1088 #[cfg(feature = "channel-nextcloud")]
1090 let nextcloud_talk_webhook_secret: Option<Arc<str>> = config
1091 .channels
1092 .nextcloud_talk
1093 .get("default")
1094 .and_then(|nc| {
1095 nc.webhook_secret
1096 .as_deref()
1097 .map(str::trim)
1098 .filter(|secret| !secret.is_empty())
1099 .map(ToOwned::to_owned)
1100 })
1101 .map(Arc::from);
1102
1103 #[cfg(feature = "channel-email")]
1105 let gmail_push_channel: Option<Arc<GmailPushChannel>> = {
1106 let active: std::collections::HashSet<String> = config
1107 .agents
1108 .values()
1109 .filter(|a| a.enabled)
1110 .flat_map(|a| a.channels.iter().map(|c| c.as_str().to_string()))
1111 .collect();
1112 config
1113 .channels
1114 .gmail_push
1115 .iter()
1116 .find(|(alias, _)| active.contains(&format!("gmail_push.{alias}")))
1117 .map(|(alias, gp)| {
1118 let alias = alias.clone();
1119 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
1120 let cfg_arc = config_state.clone();
1121 let alias = alias.clone();
1122 Arc::new(move || cfg_arc.read().channel_external_peers("gmail_push", &alias))
1123 };
1124 Arc::new(GmailPushChannel::new(gp.clone(), alias, peer_resolver))
1125 })
1126 };
1127
1128 let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
1135 match zeroclaw_infra::make_session_backend(
1136 &config.data_dir,
1137 &config.channels.session_backend,
1138 ) {
1139 Ok(backend) => {
1140 ::zeroclaw_log::record!(
1141 INFO,
1142 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1143 &format!(
1144 "Gateway session persistence enabled (backend={})",
1145 config.channels.session_backend
1146 )
1147 );
1148 if config.gateway.session_ttl_hours > 0
1149 && let Ok(cleaned) = backend.cleanup_stale(config.gateway.session_ttl_hours)
1150 && cleaned > 0
1151 {
1152 ::zeroclaw_log::record!(
1153 INFO,
1154 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1155 .with_attrs(::serde_json::json!({"cleaned": cleaned})),
1156 "Cleaned up stale gateway sessions"
1157 );
1158 }
1159 Some(backend)
1160 }
1161 Err(e) => {
1162 ::zeroclaw_log::record!(
1163 WARN,
1164 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1165 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1166 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1167 "Session persistence disabled"
1168 );
1169 None
1170 }
1171 }
1172 } else {
1173 None
1174 };
1175
1176 let pairing = Arc::new(PairingGuard::new(
1178 config.gateway.require_pairing,
1179 &config.gateway.paired_tokens,
1180 ));
1181 let rate_limit_max_keys = normalize_max_keys(
1182 config.gateway.rate_limit_max_keys,
1183 RATE_LIMIT_MAX_KEYS_DEFAULT,
1184 );
1185 let rate_limiter = Arc::new(GatewayRateLimiter::new(
1186 config.gateway.pair_rate_limit_per_minute,
1187 config.gateway.webhook_rate_limit_per_minute,
1188 rate_limit_max_keys,
1189 ));
1190 let idempotency_max_keys = normalize_max_keys(
1191 config.gateway.idempotency_max_keys,
1192 IDEMPOTENCY_MAX_KEYS_DEFAULT,
1193 );
1194 let idempotency_store = Arc::new(IdempotencyStore::new(
1195 Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
1196 idempotency_max_keys,
1197 ));
1198
1199 let path_prefix: Option<&str> = config
1201 .gateway
1202 .path_prefix
1203 .as_deref()
1204 .filter(|p| !p.is_empty());
1205
1206 let tunnel = match zeroclaw_runtime::tunnel::create_tunnel(&config.tunnel) {
1208 Ok(t) => t,
1209 Err(e) => {
1210 ::zeroclaw_log::record!(
1211 WARN,
1212 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1213 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1214 .with_attrs(::serde_json::json!({
1215 "tunnel_provider": config.tunnel.tunnel_provider,
1216 "error": format!("{e}"),
1217 })),
1218 "Gateway: tunnel adapter failed to construct; booting without a \
1219 tunnel. Fix [tunnel] and POST /admin/reload."
1220 );
1221 None
1222 }
1223 };
1224 let mut tunnel_url: Option<String> = None;
1225
1226 if let Some(ref tun) = tunnel {
1227 println!("🔗 Starting {} tunnel...", tun.name());
1228 match tun.start(host, actual_port).await {
1229 Ok(url) => {
1230 println!("🌐 Tunnel active: {url}");
1231 tunnel_url = Some(url);
1232 }
1233 Err(e) => {
1234 println!("⚠️ Tunnel failed to start: {e}");
1235 println!(" Falling back to local-only mode.");
1236 }
1237 }
1238 }
1239
1240 let auto_detect_web_dist = || -> Option<std::path::PathBuf> {
1247 let mut candidates = vec![
1248 std::path::PathBuf::from("web/dist"),
1250 std::env::current_exe()
1252 .ok()
1253 .and_then(|p| p.parent().map(|d| d.join("web/dist")))
1254 .unwrap_or_default(),
1255 std::path::PathBuf::from("/zeroclaw-data/web/dist"),
1257 std::path::PathBuf::from("/usr/share/zeroclawlabs/web/dist"),
1259 ];
1260 if let Some(data_dir) = dirs_data_local() {
1262 candidates.push(data_dir.join("zeroclaw/web/dist"));
1263 }
1264 candidates
1265 .into_iter()
1266 .find(|p| !p.as_os_str().is_empty() && p.join("index.html").is_file())
1267 };
1268
1269 let web_dist_dir: Option<std::path::PathBuf> = match config
1270 .gateway
1271 .web_dist_dir
1272 .as_ref()
1273 .map(std::path::PathBuf::from)
1274 {
1275 Some(explicit) if explicit.join("index.html").is_file() => Some(explicit),
1276 Some(stale) => {
1277 ::zeroclaw_log::record!(
1278 WARN,
1279 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1280 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1281 .with_attrs(::serde_json::json!({"configured": stale.display().to_string()})),
1282 "gateway.web_dist_dir points at a path that doesn't contain index.html on \
1283 this machine; falling back to auto-detect. Update or remove the setting in \
1284 config.toml to silence this warning."
1285 );
1286 auto_detect_web_dist()
1287 }
1288 None => auto_detect_web_dist(),
1289 };
1290
1291 if let Some(ref dir) = web_dist_dir {
1292 ::zeroclaw_log::record!(
1293 INFO,
1294 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1295 &format!("Web dashboard: serving from {}", dir.display().to_string())
1296 );
1297 } else if config.gateway.web_dist_dir.is_some() {
1298 ::zeroclaw_log::record!(
1299 INFO,
1300 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1301 "Web dashboard: not available — configured gateway.web_dist_dir is missing on \
1302 this machine and no fallback location was found. Build with `cargo web build` \
1303 and point gateway.web_dist_dir at the resulting web/dist directory."
1304 );
1305 } else {
1306 ::zeroclaw_log::record!(
1307 INFO,
1308 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1309 "Web dashboard: not available — no web/dist found. Build with `cargo web build` \
1310 and point gateway.web_dist_dir at the resulting web/dist directory."
1311 );
1312 }
1313
1314 let pfx = path_prefix.unwrap_or("");
1315 println!("🦀 ZeroClaw Gateway listening on http://{display_addr}{pfx}");
1316 if let Some(ref url) = tunnel_url {
1317 println!(" 🌐 Public URL: {url}");
1318 }
1319 println!(" 🌐 Web Dashboard: http://{display_addr}{pfx}/");
1320 if let Some(code) = pairing.pairing_code() {
1321 println!();
1322 println!(" 🔐 PAIRING REQUIRED — use this one-time code:");
1323 println!(" ┌──────────────┐");
1324 println!(" │ {code} │");
1325 println!(" └──────────────┘");
1326 println!(" Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
1327 } else if pairing.require_pairing() {
1328 println!(" 🔒 Pairing: ACTIVE (bearer token required)");
1329 println!(
1330 " To pair a new device: {}",
1331 format_paircode_recovery_command(host, actual_port)
1332 );
1333 println!(
1334 " Fallback: {}",
1335 format_paircode_recovery_curl(host, actual_port, pfx)
1336 );
1337 println!();
1338 } else {
1339 println!(" ⚠️ Pairing: DISABLED (all requests accepted)");
1340 println!();
1341 }
1342 println!(" POST {pfx}/pair — pair a new client (X-Pairing-Code header)");
1343 println!(" POST {pfx}/webhook — {{\"message\": \"your prompt\"}}");
1344 #[cfg(feature = "channel-whatsapp-cloud")]
1345 if whatsapp_channel.is_some() {
1346 println!(" GET {pfx}/whatsapp — Meta webhook verification");
1347 println!(" POST {pfx}/whatsapp — WhatsApp message webhook");
1348 }
1349 #[cfg(feature = "channel-linq")]
1350 if !linq_channels.is_empty() {
1351 for alias in linq_channels.keys() {
1352 println!(" POST {pfx}/linq/{alias} — Linq SMS webhook ({alias})");
1353 }
1354 }
1355 #[cfg(feature = "channel-wati")]
1356 if wati_channel.is_some() {
1357 println!(" GET {pfx}/wati — WATI webhook verification");
1358 println!(" POST {pfx}/wati — WATI message webhook");
1359 }
1360 #[cfg(feature = "channel-nextcloud")]
1361 if nextcloud_talk_channel.is_some() {
1362 println!(" POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
1363 }
1364 println!(" GET {pfx}/api/* — REST API (bearer token required)");
1365 println!(" GET {pfx}/ws/chat — WebSocket agent chat");
1366 if config.nodes.enabled {
1367 println!(" GET {pfx}/ws/nodes — WebSocket node discovery");
1368 }
1369 println!(" GET {pfx}/health — health check");
1370 println!(" GET {pfx}/metrics — Prometheus metrics");
1371 println!(" Press Ctrl+C to stop.\n");
1372
1373 zeroclaw_runtime::health::mark_component_ok("gateway");
1374
1375 if let Some(ref hooks) = hooks {
1377 hooks.fire_gateway_start(host, actual_port).await;
1378 }
1379
1380 let broadcast_layer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::new(
1386 sse::BroadcastObserver::new(event_tx.clone(), event_buffer.clone()),
1387 );
1388 let broadcast_hook_guard =
1389 zeroclaw_runtime::observability::set_scoped_broadcast_hook(broadcast_layer);
1390
1391 zeroclaw_log::set_broadcast_hook(event_tx.clone());
1397
1398 let state_observer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::from(
1403 zeroclaw_runtime::observability::create_observer(&config.observability),
1404 );
1405
1406 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
1407
1408 let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
1410
1411 let device_registry = if config.gateway.require_pairing {
1413 Some(Arc::new(api_pairing::DeviceRegistry::new(&config.data_dir)))
1414 } else {
1415 None
1416 };
1417 let pending_pairings = if config.gateway.require_pairing {
1418 Some(Arc::new(api_pairing::PairingStore::new()))
1419 } else {
1420 None
1421 };
1422
1423 let state = AppState {
1424 config: config_state,
1425 model_provider,
1426 model,
1427 temperature,
1428 mem,
1429 memory_strategy,
1430 auto_save: config.memory.auto_save,
1431 webhook_secret_hash,
1432 pairing,
1433 trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1434 rate_limiter,
1435 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1436 idempotency_store,
1437 #[cfg(feature = "channel-whatsapp-cloud")]
1438 whatsapp: whatsapp_channel,
1439 #[cfg(feature = "channel-whatsapp-cloud")]
1440 whatsapp_app_secret,
1441 #[cfg(feature = "channel-linq")]
1442 linq: linq_channels,
1443 #[cfg(feature = "channel-linq")]
1444 linq_signing_secrets,
1445 #[cfg(feature = "channel-nextcloud")]
1446 nextcloud_talk: nextcloud_talk_channel,
1447 #[cfg(feature = "channel-nextcloud")]
1448 nextcloud_talk_webhook_secret,
1449 #[cfg(feature = "channel-wati")]
1450 wati: wati_channel,
1451 #[cfg(feature = "channel-email")]
1452 gmail_push: gmail_push_channel,
1453 observer: state_observer,
1454 tools_registry,
1455 cost_tracker,
1456 event_tx,
1457 event_buffer,
1458 shutdown_tx,
1459 reload_tx,
1460 node_registry,
1461 session_backend,
1462 session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1463 device_registry,
1464 pending_pairings,
1465 path_prefix: path_prefix.unwrap_or("").to_string(),
1466 web_dist_dir,
1467 canvas_store,
1468 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
1469 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1470 tui_registry,
1471 #[cfg(feature = "webauthn")]
1472 webauthn: if config.security.webauthn.enabled {
1473 let secret_store = Arc::new(zeroclaw_runtime::security::SecretStore::new(
1474 &config.data_dir,
1475 true,
1476 ));
1477 let wa_config = zeroclaw_runtime::security::webauthn::WebAuthnConfig {
1478 enabled: true,
1479 rp_id: config.security.webauthn.rp_id.clone(),
1480 rp_origin: config.security.webauthn.rp_origin.clone(),
1481 rp_name: config.security.webauthn.rp_name.clone(),
1482 };
1483 Some(Arc::new(api_webauthn::WebAuthnState {
1484 manager: zeroclaw_runtime::security::webauthn::WebAuthnManager::new(
1485 wa_config,
1486 secret_store,
1487 &config.data_dir,
1488 ),
1489 pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1490 pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1491 }))
1492 } else {
1493 None
1494 },
1495 };
1496
1497 let inner = Router::new()
1499 .route("/admin/shutdown", post(handle_admin_shutdown))
1501 .route("/admin/reload", post(handle_admin_reload))
1502 .route("/admin/paircode", get(handle_admin_paircode))
1503 .route("/admin/paircode/new", post(handle_admin_paircode_new))
1504 .route("/health", get(handle_health))
1506 .route("/metrics", get(handle_metrics))
1507 .route("/pair", post(handle_pair))
1508 .route("/pair/code", get(handle_pair_code))
1509 .route("/webhook", post(handle_webhook))
1510 .merge(optional_channel_routes())
1511 .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1513 .route("/api/status", get(api::handle_api_status))
1515 .route("/api/logs", get(api_logs::handle_api_logs))
1516 .route(
1517 "/api/config",
1518 get(api_config::handle_config_get)
1519 .patch(api_config::handle_patch)
1520 .options(api_config::handle_options_config),
1521 )
1522 .route(
1523 "/api/config/prop",
1524 get(api_config::handle_prop_get)
1525 .put(api_config::handle_prop_put)
1526 .delete(api_config::handle_prop_delete)
1527 .options(api_config::handle_options_prop),
1528 )
1529 .route("/api/config/list", get(api_config::handle_list))
1530 .route("/api/config/drift", get(api_config::handle_drift))
1531 .route(
1532 "/api/config/reload-status",
1533 get(api_config::handle_reload_status),
1534 )
1535 .route("/api/config/templates", get(api_config::handle_templates))
1536 .route("/api/config/map-keys", get(api_config::handle_get_map_keys))
1537 .route(
1538 "/api/config/map-key",
1539 post(api_config::handle_map_key).delete(api_config::handle_delete_map_key),
1540 )
1541 .route("/api/config/rename-map-key", post(api_config::handle_rename_map_key))
1542 .route("/api/config/catalog", get(api_sections::handle_catalog))
1543 .route(
1544 "/api/config/catalog/models",
1545 get(api_sections::handle_catalog_models),
1546 )
1547 .route("/api/config/status", get(api_sections::handle_section_status))
1548 .route(
1549 "/api/config/agent-options",
1550 get(api_sections::handle_agent_options),
1551 )
1552 .route("/api/config/sections", get(api_sections::handle_sections))
1553 .route(
1554 "/api/config/sections/{section}",
1555 get(api_sections::handle_section_picker),
1556 )
1557 .route(
1558 "/api/config/sections/{section}/items/{key}",
1559 post(api_sections::handle_section_select),
1560 )
1561 .route("/api/personality", get(api_personality::handle_index))
1562 .route(
1563 "/api/quickstart/state",
1564 get(api_quickstart::handle_state),
1565 )
1566 .route(
1567 "/api/quickstart/fields",
1568 post(api_quickstart::handle_fields),
1569 )
1570 .route(
1571 "/api/quickstart/validate",
1572 post(api_quickstart::handle_validate),
1573 )
1574 .route(
1575 "/api/quickstart/apply",
1576 post(api_quickstart::handle_apply),
1577 )
1578 .route(
1579 "/api/quickstart/dismiss",
1580 post(api_quickstart::handle_dismiss),
1581 )
1582 .route(
1583 "/api/personality/templates",
1584 get(api_personality::handle_templates),
1585 )
1586 .route(
1587 "/api/personality/{filename}",
1588 get(api_personality::handle_get).put(api_personality::handle_put),
1589 )
1590 .route("/api/browse", get(api_browse::handle_browse))
1591 .route("/api/browse/mkdir", post(api_browse::handle_browse_mkdir))
1592 .route("/api/browse/rmdir", delete(api_browse::handle_browse_rmdir))
1593 .route(
1594 "/api/agents/{alias}/workspace/list",
1595 get(api_browse::handle_agent_workspace_list),
1596 )
1597 .route(
1598 "/api/agents/{alias}/workspace/read",
1599 get(api_browse::handle_agent_workspace_read),
1600 )
1601 .route(
1602 "/api/agents/{alias}/workspace/path",
1603 delete(api_browse::handle_agent_workspace_delete),
1604 )
1605 .route(
1606 "/api/agents/{alias}/workspace/move",
1607 post(api_browse::handle_agent_workspace_move),
1608 )
1609 .route(
1610 "/api/agents/{alias}/workspace/mkdir",
1611 post(api_browse::handle_agent_workspace_mkdir),
1612 )
1613 .route("/api/skills/bundles", get(api_skills::handle_list_bundles))
1614 .route(
1615 "/api/skills/bundles/{alias}/skills",
1616 get(api_skills::handle_list_skills).post(api_skills::handle_create_skill),
1617 )
1618 .route(
1619 "/api/skills/bundles/{alias}/skills/{name}",
1620 get(api_skills::handle_read_skill)
1621 .put(api_skills::handle_write_skill)
1622 .delete(api_skills::handle_delete_skill),
1623 )
1624 .route("/api/config/init", post(api_config::handle_init))
1625 .route("/api/config/migrate", post(api_config::handle_migrate))
1626 .route("/api/openapi.json", get(openapi::handle_openapi_json))
1627 .route("/api/docs", get(openapi::handle_docs))
1628 .route("/api/tools", get(api::handle_api_tools))
1629 .route("/api/cron", get(api::handle_api_cron_list))
1630 .route("/api/cron", post(api::handle_api_cron_add))
1631 .route(
1632 "/api/cron/settings",
1633 get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1634 )
1635 .route(
1636 "/api/cron/{id}",
1637 delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1638 )
1639 .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1640 .route("/api/integrations", get(api::handle_api_integrations))
1644 .route(
1645 "/api/integrations/settings",
1646 get(api::handle_api_integrations_settings),
1647 )
1648 .route(
1649 "/api/doctor",
1650 get(api::handle_api_doctor).post(api::handle_api_doctor),
1651 )
1652 .route("/api/memory", get(api::handle_api_memory_list))
1653 .route("/api/memory", post(api::handle_api_memory_store))
1654 .route("/api/memory/{key}", delete(api::handle_api_memory_delete))
1655 .route("/api/cost", get(api::handle_api_cost))
1656 .route("/api/cli-tools", get(api::handle_api_cli_tools))
1657 .route("/api/channels", get(api::handle_api_channels))
1658 .route("/api/health", get(api::handle_api_health))
1659 .route("/api/tuis", get(api::handle_api_tuis))
1660 .route("/api/sessions", get(api::handle_api_sessions_list))
1661 .route("/api/sessions/running", get(api::handle_api_sessions_running))
1662 .route(
1663 "/api/sessions/{id}/messages",
1664 get(api::handle_api_session_messages).post(api::handle_api_session_message_post),
1665 )
1666 .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1667 .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1668 .route("/api/sessions/{id}/abort", post(api::handle_api_session_abort))
1669 .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1671 .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1672 .route("/api/devices", get(api_pairing::list_devices))
1673 .route(
1674 "/api/devices/me/capabilities",
1675 post(api_pairing::update_my_capabilities),
1676 )
1677 .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1678 .route(
1679 "/api/devices/{id}/token/rotate",
1680 post(api_pairing::rotate_token),
1681 )
1682 .route("/api/canvas", get(canvas::handle_canvas_list))
1684 .route(
1685 "/api/canvas/{id}",
1686 get(canvas::handle_canvas_get)
1687 .post(canvas::handle_canvas_post)
1688 .delete(canvas::handle_canvas_clear),
1689 )
1690 .route(
1691 "/api/canvas/{id}/history",
1692 get(canvas::handle_canvas_history),
1693 );
1694
1695 #[cfg(feature = "webauthn")]
1697 let inner = inner
1698 .route(
1699 "/api/webauthn/register/start",
1700 post(api_webauthn::handle_register_start),
1701 )
1702 .route(
1703 "/api/webauthn/register/finish",
1704 post(api_webauthn::handle_register_finish),
1705 )
1706 .route(
1707 "/api/webauthn/auth/start",
1708 post(api_webauthn::handle_auth_start),
1709 )
1710 .route(
1711 "/api/webauthn/auth/finish",
1712 post(api_webauthn::handle_auth_finish),
1713 )
1714 .route(
1715 "/api/webauthn/credentials",
1716 get(api_webauthn::handle_list_credentials),
1717 )
1718 .route(
1719 "/api/webauthn/credentials/{id}",
1720 delete(api_webauthn::handle_delete_credential),
1721 );
1722
1723 #[cfg(feature = "plugins-wasm")]
1725 let inner = inner.route(
1726 "/api/plugins",
1727 get(api_plugins::plugin_routes::list_plugins),
1728 );
1729
1730 let inner = inner
1731 .route("/api/events", get(sse::handle_sse_events))
1733 .route("/api/events/history", get(sse::handle_events_history))
1734 .route("/acp", get(acp::handle_ws_acp))
1736 .route("/ws/chat", get(ws::handle_ws_chat))
1738 .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1740 .route("/ws/nodes", get(nodes::handle_ws_nodes))
1742 .route("/_app/{*path}", get(static_files::handle_static))
1744 .fallback(get(static_files::handle_spa_fallback))
1746 .with_state(state.clone())
1747 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1748 .layer(TimeoutLayer::with_status_code(
1749 StatusCode::REQUEST_TIMEOUT,
1750 Duration::from_secs(gateway_request_timeout_secs(&config.gateway)),
1751 ));
1752
1753 let cron_run_router: Router = Router::new()
1758 .route("/api/cron/{id}/run", post(api::handle_api_cron_run))
1759 .with_state(state)
1760 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1761 .layer(TimeoutLayer::with_status_code(
1762 StatusCode::REQUEST_TIMEOUT,
1763 Duration::from_secs(gateway_long_running_request_timeout_secs(&config.gateway)),
1764 ));
1765
1766 let inner = inner.merge(cron_run_router);
1767
1768 let app = if let Some(prefix) = path_prefix {
1772 let redirect_target = prefix.to_string();
1773 Router::new().nest(prefix, inner).route(
1774 &format!("{prefix}/"),
1775 get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1776 )
1777 } else {
1778 inner
1779 };
1780
1781 let tls_acceptor = match &config.gateway.tls {
1783 Some(tls_cfg) if tls_cfg.enabled => {
1784 let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1785 if has_mtls {
1786 ::zeroclaw_log::record!(
1787 INFO,
1788 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1789 "TLS enabled with mutual TLS (mTLS) client verification"
1790 );
1791 } else {
1792 ::zeroclaw_log::record!(
1793 INFO,
1794 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1795 "TLS enabled (no client certificate requirement)"
1796 );
1797 }
1798 Some(tls::build_tls_acceptor(tls_cfg)?)
1799 }
1800 _ => None,
1801 };
1802
1803 if let Some(tls_acceptor) = tls_acceptor {
1804 let app = app.into_make_service_with_connect_info::<SocketAddr>();
1806 let mut app = app;
1807
1808 let mut shutdown_signal = shutdown_rx;
1809 loop {
1810 tokio::select! {
1811 conn = listener.accept() => {
1812 let (tcp_stream, remote_addr) = match conn {
1813 Ok(pair) => pair,
1814 Err(e) => {
1815 if is_recoverable_accept_error(&e) {
1816 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"error": format!("{}", e)})), "gateway accept() failed with a transient error; backing off and continuing");
1821 tokio::time::sleep(Duration::from_millis(ACCEPT_ERROR_BACKOFF_MS)).await;
1822 continue;
1823 }
1824 return Err(e.into());
1825 }
1826 };
1827 let tls_acceptor = tls_acceptor.clone();
1828 let svc = tower::MakeService::<
1829 SocketAddr,
1830 hyper::Request<hyper::body::Incoming>,
1831 >::make_service(&mut app, remote_addr)
1832 .await
1833 .expect("infallible make_service");
1834
1835 zeroclaw_spawn::spawn!(async move {
1836 let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1837 Ok(s) => s,
1838 Err(e) => {
1839 ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"error": format!("{}", e), "remote_addr": remote_addr})), "TLS handshake failed from");
1840 return;
1841 }
1842 };
1843 let io = hyper_util::rt::TokioIo::new(tls_stream);
1844 let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1845 let mut svc = svc.clone();
1846 async move {
1847 tower::Service::call(&mut svc, req).await
1848 }
1849 });
1850 if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1851 hyper_util::rt::TokioExecutor::new(),
1852 )
1853 .serve_connection(io, hyper_svc)
1854 .await
1855 {
1856 ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"error": format!("{}", e), "remote_addr": remote_addr})), "connection error from");
1857 }
1858 });
1859 }
1860 _ = shutdown_signal.changed() => {
1861 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "ZeroClaw Gateway shutting down");
1862 break;
1863 }
1864 }
1865 }
1866 } else {
1867 axum::serve(
1869 listener,
1870 app.into_make_service_with_connect_info::<SocketAddr>(),
1871 )
1872 .with_graceful_shutdown(async move {
1873 let _ = shutdown_rx.changed().await;
1874 ::zeroclaw_log::record!(
1875 INFO,
1876 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1877 "ZeroClaw Gateway shutting down"
1878 );
1879 })
1880 .await?;
1881 }
1882
1883 drop(broadcast_hook_guard);
1884
1885 Ok(())
1886}
1887
1888fn format_paircode_recovery_command(host: &str, port: u16) -> String {
1889 let mut cmd = format!("zeroclaw gateway get-paircode --new --port {port}");
1890 if let Some(host_arg) = paircode_recovery_host_arg(host) {
1891 cmd.push_str(" --host ");
1892 cmd.push_str(host_arg);
1893 }
1894 cmd
1895}
1896
1897fn paircode_recovery_host_arg(host: &str) -> Option<&str> {
1898 match host {
1899 "127.0.0.1" | "localhost" | "::1" | "0.0.0.0" | "::" => None,
1900 _ => Some(host),
1901 }
1902}
1903
1904fn format_paircode_recovery_curl(host: &str, port: u16, path_prefix: &str) -> String {
1905 format!("curl -s -X POST http://{host}:{port}{path_prefix}/admin/paircode/new")
1906}
1907
1908async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1914 let body = serde_json::json!({
1915 "status": "ok",
1916 "paired": state.pairing.is_paired(),
1917 "require_pairing": state.pairing.require_pairing(),
1918 "runtime": zeroclaw_runtime::health::snapshot_json(),
1919 });
1920 Json(body)
1921}
1922
1923const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1925
1926fn prometheus_disabled_hint() -> String {
1927 String::from(
1928 "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1929 )
1930}
1931
1932#[cfg(feature = "observability-prometheus")]
1933fn prometheus_observer_from_state(
1934 observer: &dyn zeroclaw_runtime::observability::Observer,
1935) -> Option<&zeroclaw_runtime::observability::PrometheusObserver> {
1936 observer
1940 .as_any()
1941 .downcast_ref::<zeroclaw_runtime::observability::PrometheusObserver>()
1942}
1943
1944async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1946 let body = {
1947 #[cfg(feature = "observability-prometheus")]
1948 {
1949 if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1950 prom.encode()
1951 } else {
1952 prometheus_disabled_hint()
1953 }
1954 }
1955 #[cfg(not(feature = "observability-prometheus"))]
1956 {
1957 let _ = &state;
1958 prometheus_disabled_hint()
1959 }
1960 };
1961
1962 (
1963 StatusCode::OK,
1964 [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1965 body,
1966 )
1967}
1968
1969#[axum::debug_handler]
1971async fn handle_pair(
1972 State(state): State<AppState>,
1973 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1974 headers: HeaderMap,
1975) -> impl IntoResponse {
1976 let rate_key =
1977 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1978 if !state.rate_limiter.allow_pair(&rate_key) {
1979 ::zeroclaw_log::record!(
1980 WARN,
1981 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1982 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1983 "/pair rate limit exceeded"
1984 );
1985 let err = serde_json::json!({
1986 "error": "Too many pairing requests. Please retry later.",
1987 "retry_after": RATE_LIMIT_WINDOW_SECS,
1988 });
1989 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1990 }
1991
1992 if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key) {
1994 ::zeroclaw_log::record!(
1995 WARN,
1996 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1997 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1998 .with_attrs(::serde_json::json!({"rate_key": rate_key})),
1999 "pairing auth rate limit exceeded"
2000 );
2001 let err = serde_json::json!({
2002 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
2003 "retry_after": e.retry_after_secs,
2004 });
2005 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2006 }
2007
2008 let code = headers
2009 .get("X-Pairing-Code")
2010 .and_then(|v| v.to_str().ok())
2011 .unwrap_or("");
2012
2013 match state.pairing.try_pair(code, &rate_key).await {
2014 Ok(Some(token)) => {
2015 ::zeroclaw_log::record!(
2016 INFO,
2017 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
2018 "new client paired successfully"
2019 );
2020 if let Err(err) =
2021 Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
2022 {
2023 ::zeroclaw_log::record!(
2024 ERROR,
2025 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2026 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2027 .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
2028 "pairing succeeded but token persistence failed"
2029 );
2030 let body = serde_json::json!({
2031 "paired": true,
2032 "persisted": false,
2033 "token": token,
2034 "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
2035 });
2036 return (StatusCode::OK, Json(body));
2037 }
2038
2039 let body = serde_json::json!({
2040 "paired": true,
2041 "persisted": true,
2042 "token": token,
2043 "message": "Save this token — use it as Authorization: Bearer <token>"
2044 });
2045 (StatusCode::OK, Json(body))
2046 }
2047 Ok(None) => {
2048 state.auth_limiter.record_attempt(&rate_key);
2049 ::zeroclaw_log::record!(
2050 WARN,
2051 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2052 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2053 "pairing attempt with invalid code"
2054 );
2055 let err = serde_json::json!({"error": "Invalid pairing code"});
2056 (StatusCode::FORBIDDEN, Json(err))
2057 }
2058 Err(lockout_secs) => {
2059 ::zeroclaw_log::record!(
2060 WARN,
2061 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2062 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2063 .with_attrs(::serde_json::json!({"lockout_secs": lockout_secs})),
2064 "pairing locked out; too many failed attempts"
2065 );
2066 let err = serde_json::json!({
2067 "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
2068 "retry_after": lockout_secs
2069 });
2070 (StatusCode::TOO_MANY_REQUESTS, Json(err))
2071 }
2072 }
2073}
2074
2075pub(crate) async fn persist_pairing_tokens(
2076 config: Arc<RwLock<Config>>,
2077 pairing: &PairingGuard,
2078) -> Result<()> {
2079 let paired_tokens = pairing.tokens();
2080 let mut updated_cfg = { config.read().clone() };
2083 updated_cfg.gateway.paired_tokens = paired_tokens;
2084 updated_cfg.mark_dirty("gateway.paired_tokens");
2090 updated_cfg
2091 .save_dirty()
2092 .await
2093 .context("Failed to persist paired tokens to config.toml")?;
2094
2095 *config.write() = updated_cfg;
2097 Ok(())
2098}
2099
2100struct GatewayChatOutcome {
2105 response: String,
2106 input_tokens: Option<u64>,
2107 output_tokens: Option<u64>,
2108 cost_usd: Option<f64>,
2109}
2110
2111struct UnconfiguredModelProvider;
2112
2113#[async_trait::async_trait]
2114impl ModelProvider for UnconfiguredModelProvider {
2115 async fn chat_with_system(
2116 &self,
2117 _system_prompt: Option<&str>,
2118 _message: &str,
2119 _model: &str,
2120 _temperature: Option<f64>,
2121 ) -> anyhow::Result<String> {
2122 anyhow::bail!(
2123 "needs_quickstart: gateway booted without a working model_provider. \
2124 Complete browser quickstart at /quickstart, or fix \
2125 [providers.models.<type>.<alias>] and POST /admin/reload."
2126 )
2127 }
2128}
2129
2130impl ::zeroclaw_api::attribution::Attributable for UnconfiguredModelProvider {
2131 fn role(&self) -> ::zeroclaw_api::attribution::Role {
2132 ::zeroclaw_api::attribution::Role::Provider(
2133 ::zeroclaw_api::attribution::ProviderKind::Model(
2134 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
2135 ),
2136 )
2137 }
2138 fn alias(&self) -> &str {
2139 "unconfigured"
2140 }
2141}
2142
2143fn needs_quickstart_for(model: &str) -> Option<anyhow::Error> {
2150 if model.trim().is_empty() {
2151 ::zeroclaw_log::record!(
2152 WARN,
2153 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2154 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
2155 "gateway dispatch refused: no model configured (browser quickstart incomplete)"
2156 );
2157 Some(anyhow::Error::msg(
2158 "needs_quickstart: gateway has no model configured. Complete \
2159 browser quickstart at /quickstart, or set [providers.models.<type>.<alias>] \
2160 model = \"...\" before sending messages.",
2161 ))
2162 } else {
2163 None
2164 }
2165}
2166
2167fn is_needs_quickstart_err(e: &anyhow::Error) -> bool {
2172 e.to_string().contains("needs_quickstart")
2173}
2174
2175fn needs_quickstart_channel_reply() -> String {
2181 i18n::get_required_cli_string("channel-needs-quickstart-reply")
2182}
2183
2184async fn run_gateway_chat_with_tools(
2190 state: &AppState,
2191 message: &str,
2192 session_id: Option<&str>,
2193 agent_override: Option<&str>,
2194) -> anyhow::Result<GatewayChatOutcome> {
2195 if let Some(err) = needs_quickstart_for(&state.model) {
2196 return Err(err);
2197 }
2198
2199 #[cfg(test)]
2204 {
2205 let _ = (session_id, agent_override);
2206 let response = state
2207 .model_provider
2208 .chat_with_system(None, message, &state.model, state.temperature)
2209 .await?;
2210 Ok(GatewayChatOutcome {
2211 response,
2212 input_tokens: None,
2213 output_tokens: None,
2214 cost_usd: None,
2215 })
2216 }
2217
2218 #[cfg(not(test))]
2219 {
2220 let config = state.config.read().clone();
2221 let agent_alias = require_gateway_chat_agent_alias(&config, agent_override)?;
2222
2223 let cost_tracking_context = state.cost_tracker.as_ref().map(|tracker| {
2234 let pricing: zeroclaw_runtime::agent::cost::ModelProviderPricing = config
2235 .providers
2236 .models
2237 .iter_entries()
2238 .filter(|(_, _, base)| !base.pricing.is_empty())
2239 .map(|(type_k, alias_k, base)| {
2240 (format!("{type_k}.{alias_k}"), base.pricing.clone())
2241 })
2242 .collect();
2243 zeroclaw_runtime::agent::cost::ToolLoopCostTrackingContext::new(
2244 tracker.clone(),
2245 std::sync::Arc::new(pricing),
2246 )
2247 .with_agent_alias(&agent_alias)
2248 });
2249 let captured_usage = cost_tracking_context
2250 .as_ref()
2251 .map(|ctx| ctx.turn_usage.clone());
2252 let response = Box::pin(
2253 zeroclaw_runtime::agent::cost::TOOL_LOOP_COST_TRACKING_CONTEXT.scope(
2254 cost_tracking_context,
2255 zeroclaw_runtime::agent::process_message(config, &agent_alias, message, session_id),
2256 ),
2257 )
2258 .await?;
2259 let usage = captured_usage
2260 .map(|cell| *cell.lock())
2261 .filter(|u| u.input_tokens > 0 || u.output_tokens > 0);
2262 let (input_tokens, output_tokens, cost_usd) = match usage {
2263 Some(u) => (
2264 Some(u.input_tokens),
2265 Some(u.output_tokens),
2266 Some(u.cost_usd),
2267 ),
2268 None => (None, None, None),
2269 };
2270 Ok(GatewayChatOutcome {
2271 response,
2272 input_tokens,
2273 output_tokens,
2274 cost_usd,
2275 })
2276 }
2277}
2278
2279fn resolve_gateway_chat_agent_alias(
2280 config: &Config,
2281 agent_override: Option<&str>,
2282) -> Option<String> {
2283 agent_override
2284 .map(ToString::to_string)
2285 .or_else(|| config.resolved_runtime_agent_alias().map(str::to_owned))
2286}
2287
2288#[cfg(not(test))]
2289fn require_gateway_chat_agent_alias(
2290 config: &Config,
2291 agent_override: Option<&str>,
2292) -> anyhow::Result<String> {
2293 resolve_gateway_chat_agent_alias(config, agent_override).ok_or_else(|| {
2294 ::zeroclaw_log::record!(
2295 WARN,
2296 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2297 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
2298 "webhook chat rejected: no configured [agents.<alias>] entry"
2299 );
2300 anyhow::Error::msg("webhook chat requires at least one configured [agents.<alias>] entry")
2301 })
2302}
2303
2304fn optional_channel_routes() -> Router<AppState> {
2305 let router: Router<AppState> = Router::new();
2306 #[cfg(feature = "channel-whatsapp-cloud")]
2307 let router = router
2308 .route("/whatsapp", get(handle_whatsapp_verify))
2309 .route("/whatsapp", post(handle_whatsapp_message));
2310 #[cfg(feature = "channel-linq")]
2311 let router = router.route("/linq/{alias}", post(handle_linq_webhook));
2312 #[cfg(feature = "channel-wati")]
2313 let router = router
2314 .route("/wati", get(handle_wati_verify))
2315 .route("/wati", post(handle_wati_webhook));
2316 #[cfg(feature = "channel-nextcloud")]
2317 let router = router.route("/nextcloud-talk", post(handle_nextcloud_talk_webhook));
2318 #[cfg(feature = "channel-email")]
2319 let router = router.route("/webhook/gmail", post(handle_gmail_push_webhook));
2320 router
2321}
2322
2323#[derive(serde::Deserialize)]
2325pub struct WebhookBody {
2326 pub message: String,
2327}
2328
2329#[derive(Default, serde::Deserialize)]
2331pub struct WebhookQuery {
2332 #[serde(default, alias = "agentAlias", alias = "agent_alias")]
2337 pub agent: Option<String>,
2338}
2339
2340async fn handle_webhook(
2342 State(state): State<AppState>,
2343 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
2344 Query(query): Query<WebhookQuery>,
2345 headers: HeaderMap,
2346 body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
2347) -> impl IntoResponse {
2348 let rate_key =
2349 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
2350 if !state.rate_limiter.allow_webhook(&rate_key) {
2351 ::zeroclaw_log::record!(
2352 WARN,
2353 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2354 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2355 "/webhook rate limit exceeded"
2356 );
2357 let err = serde_json::json!({
2358 "error": "Too many webhook requests. Please retry later.",
2359 "retry_after": RATE_LIMIT_WINDOW_SECS,
2360 });
2361 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2362 }
2363
2364 if state.pairing.require_pairing() {
2366 if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key) {
2367 ::zeroclaw_log::record!(
2368 WARN,
2369 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2370 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2371 .with_attrs(::serde_json::json!({"rate_key": rate_key})),
2372 "webhook: auth rate limit exceeded for"
2373 );
2374 let err = serde_json::json!({
2375 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
2376 "retry_after": e.retry_after_secs,
2377 });
2378 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2379 }
2380 let auth = headers
2381 .get(header::AUTHORIZATION)
2382 .and_then(|v| v.to_str().ok())
2383 .unwrap_or("");
2384 let token = auth.strip_prefix("Bearer ").unwrap_or("");
2385 if !state.pairing.is_authenticated(token) {
2386 state.auth_limiter.record_attempt(&rate_key);
2387 ::zeroclaw_log::record!(
2388 WARN,
2389 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2390 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2391 "webhook: rejected — not paired / invalid bearer token"
2392 );
2393 let err = serde_json::json!({
2394 "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
2395 });
2396 return (StatusCode::UNAUTHORIZED, Json(err));
2397 }
2398 }
2399
2400 if let Some(ref secret_hash) = state.webhook_secret_hash {
2402 let header_hash = headers
2403 .get("X-Webhook-Secret")
2404 .and_then(|v| v.to_str().ok())
2405 .map(str::trim)
2406 .filter(|value| !value.is_empty())
2407 .map(hash_webhook_secret);
2408 match header_hash {
2409 Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
2410 _ => {
2411 ::zeroclaw_log::record!(
2412 WARN,
2413 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2414 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2415 "webhook: rejected request — invalid or missing X-Webhook-Secret"
2416 );
2417 let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
2418 return (StatusCode::UNAUTHORIZED, Json(err));
2419 }
2420 }
2421 }
2422
2423 let Json(webhook_body) = match body {
2425 Ok(b) => b,
2426 Err(e) => {
2427 ::zeroclaw_log::record!(
2428 WARN,
2429 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2430 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2431 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2432 "webhook JSON parse error"
2433 );
2434 let err = serde_json::json!({
2435 "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
2436 });
2437 return (StatusCode::BAD_REQUEST, Json(err));
2438 }
2439 };
2440
2441 let agent_override = query
2446 .agent
2447 .as_deref()
2448 .map(str::trim)
2449 .filter(|s| !s.is_empty());
2450 if let Some(alias) = agent_override {
2451 let cfg = state.config.read();
2452 if cfg.agent(alias).is_none() {
2453 ::zeroclaw_log::record!(
2454 WARN,
2455 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
2456 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2457 .with_attrs(::serde_json::json!({"agent": alias})),
2458 "webhook: rejected — unknown agent alias"
2459 );
2460 let err = serde_json::json!({
2461 "error": format!(
2462 "Unknown agent `{alias}` — no [agents.{alias}] entry configured."
2463 )
2464 });
2465 return (StatusCode::BAD_REQUEST, Json(err));
2466 }
2467 }
2468
2469 if let Some(idempotency_key) = headers
2471 .get("X-Idempotency-Key")
2472 .and_then(|v| v.to_str().ok())
2473 .map(str::trim)
2474 .filter(|value| !value.is_empty())
2475 && !state.idempotency_store.record_if_new(idempotency_key)
2476 {
2477 ::zeroclaw_log::record!(
2478 INFO,
2479 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2480 .with_attrs(::serde_json::json!({"idempotency_key": idempotency_key})),
2481 "webhook duplicate ignored"
2482 );
2483 let body = serde_json::json!({
2484 "status": "duplicate",
2485 "idempotent": true,
2486 "message": "Request already processed for this idempotency key"
2487 });
2488 return (StatusCode::OK, Json(body));
2489 }
2490
2491 let message = &webhook_body.message;
2492 let session_id = webhook_session_id(&headers);
2493
2494 if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(message) {
2495 let key = webhook_memory_key();
2496 let _ = state
2497 .mem
2498 .store(
2499 &key,
2500 message,
2501 MemoryCategory::Conversation,
2502 session_id.as_deref(),
2503 )
2504 .await;
2505 }
2506
2507 let (provider_label, model_label) = {
2508 let cfg = state.config.read();
2509 let resolved_agent_alias = resolve_gateway_chat_agent_alias(&cfg, agent_override);
2510 let resolved_provider = resolved_agent_alias
2511 .as_deref()
2512 .and_then(|alias| cfg.resolved_model_provider_for_agent(alias));
2513 let provider_label = resolved_provider
2514 .as_ref()
2515 .map(|(ty, alias, _)| format!("{ty}.{alias}"))
2516 .unwrap_or_else(|| "unknown".to_string());
2517 let model_label = resolved_provider
2518 .and_then(|(_, _, entry)| {
2519 entry
2520 .model
2521 .as_deref()
2522 .map(str::trim)
2523 .filter(|model| !model.is_empty())
2524 .map(ToString::to_string)
2525 })
2526 .or_else(|| cfg.resolve_default_model())
2527 .unwrap_or_else(|| "<unresolved>".to_string());
2528 (provider_label, model_label)
2529 };
2530 let started_at = Instant::now();
2531
2532 state.observer.record_event(
2533 &zeroclaw_runtime::observability::ObserverEvent::AgentStart {
2534 model_provider: provider_label.clone(),
2535 model: model_label.clone(),
2536 },
2537 );
2538 state.observer.record_event(
2539 &zeroclaw_runtime::observability::ObserverEvent::LlmRequest {
2540 model_provider: provider_label.clone(),
2541 model: model_label.clone(),
2542 messages_count: 1,
2543 },
2544 );
2545
2546 match run_gateway_chat_with_tools(&state, message, session_id.as_deref(), agent_override).await
2547 {
2548 Ok(GatewayChatOutcome {
2549 response,
2550 input_tokens,
2551 output_tokens,
2552 cost_usd,
2553 }) => {
2554 let duration = started_at.elapsed();
2555 let tokens_used = input_tokens
2560 .zip(output_tokens)
2561 .map(|(i, o)| i + o)
2562 .or(input_tokens)
2563 .or(output_tokens);
2564 state.observer.record_event(
2565 &zeroclaw_runtime::observability::ObserverEvent::LlmResponse {
2566 model_provider: provider_label.clone(),
2567 model: model_label.clone(),
2568 duration,
2569 success: true,
2570 error_message: None,
2571 input_tokens: None,
2572 output_tokens: None,
2573 },
2574 );
2575 state.observer.record_metric(
2576 &zeroclaw_runtime::observability::traits::ObserverMetric::RequestLatency(duration),
2577 );
2578 state.observer.record_event(
2579 &zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
2580 model_provider: provider_label,
2581 model: model_label.clone(),
2582 duration,
2583 tokens_used,
2584 cost_usd,
2585 },
2586 );
2587
2588 let body = serde_json::json!({"response": response, "model": model_label});
2589 (StatusCode::OK, Json(body))
2590 }
2591 Err(e) => {
2592 let duration = started_at.elapsed();
2593 let sanitized = zeroclaw_providers::sanitize_api_error(&e.to_string());
2594
2595 state.observer.record_event(
2596 &zeroclaw_runtime::observability::ObserverEvent::LlmResponse {
2597 model_provider: provider_label.clone(),
2598 model: model_label.clone(),
2599 duration,
2600 success: false,
2601 error_message: Some(sanitized.clone()),
2602 input_tokens: None,
2603 output_tokens: None,
2604 },
2605 );
2606 state.observer.record_metric(
2607 &zeroclaw_runtime::observability::traits::ObserverMetric::RequestLatency(duration),
2608 );
2609 state
2610 .observer
2611 .record_event(&zeroclaw_runtime::observability::ObserverEvent::Error {
2612 component: "gateway".to_string(),
2613 message: sanitized.clone(),
2614 });
2615 state.observer.record_event(
2616 &zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
2617 model_provider: provider_label,
2618 model: model_label,
2619 duration,
2620 tokens_used: None,
2621 cost_usd: None,
2622 },
2623 );
2624
2625 if is_needs_quickstart_err(&e) {
2626 ::zeroclaw_log::record!(
2627 WARN,
2628 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2629 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2630 "Webhook chat refused: gateway has no model configured; \
2631 visit /quickstart"
2632 );
2633 let body = serde_json::json!({
2634 "error": "needs_quickstart",
2635 "url": "/quickstart"
2636 });
2637 (StatusCode::SERVICE_UNAVAILABLE, Json(body))
2638 } else {
2639 ::zeroclaw_log::record!(
2640 ERROR,
2641 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2642 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2643 .with_attrs(::serde_json::json!({"error": sanitized})),
2644 "webhook model_provider error"
2645 );
2646 let err = serde_json::json!({"error": "LLM request failed"});
2647 (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
2648 }
2649 }
2650 }
2651}
2652
2653#[derive(serde::Deserialize)]
2655pub struct WhatsAppVerifyQuery {
2656 #[serde(rename = "hub.mode")]
2657 pub mode: Option<String>,
2658 #[serde(rename = "hub.verify_token")]
2659 pub verify_token: Option<String>,
2660 #[serde(rename = "hub.challenge")]
2661 pub challenge: Option<String>,
2662}
2663
2664#[cfg(feature = "channel-whatsapp-cloud")]
2666async fn handle_whatsapp_verify(
2667 State(state): State<AppState>,
2668 Query(params): Query<WhatsAppVerifyQuery>,
2669) -> impl IntoResponse {
2670 let Some(ref wa) = state.whatsapp else {
2671 return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
2672 };
2673
2674 let token_matches = params
2676 .verify_token
2677 .as_deref()
2678 .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
2679 if params.mode.as_deref() == Some("subscribe") && token_matches {
2680 if let Some(ch) = params.challenge {
2681 ::zeroclaw_log::record!(
2682 INFO,
2683 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2684 .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2685 "webhook verified successfully"
2686 );
2687 return (StatusCode::OK, ch);
2688 }
2689 return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
2690 }
2691
2692 ::zeroclaw_log::record!(
2693 WARN,
2694 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2695 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2696 .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2697 "webhook verification failed — token mismatch"
2698 );
2699 (StatusCode::FORBIDDEN, "Forbidden".to_string())
2700}
2701
2702pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
2706 use hmac::{Hmac, Mac};
2707 use sha2::Sha256;
2708
2709 let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
2711 return false;
2712 };
2713
2714 let Ok(expected) = hex::decode(hex_sig) else {
2716 return false;
2717 };
2718
2719 let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
2721 return false;
2722 };
2723 mac.update(body);
2724
2725 mac.verify_slice(&expected).is_ok()
2727}
2728
2729#[cfg(feature = "channel-whatsapp-cloud")]
2731async fn handle_whatsapp_message(
2732 State(state): State<AppState>,
2733 headers: HeaderMap,
2734 body: Bytes,
2735) -> impl IntoResponse {
2736 let Some(ref wa) = state.whatsapp else {
2737 return (
2738 StatusCode::NOT_FOUND,
2739 Json(serde_json::json!({"error": "WhatsApp not configured"})),
2740 );
2741 };
2742
2743 if let Some(ref app_secret) = state.whatsapp_app_secret {
2745 let signature = headers
2746 .get("X-Hub-Signature-256")
2747 .and_then(|v| v.to_str().ok())
2748 .unwrap_or("");
2749
2750 if !verify_whatsapp_signature(app_secret, &body, signature) {
2751 ::zeroclaw_log::record!(
2752 WARN,
2753 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2754 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2755 .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2756 &format!(
2757 "webhook signature verification failed (signature: {})",
2758 if signature.is_empty() {
2759 "missing"
2760 } else {
2761 "invalid"
2762 }
2763 )
2764 );
2765 return (
2766 StatusCode::UNAUTHORIZED,
2767 Json(serde_json::json!({"error": "Invalid signature"})),
2768 );
2769 }
2770 }
2771
2772 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2774 return (
2775 StatusCode::BAD_REQUEST,
2776 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2777 );
2778 };
2779
2780 let messages = wa.parse_webhook_payload(&payload);
2782
2783 if messages.is_empty() {
2784 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2786 }
2787
2788 for msg in &messages {
2790 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "whatsapp", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2791
2792 if let Some((token, response)) = zeroclaw_channels::util::parse_approval_reply(&msg.content)
2794 {
2795 let mut map = wa.pending_approvals().lock().await;
2796 if let Some(sender) = map.remove(&token) {
2797 let _ = sender.send(response);
2798 continue;
2799 }
2800 }
2801
2802 let session_id = sender_session_id("whatsapp", msg);
2803
2804 if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2806 let key = whatsapp_memory_key(msg);
2807 let _ = state
2808 .mem
2809 .store(
2810 &key,
2811 &msg.content,
2812 MemoryCategory::Conversation,
2813 Some(&session_id),
2814 )
2815 .await;
2816 }
2817
2818 match Box::pin(run_gateway_chat_with_tools(
2819 &state,
2820 &msg.content,
2821 Some(&session_id),
2822 None,
2823 ))
2824 .await
2825 {
2826 Ok(GatewayChatOutcome { response, .. }) => {
2827 if let Err(e) = wa
2829 .send(&SendMessage::new(response, &msg.reply_target))
2830 .await
2831 {
2832 ::zeroclaw_log::record!(
2833 ERROR,
2834 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2835 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2836 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2837 "Failed to send WhatsApp reply"
2838 );
2839 }
2840 }
2841 Err(e) => {
2842 let reply = if is_needs_quickstart_err(&e) {
2843 ::zeroclaw_log::record!(
2844 WARN,
2845 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2846 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2847 "WhatsApp chat refused: gateway has no model configured; \
2848 visit /quickstart"
2849 );
2850 needs_quickstart_channel_reply()
2851 } else {
2852 ::zeroclaw_log::record!(
2853 ERROR,
2854 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2855 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2856 .with_attrs(
2857 ::serde_json::json!({"channel": "whatsapp", "error": format!("{}", e)})
2858 ),
2859 "LLM error"
2860 );
2861 "Sorry, I couldn't process your message right now.".to_string()
2862 };
2863 let _ = wa.send(&SendMessage::new(reply, &msg.reply_target)).await;
2864 }
2865 }
2866 }
2867
2868 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2870}
2871
2872#[cfg(feature = "channel-linq")]
2874async fn handle_linq_webhook(
2875 State(state): State<AppState>,
2876 Path(alias): Path<String>,
2877 headers: HeaderMap,
2878 body: Bytes,
2879) -> impl IntoResponse {
2880 let Some(linq) = state.linq.get(&alias) else {
2881 return (
2882 StatusCode::NOT_FOUND,
2883 Json(serde_json::json!({"error": format!("Linq alias '{alias}' not configured")})),
2884 );
2885 };
2886
2887 let body_str = String::from_utf8_lossy(&body);
2888
2889 if let Some(signing_secret) = state.linq_signing_secrets.get(&alias) {
2891 let timestamp = headers
2892 .get("X-Webhook-Timestamp")
2893 .and_then(|v| v.to_str().ok())
2894 .unwrap_or("");
2895
2896 let signature = headers
2897 .get("X-Webhook-Signature")
2898 .and_then(|v| v.to_str().ok())
2899 .unwrap_or("");
2900
2901 if !zeroclaw_channels::linq::verify_linq_signature(
2902 signing_secret,
2903 &body_str,
2904 timestamp,
2905 signature,
2906 ) {
2907 ::zeroclaw_log::record!(
2908 WARN,
2909 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2910 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2911 .with_attrs(::serde_json::json!({"channel": "linq", "alias": alias})),
2912 &format!(
2913 "Linq webhook signature verification failed for alias '{alias}' (signature: {})",
2914 if signature.is_empty() {
2915 "missing"
2916 } else {
2917 "invalid"
2918 }
2919 )
2920 );
2921 return (
2922 StatusCode::UNAUTHORIZED,
2923 Json(serde_json::json!({"error": "Invalid signature"})),
2924 );
2925 }
2926 }
2927
2928 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2930 return (
2931 StatusCode::BAD_REQUEST,
2932 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2933 );
2934 };
2935
2936 let messages = linq.parse_webhook_payload(&payload);
2938
2939 if messages.is_empty() {
2940 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2942 }
2943
2944 for msg in &messages {
2946 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "linq", "alias": alias, "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2947 let session_id = sender_session_id("linq", msg);
2948
2949 if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2951 let key = linq_memory_key(msg);
2952 let _ = state
2953 .mem
2954 .store(
2955 &key,
2956 &msg.content,
2957 MemoryCategory::Conversation,
2958 Some(&session_id),
2959 )
2960 .await;
2961 }
2962
2963 match Box::pin(run_gateway_chat_with_tools(
2965 &state,
2966 &msg.content,
2967 Some(&session_id),
2968 None,
2969 ))
2970 .await
2971 {
2972 Ok(GatewayChatOutcome { response, .. }) => {
2973 if let Err(e) = linq
2975 .send(&SendMessage::new(response, &msg.reply_target))
2976 .await
2977 {
2978 ::zeroclaw_log::record!(
2979 ERROR,
2980 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2981 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2982 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2983 "Failed to send Linq reply"
2984 );
2985 }
2986 }
2987 Err(e) => {
2988 let reply = if is_needs_quickstart_err(&e) {
2989 ::zeroclaw_log::record!(
2990 WARN,
2991 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2992 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2993 "Linq chat refused: gateway has no model configured; \
2994 visit /quickstart"
2995 );
2996 needs_quickstart_channel_reply()
2997 } else {
2998 ::zeroclaw_log::record!(
2999 ERROR,
3000 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3001 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3002 .with_attrs(
3003 ::serde_json::json!({"channel": "linq", "error": format!("{}", e)})
3004 ),
3005 "LLM error"
3006 );
3007 "Sorry, I couldn't process your message right now.".to_string()
3008 };
3009 let _ = linq.send(&SendMessage::new(reply, &msg.reply_target)).await;
3010 }
3011 }
3012 }
3013
3014 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
3016}
3017
3018#[cfg(feature = "channel-wati")]
3020async fn handle_wati_verify(
3021 State(state): State<AppState>,
3022 Query(params): Query<WatiVerifyQuery>,
3023) -> impl IntoResponse {
3024 if state.wati.is_none() {
3025 return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
3026 }
3027
3028 if let Some(challenge) = params.challenge {
3030 ::zeroclaw_log::record!(
3031 INFO,
3032 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3033 .with_attrs(::serde_json::json!({"channel": "wati"})),
3034 "webhook verified successfully"
3035 );
3036 return (StatusCode::OK, challenge);
3037 }
3038
3039 (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
3040}
3041
3042#[derive(Debug, serde::Deserialize)]
3043pub struct WatiVerifyQuery {
3044 #[serde(rename = "hub.challenge")]
3045 pub challenge: Option<String>,
3046}
3047
3048#[cfg(feature = "channel-wati")]
3050async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
3051 let Some(ref wati) = state.wati else {
3052 return (
3053 StatusCode::NOT_FOUND,
3054 Json(serde_json::json!({"error": "WATI not configured"})),
3055 );
3056 };
3057
3058 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
3060 return (
3061 StatusCode::BAD_REQUEST,
3062 Json(serde_json::json!({"error": "Invalid JSON payload"})),
3063 );
3064 };
3065
3066 let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
3068
3069 let messages = if matches!(msg_type, "audio" | "voice") {
3070 if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
3072 wati.parse_audio_as_message(&payload, transcript)
3073 } else {
3074 vec![]
3075 }
3076 } else {
3077 wati.parse_webhook_payload(&payload)
3078 };
3079
3080 if messages.is_empty() {
3081 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
3082 }
3083
3084 for msg in &messages {
3086 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "wati", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
3087 let session_id = sender_session_id("wati", msg);
3088
3089 if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
3091 let key = wati_memory_key(msg);
3092 let _ = state
3093 .mem
3094 .store(
3095 &key,
3096 &msg.content,
3097 MemoryCategory::Conversation,
3098 Some(&session_id),
3099 )
3100 .await;
3101 }
3102
3103 match Box::pin(run_gateway_chat_with_tools(
3105 &state,
3106 &msg.content,
3107 Some(&session_id),
3108 None,
3109 ))
3110 .await
3111 {
3112 Ok(GatewayChatOutcome { response, .. }) => {
3113 if let Err(e) = wati
3115 .send(&SendMessage::new(response, &msg.reply_target))
3116 .await
3117 {
3118 ::zeroclaw_log::record!(
3119 ERROR,
3120 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3121 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3122 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
3123 "Failed to send WATI reply"
3124 );
3125 }
3126 }
3127 Err(e) => {
3128 let reply = if is_needs_quickstart_err(&e) {
3129 ::zeroclaw_log::record!(
3130 WARN,
3131 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3132 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3133 "WATI chat refused: gateway has no model configured; \
3134 visit /quickstart"
3135 );
3136 needs_quickstart_channel_reply()
3137 } else {
3138 ::zeroclaw_log::record!(
3139 ERROR,
3140 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3141 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3142 .with_attrs(
3143 ::serde_json::json!({"channel": "wati", "error": format!("{}", e)})
3144 ),
3145 "LLM error"
3146 );
3147 "Sorry, I couldn't process your message right now.".to_string()
3148 };
3149 let _ = wati.send(&SendMessage::new(reply, &msg.reply_target)).await;
3150 }
3151 }
3152 }
3153
3154 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
3156}
3157
3158#[cfg(feature = "channel-nextcloud")]
3160async fn handle_nextcloud_talk_webhook(
3161 State(state): State<AppState>,
3162 headers: HeaderMap,
3163 body: Bytes,
3164) -> impl IntoResponse {
3165 let Some(ref nextcloud_talk) = state.nextcloud_talk else {
3166 return (
3167 StatusCode::NOT_FOUND,
3168 Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
3169 );
3170 };
3171
3172 let body_str = String::from_utf8_lossy(&body);
3173
3174 if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
3176 let random = headers
3177 .get("X-Nextcloud-Talk-Random")
3178 .and_then(|v| v.to_str().ok())
3179 .unwrap_or("");
3180
3181 let signature = headers
3182 .get("X-Nextcloud-Talk-Signature")
3183 .and_then(|v| v.to_str().ok())
3184 .unwrap_or("");
3185
3186 if !zeroclaw_channels::nextcloud_talk::verify_nextcloud_talk_signature(
3187 webhook_secret,
3188 random,
3189 &body_str,
3190 signature,
3191 ) {
3192 ::zeroclaw_log::record!(
3193 WARN,
3194 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3195 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3196 &format!(
3197 "Nextcloud Talk webhook signature verification failed (signature: {})",
3198 if signature.is_empty() {
3199 "missing"
3200 } else {
3201 "invalid"
3202 }
3203 )
3204 );
3205 return (
3206 StatusCode::UNAUTHORIZED,
3207 Json(serde_json::json!({"error": "Invalid signature"})),
3208 );
3209 }
3210 }
3211
3212 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
3214 return (
3215 StatusCode::BAD_REQUEST,
3216 Json(serde_json::json!({"error": "Invalid JSON payload"})),
3217 );
3218 };
3219
3220 let messages = nextcloud_talk.parse_webhook_payload(&payload);
3222 if messages.is_empty() {
3223 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
3225 }
3226
3227 for msg in messages {
3232 let state = state.clone();
3233 let nextcloud_talk = Arc::clone(nextcloud_talk);
3234 zeroclaw_spawn::spawn!(async move {
3235 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "nextcloud_talk", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
3236 let session_id = sender_session_id("nextcloud_talk", &msg);
3237
3238 if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
3239 let key = nextcloud_talk_memory_key(&msg);
3240 let _ = state
3241 .mem
3242 .store(
3243 &key,
3244 &msg.content,
3245 MemoryCategory::Conversation,
3246 Some(&session_id),
3247 )
3248 .await;
3249 }
3250
3251 match Box::pin(run_gateway_chat_with_tools(
3252 &state,
3253 &msg.content,
3254 Some(&session_id),
3255 None,
3256 ))
3257 .await
3258 {
3259 Ok(GatewayChatOutcome { response, .. }) => {
3260 if let Err(e) = nextcloud_talk
3261 .send(&SendMessage::new(response, &msg.reply_target))
3262 .await
3263 {
3264 ::zeroclaw_log::record!(
3265 ERROR,
3266 ::zeroclaw_log::Event::new(
3267 module_path!(),
3268 ::zeroclaw_log::Action::Fail
3269 )
3270 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3271 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
3272 "Failed to send Nextcloud Talk reply"
3273 );
3274 }
3275 }
3276 Err(e) => {
3277 let reply = if is_needs_quickstart_err(&e) {
3278 ::zeroclaw_log::record!(
3279 WARN,
3280 ::zeroclaw_log::Event::new(
3281 module_path!(),
3282 ::zeroclaw_log::Action::Note
3283 )
3284 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
3285 "Nextcloud Talk chat refused: gateway has no model configured; \
3286 visit /quickstart"
3287 );
3288 needs_quickstart_channel_reply()
3289 } else {
3290 ::zeroclaw_log::record!(ERROR, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail).with_outcome(::zeroclaw_log::EventOutcome::Failure).with_attrs(::serde_json::json!({"channel": "nextcloud_talk", "error": format!("{}", e)})), "LLM error");
3291 "Sorry, I couldn't process your message right now.".to_string()
3292 };
3293 let _ = nextcloud_talk
3294 .send(&SendMessage::new(reply, &msg.reply_target))
3295 .await;
3296 }
3297 }
3298 });
3299 }
3300
3301 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
3302}
3303
3304#[cfg(feature = "channel-email")]
3307const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
3308
3309#[cfg(feature = "channel-email")]
3311async fn handle_gmail_push_webhook(
3312 State(state): State<AppState>,
3313 headers: HeaderMap,
3314 body: Bytes,
3315) -> impl IntoResponse {
3316 let Some(ref gmail_push) = state.gmail_push else {
3317 return (
3318 StatusCode::NOT_FOUND,
3319 Json(serde_json::json!({"error": "Gmail push not configured"})),
3320 );
3321 };
3322
3323 if body.len() > GMAIL_WEBHOOK_MAX_BODY {
3325 return (
3326 StatusCode::PAYLOAD_TOO_LARGE,
3327 Json(serde_json::json!({"error": "Request body too large"})),
3328 );
3329 }
3330
3331 let secret = gmail_push.config.webhook_secret.clone();
3333 if !secret.is_empty() {
3334 let provided = headers
3335 .get(axum::http::header::AUTHORIZATION)
3336 .and_then(|v| v.to_str().ok())
3337 .and_then(|auth| auth.strip_prefix("Bearer "))
3338 .unwrap_or("");
3339
3340 if provided != secret {
3341 ::zeroclaw_log::record!(
3342 WARN,
3343 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3344 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
3345 .with_attrs(::serde_json::json!({"channel": "gmail_push"})),
3346 "webhook: unauthorized request"
3347 );
3348 return (
3349 StatusCode::UNAUTHORIZED,
3350 Json(serde_json::json!({"error": "Unauthorized"})),
3351 );
3352 }
3353 }
3354
3355 let body_str = String::from_utf8_lossy(&body);
3356 let envelope: zeroclaw_channels::gmail_push::PubSubEnvelope =
3357 match serde_json::from_str(&body_str) {
3358 Ok(e) => e,
3359 Err(e) => {
3360 ::zeroclaw_log::record!(
3361 WARN,
3362 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3363 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
3364 .with_attrs(
3365 ::serde_json::json!({"error": format!("{}", e), "channel": "gmail_push"})
3366 ),
3367 "webhook: invalid payload"
3368 );
3369 return (
3370 StatusCode::BAD_REQUEST,
3371 Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
3372 );
3373 }
3374 };
3375
3376 let channel = Arc::clone(gmail_push);
3378 zeroclaw_spawn::spawn!(async move {
3379 if let Err(e) = channel.handle_notification(&envelope).await {
3380 ::zeroclaw_log::record!(
3381 ERROR,
3382 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3383 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3384 .with_attrs(
3385 ::serde_json::json!({"channel": "gmail_push", "error": format!("{}", e)})
3386 ),
3387 "push notification processing failed"
3388 );
3389 }
3390 });
3391
3392 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
3394}
3395
3396#[derive(serde::Serialize)]
3402struct AdminResponse {
3403 success: bool,
3404 message: String,
3405}
3406
3407fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
3409 if peer.ip().is_loopback() {
3410 Ok(())
3411 } else {
3412 Err((
3413 StatusCode::FORBIDDEN,
3414 Json(serde_json::json!({
3415 "error": "Admin endpoints are restricted to localhost"
3416 })),
3417 ))
3418 }
3419}
3420
3421async fn handle_admin_shutdown(
3423 State(state): State<AppState>,
3424 ConnectInfo(peer): ConnectInfo<SocketAddr>,
3425) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3426 require_localhost(&peer)?;
3427 ::zeroclaw_log::record!(
3428 INFO,
3429 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3430 "admin shutdown request received; initiating graceful shutdown"
3431 );
3432
3433 let body = AdminResponse {
3434 success: true,
3435 message: "Gateway shutdown initiated".to_string(),
3436 };
3437
3438 let _ = state.shutdown_tx.send(true);
3439
3440 Ok((StatusCode::OK, Json(body)))
3441}
3442
3443#[derive(Debug, PartialEq, Eq)]
3447enum AdminReloadGate {
3448 Allow,
3450 RequireAuth,
3453 Forbidden,
3455 ForbiddenNoPairing,
3460}
3461
3462fn admin_reload_gate(
3469 is_loopback: bool,
3470 allow_remote_admin: bool,
3471 require_pairing: bool,
3472) -> AdminReloadGate {
3473 if is_loopback {
3474 AdminReloadGate::Allow
3475 } else if !allow_remote_admin {
3476 AdminReloadGate::Forbidden
3477 } else if require_pairing {
3478 AdminReloadGate::RequireAuth
3479 } else {
3480 AdminReloadGate::ForbiddenNoPairing
3481 }
3482}
3483
3484async fn handle_admin_reload(
3507 State(state): State<AppState>,
3508 ConnectInfo(peer): ConnectInfo<SocketAddr>,
3509 headers: HeaderMap,
3510) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3511 let allow_remote = state.config.read().gateway.allow_remote_admin;
3516 let require_pairing = state.pairing.require_pairing();
3520 match admin_reload_gate(peer.ip().is_loopback(), allow_remote, require_pairing) {
3521 AdminReloadGate::Allow => {}
3522 AdminReloadGate::RequireAuth => api::require_auth(&state, &headers)?,
3523 AdminReloadGate::Forbidden => {
3524 return Err((
3525 StatusCode::FORBIDDEN,
3526 Json(serde_json::json!({
3527 "error": "Remote admin reload is disabled. Call from localhost, \
3528 or set gateway.allow_remote_admin = true (with pairing \
3529 enabled, then pair) to allow authenticated remote reloads."
3530 })),
3531 ));
3532 }
3533 AdminReloadGate::ForbiddenNoPairing => {
3534 return Err((
3535 StatusCode::FORBIDDEN,
3536 Json(serde_json::json!({
3537 "error": "Remote admin reload requires pairing. \
3538 gateway.allow_remote_admin is enabled but \
3539 gateway.require_pairing is off, so remote callers \
3540 cannot be authenticated. Enable require_pairing, or \
3541 call /admin/reload from localhost."
3542 })),
3543 ));
3544 }
3545 }
3546
3547 let Some(reload_tx) = state.reload_tx.clone() else {
3548 return Err((
3549 StatusCode::SERVICE_UNAVAILABLE,
3550 Json(serde_json::json!({
3551 "error": "no daemon supervisor — running as standalone gateway. \
3552 Restart the process to pick up config changes."
3553 })),
3554 ));
3555 };
3556
3557 ::zeroclaw_log::record!(
3558 INFO,
3559 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3560 "admin reload request received"
3561 );
3562 state
3567 .pending_reload
3568 .store(false, std::sync::atomic::Ordering::Relaxed);
3569 let shutdown_tx = state.shutdown_tx.clone();
3582 zeroclaw_spawn::spawn!(async move {
3584 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3585 let _ = shutdown_tx.send(true);
3587 let _ = reload_tx.send(true);
3589 });
3590
3591 Ok((
3592 StatusCode::OK,
3593 Json(AdminResponse {
3594 success: true,
3595 message: "Daemon reload initiated".to_string(),
3596 }),
3597 ))
3598}
3599
3600async fn handle_admin_paircode(
3602 State(state): State<AppState>,
3603 ConnectInfo(peer): ConnectInfo<SocketAddr>,
3604) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3605 require_localhost(&peer)?;
3606 let code = state.pairing.pairing_code();
3607
3608 let body = if let Some(c) = code {
3609 serde_json::json!({
3610 "success": true,
3611 "pairing_required": state.pairing.require_pairing(),
3612 "pairing_code": c,
3613 "message": "Use this one-time code to pair"
3614 })
3615 } else {
3616 serde_json::json!({
3617 "success": true,
3618 "pairing_required": state.pairing.require_pairing(),
3619 "pairing_code": null,
3620 "message": if state.pairing.require_pairing() {
3621 "Pairing is active but no new code available (already paired or code expired)"
3622 } else {
3623 "Pairing is disabled for this gateway"
3624 }
3625 })
3626 };
3627
3628 Ok((StatusCode::OK, Json(body)))
3629}
3630
3631#[derive(Debug, serde::Deserialize, Default)]
3641pub struct AdminPaircodeQuery {
3642 #[serde(default)]
3643 pub rotate: Option<String>,
3644}
3645
3646async fn handle_admin_paircode_new(
3652 State(state): State<AppState>,
3653 ConnectInfo(peer): ConnectInfo<SocketAddr>,
3654 Query(params): Query<AdminPaircodeQuery>,
3655) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3656 require_localhost(&peer)?;
3657
3658 if !state.pairing.require_pairing() {
3659 let body = serde_json::json!({
3660 "success": false,
3661 "pairing_required": false,
3662 "pairing_code": null,
3663 "message": "Pairing is disabled for this gateway"
3664 });
3665 return Ok((StatusCode::BAD_REQUEST, Json(body)));
3666 }
3667
3668 let rotate = params
3669 .rotate
3670 .as_deref()
3671 .map(str::trim)
3672 .filter(|s| !s.is_empty());
3673
3674 let revocation_message = match rotate {
3675 Some("all") => {
3676 let revoked = state.pairing.revoke_all_tokens();
3677 if let Some(registry) = state.device_registry.as_ref() {
3678 if let Err(e) = registry.clear() {
3679 let body = serde_json::json!({
3680 "success": false,
3681 "pairing_required": true,
3682 "pairing_code": null,
3683 "message": format!("Tokens revoked in memory but device registry clear failed: {e}"),
3684 });
3685 return Ok((StatusCode::INTERNAL_SERVER_ERROR, Json(body)));
3686 }
3687 }
3688 if let Err(e) = persist_pairing_tokens(state.config.clone(), &state.pairing).await {
3689 let body = serde_json::json!({
3690 "success": false,
3691 "pairing_required": true,
3692 "pairing_code": null,
3693 "message": format!("Tokens revoked in memory but config persist failed: {e}"),
3694 });
3695 return Ok((StatusCode::INTERNAL_SERVER_ERROR, Json(body)));
3696 }
3697 ::zeroclaw_log::record!(
3698 INFO,
3699 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
3700 .with_attrs(::serde_json::json!({"revoked": revoked})),
3701 "all paired tokens revoked via admin endpoint"
3702 );
3703 Some(format!(
3704 "Revoked all {revoked} paired token(s) and cleared the device registry."
3705 ))
3706 }
3707 Some(device_id) => {
3708 let Some(registry) = state.device_registry.as_ref() else {
3709 let body = serde_json::json!({
3710 "success": false,
3711 "pairing_required": true,
3712 "pairing_code": null,
3713 "message": "Device registry is disabled; cannot rotate a single device.",
3714 });
3715 return Ok((StatusCode::SERVICE_UNAVAILABLE, Json(body)));
3716 };
3717 let token_hash = match registry.revoke(device_id) {
3718 Ok(Some(hash)) => hash,
3719 Ok(None) => {
3720 let body = serde_json::json!({
3721 "success": false,
3722 "pairing_required": true,
3723 "pairing_code": null,
3724 "message": format!("Device '{device_id}' not found; nothing revoked."),
3725 });
3726 return Ok((StatusCode::NOT_FOUND, Json(body)));
3727 }
3728 Err(e) => {
3729 let body = serde_json::json!({
3730 "success": false,
3731 "pairing_required": true,
3732 "pairing_code": null,
3733 "message": format!("Device registry error: {e}"),
3734 });
3735 return Ok((StatusCode::INTERNAL_SERVER_ERROR, Json(body)));
3736 }
3737 };
3738 state.pairing.revoke_token_hash(&token_hash);
3739 if let Err(e) = persist_pairing_tokens(state.config.clone(), &state.pairing).await {
3740 let body = serde_json::json!({
3741 "success": false,
3742 "pairing_required": true,
3743 "pairing_code": null,
3744 "message": format!("Token revoked in memory but config persist failed: {e}"),
3745 });
3746 return Ok((StatusCode::INTERNAL_SERVER_ERROR, Json(body)));
3747 }
3748 ::zeroclaw_log::record!(
3749 INFO,
3750 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3751 "single device token revoked via admin endpoint"
3752 );
3753 Some(format!(
3754 "Revoked the bearer token for device '{device_id}'."
3755 ))
3756 }
3757 None => None,
3758 };
3759
3760 let code = state
3761 .pairing
3762 .generate_new_pairing_code()
3763 .expect("require_pairing checked above");
3764 if rotate.is_none() {
3765 ::zeroclaw_log::record!(
3766 INFO,
3767 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3768 "new pairing code generated via admin endpoint"
3769 );
3770 }
3771
3772 let message = match revocation_message {
3773 Some(revoked) => {
3774 format!("{revoked} Use this one-time code to re-pair.")
3775 }
3776 None => "New pairing code generated — use this one-time code to pair".to_string(),
3777 };
3778
3779 let body = serde_json::json!({
3780 "success": true,
3781 "pairing_required": true,
3782 "pairing_code": code,
3783 "message": message,
3784 });
3785 Ok((StatusCode::OK, Json(body)))
3786}
3787
3788async fn handle_pair_code(State(state): State<AppState>) -> impl IntoResponse {
3796 let require = state.pairing.require_pairing();
3797 let is_paired = state.pairing.is_paired();
3798
3799 let code = if require && !is_paired {
3801 state.pairing.pairing_code()
3802 } else {
3803 None
3804 };
3805
3806 let body = serde_json::json!({
3807 "success": true,
3808 "pairing_required": require,
3809 "pairing_code": code,
3810 });
3811
3812 (StatusCode::OK, Json(body))
3813}
3814
3815#[cfg(test)]
3816mod tests {
3817 use super::*;
3818 use async_trait::async_trait;
3819 use axum::http::HeaderValue;
3820 use axum::response::IntoResponse;
3821 use http_body_util::BodyExt;
3822 use parking_lot::{Mutex, RwLock};
3823 use std::sync::atomic::{AtomicUsize, Ordering};
3824 #[cfg(feature = "channel-whatsapp-cloud")]
3825 use zeroclaw_api::channel::ChannelMessage;
3826 use zeroclaw_memory::{Memory, MemoryCategory, MemoryEntry};
3827 use zeroclaw_providers::ModelProvider;
3828
3829 fn generate_test_secret() -> String {
3831 let bytes: [u8; 32] = rand::random();
3832 hex::encode(bytes)
3833 }
3834
3835 #[test]
3836 fn security_body_limit_is_64kb() {
3837 assert_eq!(MAX_BODY_SIZE, 65_536);
3838 }
3839
3840 #[test]
3841 fn security_timeout_default_is_30_seconds() {
3842 assert_eq!(REQUEST_TIMEOUT_SECS, 30);
3843 }
3844
3845 #[test]
3846 fn gateway_timeout_uses_typed_config_default() {
3847 let cfg = zeroclaw_config::schema::GatewayConfig::default();
3848 assert_eq!(gateway_request_timeout_secs(&cfg), 30);
3849 }
3850
3851 #[test]
3852 fn paircode_recovery_command_includes_alternate_port() {
3853 assert_eq!(
3854 format_paircode_recovery_command("127.0.0.1", 42617),
3855 "zeroclaw gateway get-paircode --new --port 42617"
3856 );
3857 }
3858
3859 #[test]
3860 fn paircode_recovery_command_includes_specific_host_when_needed() {
3861 assert_eq!(
3862 format_paircode_recovery_command("192.168.1.20", 42617),
3863 "zeroclaw gateway get-paircode --new --port 42617 --host 192.168.1.20"
3864 );
3865 }
3866
3867 #[test]
3868 fn paircode_recovery_curl_targets_running_instance() {
3869 assert_eq!(
3870 format_paircode_recovery_curl("127.0.0.1", 42617, ""),
3871 "curl -s -X POST http://127.0.0.1:42617/admin/paircode/new"
3872 );
3873 }
3874
3875 #[test]
3876 fn paircode_recovery_curl_preserves_path_prefix() {
3877 assert_eq!(
3878 format_paircode_recovery_curl("127.0.0.1", 42617, "/gw"),
3879 "curl -s -X POST http://127.0.0.1:42617/gw/admin/paircode/new"
3880 );
3881 }
3882
3883 fn admin_paircode_state(
3887 tmp: &tempfile::TempDir,
3888 require_pairing: bool,
3889 with_registry: bool,
3890 ) -> AppState {
3891 let data_dir = tmp.path().join("workspace");
3892 std::fs::create_dir_all(&data_dir).unwrap();
3893 let config = Config {
3894 data_dir: data_dir.clone(),
3895 config_path: tmp.path().join("config.toml"),
3896 ..Config::default()
3897 };
3898 let registry = with_registry.then(|| Arc::new(api_pairing::DeviceRegistry::new(&data_dir)));
3899 AppState {
3900 config: Arc::new(RwLock::new(config)),
3901 model_provider: Arc::new(MockModelProvider::default()),
3902 model: "test-model".into(),
3903 temperature: None,
3904 mem: Arc::new(MockMemory),
3905 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
3906 Arc::new(MockMemory),
3907 zeroclaw_config::schema::MemoryConfig::default(),
3908 std::path::PathBuf::new(),
3909 )),
3910 auto_save: false,
3911 webhook_secret_hash: None,
3912 pairing: Arc::new(PairingGuard::new(require_pairing, &[])),
3913 trust_forwarded_headers: false,
3914 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3915 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3916 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3917 #[cfg(feature = "channel-whatsapp-cloud")]
3918 whatsapp: None,
3919 #[cfg(feature = "channel-whatsapp-cloud")]
3920 whatsapp_app_secret: None,
3921 #[cfg(feature = "channel-linq")]
3922 linq: HashMap::new(),
3923 #[cfg(feature = "channel-linq")]
3924 linq_signing_secrets: HashMap::new(),
3925 #[cfg(feature = "channel-nextcloud")]
3926 nextcloud_talk: None,
3927 #[cfg(feature = "channel-nextcloud")]
3928 nextcloud_talk_webhook_secret: None,
3929 #[cfg(feature = "channel-wati")]
3930 wati: None,
3931 #[cfg(feature = "channel-email")]
3932 gmail_push: None,
3933 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
3934 tools_registry: Arc::new(Vec::new()),
3935 cost_tracker: None,
3936 event_tx: tokio::sync::broadcast::channel(16).0,
3937 event_buffer: Arc::new(sse::EventBuffer::new(16)),
3938 shutdown_tx: tokio::sync::watch::channel(false).0,
3939 reload_tx: None,
3940 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3941 path_prefix: String::new(),
3942 web_dist_dir: None,
3943 session_backend: None,
3944 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
3945 8, 30, 600,
3946 )),
3947 device_registry: registry,
3948 pending_pairings: None,
3949 canvas_store: CanvasStore::new(),
3950 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
3951 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3952 tui_registry: None,
3953 #[cfg(feature = "webauthn")]
3954 webauthn: None,
3955 }
3956 }
3957
3958 async fn pair_device(state: &AppState, device_id: &str) -> String {
3961 let code = state
3962 .pairing
3963 .generate_new_pairing_code()
3964 .expect("pairing enabled");
3965 let token = state
3966 .pairing
3967 .try_pair(&code, device_id)
3968 .await
3969 .unwrap()
3970 .unwrap();
3971 state.device_registry.as_ref().unwrap().register(
3972 PairingGuard::token_hash(&token),
3973 api_pairing::DeviceInfo {
3974 id: device_id.to_string(),
3975 name: None,
3976 device_type: None,
3977 paired_at: chrono::Utc::now(),
3978 last_seen: chrono::Utc::now(),
3979 ip_address: None,
3980 capabilities: None,
3981 },
3982 );
3983 token
3984 }
3985
3986 async fn admin_paircode_response_json(
3987 result: Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)>,
3988 ) -> (StatusCode, serde_json::Value) {
3989 let response = result.into_response();
3990 let status = response.status();
3991 let bytes = response.into_body().collect().await.unwrap().to_bytes();
3992 let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
3993 (status, json)
3994 }
3995
3996 #[tokio::test]
3998 async fn admin_paircode_new_without_rotate_keeps_existing_tokens() {
3999 let tmp = tempfile::TempDir::new().unwrap();
4000 let state = admin_paircode_state(&tmp, true, true);
4001 let token = pair_device(&state, "dev-a").await;
4002
4003 let (status, json) = admin_paircode_response_json(
4004 handle_admin_paircode_new(
4005 State(state.clone()),
4006 test_connect_info(),
4007 Query(AdminPaircodeQuery::default()),
4008 )
4009 .await,
4010 )
4011 .await;
4012
4013 assert_eq!(status, StatusCode::OK);
4014 assert!(json["pairing_code"].is_string());
4015 assert!(
4016 state.pairing.is_authenticated(&token),
4017 "add-another-client path must not revoke existing tokens"
4018 );
4019 }
4020
4021 #[tokio::test]
4024 async fn admin_paircode_new_rotate_all_revokes_everything() {
4025 let tmp = tempfile::TempDir::new().unwrap();
4026 let state = admin_paircode_state(&tmp, true, true);
4027 let token_a = pair_device(&state, "dev-a").await;
4028 let token_b = pair_device(&state, "dev-b").await;
4029
4030 let (status, json) = admin_paircode_response_json(
4031 handle_admin_paircode_new(
4032 State(state.clone()),
4033 test_connect_info(),
4034 Query(AdminPaircodeQuery {
4035 rotate: Some("all".into()),
4036 }),
4037 )
4038 .await,
4039 )
4040 .await;
4041
4042 assert_eq!(status, StatusCode::OK);
4043 assert!(json["pairing_code"].is_string());
4044 assert!(!state.pairing.is_authenticated(&token_a));
4045 assert!(!state.pairing.is_authenticated(&token_b));
4046 assert!(
4047 state.config.read().gateway.paired_tokens.is_empty(),
4048 "rotate=all must persist an empty token set"
4049 );
4050 assert!(
4051 state.device_registry.as_ref().unwrap().list().is_empty(),
4052 "rotate=all must clear the device registry"
4053 );
4054 }
4055
4056 #[tokio::test]
4058 async fn admin_paircode_new_rotate_device_revokes_one() {
4059 let tmp = tempfile::TempDir::new().unwrap();
4060 let state = admin_paircode_state(&tmp, true, true);
4061 let token_a = pair_device(&state, "dev-a").await;
4062 let token_b = pair_device(&state, "dev-b").await;
4063
4064 let (status, json) = admin_paircode_response_json(
4065 handle_admin_paircode_new(
4066 State(state.clone()),
4067 test_connect_info(),
4068 Query(AdminPaircodeQuery {
4069 rotate: Some("dev-a".into()),
4070 }),
4071 )
4072 .await,
4073 )
4074 .await;
4075
4076 assert_eq!(status, StatusCode::OK);
4077 assert!(json["pairing_code"].is_string());
4078 assert!(!state.pairing.is_authenticated(&token_a));
4079 assert!(
4080 state.pairing.is_authenticated(&token_b),
4081 "targeted rotate must not touch other devices"
4082 );
4083 let old_hash = PairingGuard::token_hash(&token_a);
4084 assert!(
4085 !state
4086 .config
4087 .read()
4088 .gateway
4089 .paired_tokens
4090 .contains(&old_hash)
4091 );
4092 }
4093
4094 #[tokio::test]
4096 async fn admin_paircode_new_rotate_unknown_device_is_not_found() {
4097 let tmp = tempfile::TempDir::new().unwrap();
4098 let state = admin_paircode_state(&tmp, true, true);
4099 let token = pair_device(&state, "dev-a").await;
4100
4101 let (status, _json) = admin_paircode_response_json(
4102 handle_admin_paircode_new(
4103 State(state.clone()),
4104 test_connect_info(),
4105 Query(AdminPaircodeQuery {
4106 rotate: Some("ghost".into()),
4107 }),
4108 )
4109 .await,
4110 )
4111 .await;
4112
4113 assert_eq!(status, StatusCode::NOT_FOUND);
4114 assert!(
4115 state.pairing.is_authenticated(&token),
4116 "a not-found rotate must not revoke any token"
4117 );
4118 }
4119
4120 #[tokio::test]
4122 async fn admin_paircode_new_pairing_disabled_is_bad_request() {
4123 let tmp = tempfile::TempDir::new().unwrap();
4124 let state = admin_paircode_state(&tmp, false, false);
4125
4126 let (status, json) = admin_paircode_response_json(
4127 handle_admin_paircode_new(
4128 State(state),
4129 test_connect_info(),
4130 Query(AdminPaircodeQuery {
4131 rotate: Some("all".into()),
4132 }),
4133 )
4134 .await,
4135 )
4136 .await;
4137
4138 assert_eq!(status, StatusCode::BAD_REQUEST);
4139 assert_eq!(json["success"], false);
4140 }
4141
4142 #[test]
4143 fn long_running_request_timeout_default_is_ten_minutes() {
4144 assert_eq!(LONG_RUNNING_REQUEST_TIMEOUT_SECS, 600);
4145 }
4146
4147 #[test]
4148 fn long_running_request_timeout_uses_typed_config_default() {
4149 let cfg = zeroclaw_config::schema::GatewayConfig::default();
4150 assert_eq!(gateway_long_running_request_timeout_secs(&cfg), 600);
4151 }
4152
4153 #[test]
4154 fn webhook_body_requires_message_field() {
4155 let valid = r#"{"message": "hello"}"#;
4156 let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
4157 assert!(parsed.is_ok());
4158 assert_eq!(parsed.unwrap().message, "hello");
4159
4160 let missing = r#"{"other": "field"}"#;
4161 let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
4162 assert!(parsed.is_err());
4163 }
4164
4165 #[test]
4166 fn whatsapp_query_fields_are_optional() {
4167 let q = WhatsAppVerifyQuery {
4168 mode: None,
4169 verify_token: None,
4170 challenge: None,
4171 };
4172 assert!(q.mode.is_none());
4173 }
4174
4175 #[test]
4176 fn app_state_is_clone() {
4177 fn assert_clone<T: Clone>() {}
4178 assert_clone::<AppState>();
4179 }
4180
4181 #[tokio::test]
4188 async fn run_gateway_starts_with_zero_agents() {
4189 let tmp = tempfile::TempDir::new().unwrap();
4192 let config = zeroclaw_config::schema::Config {
4193 data_dir: tmp.path().join("workspace"),
4194 config_path: tmp.path().join("config.toml"),
4195 ..zeroclaw_config::schema::Config::default()
4196 };
4197 std::fs::create_dir_all(&config.data_dir).unwrap();
4198
4199 assert!(
4202 config.agents.is_empty(),
4203 "regression assumes default Config has no agents",
4204 );
4205
4206 let handle = zeroclaw_spawn::spawn!(async move {
4212 run_gateway("127.0.0.1", 0, config, None, None, None, None).await
4213 });
4214
4215 match tokio::time::timeout(
4216 std::time::Duration::from_millis(750),
4217 &mut Box::pin(async {
4218 let _ = tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4223 }),
4224 )
4225 .await
4226 {
4227 Ok(()) => {}
4228 Err(_) => panic!("test setup timed out before checking gateway state"),
4229 }
4230
4231 if handle.is_finished() {
4235 let result = handle.await.expect("task did not panic");
4236 panic!(
4237 "gateway exited during boot with zero agents — must stay up for reload/quickstart: {:?}",
4238 result
4239 );
4240 }
4241 handle.abort();
4242 }
4243
4244 #[tokio::test]
4253 async fn run_gateway_starts_with_unresolved_agent_risk_profile() {
4254 use zeroclaw_config::schema::AliasedAgentConfig;
4255
4256 let tmp = tempfile::TempDir::new().unwrap();
4259 let mut config = zeroclaw_config::schema::Config {
4260 data_dir: tmp.path().join("workspace"),
4261 config_path: tmp.path().join("config.toml"),
4262 ..zeroclaw_config::schema::Config::default()
4263 };
4264 std::fs::create_dir_all(&config.data_dir).unwrap();
4265
4266 let agent = AliasedAgentConfig {
4269 enabled: true,
4270 risk_profile: "definitely_not_configured".to_string(),
4271 ..AliasedAgentConfig::default()
4272 };
4273 config.agents.insert("fake123".to_string(), agent);
4274
4275 let handle = zeroclaw_spawn::spawn!(async move {
4276 run_gateway("127.0.0.1", 0, config, None, None, None, None).await
4277 });
4278
4279 match tokio::time::timeout(
4280 std::time::Duration::from_millis(750),
4281 &mut Box::pin(async {
4282 let _ = tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4283 }),
4284 )
4285 .await
4286 {
4287 Ok(()) => {}
4288 Err(_) => panic!("test setup timed out before checking gateway state"),
4289 }
4290
4291 if handle.is_finished() {
4292 let result = handle.await.expect("task did not panic");
4293 panic!(
4294 "gateway exited during boot when agent.risk_profile was unresolved \
4295 — must stay up so operator can fix via /admin/reload or /quickstart: {:?}",
4296 result
4297 );
4298 }
4299 handle.abort();
4300 }
4301
4302 #[tokio::test]
4303 async fn run_gateway_starts_with_mismatched_provider_api_key() {
4304 let mut config = Config::default();
4305 config.providers.models.anthropic.insert(
4306 "default".to_string(),
4307 zeroclaw_config::schema::AnthropicModelProviderConfig {
4308 base: zeroclaw_config::schema::ModelProviderConfig {
4309 model: Some("anthropic/claude-sonnet-4-6".to_string()),
4310 api_key: Some("sk-test-openai-shaped-key".to_string()),
4311 ..Default::default()
4312 },
4313 },
4314 );
4315
4316 let handle = zeroclaw_spawn::spawn!(async move {
4317 run_gateway("127.0.0.1", 0, config, None, None, None, None).await
4318 });
4319
4320 match tokio::time::timeout(
4321 std::time::Duration::from_millis(750),
4322 &mut Box::pin(async {
4323 let _ = tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4324 }),
4325 )
4326 .await
4327 {
4328 Ok(()) => {}
4329 Err(_) => panic!("test setup timed out before checking gateway state"),
4330 }
4331
4332 if handle.is_finished() {
4333 let result = handle.await.expect("task did not panic");
4334 panic!(
4335 "gateway exited during boot when seed provider API key was \
4336 mismatched — must stay up so operator can fix via /admin/reload \
4337 or /quickstart: {:?}",
4338 result
4339 );
4340 }
4341 handle.abort();
4342 }
4343
4344 #[tokio::test]
4345 async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
4346 let state = AppState {
4347 config: Arc::new(RwLock::new(Config::default())),
4348 model_provider: Arc::new(MockModelProvider::default()),
4349 model: "test-model".into(),
4350 temperature: None,
4351 mem: Arc::new(MockMemory),
4352 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
4353 Arc::new(MockMemory),
4354 zeroclaw_config::schema::MemoryConfig::default(),
4355 std::path::PathBuf::new(),
4356 )),
4357 auto_save: false,
4358 webhook_secret_hash: None,
4359 pairing: Arc::new(PairingGuard::new(false, &[])),
4360 trust_forwarded_headers: false,
4361 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4362 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4363 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4364 #[cfg(feature = "channel-whatsapp-cloud")]
4365 whatsapp: None,
4366 #[cfg(feature = "channel-whatsapp-cloud")]
4367 whatsapp_app_secret: None,
4368 #[cfg(feature = "channel-linq")]
4369 linq: HashMap::new(),
4370 #[cfg(feature = "channel-linq")]
4371 linq_signing_secrets: HashMap::new(),
4372 #[cfg(feature = "channel-nextcloud")]
4373 nextcloud_talk: None,
4374 #[cfg(feature = "channel-nextcloud")]
4375 nextcloud_talk_webhook_secret: None,
4376 #[cfg(feature = "channel-wati")]
4377 wati: None,
4378 #[cfg(feature = "channel-email")]
4379 gmail_push: None,
4380 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4381 tools_registry: Arc::new(Vec::new()),
4382 cost_tracker: None,
4383 event_tx: tokio::sync::broadcast::channel(16).0,
4384 event_buffer: Arc::new(sse::EventBuffer::new(16)),
4385 shutdown_tx: tokio::sync::watch::channel(false).0,
4386 reload_tx: None,
4387 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4388 path_prefix: String::new(),
4389 web_dist_dir: None,
4390 session_backend: None,
4391 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4392 8, 30, 600,
4393 )),
4394 device_registry: None,
4395 pending_pairings: None,
4396 canvas_store: CanvasStore::new(),
4397 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4398 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4399 tui_registry: None,
4400 #[cfg(feature = "webauthn")]
4401 webauthn: None,
4402 };
4403
4404 let response = handle_metrics(State(state)).await.into_response();
4405 assert_eq!(response.status(), StatusCode::OK);
4406 assert_eq!(
4407 response
4408 .headers()
4409 .get(header::CONTENT_TYPE)
4410 .and_then(|value| value.to_str().ok()),
4411 Some(PROMETHEUS_CONTENT_TYPE)
4412 );
4413
4414 let body = response.into_body().collect().await.unwrap().to_bytes();
4415 let text = String::from_utf8(body.to_vec()).unwrap();
4416 assert!(text.contains("Prometheus backend not enabled"));
4417 }
4418
4419 #[cfg(feature = "observability-prometheus")]
4420 #[tokio::test]
4421 async fn metrics_endpoint_renders_prometheus_output() {
4422 let event_tx = tokio::sync::broadcast::channel(16).0;
4423 let prom = zeroclaw_runtime::observability::PrometheusObserver::new();
4424 zeroclaw_runtime::observability::Observer::record_event(
4425 &prom,
4426 &zeroclaw_runtime::observability::ObserverEvent::HeartbeatTick,
4427 );
4428
4429 let observer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::new(prom);
4430 let state = AppState {
4431 config: Arc::new(RwLock::new(Config::default())),
4432 model_provider: Arc::new(MockModelProvider::default()),
4433 model: "test-model".into(),
4434 temperature: None,
4435 mem: Arc::new(MockMemory),
4436 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
4437 Arc::new(MockMemory),
4438 zeroclaw_config::schema::MemoryConfig::default(),
4439 std::path::PathBuf::new(),
4440 )),
4441 auto_save: false,
4442 webhook_secret_hash: None,
4443 pairing: Arc::new(PairingGuard::new(false, &[])),
4444 trust_forwarded_headers: false,
4445 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4446 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4447 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4448 #[cfg(feature = "channel-whatsapp-cloud")]
4449 whatsapp: None,
4450 #[cfg(feature = "channel-whatsapp-cloud")]
4451 whatsapp_app_secret: None,
4452 #[cfg(feature = "channel-linq")]
4453 linq: HashMap::new(),
4454 #[cfg(feature = "channel-linq")]
4455 linq_signing_secrets: HashMap::new(),
4456 #[cfg(feature = "channel-nextcloud")]
4457 nextcloud_talk: None,
4458 #[cfg(feature = "channel-nextcloud")]
4459 nextcloud_talk_webhook_secret: None,
4460 #[cfg(feature = "channel-wati")]
4461 wati: None,
4462 #[cfg(feature = "channel-email")]
4463 gmail_push: None,
4464 observer,
4465 tools_registry: Arc::new(Vec::new()),
4466 cost_tracker: None,
4467 event_tx,
4468 event_buffer: Arc::new(sse::EventBuffer::new(16)),
4469 shutdown_tx: tokio::sync::watch::channel(false).0,
4470 reload_tx: None,
4471 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4472 path_prefix: String::new(),
4473 web_dist_dir: None,
4474 session_backend: None,
4475 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4476 8, 30, 600,
4477 )),
4478 device_registry: None,
4479 pending_pairings: None,
4480 canvas_store: CanvasStore::new(),
4481 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4482 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4483 tui_registry: None,
4484 #[cfg(feature = "webauthn")]
4485 webauthn: None,
4486 };
4487
4488 let response = handle_metrics(State(state)).await.into_response();
4489 assert_eq!(response.status(), StatusCode::OK);
4490
4491 let body = response.into_body().collect().await.unwrap().to_bytes();
4492 let text = String::from_utf8(body.to_vec()).unwrap();
4493 assert!(text.contains("zeroclaw_heartbeat_ticks_total 1"));
4494 }
4495
4496 #[test]
4497 fn gateway_rate_limiter_blocks_after_limit() {
4498 let limiter = GatewayRateLimiter::new(2, 2, 100);
4499 assert!(limiter.allow_pair("127.0.0.1"));
4500 assert!(limiter.allow_pair("127.0.0.1"));
4501 assert!(!limiter.allow_pair("127.0.0.1"));
4502 }
4503
4504 #[test]
4505 fn rate_limiter_sweep_removes_stale_entries() {
4506 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
4507 assert!(limiter.allow("ip-1"));
4509 assert!(limiter.allow("ip-2"));
4510 assert!(limiter.allow("ip-3"));
4511
4512 {
4513 let guard = limiter.requests.lock();
4514 assert_eq!(guard.0.len(), 3);
4515 }
4516
4517 {
4519 let mut guard = limiter.requests.lock();
4520 guard.1 = Instant::now()
4521 .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
4522 .unwrap();
4523 guard.0.get_mut("ip-2").unwrap().clear();
4525 guard.0.get_mut("ip-3").unwrap().clear();
4526 }
4527
4528 assert!(limiter.allow("ip-1"));
4530
4531 {
4532 let guard = limiter.requests.lock();
4533 assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
4534 assert!(guard.0.contains_key("ip-1"));
4535 }
4536 }
4537
4538 #[test]
4539 fn rate_limiter_zero_limit_always_allows() {
4540 let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
4541 for _ in 0..100 {
4542 assert!(limiter.allow("any-key"));
4543 }
4544 }
4545
4546 #[test]
4547 fn idempotency_store_rejects_duplicate_key() {
4548 let store = IdempotencyStore::new(Duration::from_secs(30), 10);
4549 assert!(store.record_if_new("req-1"));
4550 assert!(!store.record_if_new("req-1"));
4551 assert!(store.record_if_new("req-2"));
4552 }
4553
4554 #[test]
4555 fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
4556 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
4557 assert!(limiter.allow("ip-1"));
4558 assert!(limiter.allow("ip-2"));
4559 assert!(limiter.allow("ip-3"));
4560
4561 let guard = limiter.requests.lock();
4562 assert_eq!(guard.0.len(), 2);
4563 assert!(guard.0.contains_key("ip-2"));
4564 assert!(guard.0.contains_key("ip-3"));
4565 }
4566
4567 #[test]
4568 fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
4569 let store = IdempotencyStore::new(Duration::from_secs(300), 2);
4570 assert!(store.record_if_new("k1"));
4571 std::thread::sleep(Duration::from_millis(2));
4572 assert!(store.record_if_new("k2"));
4573 std::thread::sleep(Duration::from_millis(2));
4574 assert!(store.record_if_new("k3"));
4575
4576 let keys = store.keys.lock();
4577 assert_eq!(keys.len(), 2);
4578 assert!(!keys.contains_key("k1"));
4579 assert!(keys.contains_key("k2"));
4580 assert!(keys.contains_key("k3"));
4581 }
4582
4583 #[test]
4584 fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
4585 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
4586 let mut headers = HeaderMap::new();
4587 headers.insert(
4588 "X-Forwarded-For",
4589 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
4590 );
4591
4592 let key = client_key_from_request(Some(peer), &headers, false);
4593 assert_eq!(key, "10.0.0.5");
4594 }
4595
4596 #[test]
4597 fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
4598 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
4599 let mut headers = HeaderMap::new();
4600 headers.insert(
4601 "X-Forwarded-For",
4602 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
4603 );
4604
4605 let key = client_key_from_request(Some(peer), &headers, true);
4606 assert_eq!(key, "198.51.100.10");
4607 }
4608
4609 #[test]
4610 fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
4611 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
4612 let mut headers = HeaderMap::new();
4613 headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
4614
4615 let key = client_key_from_request(Some(peer), &headers, true);
4616 assert_eq!(key, "10.0.0.5");
4617 }
4618
4619 #[test]
4620 fn normalize_max_keys_uses_fallback_for_zero() {
4621 assert_eq!(normalize_max_keys(0, 10_000), 10_000);
4622 assert_eq!(normalize_max_keys(0, 0), 1);
4623 }
4624
4625 #[test]
4626 fn normalize_max_keys_preserves_nonzero_values() {
4627 assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
4628 assert_eq!(normalize_max_keys(1, 10_000), 1);
4629 }
4630
4631 #[tokio::test]
4632 async fn persist_pairing_tokens_writes_config_tokens() {
4633 let temp = tempfile::tempdir().unwrap();
4634 let config_path = temp.path().join("config.toml");
4635 let workspace_path = temp.path().join("workspace");
4636
4637 let config = Config {
4638 config_path: config_path.clone(),
4639 data_dir: workspace_path,
4640 ..Default::default()
4641 };
4642 config.save().await.unwrap();
4643
4644 let guard = PairingGuard::new(true, &[]);
4645 let code = guard.pairing_code().unwrap();
4646 let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
4647 assert!(guard.is_authenticated(&token));
4648
4649 let shared_config = Arc::new(RwLock::new(config));
4650 Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
4651 .await
4652 .unwrap();
4653
4654 let plaintext = {
4656 let in_memory = shared_config.read();
4657 assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
4658 in_memory.gateway.paired_tokens[0].clone()
4659 };
4660 assert_eq!(plaintext.len(), 64);
4661 assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
4662
4663 let saved = tokio::fs::read_to_string(config_path).await.unwrap();
4665 let raw_parsed: Config = toml::from_str(&saved).unwrap();
4666 assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
4667 let on_disk = &raw_parsed.gateway.paired_tokens[0];
4668 assert!(
4669 zeroclaw_runtime::security::SecretStore::is_encrypted(on_disk),
4670 "paired_token should be encrypted on disk"
4671 );
4672 }
4673
4674 #[test]
4675 fn webhook_memory_key_is_unique() {
4676 let key1 = webhook_memory_key();
4677 let key2 = webhook_memory_key();
4678
4679 assert!(key1.starts_with("webhook_msg_"));
4680 assert!(key2.starts_with("webhook_msg_"));
4681 assert_ne!(key1, key2);
4682 }
4683
4684 #[test]
4685 fn webhook_session_id_accepts_valid() {
4686 let mut headers = HeaderMap::new();
4687 headers.insert("X-Session-Id", HeaderValue::from_static("abc-DEF_123.foo"));
4688 assert_eq!(webhook_session_id(&headers), Some("abc-DEF_123.foo".into()));
4689 }
4690
4691 #[test]
4692 fn webhook_session_id_trims_whitespace() {
4693 let mut headers = HeaderMap::new();
4694 headers.insert("X-Session-Id", HeaderValue::from_static(" my-session "));
4695 assert_eq!(webhook_session_id(&headers), Some("my-session".into()));
4696 }
4697
4698 #[test]
4699 fn webhook_session_id_rejects_empty() {
4700 let mut headers = HeaderMap::new();
4701 headers.insert("X-Session-Id", HeaderValue::from_static(""));
4702 assert_eq!(webhook_session_id(&headers), None);
4703
4704 headers.insert("X-Session-Id", HeaderValue::from_static(" "));
4705 assert_eq!(webhook_session_id(&headers), None);
4706 }
4707
4708 #[test]
4709 fn webhook_session_id_rejects_missing() {
4710 let headers = HeaderMap::new();
4711 assert_eq!(webhook_session_id(&headers), None);
4712 }
4713
4714 #[test]
4715 fn webhook_session_id_rejects_oversized() {
4716 let mut headers = HeaderMap::new();
4717 let long = "a".repeat(129);
4718 headers.insert("X-Session-Id", HeaderValue::from_str(&long).unwrap());
4719 assert_eq!(webhook_session_id(&headers), None);
4720
4721 let at_limit = "b".repeat(128);
4722 headers.insert("X-Session-Id", HeaderValue::from_str(&at_limit).unwrap());
4723 assert!(webhook_session_id(&headers).is_some());
4724 }
4725
4726 #[test]
4727 fn webhook_session_id_rejects_invalid_chars() {
4728 let mut headers = HeaderMap::new();
4729 for bad in &[
4730 "has/slash",
4731 "has:colon",
4732 "has space",
4733 "has@at",
4734 "emoji\u{1f600}",
4735 ] {
4736 if let Ok(val) = HeaderValue::from_str(bad) {
4737 headers.insert("X-Session-Id", val);
4738 assert_eq!(webhook_session_id(&headers), None, "should reject: {bad}");
4739 }
4740 }
4741 }
4742
4743 #[cfg(feature = "channel-whatsapp-cloud")]
4744 #[test]
4745 fn whatsapp_memory_key_includes_sender_and_message_id() {
4746 let msg = ChannelMessage {
4747 id: "wamid-123".into(),
4748 sender: "+1234567890".into(),
4749 reply_target: "+1234567890".into(),
4750 content: "hello".into(),
4751 channel: "whatsapp".into(),
4752 channel_alias: None,
4753 timestamp: 1,
4754 thread_ts: None,
4755 interruption_scope_id: None,
4756 attachments: vec![],
4757 subject: None,
4758 };
4759
4760 let key = whatsapp_memory_key(&msg);
4761 assert_eq!(key, "whatsapp_+1234567890_wamid-123");
4762 }
4763
4764 #[derive(Default)]
4765 struct MockMemory;
4766
4767 #[async_trait]
4768 impl Memory for MockMemory {
4769 fn name(&self) -> &str {
4770 "mock"
4771 }
4772
4773 async fn store(
4774 &self,
4775 _key: &str,
4776 _content: &str,
4777 _category: MemoryCategory,
4778 _session_id: Option<&str>,
4779 ) -> anyhow::Result<()> {
4780 Ok(())
4781 }
4782
4783 async fn recall(
4784 &self,
4785 _query: &str,
4786 _limit: usize,
4787 _session_id: Option<&str>,
4788 _since: Option<&str>,
4789 _until: Option<&str>,
4790 ) -> anyhow::Result<Vec<MemoryEntry>> {
4791 Ok(Vec::new())
4792 }
4793
4794 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
4795 Ok(None)
4796 }
4797
4798 async fn list(
4799 &self,
4800 _category: Option<&MemoryCategory>,
4801 _session_id: Option<&str>,
4802 ) -> anyhow::Result<Vec<MemoryEntry>> {
4803 Ok(Vec::new())
4804 }
4805
4806 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
4807 Ok(false)
4808 }
4809
4810 async fn forget_for_agent(&self, _key: &str, _agent_id: &str) -> anyhow::Result<bool> {
4811 Ok(false)
4812 }
4813
4814 async fn count(&self) -> anyhow::Result<usize> {
4815 Ok(0)
4816 }
4817
4818 async fn health_check(&self) -> bool {
4819 true
4820 }
4821
4822 async fn store_with_agent(
4823 &self,
4824 _key: &str,
4825 _content: &str,
4826 _category: MemoryCategory,
4827 _session_id: Option<&str>,
4828 _namespace: Option<&str>,
4829 _importance: Option<f64>,
4830 _agent_id: Option<&str>,
4831 ) -> anyhow::Result<()> {
4832 Ok(())
4833 }
4834
4835 async fn recall_for_agents(
4836 &self,
4837 _allowed_agent_ids: &[&str],
4838 _query: &str,
4839 _limit: usize,
4840 _session_id: Option<&str>,
4841 _since: Option<&str>,
4842 _until: Option<&str>,
4843 ) -> anyhow::Result<Vec<MemoryEntry>> {
4844 Ok(Vec::new())
4845 }
4846 }
4847 impl ::zeroclaw_api::attribution::Attributable for MockMemory {
4848 fn role(&self) -> ::zeroclaw_api::attribution::Role {
4849 ::zeroclaw_api::attribution::Role::Memory(
4850 ::zeroclaw_api::attribution::MemoryKind::InMemory,
4851 )
4852 }
4853 fn alias(&self) -> &str {
4854 "MockMemory"
4855 }
4856 }
4857
4858 #[derive(Default)]
4859 struct MockModelProvider {
4860 calls: AtomicUsize,
4861 }
4862
4863 #[async_trait]
4864 impl ModelProvider for MockModelProvider {
4865 async fn chat_with_system(
4866 &self,
4867 _system_prompt: Option<&str>,
4868 _message: &str,
4869 _model: &str,
4870 _temperature: Option<f64>,
4871 ) -> anyhow::Result<String> {
4872 self.calls.fetch_add(1, Ordering::SeqCst);
4873 Ok("ok".into())
4874 }
4875 }
4876 impl ::zeroclaw_api::attribution::Attributable for MockModelProvider {
4877 fn role(&self) -> ::zeroclaw_api::attribution::Role {
4878 ::zeroclaw_api::attribution::Role::Provider(
4879 ::zeroclaw_api::attribution::ProviderKind::Model(
4880 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
4881 ),
4882 )
4883 }
4884 fn alias(&self) -> &str {
4885 "MockModelProvider"
4886 }
4887 }
4888
4889 #[derive(Default)]
4890 struct CapturingObserver {
4891 events: Mutex<Vec<zeroclaw_runtime::observability::ObserverEvent>>,
4892 }
4893
4894 impl zeroclaw_runtime::observability::Observer for CapturingObserver {
4895 fn record_event(&self, event: &zeroclaw_runtime::observability::ObserverEvent) {
4896 self.events.lock().push(event.clone());
4897 }
4898
4899 fn record_metric(&self, _metric: &zeroclaw_runtime::observability::traits::ObserverMetric) {
4900 }
4901
4902 fn name(&self) -> &str {
4903 "capturing"
4904 }
4905
4906 fn as_any(&self) -> &dyn std::any::Any {
4907 self
4908 }
4909 }
4910
4911 #[derive(Default)]
4912 struct TrackingMemory {
4913 keys: Mutex<Vec<String>>,
4914 }
4915
4916 #[async_trait]
4917 impl Memory for TrackingMemory {
4918 fn name(&self) -> &str {
4919 "tracking"
4920 }
4921
4922 async fn store(
4923 &self,
4924 key: &str,
4925 _content: &str,
4926 _category: MemoryCategory,
4927 _session_id: Option<&str>,
4928 ) -> anyhow::Result<()> {
4929 self.keys.lock().push(key.to_string());
4930 Ok(())
4931 }
4932
4933 async fn recall(
4934 &self,
4935 _query: &str,
4936 _limit: usize,
4937 _session_id: Option<&str>,
4938 _since: Option<&str>,
4939 _until: Option<&str>,
4940 ) -> anyhow::Result<Vec<MemoryEntry>> {
4941 Ok(Vec::new())
4942 }
4943
4944 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
4945 Ok(None)
4946 }
4947
4948 async fn list(
4949 &self,
4950 _category: Option<&MemoryCategory>,
4951 _session_id: Option<&str>,
4952 ) -> anyhow::Result<Vec<MemoryEntry>> {
4953 Ok(Vec::new())
4954 }
4955
4956 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
4957 Ok(false)
4958 }
4959
4960 async fn forget_for_agent(&self, _key: &str, _agent_id: &str) -> anyhow::Result<bool> {
4961 Ok(false)
4962 }
4963
4964 async fn count(&self) -> anyhow::Result<usize> {
4965 let size = self.keys.lock().len();
4966 Ok(size)
4967 }
4968
4969 async fn health_check(&self) -> bool {
4970 true
4971 }
4972
4973 async fn store_with_agent(
4974 &self,
4975 key: &str,
4976 content: &str,
4977 category: MemoryCategory,
4978 session_id: Option<&str>,
4979 _namespace: Option<&str>,
4980 _importance: Option<f64>,
4981 _agent_id: Option<&str>,
4982 ) -> anyhow::Result<()> {
4983 self.store(key, content, category, session_id).await
4984 }
4985
4986 async fn recall_for_agents(
4987 &self,
4988 _allowed_agent_ids: &[&str],
4989 _query: &str,
4990 _limit: usize,
4991 _session_id: Option<&str>,
4992 _since: Option<&str>,
4993 _until: Option<&str>,
4994 ) -> anyhow::Result<Vec<MemoryEntry>> {
4995 Ok(Vec::new())
4996 }
4997 }
4998 impl ::zeroclaw_api::attribution::Attributable for TrackingMemory {
4999 fn role(&self) -> ::zeroclaw_api::attribution::Role {
5000 ::zeroclaw_api::attribution::Role::Memory(
5001 ::zeroclaw_api::attribution::MemoryKind::InMemory,
5002 )
5003 }
5004 fn alias(&self) -> &str {
5005 "TrackingMemory"
5006 }
5007 }
5008
5009 fn test_connect_info() -> ConnectInfo<SocketAddr> {
5010 ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
5011 }
5012
5013 #[tokio::test]
5014 async fn webhook_idempotency_skips_duplicate_provider_calls() {
5015 let provider_impl = Arc::new(MockModelProvider::default());
5016 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5017 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5018
5019 let state = AppState {
5020 config: Arc::new(RwLock::new(Config::default())),
5021 model_provider,
5022 model: "test-model".into(),
5023 temperature: None,
5024 mem: memory.clone(),
5025 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5026 Arc::clone(&memory),
5027 zeroclaw_config::schema::MemoryConfig::default(),
5028 std::path::PathBuf::new(),
5029 )),
5030 auto_save: false,
5031 webhook_secret_hash: None,
5032 pairing: Arc::new(PairingGuard::new(false, &[])),
5033 trust_forwarded_headers: false,
5034 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5035 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5036 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5037 #[cfg(feature = "channel-whatsapp-cloud")]
5038 whatsapp: None,
5039 #[cfg(feature = "channel-whatsapp-cloud")]
5040 whatsapp_app_secret: None,
5041 #[cfg(feature = "channel-linq")]
5042 linq: HashMap::new(),
5043 #[cfg(feature = "channel-linq")]
5044 linq_signing_secrets: HashMap::new(),
5045 #[cfg(feature = "channel-nextcloud")]
5046 nextcloud_talk: None,
5047 #[cfg(feature = "channel-nextcloud")]
5048 nextcloud_talk_webhook_secret: None,
5049 #[cfg(feature = "channel-wati")]
5050 wati: None,
5051 #[cfg(feature = "channel-email")]
5052 gmail_push: None,
5053 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5054 tools_registry: Arc::new(Vec::new()),
5055 cost_tracker: None,
5056 event_tx: tokio::sync::broadcast::channel(16).0,
5057 event_buffer: Arc::new(sse::EventBuffer::new(16)),
5058 shutdown_tx: tokio::sync::watch::channel(false).0,
5059 reload_tx: None,
5060 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5061 path_prefix: String::new(),
5062 web_dist_dir: None,
5063 session_backend: None,
5064 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5065 8, 30, 600,
5066 )),
5067 device_registry: None,
5068 pending_pairings: None,
5069 canvas_store: CanvasStore::new(),
5070 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5071 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5072 tui_registry: None,
5073 #[cfg(feature = "webauthn")]
5074 webauthn: None,
5075 };
5076
5077 let mut headers = HeaderMap::new();
5078 headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
5079
5080 let body = Ok(Json(WebhookBody {
5081 message: "hello".into(),
5082 }));
5083 let first = handle_webhook(
5084 State(state.clone()),
5085 test_connect_info(),
5086 Query(WebhookQuery::default()),
5087 headers.clone(),
5088 body,
5089 )
5090 .await
5091 .into_response();
5092 assert_eq!(first.status(), StatusCode::OK);
5093
5094 let body = Ok(Json(WebhookBody {
5095 message: "hello".into(),
5096 }));
5097 let second = handle_webhook(
5098 State(state),
5099 test_connect_info(),
5100 Query(WebhookQuery::default()),
5101 headers,
5102 body,
5103 )
5104 .await
5105 .into_response();
5106 assert_eq!(second.status(), StatusCode::OK);
5107
5108 let payload = second.into_body().collect().await.unwrap().to_bytes();
5109 let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
5110 assert_eq!(parsed["status"], "duplicate");
5111 assert_eq!(parsed["idempotent"], true);
5112 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
5113 }
5114
5115 #[tokio::test]
5116 async fn webhook_unknown_agent_rejected_before_dispatch() {
5117 let provider_impl = Arc::new(MockModelProvider::default());
5118 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5119 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5120
5121 let state = AppState {
5122 config: Arc::new(RwLock::new(Config::default())),
5123 model_provider,
5124 model: "test-model".into(),
5125 temperature: None,
5126 mem: memory.clone(),
5127 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5128 Arc::clone(&memory),
5129 zeroclaw_config::schema::MemoryConfig::default(),
5130 std::path::PathBuf::new(),
5131 )),
5132 auto_save: false,
5133 webhook_secret_hash: None,
5134 pairing: Arc::new(PairingGuard::new(false, &[])),
5135 trust_forwarded_headers: false,
5136 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5137 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5138 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5139 #[cfg(feature = "channel-whatsapp-cloud")]
5140 whatsapp: None,
5141 #[cfg(feature = "channel-whatsapp-cloud")]
5142 whatsapp_app_secret: None,
5143 #[cfg(feature = "channel-linq")]
5144 linq: HashMap::new(),
5145 #[cfg(feature = "channel-linq")]
5146 linq_signing_secrets: HashMap::new(),
5147 #[cfg(feature = "channel-nextcloud")]
5148 nextcloud_talk: None,
5149 #[cfg(feature = "channel-nextcloud")]
5150 nextcloud_talk_webhook_secret: None,
5151 #[cfg(feature = "channel-wati")]
5152 wati: None,
5153 #[cfg(feature = "channel-email")]
5154 gmail_push: None,
5155 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5156 tools_registry: Arc::new(Vec::new()),
5157 cost_tracker: None,
5158 event_tx: tokio::sync::broadcast::channel(16).0,
5159 event_buffer: Arc::new(sse::EventBuffer::new(16)),
5160 shutdown_tx: tokio::sync::watch::channel(false).0,
5161 reload_tx: None,
5162 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5163 path_prefix: String::new(),
5164 web_dist_dir: None,
5165 session_backend: None,
5166 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5167 8, 30, 600,
5168 )),
5169 device_registry: None,
5170 pending_pairings: None,
5171 canvas_store: CanvasStore::new(),
5172 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5173 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5174 tui_registry: None,
5175 #[cfg(feature = "webauthn")]
5176 webauthn: None,
5177 };
5178
5179 let mut headers = HeaderMap::new();
5181 headers.insert("X-Idempotency-Key", HeaderValue::from_static("ghost-key"));
5182
5183 let response = handle_webhook(
5184 State(state.clone()),
5185 test_connect_info(),
5186 Query(WebhookQuery {
5187 agent: Some("ghost".into()),
5188 }),
5189 headers,
5190 Ok(Json(WebhookBody {
5191 message: "hello".into(),
5192 })),
5193 )
5194 .await
5195 .into_response();
5196 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
5197 let payload = response.into_body().collect().await.unwrap().to_bytes();
5198 let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
5199 assert!(
5200 parsed["error"]
5201 .as_str()
5202 .unwrap_or_default()
5203 .contains("Unknown agent `ghost`")
5204 );
5205 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
5206 assert!(state.idempotency_store.record_if_new("ghost-key"));
5208 }
5209
5210 #[tokio::test]
5211 async fn webhook_explicit_agent_reports_agent_model() {
5212 let provider_impl = Arc::new(MockModelProvider::default());
5213 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5214 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5215 let observer_impl = Arc::new(CapturingObserver::default());
5216 let observer: Arc<dyn zeroclaw_runtime::observability::Observer> = observer_impl.clone();
5217
5218 let mut config = Config::default();
5219 config.providers.models.anthropic.insert(
5220 "default".into(),
5221 zeroclaw_config::schema::AnthropicModelProviderConfig {
5222 base: zeroclaw_config::schema::ModelProviderConfig {
5223 model: Some("agent-model".into()),
5224 ..Default::default()
5225 },
5226 },
5227 );
5228 let expected_provider = "anthropic.default".to_string();
5229 config.agents.insert(
5230 "nova".to_string(),
5231 zeroclaw_config::schema::AliasedAgentConfig {
5232 enabled: true,
5233 model_provider: expected_provider.clone().into(),
5234 ..Default::default()
5235 },
5236 );
5237
5238 let state = AppState {
5239 config: Arc::new(RwLock::new(config)),
5240 model_provider,
5241 model: "startup-model".into(),
5242 temperature: None,
5243 mem: memory,
5244 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5245 Arc::new(MockMemory),
5246 zeroclaw_config::schema::MemoryConfig::default(),
5247 std::path::PathBuf::new(),
5248 )),
5249 auto_save: false,
5250 webhook_secret_hash: None,
5251 pairing: Arc::new(PairingGuard::new(false, &[])),
5252 trust_forwarded_headers: false,
5253 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5254 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5255 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5256 #[cfg(feature = "channel-whatsapp-cloud")]
5257 whatsapp: None,
5258 #[cfg(feature = "channel-whatsapp-cloud")]
5259 whatsapp_app_secret: None,
5260 #[cfg(feature = "channel-linq")]
5261 linq: HashMap::new(),
5262 #[cfg(feature = "channel-linq")]
5263 linq_signing_secrets: HashMap::new(),
5264 #[cfg(feature = "channel-nextcloud")]
5265 nextcloud_talk: None,
5266 #[cfg(feature = "channel-nextcloud")]
5267 nextcloud_talk_webhook_secret: None,
5268 #[cfg(feature = "channel-wati")]
5269 wati: None,
5270 #[cfg(feature = "channel-email")]
5271 gmail_push: None,
5272 observer,
5273 tools_registry: Arc::new(Vec::new()),
5274 cost_tracker: None,
5275 event_tx: tokio::sync::broadcast::channel(16).0,
5276 event_buffer: Arc::new(sse::EventBuffer::new(16)),
5277 shutdown_tx: tokio::sync::watch::channel(false).0,
5278 reload_tx: None,
5279 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5280 path_prefix: String::new(),
5281 web_dist_dir: None,
5282 session_backend: None,
5283 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5284 8, 30, 600,
5285 )),
5286 device_registry: None,
5287 pending_pairings: None,
5288 canvas_store: CanvasStore::new(),
5289 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5290 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5291 tui_registry: None,
5292 #[cfg(feature = "webauthn")]
5293 webauthn: None,
5294 };
5295
5296 let response = handle_webhook(
5297 State(state),
5298 test_connect_info(),
5299 Query(WebhookQuery {
5300 agent: Some("nova".into()),
5301 }),
5302 HeaderMap::new(),
5303 Ok(Json(WebhookBody {
5304 message: "hello".into(),
5305 })),
5306 )
5307 .await
5308 .into_response();
5309 assert_eq!(response.status(), StatusCode::OK);
5310 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
5311 let payload = response.into_body().collect().await.unwrap().to_bytes();
5312 let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
5313 assert_eq!(parsed["model"], "agent-model");
5314 let events = observer_impl.events.lock();
5315 assert!(
5316 events.iter().any(|event| matches!(
5317 event,
5318 zeroclaw_runtime::observability::ObserverEvent::AgentStart {
5319 model_provider,
5320 model,
5321 } if model_provider == &expected_provider && model == "agent-model"
5322 )),
5323 "expected AgentStart to use the explicit agent model; events were: {events:?}"
5324 );
5325 }
5326
5327 #[tokio::test]
5328 async fn webhook_autosave_stores_distinct_keys_per_request() {
5329 let provider_impl = Arc::new(MockModelProvider::default());
5330 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5331
5332 let tracking_impl = Arc::new(TrackingMemory::default());
5333 let memory: Arc<dyn Memory> = tracking_impl.clone();
5334
5335 let state = AppState {
5336 config: Arc::new(RwLock::new(Config::default())),
5337 model_provider,
5338 model: "test-model".into(),
5339 temperature: None,
5340 mem: memory,
5341 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5342 Arc::new(MockMemory),
5343 zeroclaw_config::schema::MemoryConfig::default(),
5344 std::path::PathBuf::new(),
5345 )),
5346 auto_save: true,
5347 webhook_secret_hash: None,
5348 pairing: Arc::new(PairingGuard::new(false, &[])),
5349 trust_forwarded_headers: false,
5350 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5351 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5352 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5353 #[cfg(feature = "channel-whatsapp-cloud")]
5354 whatsapp: None,
5355 #[cfg(feature = "channel-whatsapp-cloud")]
5356 whatsapp_app_secret: None,
5357 #[cfg(feature = "channel-linq")]
5358 linq: HashMap::new(),
5359 #[cfg(feature = "channel-linq")]
5360 linq_signing_secrets: HashMap::new(),
5361 #[cfg(feature = "channel-nextcloud")]
5362 nextcloud_talk: None,
5363 #[cfg(feature = "channel-nextcloud")]
5364 nextcloud_talk_webhook_secret: None,
5365 #[cfg(feature = "channel-wati")]
5366 wati: None,
5367 #[cfg(feature = "channel-email")]
5368 gmail_push: None,
5369 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5370 tools_registry: Arc::new(Vec::new()),
5371 cost_tracker: None,
5372 event_tx: tokio::sync::broadcast::channel(16).0,
5373 event_buffer: Arc::new(sse::EventBuffer::new(16)),
5374 shutdown_tx: tokio::sync::watch::channel(false).0,
5375 reload_tx: None,
5376 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5377 path_prefix: String::new(),
5378 web_dist_dir: None,
5379 session_backend: None,
5380 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5381 8, 30, 600,
5382 )),
5383 device_registry: None,
5384 pending_pairings: None,
5385 canvas_store: CanvasStore::new(),
5386 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5387 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5388 tui_registry: None,
5389 #[cfg(feature = "webauthn")]
5390 webauthn: None,
5391 };
5392
5393 let headers = HeaderMap::new();
5394
5395 let body1 = Ok(Json(WebhookBody {
5396 message: "hello one".into(),
5397 }));
5398 let first = handle_webhook(
5399 State(state.clone()),
5400 test_connect_info(),
5401 Query(WebhookQuery::default()),
5402 headers.clone(),
5403 body1,
5404 )
5405 .await
5406 .into_response();
5407 assert_eq!(first.status(), StatusCode::OK);
5408
5409 let body2 = Ok(Json(WebhookBody {
5410 message: "hello two".into(),
5411 }));
5412 let second = handle_webhook(
5413 State(state),
5414 test_connect_info(),
5415 Query(WebhookQuery::default()),
5416 headers,
5417 body2,
5418 )
5419 .await
5420 .into_response();
5421 assert_eq!(second.status(), StatusCode::OK);
5422
5423 let keys = tracking_impl.keys.lock().clone();
5424 assert_eq!(keys.len(), 2);
5425 assert_ne!(keys[0], keys[1]);
5426 assert!(keys[0].starts_with("webhook_msg_"));
5427 assert!(keys[1].starts_with("webhook_msg_"));
5428 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
5429 }
5430
5431 #[test]
5432 fn webhook_secret_hash_is_deterministic_and_nonempty() {
5433 let secret_a = generate_test_secret();
5434 let secret_b = generate_test_secret();
5435 let one = hash_webhook_secret(&secret_a);
5436 let two = hash_webhook_secret(&secret_a);
5437 let other = hash_webhook_secret(&secret_b);
5438
5439 assert_eq!(one, two);
5440 assert_ne!(one, other);
5441 assert_eq!(one.len(), 64);
5442 }
5443
5444 #[tokio::test]
5445 async fn webhook_secret_hash_rejects_missing_header() {
5446 let provider_impl = Arc::new(MockModelProvider::default());
5447 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5448 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5449 let secret = generate_test_secret();
5450
5451 let state = AppState {
5452 config: Arc::new(RwLock::new(Config::default())),
5453 model_provider,
5454 model: "test-model".into(),
5455 temperature: None,
5456 mem: memory.clone(),
5457 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5458 Arc::clone(&memory),
5459 zeroclaw_config::schema::MemoryConfig::default(),
5460 std::path::PathBuf::new(),
5461 )),
5462 auto_save: false,
5463 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
5464 pairing: Arc::new(PairingGuard::new(false, &[])),
5465 trust_forwarded_headers: false,
5466 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5467 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5468 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5469 #[cfg(feature = "channel-whatsapp-cloud")]
5470 whatsapp: None,
5471 #[cfg(feature = "channel-whatsapp-cloud")]
5472 whatsapp_app_secret: None,
5473 #[cfg(feature = "channel-linq")]
5474 linq: HashMap::new(),
5475 #[cfg(feature = "channel-linq")]
5476 linq_signing_secrets: HashMap::new(),
5477 #[cfg(feature = "channel-nextcloud")]
5478 nextcloud_talk: None,
5479 #[cfg(feature = "channel-nextcloud")]
5480 nextcloud_talk_webhook_secret: None,
5481 #[cfg(feature = "channel-wati")]
5482 wati: None,
5483 #[cfg(feature = "channel-email")]
5484 gmail_push: None,
5485 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5486 tools_registry: Arc::new(Vec::new()),
5487 cost_tracker: None,
5488 event_tx: tokio::sync::broadcast::channel(16).0,
5489 event_buffer: Arc::new(sse::EventBuffer::new(16)),
5490 shutdown_tx: tokio::sync::watch::channel(false).0,
5491 reload_tx: None,
5492 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5493 path_prefix: String::new(),
5494 web_dist_dir: None,
5495 session_backend: None,
5496 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5497 8, 30, 600,
5498 )),
5499 device_registry: None,
5500 pending_pairings: None,
5501 canvas_store: CanvasStore::new(),
5502 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5503 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5504 tui_registry: None,
5505 #[cfg(feature = "webauthn")]
5506 webauthn: None,
5507 };
5508
5509 let response = handle_webhook(
5510 State(state),
5511 test_connect_info(),
5512 Query(WebhookQuery::default()),
5513 HeaderMap::new(),
5514 Ok(Json(WebhookBody {
5515 message: "hello".into(),
5516 })),
5517 )
5518 .await
5519 .into_response();
5520
5521 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
5522 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
5523 }
5524
5525 #[tokio::test]
5526 async fn webhook_secret_hash_rejects_invalid_header() {
5527 let provider_impl = Arc::new(MockModelProvider::default());
5528 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5529 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5530 let valid_secret = generate_test_secret();
5531 let wrong_secret = generate_test_secret();
5532
5533 let state = AppState {
5534 config: Arc::new(RwLock::new(Config::default())),
5535 model_provider,
5536 model: "test-model".into(),
5537 temperature: None,
5538 mem: memory.clone(),
5539 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5540 Arc::clone(&memory),
5541 zeroclaw_config::schema::MemoryConfig::default(),
5542 std::path::PathBuf::new(),
5543 )),
5544 auto_save: false,
5545 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
5546 pairing: Arc::new(PairingGuard::new(false, &[])),
5547 trust_forwarded_headers: false,
5548 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5549 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5550 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5551 #[cfg(feature = "channel-whatsapp-cloud")]
5552 whatsapp: None,
5553 #[cfg(feature = "channel-whatsapp-cloud")]
5554 whatsapp_app_secret: None,
5555 #[cfg(feature = "channel-linq")]
5556 linq: HashMap::new(),
5557 #[cfg(feature = "channel-linq")]
5558 linq_signing_secrets: HashMap::new(),
5559 #[cfg(feature = "channel-nextcloud")]
5560 nextcloud_talk: None,
5561 #[cfg(feature = "channel-nextcloud")]
5562 nextcloud_talk_webhook_secret: None,
5563 #[cfg(feature = "channel-wati")]
5564 wati: None,
5565 #[cfg(feature = "channel-email")]
5566 gmail_push: None,
5567 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5568 tools_registry: Arc::new(Vec::new()),
5569 cost_tracker: None,
5570 event_tx: tokio::sync::broadcast::channel(16).0,
5571 event_buffer: Arc::new(sse::EventBuffer::new(16)),
5572 shutdown_tx: tokio::sync::watch::channel(false).0,
5573 reload_tx: None,
5574 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5575 path_prefix: String::new(),
5576 web_dist_dir: None,
5577 session_backend: None,
5578 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5579 8, 30, 600,
5580 )),
5581 device_registry: None,
5582 pending_pairings: None,
5583 canvas_store: CanvasStore::new(),
5584 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5585 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5586 tui_registry: None,
5587 #[cfg(feature = "webauthn")]
5588 webauthn: None,
5589 };
5590
5591 let mut headers = HeaderMap::new();
5592 headers.insert(
5593 "X-Webhook-Secret",
5594 HeaderValue::from_str(&wrong_secret).unwrap(),
5595 );
5596
5597 let response = handle_webhook(
5598 State(state),
5599 test_connect_info(),
5600 Query(WebhookQuery::default()),
5601 headers,
5602 Ok(Json(WebhookBody {
5603 message: "hello".into(),
5604 })),
5605 )
5606 .await
5607 .into_response();
5608
5609 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
5610 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
5611 }
5612
5613 #[tokio::test]
5614 async fn webhook_secret_hash_accepts_valid_header() {
5615 let provider_impl = Arc::new(MockModelProvider::default());
5616 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5617 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5618 let secret = generate_test_secret();
5619
5620 let state = AppState {
5621 config: Arc::new(RwLock::new(Config::default())),
5622 model_provider,
5623 model: "test-model".into(),
5624 temperature: None,
5625 mem: memory.clone(),
5626 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5627 Arc::clone(&memory),
5628 zeroclaw_config::schema::MemoryConfig::default(),
5629 std::path::PathBuf::new(),
5630 )),
5631 auto_save: false,
5632 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
5633 pairing: Arc::new(PairingGuard::new(false, &[])),
5634 trust_forwarded_headers: false,
5635 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5636 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5637 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5638 #[cfg(feature = "channel-whatsapp-cloud")]
5639 whatsapp: None,
5640 #[cfg(feature = "channel-whatsapp-cloud")]
5641 whatsapp_app_secret: None,
5642 #[cfg(feature = "channel-linq")]
5643 linq: HashMap::new(),
5644 #[cfg(feature = "channel-linq")]
5645 linq_signing_secrets: HashMap::new(),
5646 #[cfg(feature = "channel-nextcloud")]
5647 nextcloud_talk: None,
5648 #[cfg(feature = "channel-nextcloud")]
5649 nextcloud_talk_webhook_secret: None,
5650 #[cfg(feature = "channel-wati")]
5651 wati: None,
5652 #[cfg(feature = "channel-email")]
5653 gmail_push: None,
5654 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5655 tools_registry: Arc::new(Vec::new()),
5656 cost_tracker: None,
5657 event_tx: tokio::sync::broadcast::channel(16).0,
5658 event_buffer: Arc::new(sse::EventBuffer::new(16)),
5659 shutdown_tx: tokio::sync::watch::channel(false).0,
5660 reload_tx: None,
5661 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5662 path_prefix: String::new(),
5663 web_dist_dir: None,
5664 session_backend: None,
5665 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5666 8, 30, 600,
5667 )),
5668 device_registry: None,
5669 pending_pairings: None,
5670 canvas_store: CanvasStore::new(),
5671 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5672 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5673 tui_registry: None,
5674 #[cfg(feature = "webauthn")]
5675 webauthn: None,
5676 };
5677
5678 let mut headers = HeaderMap::new();
5679 headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
5680
5681 let response = handle_webhook(
5682 State(state),
5683 test_connect_info(),
5684 Query(WebhookQuery::default()),
5685 headers,
5686 Ok(Json(WebhookBody {
5687 message: "hello".into(),
5688 })),
5689 )
5690 .await
5691 .into_response();
5692
5693 assert_eq!(response.status(), StatusCode::OK);
5694 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
5695 }
5696
5697 #[cfg(feature = "channel-nextcloud")]
5698 fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
5699 use hmac::{Hmac, Mac};
5700 use sha2::Sha256;
5701
5702 let payload = format!("{random}{body}");
5703 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
5704 mac.update(payload.as_bytes());
5705 hex::encode(mac.finalize().into_bytes())
5706 }
5707
5708 #[cfg(feature = "channel-nextcloud")]
5709 #[tokio::test]
5710 async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
5711 let model_provider: Arc<dyn ModelProvider> = Arc::new(MockModelProvider::default());
5712 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5713
5714 let state = AppState {
5715 config: Arc::new(RwLock::new(Config::default())),
5716 model_provider,
5717 model: "test-model".into(),
5718 temperature: None,
5719 mem: memory.clone(),
5720 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5721 Arc::clone(&memory),
5722 zeroclaw_config::schema::MemoryConfig::default(),
5723 std::path::PathBuf::new(),
5724 )),
5725 auto_save: false,
5726 webhook_secret_hash: None,
5727 pairing: Arc::new(PairingGuard::new(false, &[])),
5728 trust_forwarded_headers: false,
5729 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5730 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5731 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5732 #[cfg(feature = "channel-whatsapp-cloud")]
5733 whatsapp: None,
5734 #[cfg(feature = "channel-whatsapp-cloud")]
5735 whatsapp_app_secret: None,
5736 #[cfg(feature = "channel-linq")]
5737 linq: HashMap::new(),
5738 #[cfg(feature = "channel-linq")]
5739 linq_signing_secrets: HashMap::new(),
5740 #[cfg(feature = "channel-nextcloud")]
5741 nextcloud_talk: None,
5742 #[cfg(feature = "channel-nextcloud")]
5743 nextcloud_talk_webhook_secret: None,
5744 #[cfg(feature = "channel-wati")]
5745 wati: None,
5746 #[cfg(feature = "channel-email")]
5747 gmail_push: None,
5748 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5749 tools_registry: Arc::new(Vec::new()),
5750 cost_tracker: None,
5751 event_tx: tokio::sync::broadcast::channel(16).0,
5752 event_buffer: Arc::new(sse::EventBuffer::new(16)),
5753 shutdown_tx: tokio::sync::watch::channel(false).0,
5754 reload_tx: None,
5755 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5756 path_prefix: String::new(),
5757 web_dist_dir: None,
5758 session_backend: None,
5759 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5760 8, 30, 600,
5761 )),
5762 device_registry: None,
5763 pending_pairings: None,
5764 canvas_store: CanvasStore::new(),
5765 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5766 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5767 tui_registry: None,
5768 #[cfg(feature = "webauthn")]
5769 webauthn: None,
5770 };
5771
5772 let response = Box::pin(handle_nextcloud_talk_webhook(
5773 State(state),
5774 HeaderMap::new(),
5775 Bytes::from_static(br#"{"type":"message"}"#),
5776 ))
5777 .await
5778 .into_response();
5779
5780 assert_eq!(response.status(), StatusCode::NOT_FOUND);
5781 }
5782
5783 #[cfg(feature = "channel-nextcloud")]
5784 #[tokio::test]
5785 async fn nextcloud_talk_webhook_rejects_invalid_signature() {
5786 let provider_impl = Arc::new(MockModelProvider::default());
5787 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
5788 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5789
5790 let alias = "nextcloud_talk_test_alias";
5791 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = Arc::new(Vec::new);
5792 let channel = Arc::new(NextcloudTalkChannel::new(
5793 "https://cloud.example.com".into(),
5794 "app-token".into(),
5795 String::new(),
5796 alias,
5797 peer_resolver,
5798 ));
5799
5800 let secret = "nextcloud-test-secret";
5801 let random = "seed-value";
5802 let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
5803 let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
5804 let invalid_signature = "deadbeef";
5805
5806 let state = AppState {
5807 config: Arc::new(RwLock::new(Config::default())),
5808 model_provider,
5809 model: "test-model".into(),
5810 temperature: None,
5811 mem: memory.clone(),
5812 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5813 Arc::clone(&memory),
5814 zeroclaw_config::schema::MemoryConfig::default(),
5815 std::path::PathBuf::new(),
5816 )),
5817 auto_save: false,
5818 webhook_secret_hash: None,
5819 pairing: Arc::new(PairingGuard::new(false, &[])),
5820 trust_forwarded_headers: false,
5821 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5822 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5823 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5824 #[cfg(feature = "channel-whatsapp-cloud")]
5825 whatsapp: None,
5826 #[cfg(feature = "channel-whatsapp-cloud")]
5827 whatsapp_app_secret: None,
5828 #[cfg(feature = "channel-linq")]
5829 linq: HashMap::new(),
5830 #[cfg(feature = "channel-linq")]
5831 linq_signing_secrets: HashMap::new(),
5832 nextcloud_talk: Some(channel),
5833 nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
5834 #[cfg(feature = "channel-wati")]
5835 wati: None,
5836 #[cfg(feature = "channel-email")]
5837 gmail_push: None,
5838 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5839 tools_registry: Arc::new(Vec::new()),
5840 cost_tracker: None,
5841 event_tx: tokio::sync::broadcast::channel(16).0,
5842 event_buffer: Arc::new(sse::EventBuffer::new(16)),
5843 shutdown_tx: tokio::sync::watch::channel(false).0,
5844 reload_tx: None,
5845 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5846 path_prefix: String::new(),
5847 web_dist_dir: None,
5848 session_backend: None,
5849 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5850 8, 30, 600,
5851 )),
5852 device_registry: None,
5853 pending_pairings: None,
5854 canvas_store: CanvasStore::new(),
5855 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5856 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5857 tui_registry: None,
5858 #[cfg(feature = "webauthn")]
5859 webauthn: None,
5860 };
5861
5862 let mut headers = HeaderMap::new();
5863 headers.insert(
5864 "X-Nextcloud-Talk-Random",
5865 HeaderValue::from_str(random).unwrap(),
5866 );
5867 headers.insert(
5868 "X-Nextcloud-Talk-Signature",
5869 HeaderValue::from_str(invalid_signature).unwrap(),
5870 );
5871
5872 let response = Box::pin(handle_nextcloud_talk_webhook(
5873 State(state),
5874 headers,
5875 Bytes::from(body),
5876 ))
5877 .await
5878 .into_response();
5879 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
5880 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
5881 }
5882
5883 #[cfg(feature = "channel-nextcloud")]
5887 #[derive(Default)]
5888 struct SlowProvider {
5889 calls: AtomicUsize,
5890 started_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
5891 }
5892
5893 #[cfg(feature = "channel-nextcloud")]
5894 #[async_trait]
5895 impl ModelProvider for SlowProvider {
5896 async fn chat_with_system(
5897 &self,
5898 _system_prompt: Option<&str>,
5899 _message: &str,
5900 _model: &str,
5901 _temperature: Option<f64>,
5902 ) -> anyhow::Result<String> {
5903 self.calls.fetch_add(1, Ordering::SeqCst);
5904 if let Some(tx) = self.started_tx.lock().take() {
5905 let _ = tx.send(());
5906 }
5907 tokio::time::sleep(Duration::from_secs(30)).await;
5908 Ok("slow ok".into())
5909 }
5910 }
5911 #[cfg(feature = "channel-nextcloud")]
5912 impl ::zeroclaw_api::attribution::Attributable for SlowProvider {
5913 fn role(&self) -> ::zeroclaw_api::attribution::Role {
5914 ::zeroclaw_api::attribution::Role::Provider(
5915 ::zeroclaw_api::attribution::ProviderKind::Model(
5916 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
5917 ),
5918 )
5919 }
5920 fn alias(&self) -> &str {
5921 "SlowProvider"
5922 }
5923 }
5924
5925 #[cfg(feature = "channel-nextcloud")]
5926 #[tokio::test]
5927 async fn nextcloud_talk_webhook_returns_before_llm_call_completes() {
5928 let (started_tx, started_rx) = tokio::sync::oneshot::channel();
5929 let provider_impl = Arc::new(SlowProvider {
5930 calls: AtomicUsize::new(0),
5931 started_tx: Mutex::new(Some(started_tx)),
5932 });
5933 let provider: Arc<dyn ModelProvider> = provider_impl.clone();
5934 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
5935
5936 let channel = Arc::new(NextcloudTalkChannel::new(
5937 "https://cloud.example.com".into(),
5938 "app-token".into(),
5939 String::new(),
5940 "default",
5941 Arc::new(|| vec!["*".to_string()]),
5942 ));
5943
5944 let body = r#"{"type":"message","object":{"token":"room-token"},"actor":{"id":"user_a","name":"User A"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
5945
5946 let state = AppState {
5947 config: Arc::new(RwLock::new(Config::default())),
5948 model_provider: provider,
5949 model: "test-model".into(),
5950 temperature: None,
5951 mem: memory.clone(),
5952 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
5953 Arc::clone(&memory),
5954 zeroclaw_config::schema::MemoryConfig::default(),
5955 std::path::PathBuf::new(),
5956 )),
5957 auto_save: false,
5958 webhook_secret_hash: None,
5959 pairing: Arc::new(PairingGuard::new(false, &[])),
5960 trust_forwarded_headers: false,
5961 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
5962 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
5963 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
5964 #[cfg(feature = "channel-whatsapp-cloud")]
5965 whatsapp: None,
5966 #[cfg(feature = "channel-whatsapp-cloud")]
5967 whatsapp_app_secret: None,
5968 #[cfg(feature = "channel-linq")]
5969 linq: HashMap::new(),
5970 #[cfg(feature = "channel-linq")]
5971 linq_signing_secrets: HashMap::new(),
5972 nextcloud_talk: Some(channel),
5973 nextcloud_talk_webhook_secret: None,
5974 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
5975 tui_registry: None,
5976 #[cfg(feature = "channel-wati")]
5977 wati: None,
5978 #[cfg(feature = "channel-email")]
5979 gmail_push: None,
5980 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
5981 tools_registry: Arc::new(Vec::new()),
5982 cost_tracker: None,
5983 event_tx: tokio::sync::broadcast::channel(16).0,
5984 event_buffer: Arc::new(sse::EventBuffer::new(16)),
5985 shutdown_tx: tokio::sync::watch::channel(false).0,
5986 reload_tx: None,
5987 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
5988 path_prefix: String::new(),
5989 web_dist_dir: None,
5990 session_backend: None,
5991 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
5992 8, 30, 600,
5993 )),
5994 device_registry: None,
5995 pending_pairings: None,
5996 canvas_store: CanvasStore::new(),
5997 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
5998 #[cfg(feature = "webauthn")]
5999 webauthn: None,
6000 };
6001
6002 let start = std::time::Instant::now();
6003 let response = tokio::time::timeout(
6004 Duration::from_secs(2),
6005 Box::pin(handle_nextcloud_talk_webhook(
6006 State(state),
6007 HeaderMap::new(),
6008 Bytes::from(body),
6009 )),
6010 )
6011 .await
6012 .expect("webhook must return before 2s deadline (regression #6156)")
6013 .into_response();
6014
6015 let elapsed = start.elapsed();
6016 assert_eq!(response.status(), StatusCode::OK);
6017 assert!(
6018 elapsed < Duration::from_secs(2),
6019 "handler returned after {elapsed:?}; expected fast return for #6156"
6020 );
6021
6022 tokio::time::timeout(Duration::from_secs(2), started_rx)
6025 .await
6026 .expect("spawned LLM call did not start within 2s")
6027 .expect("started_tx sender was dropped");
6028 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
6029 }
6030
6031 #[cfg(feature = "channel-whatsapp-cloud")]
6036 fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
6037 use hmac::{Hmac, Mac};
6038 use sha2::Sha256;
6039
6040 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
6041 mac.update(body);
6042 hex::encode(mac.finalize().into_bytes())
6043 }
6044
6045 #[cfg(feature = "channel-whatsapp-cloud")]
6046 fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
6047 format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
6048 }
6049
6050 #[cfg(feature = "channel-whatsapp-cloud")]
6051 #[test]
6052 fn whatsapp_signature_valid() {
6053 let app_secret = generate_test_secret();
6054 let body = b"test body content";
6055
6056 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
6057
6058 assert!(verify_whatsapp_signature(
6059 &app_secret,
6060 body,
6061 &signature_header
6062 ));
6063 }
6064
6065 #[cfg(feature = "channel-whatsapp-cloud")]
6066 #[test]
6067 fn whatsapp_signature_invalid_wrong_secret() {
6068 let app_secret = generate_test_secret();
6069 let wrong_secret = generate_test_secret();
6070 let body = b"test body content";
6071
6072 let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
6073
6074 assert!(!verify_whatsapp_signature(
6075 &app_secret,
6076 body,
6077 &signature_header
6078 ));
6079 }
6080
6081 #[cfg(feature = "channel-whatsapp-cloud")]
6082 #[test]
6083 fn whatsapp_signature_invalid_wrong_body() {
6084 let app_secret = generate_test_secret();
6085 let original_body = b"original body";
6086 let tampered_body = b"tampered body";
6087
6088 let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
6089
6090 assert!(!verify_whatsapp_signature(
6092 &app_secret,
6093 tampered_body,
6094 &signature_header
6095 ));
6096 }
6097
6098 #[cfg(feature = "channel-whatsapp-cloud")]
6099 #[test]
6100 fn whatsapp_signature_missing_prefix() {
6101 let app_secret = generate_test_secret();
6102 let body = b"test body";
6103
6104 let signature_header = "abc123def456";
6106
6107 assert!(!verify_whatsapp_signature(
6108 &app_secret,
6109 body,
6110 signature_header
6111 ));
6112 }
6113
6114 #[cfg(feature = "channel-whatsapp-cloud")]
6115 #[test]
6116 fn whatsapp_signature_empty_header() {
6117 let app_secret = generate_test_secret();
6118 let body = b"test body";
6119
6120 assert!(!verify_whatsapp_signature(&app_secret, body, ""));
6121 }
6122
6123 #[cfg(feature = "channel-whatsapp-cloud")]
6124 #[test]
6125 fn whatsapp_signature_invalid_hex() {
6126 let app_secret = generate_test_secret();
6127 let body = b"test body";
6128
6129 let signature_header = "sha256=not_valid_hex_zzz";
6131
6132 assert!(!verify_whatsapp_signature(
6133 &app_secret,
6134 body,
6135 signature_header
6136 ));
6137 }
6138
6139 #[cfg(feature = "channel-whatsapp-cloud")]
6140 #[test]
6141 fn whatsapp_signature_empty_body() {
6142 let app_secret = generate_test_secret();
6143 let body = b"";
6144
6145 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
6146
6147 assert!(verify_whatsapp_signature(
6148 &app_secret,
6149 body,
6150 &signature_header
6151 ));
6152 }
6153
6154 #[cfg(feature = "channel-whatsapp-cloud")]
6155 #[test]
6156 fn whatsapp_signature_unicode_body() {
6157 let app_secret = generate_test_secret();
6158 let body = "Hello 🦀 World".as_bytes();
6159
6160 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
6161
6162 assert!(verify_whatsapp_signature(
6163 &app_secret,
6164 body,
6165 &signature_header
6166 ));
6167 }
6168
6169 #[cfg(feature = "channel-whatsapp-cloud")]
6170 #[test]
6171 fn whatsapp_signature_json_payload() {
6172 let app_secret = generate_test_secret();
6173 let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
6174
6175 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
6176
6177 assert!(verify_whatsapp_signature(
6178 &app_secret,
6179 body,
6180 &signature_header
6181 ));
6182 }
6183
6184 #[cfg(feature = "channel-whatsapp-cloud")]
6185 #[test]
6186 fn whatsapp_signature_case_sensitive_prefix() {
6187 let app_secret = generate_test_secret();
6188 let body = b"test body";
6189
6190 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
6191
6192 let wrong_prefix = format!("SHA256={hex_sig}");
6194 assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
6195
6196 let correct_prefix = format!("sha256={hex_sig}");
6198 assert!(verify_whatsapp_signature(
6199 &app_secret,
6200 body,
6201 &correct_prefix
6202 ));
6203 }
6204
6205 #[cfg(feature = "channel-whatsapp-cloud")]
6206 #[test]
6207 fn whatsapp_signature_truncated_hex() {
6208 let app_secret = generate_test_secret();
6209 let body = b"test body";
6210
6211 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
6212 let truncated = &hex_sig[..32]; let signature_header = format!("sha256={truncated}");
6214
6215 assert!(!verify_whatsapp_signature(
6216 &app_secret,
6217 body,
6218 &signature_header
6219 ));
6220 }
6221
6222 #[cfg(feature = "channel-whatsapp-cloud")]
6223 #[test]
6224 fn whatsapp_signature_extra_bytes() {
6225 let app_secret = generate_test_secret();
6226 let body = b"test body";
6227
6228 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
6229 let extended = format!("{hex_sig}deadbeef");
6230 let signature_header = format!("sha256={extended}");
6231
6232 assert!(!verify_whatsapp_signature(
6233 &app_secret,
6234 body,
6235 &signature_header
6236 ));
6237 }
6238
6239 #[test]
6244 fn idempotency_store_allows_different_keys() {
6245 let store = IdempotencyStore::new(Duration::from_secs(60), 100);
6246 assert!(store.record_if_new("key-a"));
6247 assert!(store.record_if_new("key-b"));
6248 assert!(store.record_if_new("key-c"));
6249 assert!(store.record_if_new("key-d"));
6250 }
6251
6252 #[test]
6253 fn idempotency_store_max_keys_clamped_to_one() {
6254 let store = IdempotencyStore::new(Duration::from_secs(60), 0);
6255 assert!(store.record_if_new("only-key"));
6256 assert!(!store.record_if_new("only-key"));
6257 }
6258
6259 #[test]
6260 fn idempotency_store_rapid_duplicate_rejected() {
6261 let store = IdempotencyStore::new(Duration::from_secs(300), 100);
6262 assert!(store.record_if_new("rapid"));
6263 assert!(!store.record_if_new("rapid"));
6264 }
6265
6266 #[test]
6267 fn idempotency_store_accepts_after_ttl_expires() {
6268 let store = IdempotencyStore::new(Duration::from_millis(1), 100);
6269 assert!(store.record_if_new("ttl-key"));
6270 std::thread::sleep(Duration::from_millis(10));
6271 assert!(store.record_if_new("ttl-key"));
6272 }
6273
6274 #[test]
6275 fn idempotency_store_eviction_preserves_newest() {
6276 let store = IdempotencyStore::new(Duration::from_secs(300), 1);
6277 assert!(store.record_if_new("old-key"));
6278 std::thread::sleep(Duration::from_millis(2));
6279 assert!(store.record_if_new("new-key"));
6280
6281 let keys = store.keys.lock();
6282 assert_eq!(keys.len(), 1);
6283 assert!(!keys.contains_key("old-key"));
6284 assert!(keys.contains_key("new-key"));
6285 }
6286
6287 #[test]
6288 fn rate_limiter_allows_after_window_expires() {
6289 let window = Duration::from_millis(50);
6290 let limiter = SlidingWindowRateLimiter::new(2, window, 100);
6291 assert!(limiter.allow("ip-1"));
6292 assert!(limiter.allow("ip-1"));
6293 assert!(!limiter.allow("ip-1")); std::thread::sleep(Duration::from_millis(60));
6297
6298 assert!(limiter.allow("ip-1"));
6300 }
6301
6302 #[test]
6303 fn rate_limiter_independent_keys_tracked_separately() {
6304 let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
6305 assert!(limiter.allow("ip-1"));
6306 assert!(limiter.allow("ip-1"));
6307 assert!(!limiter.allow("ip-1")); assert!(limiter.allow("ip-2"));
6311 assert!(limiter.allow("ip-2"));
6312 assert!(!limiter.allow("ip-2")); }
6314
6315 #[test]
6316 fn rate_limiter_exact_boundary_at_max_keys() {
6317 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
6318 assert!(limiter.allow("ip-1"));
6319 assert!(limiter.allow("ip-2"));
6320 assert!(limiter.allow("ip-3"));
6321 assert!(limiter.allow("ip-4")); let guard = limiter.requests.lock();
6325 assert_eq!(guard.0.len(), 3);
6326 assert!(
6327 !guard.0.contains_key("ip-1"),
6328 "ip-1 should have been evicted"
6329 );
6330 assert!(guard.0.contains_key("ip-2"));
6331 assert!(guard.0.contains_key("ip-3"));
6332 assert!(guard.0.contains_key("ip-4"));
6333 }
6334
6335 #[test]
6336 fn gateway_rate_limiter_pair_and_webhook_are_independent() {
6337 let limiter = GatewayRateLimiter::new(2, 3, 100);
6338
6339 assert!(limiter.allow_pair("ip-1"));
6341 assert!(limiter.allow_pair("ip-1"));
6342 assert!(!limiter.allow_pair("ip-1")); assert!(limiter.allow_webhook("ip-1"));
6346 assert!(limiter.allow_webhook("ip-1"));
6347 assert!(limiter.allow_webhook("ip-1"));
6348 assert!(!limiter.allow_webhook("ip-1")); }
6350
6351 #[test]
6352 fn rate_limiter_single_key_max_allows_one_request() {
6353 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
6354 assert!(limiter.allow("ip-1"));
6355 assert!(limiter.allow("ip-2")); let guard = limiter.requests.lock();
6358 assert_eq!(guard.0.len(), 1);
6359 assert!(guard.0.contains_key("ip-2"));
6360 assert!(!guard.0.contains_key("ip-1"));
6361 }
6362
6363 #[test]
6364 fn rate_limiter_concurrent_access_safe() {
6365 use std::sync::Arc;
6366
6367 let limiter = Arc::new(SlidingWindowRateLimiter::new(
6368 1000,
6369 Duration::from_secs(60),
6370 1000,
6371 ));
6372 let mut handles = Vec::new();
6373
6374 for i in 0..10 {
6375 let limiter = limiter.clone();
6376 handles.push(std::thread::spawn(move || {
6377 for j in 0..100 {
6378 limiter.allow(&format!("thread-{i}-req-{j}"));
6379 }
6380 }));
6381 }
6382
6383 for handle in handles {
6384 handle.join().unwrap();
6385 }
6386
6387 let guard = limiter.requests.lock();
6389 assert!(guard.0.len() <= 1000, "should respect max_keys");
6390 }
6391
6392 #[test]
6393 fn idempotency_store_concurrent_access_safe() {
6394 use std::sync::Arc;
6395
6396 let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
6397 let mut handles = Vec::new();
6398
6399 for i in 0..10 {
6400 let store = store.clone();
6401 handles.push(std::thread::spawn(move || {
6402 for j in 0..100 {
6403 store.record_if_new(&format!("thread-{i}-key-{j}"));
6404 }
6405 }));
6406 }
6407
6408 for handle in handles {
6409 handle.join().unwrap();
6410 }
6411
6412 let keys = store.keys.lock();
6413 assert!(keys.len() <= 1000, "should respect max_keys");
6414 }
6415
6416 #[test]
6417 fn rate_limiter_rapid_burst_then_cooldown() {
6418 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
6419
6420 for _ in 0..5 {
6422 assert!(limiter.allow("burst-ip"));
6423 }
6424 assert!(!limiter.allow("burst-ip")); std::thread::sleep(Duration::from_millis(60));
6428
6429 assert!(limiter.allow("burst-ip"));
6431 }
6432
6433 #[test]
6434 fn require_localhost_accepts_ipv4_loopback() {
6435 let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
6436 assert!(require_localhost(&peer).is_ok());
6437 }
6438
6439 #[test]
6440 fn require_localhost_accepts_ipv6_loopback() {
6441 let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
6442 assert!(require_localhost(&peer).is_ok());
6443 }
6444
6445 #[test]
6446 fn require_localhost_rejects_non_loopback_ipv4() {
6447 let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
6448 let err = require_localhost(&peer).unwrap_err();
6449 assert_eq!(err.0, StatusCode::FORBIDDEN);
6450 }
6451
6452 #[test]
6453 fn require_localhost_rejects_non_loopback_ipv6() {
6454 let peer = SocketAddr::from((
6455 std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
6456 12345,
6457 ));
6458 let err = require_localhost(&peer).unwrap_err();
6459 assert_eq!(err.0, StatusCode::FORBIDDEN);
6460 }
6461
6462 #[test]
6463 fn admin_reload_gate_loopback_always_allowed() {
6464 assert_eq!(
6466 admin_reload_gate(true, false, false),
6467 AdminReloadGate::Allow
6468 );
6469 assert_eq!(admin_reload_gate(true, true, true), AdminReloadGate::Allow);
6470 assert_eq!(admin_reload_gate(true, false, true), AdminReloadGate::Allow);
6471 assert_eq!(admin_reload_gate(true, true, false), AdminReloadGate::Allow);
6472 }
6473
6474 #[test]
6475 fn admin_reload_gate_remote_blocked_by_default() {
6476 assert_eq!(
6479 admin_reload_gate(false, false, true),
6480 AdminReloadGate::Forbidden
6481 );
6482 assert_eq!(
6483 admin_reload_gate(false, false, false),
6484 AdminReloadGate::Forbidden
6485 );
6486 }
6487
6488 #[test]
6489 fn admin_reload_gate_remote_opt_in_requires_auth() {
6490 assert_eq!(
6492 admin_reload_gate(false, true, true),
6493 AdminReloadGate::RequireAuth
6494 );
6495 }
6496
6497 #[test]
6498 fn admin_reload_gate_remote_opt_in_without_pairing_is_rejected() {
6499 assert_eq!(
6502 admin_reload_gate(false, true, false),
6503 AdminReloadGate::ForbiddenNoPairing
6504 );
6505 }
6506
6507 #[test]
6508 fn allow_remote_admin_defaults_off() {
6509 assert!(!zeroclaw_config::schema::GatewayConfig::default().allow_remote_admin);
6511 }
6512
6513 fn admin_reload_state(
6524 tmp: &tempfile::TempDir,
6525 allow_remote_admin: bool,
6526 require_pairing: bool,
6527 tokens: &[String],
6528 ) -> AppState {
6529 let mut state = admin_paircode_state(tmp, require_pairing, false);
6530 state.config.write().gateway.allow_remote_admin = allow_remote_admin;
6531 state.pairing = Arc::new(PairingGuard::new(require_pairing, tokens));
6532 state.reload_tx = Some(tokio::sync::watch::channel(false).0);
6533 state
6534 }
6535
6536 fn loopback_peer() -> SocketAddr {
6537 SocketAddr::from(([127, 0, 0, 1], 40000))
6538 }
6539
6540 fn remote_peer() -> SocketAddr {
6541 SocketAddr::from(([203, 0, 113, 50], 40000))
6544 }
6545
6546 fn bearer_headers(token: &str) -> HeaderMap {
6547 let mut headers = HeaderMap::new();
6548 headers.insert(
6549 header::AUTHORIZATION,
6550 HeaderValue::from_str(&format!("Bearer {token}")).unwrap(),
6551 );
6552 headers
6553 }
6554
6555 #[tokio::test]
6556 async fn admin_reload_loopback_no_token_reloads() {
6557 let tmp = tempfile::tempdir().unwrap();
6558 let state = admin_reload_state(&tmp, false, true, &[]);
6559 let resp =
6560 handle_admin_reload(State(state), ConnectInfo(loopback_peer()), HeaderMap::new())
6561 .await
6562 .unwrap()
6563 .into_response();
6564 assert_eq!(resp.status(), StatusCode::OK);
6565 }
6566
6567 #[tokio::test]
6568 async fn admin_reload_remote_default_off_is_forbidden() {
6569 let tmp = tempfile::tempdir().unwrap();
6570 let state = admin_reload_state(&tmp, false, true, &[]);
6571 let err = handle_admin_reload(State(state), ConnectInfo(remote_peer()), HeaderMap::new())
6572 .await
6573 .err()
6574 .unwrap();
6575 assert_eq!(err.0, StatusCode::FORBIDDEN);
6576 }
6577
6578 #[tokio::test]
6579 async fn admin_reload_remote_opt_in_without_pairing_does_not_reload() {
6580 let tmp = tempfile::tempdir().unwrap();
6583 let state = admin_reload_state(&tmp, true, false, &[]);
6584 let err = handle_admin_reload(State(state), ConnectInfo(remote_peer()), HeaderMap::new())
6585 .await
6586 .err()
6587 .unwrap();
6588 assert_eq!(err.0, StatusCode::FORBIDDEN);
6589 }
6590
6591 #[tokio::test]
6592 async fn admin_reload_remote_opt_in_missing_token_is_rejected() {
6593 let tmp = tempfile::tempdir().unwrap();
6594 let state = admin_reload_state(&tmp, true, true, &["zc_test_token".to_string()]);
6595 let err = handle_admin_reload(State(state), ConnectInfo(remote_peer()), HeaderMap::new())
6596 .await
6597 .err()
6598 .unwrap();
6599 assert_eq!(err.0, StatusCode::UNAUTHORIZED);
6600 }
6601
6602 #[tokio::test]
6603 async fn admin_reload_remote_opt_in_invalid_token_is_rejected() {
6604 let tmp = tempfile::tempdir().unwrap();
6605 let state = admin_reload_state(&tmp, true, true, &["zc_test_token".to_string()]);
6606 let err = handle_admin_reload(
6607 State(state),
6608 ConnectInfo(remote_peer()),
6609 bearer_headers("not-the-token"),
6610 )
6611 .await
6612 .err()
6613 .unwrap();
6614 assert_eq!(err.0, StatusCode::UNAUTHORIZED);
6615 }
6616
6617 #[tokio::test]
6618 async fn admin_reload_remote_opt_in_valid_token_reloads() {
6619 let tmp = tempfile::tempdir().unwrap();
6620 let state = admin_reload_state(&tmp, true, true, &["zc_test_token".to_string()]);
6621 let resp = handle_admin_reload(
6622 State(state),
6623 ConnectInfo(remote_peer()),
6624 bearer_headers("zc_test_token"),
6625 )
6626 .await
6627 .unwrap()
6628 .into_response();
6629 assert_eq!(resp.status(), StatusCode::OK);
6630 }
6631
6632 #[test]
6633 fn needs_quickstart_for_flags_empty_model() {
6634 let err =
6635 needs_quickstart_for("").expect("empty model must produce a needs_quickstart error");
6636 let msg = err.to_string();
6637 assert!(
6638 msg.contains("needs_quickstart"),
6639 "error must carry the needs_quickstart marker for callers to map to 503; got: {msg}"
6640 );
6641 assert!(
6642 msg.contains("/quickstart"),
6643 "error must point the user at /quickstart; got: {msg}"
6644 );
6645 }
6646
6647 #[test]
6648 fn needs_quickstart_for_flags_whitespace_only_model() {
6649 assert!(
6650 needs_quickstart_for(" ").is_some(),
6651 "whitespace-only model must be treated as empty"
6652 );
6653 assert!(
6654 needs_quickstart_for("\n\t ").is_some(),
6655 "tabs and newlines count as empty too"
6656 );
6657 }
6658
6659 #[test]
6660 fn needs_quickstart_for_passes_real_model() {
6661 assert!(
6662 needs_quickstart_for("anthropic/claude-sonnet-4").is_none(),
6663 "a real model id must not be flagged"
6664 );
6665 assert!(
6666 needs_quickstart_for(" gpt-4 ").is_none(),
6667 "leading/trailing whitespace around a real model id must not be flagged"
6668 );
6669 }
6670
6671 #[test]
6672 fn is_needs_quickstart_err_detects_marker_from_helper() {
6673 let err = needs_quickstart_for("").expect("empty model produces marker");
6674 assert!(
6675 is_needs_quickstart_err(&err),
6676 "the marker emitted by needs_quickstart_for must be detected"
6677 );
6678 }
6679
6680 #[test]
6681 fn is_needs_quickstart_err_ignores_unrelated_errors() {
6682 let err = anyhow::Error::msg("upstream timeout: provider returned 504");
6683 assert!(
6684 !is_needs_quickstart_err(&err),
6685 "unrelated errors must not be misclassified as needs_quickstart"
6686 );
6687 let err = anyhow::Error::msg("invalid api key");
6688 assert!(!is_needs_quickstart_err(&err));
6689 }
6690
6691 #[test]
6692 fn is_needs_quickstart_err_detects_via_substring() {
6693 let err =
6697 anyhow::Error::msg("provider call failed").context("needs_quickstart: empty model");
6698 assert!(is_needs_quickstart_err(&err));
6699 }
6700
6701 #[test]
6702 fn needs_quickstart_channel_reply_resolves_via_fluent() {
6703 let reply = needs_quickstart_channel_reply();
6710 assert!(
6711 !reply.starts_with('{') && !reply.ends_with('}'),
6712 "fluent missing-key fallback leaked into channel reply: {reply:?}"
6713 );
6714 assert!(
6715 reply.to_lowercase().contains("quickstart"),
6716 "channel reply must mention Quickstart so users know what's missing: {reply:?}"
6717 );
6718 }
6719
6720 #[cfg(feature = "channel-linq")]
6728 fn compute_linq_signature_hex(secret: &str, timestamp: &str, body: &str) -> String {
6729 use hmac::{Hmac, Mac};
6730 use sha2::Sha256;
6731
6732 let message = format!("{timestamp}.{body}");
6733 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
6734 mac.update(message.as_bytes());
6735 hex::encode(mac.finalize().into_bytes())
6736 }
6737
6738 #[cfg(feature = "channel-linq")]
6741 fn linq_webhook_body(sender: &str, text: &str) -> String {
6742 serde_json::json!({
6743 "event_type": "message.received",
6744 "data": {
6745 "sender": { "phone": sender },
6746 "message": {
6747 "parts": [{ "type": "text", "value": text }]
6748 }
6749 }
6750 })
6751 .to_string()
6752 }
6753
6754 #[cfg(feature = "channel-linq")]
6758 fn linq_test_state(alias: &str, signing_secret: Option<&str>) -> AppState {
6759 let model_provider: Arc<dyn ModelProvider> = Arc::new(MockModelProvider::default());
6760 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
6761
6762 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> =
6763 Arc::new(|| vec!["*".to_string()]);
6764 let channel = Arc::new(LinqChannel::new(
6765 "test-token".into(),
6766 "+15550000000".into(),
6767 alias,
6768 peer_resolver,
6769 ));
6770 let mut linq = HashMap::new();
6771 linq.insert(alias.to_string(), channel);
6772
6773 let mut linq_signing_secrets: HashMap<String, Arc<str>> = HashMap::new();
6774 if let Some(secret) = signing_secret {
6775 linq_signing_secrets.insert(alias.to_string(), Arc::from(secret));
6776 }
6777
6778 AppState {
6779 config: Arc::new(RwLock::new(Config::default())),
6780 model_provider,
6781 model: "test-model".into(),
6782 temperature: None,
6783 mem: memory,
6784 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
6785 Arc::new(MockMemory),
6786 zeroclaw_config::schema::MemoryConfig::default(),
6787 std::path::PathBuf::new(),
6788 )),
6789 auto_save: false,
6790 webhook_secret_hash: None,
6791 pairing: Arc::new(PairingGuard::new(false, &[])),
6792 trust_forwarded_headers: false,
6793 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
6794 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
6795 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
6796 #[cfg(feature = "channel-whatsapp-cloud")]
6797 whatsapp: None,
6798 #[cfg(feature = "channel-whatsapp-cloud")]
6799 whatsapp_app_secret: None,
6800 #[cfg(feature = "channel-linq")]
6801 linq,
6802 #[cfg(feature = "channel-linq")]
6803 linq_signing_secrets,
6804 #[cfg(feature = "channel-nextcloud")]
6805 nextcloud_talk: None,
6806 #[cfg(feature = "channel-nextcloud")]
6807 nextcloud_talk_webhook_secret: None,
6808 #[cfg(feature = "channel-wati")]
6809 wati: None,
6810 #[cfg(feature = "channel-email")]
6811 gmail_push: None,
6812 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
6813 tools_registry: Arc::new(Vec::new()),
6814 cost_tracker: None,
6815 event_tx: tokio::sync::broadcast::channel(16).0,
6816 event_buffer: Arc::new(sse::EventBuffer::new(16)),
6817 shutdown_tx: tokio::sync::watch::channel(false).0,
6818 reload_tx: None,
6819 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
6820 path_prefix: String::new(),
6821 web_dist_dir: None,
6822 session_backend: None,
6823 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
6824 8, 30, 600,
6825 )),
6826 device_registry: None,
6827 pending_pairings: None,
6828 canvas_store: CanvasStore::new(),
6829 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
6830 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
6831 tui_registry: None,
6832 #[cfg(feature = "webauthn")]
6833 webauthn: None,
6834 }
6835 }
6836
6837 #[cfg(feature = "channel-linq")]
6838 #[tokio::test]
6839 async fn linq_webhook_returns_not_found_for_unknown_alias() {
6840 let state = linq_test_state("production", None);
6842
6843 let response = Box::pin(handle_linq_webhook(
6844 State(state),
6845 Path("staging".to_string()),
6846 HeaderMap::new(),
6847 Bytes::from_static(br#"{"event_type":"message.received"}"#),
6848 ))
6849 .await
6850 .into_response();
6851
6852 assert_eq!(response.status(), StatusCode::NOT_FOUND);
6853 }
6854
6855 #[cfg(feature = "channel-linq")]
6856 #[tokio::test]
6857 async fn linq_webhook_returns_not_found_when_no_channels_configured() {
6858 let model_provider: Arc<dyn ModelProvider> = Arc::new(MockModelProvider::default());
6859 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
6860
6861 let state = AppState {
6862 config: Arc::new(RwLock::new(Config::default())),
6863 model_provider,
6864 model: "test-model".into(),
6865 temperature: None,
6866 mem: memory,
6867 memory_strategy: Arc::new(DefaultMemoryStrategy::with_config(
6868 Arc::new(MockMemory),
6869 zeroclaw_config::schema::MemoryConfig::default(),
6870 std::path::PathBuf::new(),
6871 )),
6872 auto_save: false,
6873 webhook_secret_hash: None,
6874 pairing: Arc::new(PairingGuard::new(false, &[])),
6875 trust_forwarded_headers: false,
6876 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
6877 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
6878 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
6879 #[cfg(feature = "channel-whatsapp-cloud")]
6880 whatsapp: None,
6881 #[cfg(feature = "channel-whatsapp-cloud")]
6882 whatsapp_app_secret: None,
6883 #[cfg(feature = "channel-linq")]
6884 linq: HashMap::new(),
6885 #[cfg(feature = "channel-linq")]
6886 linq_signing_secrets: HashMap::new(),
6887 #[cfg(feature = "channel-nextcloud")]
6888 nextcloud_talk: None,
6889 #[cfg(feature = "channel-nextcloud")]
6890 nextcloud_talk_webhook_secret: None,
6891 #[cfg(feature = "channel-wati")]
6892 wati: None,
6893 #[cfg(feature = "channel-email")]
6894 gmail_push: None,
6895 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
6896 tools_registry: Arc::new(Vec::new()),
6897 cost_tracker: None,
6898 event_tx: tokio::sync::broadcast::channel(16).0,
6899 event_buffer: Arc::new(sse::EventBuffer::new(16)),
6900 shutdown_tx: tokio::sync::watch::channel(false).0,
6901 reload_tx: None,
6902 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
6903 path_prefix: String::new(),
6904 web_dist_dir: None,
6905 session_backend: None,
6906 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
6907 8, 30, 600,
6908 )),
6909 device_registry: None,
6910 pending_pairings: None,
6911 canvas_store: CanvasStore::new(),
6912 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
6913 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
6914 tui_registry: None,
6915 #[cfg(feature = "webauthn")]
6916 webauthn: None,
6917 };
6918
6919 let response = Box::pin(handle_linq_webhook(
6920 State(state),
6921 Path("default".to_string()),
6922 HeaderMap::new(),
6923 Bytes::from_static(br#"{"event_type":"message.received"}"#),
6924 ))
6925 .await
6926 .into_response();
6927
6928 assert_eq!(response.status(), StatusCode::NOT_FOUND);
6929 }
6930
6931 #[cfg(feature = "channel-linq")]
6932 #[tokio::test]
6933 async fn linq_webhook_accepts_valid_message_for_known_alias() {
6934 let state = linq_test_state("default", None);
6935 let body = linq_webhook_body("+15551234567", "hello from test");
6936
6937 let response = Box::pin(handle_linq_webhook(
6938 State(state),
6939 Path("default".to_string()),
6940 HeaderMap::new(),
6941 Bytes::from(body),
6942 ))
6943 .await
6944 .into_response();
6945
6946 assert_eq!(response.status(), StatusCode::OK);
6947 }
6948
6949 #[cfg(feature = "channel-linq")]
6950 #[tokio::test]
6951 async fn linq_webhook_rejects_invalid_signature_for_alias() {
6952 let secret = generate_test_secret();
6953 let state = linq_test_state("secure-alias", Some(&secret));
6954
6955 let body = linq_webhook_body("+15551234567", "hello from test");
6956 let mut headers = HeaderMap::new();
6957 headers.insert(
6958 "X-Webhook-Signature",
6959 HeaderValue::from_static("sha256=deadbeef"),
6960 );
6961 headers.insert(
6962 "X-Webhook-Timestamp",
6963 HeaderValue::from_static("9999999999"),
6964 );
6965
6966 let response = Box::pin(handle_linq_webhook(
6967 State(state),
6968 Path("secure-alias".to_string()),
6969 headers,
6970 Bytes::from(body),
6971 ))
6972 .await
6973 .into_response();
6974
6975 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
6976 }
6977
6978 #[cfg(feature = "channel-linq")]
6979 #[tokio::test]
6980 async fn linq_webhook_accepts_valid_signature_for_alias() {
6981 let secret = generate_test_secret();
6982 let state = linq_test_state("secure-alias", Some(&secret));
6983
6984 let body = linq_webhook_body("+15551234567", "hello from test");
6985 let timestamp = chrono::Utc::now().timestamp().to_string();
6986 let sig = compute_linq_signature_hex(&secret, ×tamp, &body);
6987
6988 let mut headers = HeaderMap::new();
6989 headers.insert(
6990 "X-Webhook-Signature",
6991 HeaderValue::from_str(&format!("sha256={sig}")).unwrap(),
6992 );
6993 headers.insert(
6994 "X-Webhook-Timestamp",
6995 HeaderValue::from_str(×tamp).unwrap(),
6996 );
6997
6998 let response = Box::pin(handle_linq_webhook(
6999 State(state),
7000 Path("secure-alias".to_string()),
7001 headers,
7002 Bytes::from(body),
7003 ))
7004 .await
7005 .into_response();
7006
7007 assert_eq!(response.status(), StatusCode::OK);
7008 }
7009}
7010
7011#[cfg(test)]
7012mod accept_error_tests {
7013 use super::is_recoverable_accept_error;
7014 use std::io::{Error, ErrorKind};
7015
7016 #[cfg(unix)]
7017 #[test]
7018 fn fd_exhaustion_accept_errors_are_recoverable() {
7019 assert!(is_recoverable_accept_error(&Error::from_raw_os_error(24))); assert!(is_recoverable_accept_error(&Error::from_raw_os_error(23))); }
7023
7024 #[test]
7025 fn transient_kinds_recover_but_fatal_propagates() {
7026 assert!(is_recoverable_accept_error(&Error::from(
7027 ErrorKind::ConnectionAborted
7028 )));
7029 assert!(is_recoverable_accept_error(&Error::from(
7030 ErrorKind::Interrupted
7031 )));
7032 assert!(!is_recoverable_accept_error(&Error::from(
7034 ErrorKind::InvalidInput
7035 )));
7036 }
7037}