1#![allow(
2 clippy::to_string_in_format_args,
3 clippy::useless_format,
4 clippy::collapsible_if
5)]
6pub mod acp;
16pub mod api;
17pub mod api_browse;
18pub mod api_config;
19pub mod api_logs;
20pub mod api_onboard;
21pub mod api_pairing;
22pub mod api_personality;
23#[cfg(feature = "plugins-wasm")]
24pub mod api_plugins;
25pub mod api_skills;
26#[cfg(feature = "webauthn")]
27pub mod api_webauthn;
28pub mod auth_rate_limit;
29pub mod canvas;
30pub mod hardware_context;
31pub mod node_tool;
32pub mod nodes;
33pub mod openapi;
34pub mod session_queue;
35pub mod sse;
36pub mod static_files;
37pub mod tls;
38#[cfg(feature = "gateway-voice-duplex")]
39pub mod voice_duplex;
40pub mod ws;
41pub mod ws_approval;
42
43use anyhow::{Context, Result};
44use axum::{
45 Router,
46 body::Bytes,
47 extract::{ConnectInfo, Query, State},
48 http::{HeaderMap, StatusCode, header},
49 response::{IntoResponse, Json},
50 routing::{delete, get, post},
51};
52use parking_lot::{Mutex, RwLock};
53use std::collections::HashMap;
54use std::net::{IpAddr, SocketAddr};
55use std::sync::Arc;
56use std::time::{Duration, Instant};
57use tower_http::limit::RequestBodyLimitLayer;
58use tower_http::timeout::TimeoutLayer;
59use uuid::Uuid;
60use zeroclaw_api::channel::{Channel, SendMessage};
61use zeroclaw_api::tool::ToolSpec;
62use zeroclaw_channels::{
63 gmail_push::GmailPushChannel, linq::LinqChannel, nextcloud_talk::NextcloudTalkChannel,
64 wati::WatiChannel, whatsapp::WhatsAppChannel,
65};
66use zeroclaw_config::policy::SecurityPolicy;
67use zeroclaw_config::schema::Config;
68use zeroclaw_infra::session_backend::SessionBackend;
69use zeroclaw_memory::{self, Memory, MemoryCategory};
70use zeroclaw_providers::{self, ModelProvider};
71use zeroclaw_runtime::cost::CostTracker;
72use zeroclaw_runtime::i18n;
73use zeroclaw_runtime::platform;
74use zeroclaw_runtime::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
75use zeroclaw_runtime::tools;
76use zeroclaw_runtime::tools::CanvasStore;
77
78pub const MAX_BODY_SIZE: usize = 65_536;
80pub const REQUEST_TIMEOUT_SECS: u64 = 30;
82
83pub const LONG_RUNNING_REQUEST_TIMEOUT_SECS: u64 = 600;
91
92pub fn gateway_request_timeout_secs(cfg: &zeroclaw_config::schema::GatewayConfig) -> u64 {
95 cfg.request_timeout_secs
96}
97
98pub fn gateway_long_running_request_timeout_secs(
102 cfg: &zeroclaw_config::schema::GatewayConfig,
103) -> u64 {
104 cfg.long_running_request_timeout_secs
105}
106pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
108pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
110pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
112
113fn webhook_memory_key() -> String {
114 format!("webhook_msg_{}", Uuid::new_v4())
115}
116
117fn whatsapp_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
118 format!("whatsapp_{}_{}", msg.sender, msg.id)
119}
120
121fn linq_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
122 format!("linq_{}_{}", msg.sender, msg.id)
123}
124
125fn wati_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
126 format!("wati_{}_{}", msg.sender, msg.id)
127}
128
129fn nextcloud_talk_memory_key(msg: &zeroclaw_api::channel::ChannelMessage) -> String {
130 format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
131}
132
133fn sender_session_id(channel: &str, msg: &zeroclaw_api::channel::ChannelMessage) -> String {
134 match &msg.thread_ts {
135 Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
136 None => format!("{channel}_{}", msg.sender),
137 }
138}
139
140fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
141 const MAX_SESSION_ID_LEN: usize = 128;
142 headers
143 .get("X-Session-Id")
144 .and_then(|v| v.to_str().ok())
145 .map(str::trim)
146 .filter(|value| !value.is_empty())
147 .filter(|value| value.len() <= MAX_SESSION_ID_LEN)
148 .filter(|value| {
149 value
150 .bytes()
151 .all(|b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.')
152 })
153 .map(str::to_owned)
154}
155
156fn hash_webhook_secret(value: &str) -> String {
157 use sha2::{Digest, Sha256};
158
159 let digest = Sha256::digest(value.as_bytes());
160 hex::encode(digest)
161}
162
163const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; #[derive(Debug)]
167struct SlidingWindowRateLimiter {
168 limit_per_window: u32,
169 window: Duration,
170 max_keys: usize,
171 requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
172}
173
174impl SlidingWindowRateLimiter {
175 fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
176 Self {
177 limit_per_window,
178 window,
179 max_keys: max_keys.max(1),
180 requests: Mutex::new((HashMap::new(), Instant::now())),
181 }
182 }
183
184 fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
185 requests.retain(|_, timestamps| {
186 timestamps.retain(|t| *t > cutoff);
187 !timestamps.is_empty()
188 });
189 }
190
191 fn allow(&self, key: &str) -> bool {
192 if self.limit_per_window == 0 {
193 return true;
194 }
195
196 let now = Instant::now();
197 let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
198
199 let mut guard = self.requests.lock();
200 let (requests, last_sweep) = &mut *guard;
201
202 if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
204 Self::prune_stale(requests, cutoff);
205 *last_sweep = now;
206 }
207
208 if !requests.contains_key(key) && requests.len() >= self.max_keys {
209 Self::prune_stale(requests, cutoff);
211 *last_sweep = now;
212
213 if requests.len() >= self.max_keys {
214 let evict_key = requests
215 .iter()
216 .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
217 .map(|(k, _)| k.clone());
218 if let Some(evict_key) = evict_key {
219 requests.remove(&evict_key);
220 }
221 }
222 }
223
224 let entry = requests.entry(key.to_owned()).or_default();
225 entry.retain(|instant| *instant > cutoff);
226
227 if entry.len() >= self.limit_per_window as usize {
228 return false;
229 }
230
231 entry.push(now);
232 true
233 }
234}
235
236#[derive(Debug)]
237pub struct GatewayRateLimiter {
238 pair: SlidingWindowRateLimiter,
239 webhook: SlidingWindowRateLimiter,
240}
241
242impl GatewayRateLimiter {
243 pub fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
244 let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
245 Self {
246 pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
247 webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
248 }
249 }
250
251 fn allow_pair(&self, key: &str) -> bool {
252 self.pair.allow(key)
253 }
254
255 fn allow_webhook(&self, key: &str) -> bool {
256 self.webhook.allow(key)
257 }
258}
259
260#[derive(Debug)]
261pub struct IdempotencyStore {
262 ttl: Duration,
263 max_keys: usize,
264 keys: Mutex<HashMap<String, Instant>>,
265}
266
267impl IdempotencyStore {
268 pub fn new(ttl: Duration, max_keys: usize) -> Self {
269 Self {
270 ttl,
271 max_keys: max_keys.max(1),
272 keys: Mutex::new(HashMap::new()),
273 }
274 }
275
276 fn record_if_new(&self, key: &str) -> bool {
278 let now = Instant::now();
279 let mut keys = self.keys.lock();
280
281 keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
282
283 if keys.contains_key(key) {
284 return false;
285 }
286
287 if keys.len() >= self.max_keys {
288 let evict_key = keys
289 .iter()
290 .min_by_key(|(_, seen_at)| *seen_at)
291 .map(|(k, _)| k.clone());
292 if let Some(evict_key) = evict_key {
293 keys.remove(&evict_key);
294 }
295 }
296
297 keys.insert(key.to_owned(), now);
298 true
299 }
300}
301
302fn parse_client_ip(value: &str) -> Option<IpAddr> {
303 let value = value.trim().trim_matches('"').trim();
304 if value.is_empty() {
305 return None;
306 }
307
308 if let Ok(ip) = value.parse::<IpAddr>() {
309 return Some(ip);
310 }
311
312 if let Ok(addr) = value.parse::<SocketAddr>() {
313 return Some(addr.ip());
314 }
315
316 let value = value.trim_matches(['[', ']']);
317 value.parse::<IpAddr>().ok()
318}
319
320fn dirs_data_local() -> Option<std::path::PathBuf> {
321 directories::BaseDirs::new().map(|d| d.data_local_dir().to_path_buf())
322}
323
324fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
325 if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
326 for candidate in xff.split(',') {
327 if let Some(ip) = parse_client_ip(candidate) {
328 return Some(ip);
329 }
330 }
331 }
332
333 headers
334 .get("X-Real-IP")
335 .and_then(|v| v.to_str().ok())
336 .and_then(parse_client_ip)
337}
338
339fn client_key_from_request(
340 peer_addr: Option<SocketAddr>,
341 headers: &HeaderMap,
342 trust_forwarded_headers: bool,
343) -> String {
344 if trust_forwarded_headers && let Some(ip) = forwarded_client_ip(headers) {
345 return ip.to_string();
346 }
347
348 peer_addr
349 .map(|addr| addr.ip().to_string())
350 .unwrap_or_else(|| "unknown".to_string())
351}
352
353fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
354 if configured == 0 {
355 fallback.max(1)
356 } else {
357 configured
358 }
359}
360
361#[derive(Clone)]
363pub struct AppState {
364 pub config: Arc<RwLock<Config>>,
365 pub model_provider: Arc<dyn ModelProvider>,
366 pub model: String,
367 pub temperature: Option<f64>,
371 pub mem: Arc<dyn Memory>,
372 pub auto_save: bool,
373 pub webhook_secret_hash: Option<Arc<str>>,
375 pub pairing: Arc<PairingGuard>,
376 pub trust_forwarded_headers: bool,
377 pub rate_limiter: Arc<GatewayRateLimiter>,
378 pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
379 pub idempotency_store: Arc<IdempotencyStore>,
380 pub whatsapp: Option<Arc<WhatsAppChannel>>,
381 pub whatsapp_app_secret: Option<Arc<str>>,
383 pub linq: Option<Arc<LinqChannel>>,
384 pub linq_signing_secret: Option<Arc<str>>,
386 pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
387 pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
389 pub wati: Option<Arc<WatiChannel>>,
390 pub gmail_push: Option<Arc<GmailPushChannel>>,
392 pub observer: Arc<dyn zeroclaw_runtime::observability::Observer>,
394 pub tools_registry: Arc<Vec<ToolSpec>>,
396 pub cost_tracker: Option<Arc<CostTracker>>,
398 pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
400 pub event_buffer: Arc<sse::EventBuffer>,
402 pub shutdown_tx: tokio::sync::watch::Sender<bool>,
404 pub reload_tx: Option<tokio::sync::watch::Sender<bool>>,
409 pub node_registry: Arc<nodes::NodeRegistry>,
411 pub path_prefix: String,
413 pub web_dist_dir: Option<std::path::PathBuf>,
415 pub session_backend: Option<Arc<dyn SessionBackend>>,
417 pub session_queue: Arc<session_queue::SessionActorQueue>,
419 pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
421 pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
423 pub canvas_store: CanvasStore,
425 #[cfg(feature = "webauthn")]
427 pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
428 pub cancel_tokens: Arc<
433 std::sync::Mutex<std::collections::HashMap<String, tokio_util::sync::CancellationToken>>,
434 >,
435 pub pending_reload: Arc<std::sync::atomic::AtomicBool>,
442}
443
444#[allow(clippy::too_many_lines)]
446pub async fn run_gateway(
447 host: &str,
448 port: u16,
449 config: Config,
450 external_event_tx: Option<tokio::sync::broadcast::Sender<serde_json::Value>>,
451 reload_tx: Option<tokio::sync::watch::Sender<bool>>,
455 canvas_store: Option<CanvasStore>,
456) -> Result<()> {
457 if is_public_bind(host)
459 && config.tunnel.tunnel_provider == "none"
460 && !config.gateway.allow_public_bind
461 {
462 ::zeroclaw_log::record!(
463 WARN,
464 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
465 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
466 "⚠️ Binding to {host} — gateway will be exposed to all network interfaces.\n\
467 Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
468 [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
469 Docker/VM: if you are running inside a container or VM, this is expected."
470 );
471 }
472 let config_state = Arc::new(RwLock::new(config.clone()));
473
474 let hooks: Option<std::sync::Arc<zeroclaw_runtime::hooks::HookRunner>> = if config.hooks.enabled
476 {
477 Some(std::sync::Arc::new(
478 zeroclaw_runtime::hooks::HookRunner::new(),
479 ))
480 } else {
481 None
482 };
483
484 let addr: SocketAddr = format!("{host}:{port}").parse()?;
485 let listener = tokio::net::TcpListener::bind(addr).await?;
486 let actual_port = listener.local_addr()?.port();
487 let display_addr = format!("{host}:{actual_port}");
488
489 let fallback = config.first_model_provider();
490 let model_provider_name = config
491 .first_model_provider_alias()
492 .unwrap_or_else(|| "openrouter".to_string());
493 let provider_runtime_options_base =
494 zeroclaw_providers::provider_runtime_options_from_config(&config);
495 let provider_runtime_options = zeroclaw_providers::options_for_provider_ref(
496 &config,
497 &model_provider_name,
498 &provider_runtime_options_base,
499 );
500 let model_provider: Arc<dyn ModelProvider> = Arc::from(
501 zeroclaw_providers::create_resilient_model_provider_from_ref(
502 &config,
503 &model_provider_name,
504 fallback.and_then(|e| e.api_key.as_deref()),
505 fallback.and_then(|e| e.uri.as_deref()),
506 &config.reliability,
507 &provider_runtime_options,
508 )?,
509 );
510 let model = match fallback
524 .and_then(|e| e.model.as_deref())
525 .map(str::trim)
526 .filter(|m| !m.is_empty())
527 {
528 Some(m) => m.to_string(),
529 None => {
530 match config.resolve_default_model() {
531 Some(m) => {
532 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"model_provider": model_provider_name, "model": m})), "first model_provider has no `model` set; using first configured \
533 providers.models entry as default. Set \
534 [providers.models.<type>.<alias>] model = \"...\" to silence \
535 this warning.");
536 m
537 }
538 None => {
539 ::zeroclaw_log::record!(
540 WARN,
541 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
542 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
543 .with_attrs(::serde_json::json!({"display_addr": display_addr})),
544 "Gateway booting without a configured model. Visit http:///onboard to complete browser onboarding. Chat endpoints will return 503 needs_onboarding until at least one [providers.models.<type>.<alias>] model = \"...\" is set."
545 );
546 String::new()
547 }
548 }
549 }
550 };
551 let temperature: Option<f64> = fallback.and_then(|e| e.temperature);
555 let mem: Arc<dyn Memory> = if config.agents.is_empty() {
564 Arc::new(zeroclaw_memory::NoneMemory::new("none"))
565 } else {
566 Arc::from(zeroclaw_memory::create_memory_with_storage_and_routes(
567 &config.memory,
568 &config.embedding_routes,
569 config.resolve_active_storage(),
570 &config.data_dir,
571 fallback.and_then(|e| e.api_key.as_deref()),
572 )?)
573 };
574 let runtime: Arc<dyn platform::RuntimeAdapter> =
575 Arc::from(platform::create_runtime(&config.runtime)?);
576 let canvas_store = canvas_store.unwrap_or_default();
591 let agent_alias_opt = config
592 .agents
593 .iter()
594 .find(|(_, a)| a.enabled)
595 .map(|(alias, _)| alias.clone());
596
597 let (composio_key, composio_entity_id) = if config.composio.enabled {
598 (
599 config.composio.api_key.as_deref(),
600 Some(config.composio.entity_id.as_str()),
601 )
602 } else {
603 (None, None)
604 };
605
606 let agent_setup: Option<(
615 zeroclaw_config::schema::RiskProfileConfig,
616 Arc<SecurityPolicy>,
617 )> = agent_alias_opt.as_ref().and_then(|agent_alias| {
618 let Some(risk_profile) = config.risk_profile_for_agent(agent_alias) else {
619 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "agent_alias": agent_alias})), "Gateway: agents..risk_profile does not name a configured risk_profiles entry; booting with empty tools registry. Fix via /admin/reload or /onboard.");
620 return None;
621 };
622 let risk_profile = risk_profile.clone();
623 let security = match SecurityPolicy::for_agent(&config, agent_alias) {
624 Ok(s) => Arc::new(s),
625 Err(e) => {
626 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"agent": agent_alias, "error": format!("{}", e), "agent_alias": agent_alias})), "Gateway: agent SecurityPolicy failed to build; booting with empty tools registry. Fix [agents.] via /admin/reload or /onboard.");
627 return None;
628 }
629 };
630 Some((risk_profile, security))
631 });
632
633 let (mut tools_registry_raw, delegate_handle_gw) = match (&agent_alias_opt, agent_setup) {
634 (Some(agent_alias), Some((risk_profile, security))) => {
635 let all_tools_result = tools::all_tools_with_runtime(
636 Arc::new(config.clone()),
637 &security,
638 &risk_profile,
639 agent_alias,
640 runtime,
641 Arc::clone(&mem),
642 composio_key,
643 composio_entity_id,
644 &config.browser,
645 &config.http_request,
646 &config.web_fetch,
647 &config.data_dir,
648 &config.agents,
649 config
650 .first_model_provider()
651 .and_then(|e| e.api_key.as_deref()),
652 &config,
653 Some(canvas_store.clone()),
654 false,
655 );
656 let reaction_handle_gw_opt = Some(all_tools_result.reaction_handle.clone());
662 let channel_names = zeroclaw_channels::orchestrator::register_channels_for_tools(
663 &config,
664 &all_tools_result.ask_user_handle,
665 &reaction_handle_gw_opt,
666 &all_tools_result.poll_handle,
667 &all_tools_result.escalate_handle,
668 &all_tools_result.channel_send_handle,
669 );
670 if !channel_names.is_empty() {
671 ::zeroclaw_log::record!(
672 INFO,
673 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
674 .with_attrs(::serde_json::json!({"count": channel_names.len()})),
675 &format!(
676 "Registered {} channel(s) for dashboard agent",
677 channel_names.len()
678 ),
679 );
680 }
681 (all_tools_result.tools, all_tools_result.delegate_handle)
682 }
683 (Some(_), None) => {
684 (Vec::new(), None)
687 }
688 (None, _) => {
689 ::zeroclaw_log::record!(
690 INFO,
691 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
692 .with_attrs(::serde_json::json!({"display_addr": display_addr})),
693 "Gateway: no [agents.<alias>] configured — booting with empty tools registry. Visit http:///onboard to add an agent."
694 );
695 (Vec::new(), None)
696 }
697 };
698
699 if config.mcp.enabled && !config.mcp.servers.is_empty() {
702 ::zeroclaw_log::record!(
703 INFO,
704 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
705 &format!(
706 "Gateway: initializing MCP client — {} server(s) configured",
707 config.mcp.servers.len()
708 )
709 );
710 match tools::McpRegistry::connect_all(&config.mcp.servers).await {
711 Ok(registry) => {
712 let registry = std::sync::Arc::new(registry);
713 if config.mcp.deferred_loading {
714 let deferred_set =
715 tools::DeferredMcpToolSet::from_registry(std::sync::Arc::clone(®istry))
716 .await;
717 ::zeroclaw_log::record!(
718 INFO,
719 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
720 &format!(
721 "Gateway MCP deferred: {} tool stub(s) from {} server(s)",
722 deferred_set.len(),
723 registry.server_count()
724 )
725 );
726 let activated =
727 std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
728 tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
729 deferred_set,
730 activated,
731 )));
732 } else {
733 let names = registry.tool_names();
734 let mut registered = 0usize;
735 for name in names {
736 if let Some(def) = registry.get_tool_def(&name).await {
737 let wrapper: std::sync::Arc<dyn tools::Tool> =
738 std::sync::Arc::new(tools::McpToolWrapper::new(
739 name,
740 def,
741 std::sync::Arc::clone(®istry),
742 ));
743 if let Some(ref handle) = delegate_handle_gw {
744 handle.write().push(std::sync::Arc::clone(&wrapper));
745 }
746 tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
747 registered += 1;
748 }
749 }
750 ::zeroclaw_log::record!(
751 INFO,
752 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
753 &format!(
754 "Gateway MCP: {} tool(s) registered from {} server(s)",
755 registered,
756 registry.server_count()
757 )
758 );
759 }
760 }
761 Err(e) => {
762 ::zeroclaw_log::record!(
763 ERROR,
764 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
765 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
766 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
767 "MCP registry failed to initialize"
768 );
769 }
770 }
771 }
772
773 let tools_registry: Arc<Vec<ToolSpec>> =
774 Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
775
776 let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.data_dir);
778
779 let event_tx = external_event_tx.unwrap_or_else(|| {
783 let (tx, _rx) = tokio::sync::broadcast::channel::<serde_json::Value>(256);
784 tx
785 });
786 let event_buffer = Arc::new(sse::EventBuffer::new(500));
787 let webhook_secret_hash: Option<Arc<str>> =
789 config.channels.webhook.values().next().and_then(|webhook| {
790 webhook.secret.as_ref().and_then(|raw_secret| {
791 let trimmed_secret = raw_secret.trim();
792 (!trimmed_secret.is_empty())
793 .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
794 })
795 });
796
797 let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
799 .channels
800 .whatsapp
801 .get("default")
802 .filter(|wa| wa.is_cloud_config())
803 .map(|wa| {
804 let alias = "default".to_string();
805 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
806 let cfg_arc = config_state.clone();
807 let alias = alias.clone();
808 Arc::new(move || cfg_arc.read().channel_external_peers("whatsapp", &alias))
809 };
810 Arc::new(WhatsAppChannel::new(
811 wa.access_token.clone().unwrap_or_default(),
812 wa.phone_number_id.clone().unwrap_or_default(),
813 wa.verify_token.clone().unwrap_or_default(),
814 alias,
815 peer_resolver,
816 ))
817 });
818
819 let whatsapp_app_secret: Option<Arc<str>> = config
821 .channels
822 .whatsapp
823 .values()
824 .next()
825 .and_then(|wa| {
826 wa.app_secret
827 .as_deref()
828 .map(str::trim)
829 .filter(|secret| !secret.is_empty())
830 .map(ToOwned::to_owned)
831 })
832 .map(Arc::from);
833
834 let linq_channel: Option<Arc<LinqChannel>> = config.channels.linq.values().next().map(|lq| {
836 let alias = "default".to_string();
837 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
838 let cfg_arc = config_state.clone();
839 let alias = alias.clone();
840 Arc::new(move || cfg_arc.read().channel_external_peers("linq", &alias))
841 };
842 Arc::new(LinqChannel::new(
843 lq.api_token.clone(),
844 lq.from_phone.clone(),
845 alias,
846 peer_resolver,
847 ))
848 });
849
850 let linq_signing_secret: Option<Arc<str>> = config
852 .channels
853 .linq
854 .values()
855 .next()
856 .and_then(|lq| {
857 lq.signing_secret
858 .as_deref()
859 .map(str::trim)
860 .filter(|secret| !secret.is_empty())
861 .map(ToOwned::to_owned)
862 })
863 .map(Arc::from);
864
865 let wati_channel: Option<Arc<WatiChannel>> =
867 config.channels.wati.values().next().map(|wati_cfg| {
868 let alias = "default".to_string();
869 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
870 let cfg_arc = config_state.clone();
871 let alias = alias.clone();
872 Arc::new(move || cfg_arc.read().channel_external_peers("wati", &alias))
873 };
874 Arc::new(
875 WatiChannel::new(
876 wati_cfg.api_token.clone(),
877 wati_cfg.api_url.clone(),
878 wati_cfg.tenant_id.clone(),
879 alias,
880 peer_resolver,
881 )
882 .with_transcription(config.transcription.clone()),
883 )
884 });
885
886 let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
888 config.channels.nextcloud_talk.values().next().map(|nc| {
889 let alias = "default".to_string();
890 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
891 let cfg_arc = config_state.clone();
892 let alias = alias.clone();
893 Arc::new(move || {
894 cfg_arc
895 .read()
896 .channel_external_peers("nextcloud_talk", &alias)
897 })
898 };
899 Arc::new(NextcloudTalkChannel::new(
900 nc.base_url.clone(),
901 nc.app_token.clone(),
902 nc.bot_name.clone().unwrap_or_default(),
903 alias,
904 peer_resolver,
905 ))
906 });
907
908 let nextcloud_talk_webhook_secret: Option<Arc<str>> = config
910 .channels
911 .nextcloud_talk
912 .get("default")
913 .and_then(|nc| {
914 nc.webhook_secret
915 .as_deref()
916 .map(str::trim)
917 .filter(|secret| !secret.is_empty())
918 .map(ToOwned::to_owned)
919 })
920 .map(Arc::from);
921
922 let gmail_push_channel: Option<Arc<GmailPushChannel>> = {
924 let active: std::collections::HashSet<String> = config
925 .agents
926 .values()
927 .filter(|a| a.enabled)
928 .flat_map(|a| a.channels.iter().map(|c| c.as_str().to_string()))
929 .collect();
930 config
931 .channels
932 .gmail_push
933 .iter()
934 .find(|(alias, _)| active.contains(&format!("gmail_push.{alias}")))
935 .map(|(alias, gp)| {
936 let alias = alias.clone();
937 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = {
938 let cfg_arc = config_state.clone();
939 let alias = alias.clone();
940 Arc::new(move || cfg_arc.read().channel_external_peers("gmail_push", &alias))
941 };
942 Arc::new(GmailPushChannel::new(gp.clone(), alias, peer_resolver))
943 })
944 };
945
946 let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
953 match zeroclaw_infra::make_session_backend(
954 &config.data_dir,
955 &config.channels.session_backend,
956 ) {
957 Ok(backend) => {
958 ::zeroclaw_log::record!(
959 INFO,
960 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
961 &format!(
962 "Gateway session persistence enabled (backend={})",
963 config.channels.session_backend
964 )
965 );
966 if config.gateway.session_ttl_hours > 0
967 && let Ok(cleaned) = backend.cleanup_stale(config.gateway.session_ttl_hours)
968 && cleaned > 0
969 {
970 ::zeroclaw_log::record!(
971 INFO,
972 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
973 .with_attrs(::serde_json::json!({"cleaned": cleaned})),
974 "Cleaned up stale gateway sessions"
975 );
976 }
977 Some(backend)
978 }
979 Err(e) => {
980 ::zeroclaw_log::record!(
981 WARN,
982 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
983 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
984 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
985 "Session persistence disabled"
986 );
987 None
988 }
989 }
990 } else {
991 None
992 };
993
994 let pairing = Arc::new(PairingGuard::new(
996 config.gateway.require_pairing,
997 &config.gateway.paired_tokens,
998 ));
999 let rate_limit_max_keys = normalize_max_keys(
1000 config.gateway.rate_limit_max_keys,
1001 RATE_LIMIT_MAX_KEYS_DEFAULT,
1002 );
1003 let rate_limiter = Arc::new(GatewayRateLimiter::new(
1004 config.gateway.pair_rate_limit_per_minute,
1005 config.gateway.webhook_rate_limit_per_minute,
1006 rate_limit_max_keys,
1007 ));
1008 let idempotency_max_keys = normalize_max_keys(
1009 config.gateway.idempotency_max_keys,
1010 IDEMPOTENCY_MAX_KEYS_DEFAULT,
1011 );
1012 let idempotency_store = Arc::new(IdempotencyStore::new(
1013 Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
1014 idempotency_max_keys,
1015 ));
1016
1017 let path_prefix: Option<&str> = config
1019 .gateway
1020 .path_prefix
1021 .as_deref()
1022 .filter(|p| !p.is_empty());
1023
1024 let tunnel = zeroclaw_runtime::tunnel::create_tunnel(&config.tunnel)?;
1026 let mut tunnel_url: Option<String> = None;
1027
1028 if let Some(ref tun) = tunnel {
1029 println!("🔗 Starting {} tunnel...", tun.name());
1030 match tun.start(host, actual_port).await {
1031 Ok(url) => {
1032 println!("🌐 Tunnel active: {url}");
1033 tunnel_url = Some(url);
1034 }
1035 Err(e) => {
1036 println!("⚠️ Tunnel failed to start: {e}");
1037 println!(" Falling back to local-only mode.");
1038 }
1039 }
1040 }
1041
1042 let auto_detect_web_dist = || -> Option<std::path::PathBuf> {
1049 let mut candidates = vec![
1050 std::path::PathBuf::from("web/dist"),
1052 std::env::current_exe()
1054 .ok()
1055 .and_then(|p| p.parent().map(|d| d.join("web/dist")))
1056 .unwrap_or_default(),
1057 std::path::PathBuf::from("/zeroclaw-data/web/dist"),
1059 std::path::PathBuf::from("/usr/share/zeroclawlabs/web/dist"),
1061 ];
1062 if let Some(data_dir) = dirs_data_local() {
1064 candidates.push(data_dir.join("zeroclaw/web/dist"));
1065 }
1066 candidates
1067 .into_iter()
1068 .find(|p| !p.as_os_str().is_empty() && p.join("index.html").is_file())
1069 };
1070
1071 let web_dist_dir: Option<std::path::PathBuf> = match config
1072 .gateway
1073 .web_dist_dir
1074 .as_ref()
1075 .map(std::path::PathBuf::from)
1076 {
1077 Some(explicit) if explicit.join("index.html").is_file() => Some(explicit),
1078 Some(stale) => {
1079 ::zeroclaw_log::record!(
1080 WARN,
1081 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1082 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1083 .with_attrs(::serde_json::json!({"configured": stale.display().to_string()})),
1084 "gateway.web_dist_dir points at a path that doesn't contain index.html on \
1085 this machine; falling back to auto-detect. Update or remove the setting in \
1086 config.toml to silence this warning."
1087 );
1088 auto_detect_web_dist()
1089 }
1090 None => auto_detect_web_dist(),
1091 };
1092
1093 if let Some(ref dir) = web_dist_dir {
1094 ::zeroclaw_log::record!(
1095 INFO,
1096 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1097 &format!("Web dashboard: serving from {}", dir.display().to_string())
1098 );
1099 } else if config.gateway.web_dist_dir.is_some() {
1100 ::zeroclaw_log::record!(
1101 INFO,
1102 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1103 "Web dashboard: not available — configured gateway.web_dist_dir is missing on \
1104 this machine and no fallback location was found. Build with `cargo web build` \
1105 and point gateway.web_dist_dir at the resulting web/dist directory."
1106 );
1107 } else {
1108 ::zeroclaw_log::record!(
1109 INFO,
1110 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1111 "Web dashboard: not available — no web/dist found. Build with `cargo web build` \
1112 and point gateway.web_dist_dir at the resulting web/dist directory."
1113 );
1114 }
1115
1116 let pfx = path_prefix.unwrap_or("");
1117 println!("🦀 ZeroClaw Gateway listening on http://{display_addr}{pfx}");
1118 if let Some(ref url) = tunnel_url {
1119 println!(" 🌐 Public URL: {url}");
1120 }
1121 println!(" 🌐 Web Dashboard: http://{display_addr}{pfx}/");
1122 if let Some(code) = pairing.pairing_code() {
1123 println!();
1124 println!(" 🔐 PAIRING REQUIRED — use this one-time code:");
1125 println!(" ┌──────────────┐");
1126 println!(" │ {code} │");
1127 println!(" └──────────────┘");
1128 println!(" Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
1129 } else if pairing.require_pairing() {
1130 println!(" 🔒 Pairing: ACTIVE (bearer token required)");
1131 println!(
1132 " To pair a new device: {}",
1133 format_paircode_recovery_command(host, actual_port)
1134 );
1135 println!(
1136 " Fallback: {}",
1137 format_paircode_recovery_curl(host, actual_port, pfx)
1138 );
1139 println!();
1140 } else {
1141 println!(" ⚠️ Pairing: DISABLED (all requests accepted)");
1142 println!();
1143 }
1144 println!(" POST {pfx}/pair — pair a new client (X-Pairing-Code header)");
1145 println!(" POST {pfx}/webhook — {{\"message\": \"your prompt\"}}");
1146 if whatsapp_channel.is_some() {
1147 println!(" GET {pfx}/whatsapp — Meta webhook verification");
1148 println!(" POST {pfx}/whatsapp — WhatsApp message webhook");
1149 }
1150 if linq_channel.is_some() {
1151 println!(" POST {pfx}/linq — Linq message webhook (iMessage/RCS/SMS)");
1152 }
1153 if wati_channel.is_some() {
1154 println!(" GET {pfx}/wati — WATI webhook verification");
1155 println!(" POST {pfx}/wati — WATI message webhook");
1156 }
1157 if nextcloud_talk_channel.is_some() {
1158 println!(" POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
1159 }
1160 println!(" GET {pfx}/api/* — REST API (bearer token required)");
1161 println!(" GET {pfx}/ws/chat — WebSocket agent chat");
1162 if config.nodes.enabled {
1163 println!(" GET {pfx}/ws/nodes — WebSocket node discovery");
1164 }
1165 println!(" GET {pfx}/health — health check");
1166 println!(" GET {pfx}/metrics — Prometheus metrics");
1167 println!(" Press Ctrl+C to stop.\n");
1168
1169 zeroclaw_runtime::health::mark_component_ok("gateway");
1170
1171 if let Some(ref hooks) = hooks {
1173 hooks.fire_gateway_start(host, actual_port).await;
1174 }
1175
1176 let broadcast_layer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::new(
1182 sse::BroadcastObserver::new(event_tx.clone(), event_buffer.clone()),
1183 );
1184 let broadcast_hook_guard =
1185 zeroclaw_runtime::observability::set_scoped_broadcast_hook(broadcast_layer);
1186
1187 zeroclaw_log::set_broadcast_hook(event_tx.clone());
1193
1194 let state_observer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::from(
1199 zeroclaw_runtime::observability::create_observer(&config.observability),
1200 );
1201
1202 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
1203
1204 let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
1206
1207 let device_registry = if config.gateway.require_pairing {
1209 Some(Arc::new(api_pairing::DeviceRegistry::new(&config.data_dir)))
1210 } else {
1211 None
1212 };
1213 let pending_pairings = if config.gateway.require_pairing {
1214 Some(Arc::new(api_pairing::PairingStore::new()))
1215 } else {
1216 None
1217 };
1218
1219 let state = AppState {
1220 config: config_state,
1221 model_provider,
1222 model,
1223 temperature,
1224 mem,
1225 auto_save: config.memory.auto_save,
1226 webhook_secret_hash,
1227 pairing,
1228 trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1229 rate_limiter,
1230 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1231 idempotency_store,
1232 whatsapp: whatsapp_channel,
1233 whatsapp_app_secret,
1234 linq: linq_channel,
1235 linq_signing_secret,
1236 nextcloud_talk: nextcloud_talk_channel,
1237 nextcloud_talk_webhook_secret,
1238 wati: wati_channel,
1239 gmail_push: gmail_push_channel,
1240 observer: state_observer,
1241 tools_registry,
1242 cost_tracker,
1243 event_tx,
1244 event_buffer,
1245 shutdown_tx,
1246 reload_tx,
1247 node_registry,
1248 session_backend,
1249 session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1250 device_registry,
1251 pending_pairings,
1252 path_prefix: path_prefix.unwrap_or("").to_string(),
1253 web_dist_dir,
1254 canvas_store,
1255 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
1256 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1257 #[cfg(feature = "webauthn")]
1258 webauthn: if config.security.webauthn.enabled {
1259 let secret_store = Arc::new(zeroclaw_runtime::security::SecretStore::new(
1260 &config.data_dir,
1261 true,
1262 ));
1263 let wa_config = zeroclaw_runtime::security::webauthn::WebAuthnConfig {
1264 enabled: true,
1265 rp_id: config.security.webauthn.rp_id.clone(),
1266 rp_origin: config.security.webauthn.rp_origin.clone(),
1267 rp_name: config.security.webauthn.rp_name.clone(),
1268 };
1269 Some(Arc::new(api_webauthn::WebAuthnState {
1270 manager: zeroclaw_runtime::security::webauthn::WebAuthnManager::new(
1271 wa_config,
1272 secret_store,
1273 &config.data_dir,
1274 ),
1275 pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1276 pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1277 }))
1278 } else {
1279 None
1280 },
1281 };
1282
1283 let inner = Router::new()
1285 .route("/admin/shutdown", post(handle_admin_shutdown))
1287 .route("/admin/reload", post(handle_admin_reload))
1288 .route("/admin/paircode", get(handle_admin_paircode))
1289 .route("/admin/paircode/new", post(handle_admin_paircode_new))
1290 .route("/health", get(handle_health))
1292 .route("/metrics", get(handle_metrics))
1293 .route("/pair", post(handle_pair))
1294 .route("/pair/code", get(handle_pair_code))
1295 .route("/webhook", post(handle_webhook))
1296 .route("/whatsapp", get(handle_whatsapp_verify))
1297 .route("/whatsapp", post(handle_whatsapp_message))
1298 .route("/linq", post(handle_linq_webhook))
1299 .route("/wati", get(handle_wati_verify))
1300 .route("/wati", post(handle_wati_webhook))
1301 .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
1302 .route("/webhook/gmail", post(handle_gmail_push_webhook))
1303 .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1305 .route("/api/status", get(api::handle_api_status))
1307 .route("/api/logs", get(api_logs::handle_api_logs))
1308 .route(
1309 "/api/config",
1310 get(api_config::handle_config_get)
1311 .patch(api_config::handle_patch)
1312 .options(api_config::handle_options_config),
1313 )
1314 .route(
1315 "/api/config/prop",
1316 get(api_config::handle_prop_get)
1317 .put(api_config::handle_prop_put)
1318 .delete(api_config::handle_prop_delete)
1319 .options(api_config::handle_options_prop),
1320 )
1321 .route("/api/config/list", get(api_config::handle_list))
1322 .route("/api/config/drift", get(api_config::handle_drift))
1323 .route(
1324 "/api/config/reload-status",
1325 get(api_config::handle_reload_status),
1326 )
1327 .route("/api/config/templates", get(api_config::handle_templates))
1328 .route("/api/config/map-keys", get(api_config::handle_get_map_keys))
1329 .route(
1330 "/api/config/map-key",
1331 post(api_config::handle_map_key).delete(api_config::handle_delete_map_key),
1332 )
1333 .route("/api/config/rename-map-key", post(api_config::handle_rename_map_key))
1334 .route("/api/onboard/catalog", get(api_onboard::handle_catalog))
1335 .route(
1336 "/api/onboard/catalog/models",
1337 get(api_onboard::handle_catalog_models),
1338 )
1339 .route("/api/onboard/status", get(api_onboard::handle_onboard_status))
1340 .route(
1341 "/api/onboard/agent-options",
1342 get(api_onboard::handle_agent_options),
1343 )
1344 .route("/api/onboard/sections", get(api_onboard::handle_sections))
1345 .route(
1346 "/api/onboard/sections/{section}",
1347 get(api_onboard::handle_section_picker),
1348 )
1349 .route(
1350 "/api/onboard/sections/{section}/items/{key}",
1351 post(api_onboard::handle_section_select),
1352 )
1353 .route("/api/personality", get(api_personality::handle_index))
1354 .route(
1355 "/api/personality/templates",
1356 get(api_personality::handle_templates),
1357 )
1358 .route(
1359 "/api/personality/{filename}",
1360 get(api_personality::handle_get).put(api_personality::handle_put),
1361 )
1362 .route("/api/browse", get(api_browse::handle_browse))
1363 .route("/api/browse/mkdir", post(api_browse::handle_browse_mkdir))
1364 .route("/api/browse/rmdir", delete(api_browse::handle_browse_rmdir))
1365 .route(
1366 "/api/agents/{alias}/workspace/list",
1367 get(api_browse::handle_agent_workspace_list),
1368 )
1369 .route(
1370 "/api/agents/{alias}/workspace/read",
1371 get(api_browse::handle_agent_workspace_read),
1372 )
1373 .route(
1374 "/api/agents/{alias}/workspace/path",
1375 delete(api_browse::handle_agent_workspace_delete),
1376 )
1377 .route(
1378 "/api/agents/{alias}/workspace/move",
1379 post(api_browse::handle_agent_workspace_move),
1380 )
1381 .route(
1382 "/api/agents/{alias}/workspace/mkdir",
1383 post(api_browse::handle_agent_workspace_mkdir),
1384 )
1385 .route("/api/skills/bundles", get(api_skills::handle_list_bundles))
1386 .route(
1387 "/api/skills/bundles/{alias}/skills",
1388 get(api_skills::handle_list_skills).post(api_skills::handle_create_skill),
1389 )
1390 .route(
1391 "/api/skills/bundles/{alias}/skills/{name}",
1392 get(api_skills::handle_read_skill)
1393 .put(api_skills::handle_write_skill)
1394 .delete(api_skills::handle_delete_skill),
1395 )
1396 .route("/api/config/init", post(api_config::handle_init))
1397 .route("/api/config/migrate", post(api_config::handle_migrate))
1398 .route("/api/openapi.json", get(openapi::handle_openapi_json))
1399 .route("/api/docs", get(openapi::handle_docs))
1400 .route("/api/tools", get(api::handle_api_tools))
1401 .route("/api/cron", get(api::handle_api_cron_list))
1402 .route("/api/cron", post(api::handle_api_cron_add))
1403 .route(
1404 "/api/cron/settings",
1405 get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1406 )
1407 .route(
1408 "/api/cron/{id}",
1409 delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1410 )
1411 .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1412 .route("/api/integrations", get(api::handle_api_integrations))
1416 .route(
1417 "/api/integrations/settings",
1418 get(api::handle_api_integrations_settings),
1419 )
1420 .route(
1421 "/api/doctor",
1422 get(api::handle_api_doctor).post(api::handle_api_doctor),
1423 )
1424 .route("/api/memory", get(api::handle_api_memory_list))
1425 .route("/api/memory", post(api::handle_api_memory_store))
1426 .route("/api/memory/{key}", delete(api::handle_api_memory_delete))
1427 .route("/api/cost", get(api::handle_api_cost))
1428 .route("/api/cli-tools", get(api::handle_api_cli_tools))
1429 .route("/api/channels", get(api::handle_api_channels))
1430 .route("/api/health", get(api::handle_api_health))
1431 .route("/api/sessions", get(api::handle_api_sessions_list))
1432 .route("/api/sessions/running", get(api::handle_api_sessions_running))
1433 .route(
1434 "/api/sessions/{id}/messages",
1435 get(api::handle_api_session_messages).post(api::handle_api_session_message_post),
1436 )
1437 .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1438 .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1439 .route("/api/sessions/{id}/abort", post(api::handle_api_session_abort))
1440 .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1442 .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1443 .route("/api/devices", get(api_pairing::list_devices))
1444 .route(
1445 "/api/devices/me/capabilities",
1446 post(api_pairing::update_my_capabilities),
1447 )
1448 .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1449 .route(
1450 "/api/devices/{id}/token/rotate",
1451 post(api_pairing::rotate_token),
1452 )
1453 .route("/api/canvas", get(canvas::handle_canvas_list))
1455 .route(
1456 "/api/canvas/{id}",
1457 get(canvas::handle_canvas_get)
1458 .post(canvas::handle_canvas_post)
1459 .delete(canvas::handle_canvas_clear),
1460 )
1461 .route(
1462 "/api/canvas/{id}/history",
1463 get(canvas::handle_canvas_history),
1464 );
1465
1466 #[cfg(feature = "webauthn")]
1468 let inner = inner
1469 .route(
1470 "/api/webauthn/register/start",
1471 post(api_webauthn::handle_register_start),
1472 )
1473 .route(
1474 "/api/webauthn/register/finish",
1475 post(api_webauthn::handle_register_finish),
1476 )
1477 .route(
1478 "/api/webauthn/auth/start",
1479 post(api_webauthn::handle_auth_start),
1480 )
1481 .route(
1482 "/api/webauthn/auth/finish",
1483 post(api_webauthn::handle_auth_finish),
1484 )
1485 .route(
1486 "/api/webauthn/credentials",
1487 get(api_webauthn::handle_list_credentials),
1488 )
1489 .route(
1490 "/api/webauthn/credentials/{id}",
1491 delete(api_webauthn::handle_delete_credential),
1492 );
1493
1494 #[cfg(feature = "plugins-wasm")]
1496 let inner = inner.route(
1497 "/api/plugins",
1498 get(api_plugins::plugin_routes::list_plugins),
1499 );
1500
1501 let inner = inner
1502 .route("/api/events", get(sse::handle_sse_events))
1504 .route("/api/events/history", get(sse::handle_events_history))
1505 .route("/acp", get(acp::handle_ws_acp))
1507 .route("/ws/chat", get(ws::handle_ws_chat))
1509 .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1511 .route("/ws/nodes", get(nodes::handle_ws_nodes))
1513 .route("/_app/{*path}", get(static_files::handle_static))
1515 .fallback(get(static_files::handle_spa_fallback))
1517 .with_state(state.clone())
1518 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1519 .layer(TimeoutLayer::with_status_code(
1520 StatusCode::REQUEST_TIMEOUT,
1521 Duration::from_secs(gateway_request_timeout_secs(&config.gateway)),
1522 ));
1523
1524 let cron_run_router: Router = Router::new()
1529 .route("/api/cron/{id}/run", post(api::handle_api_cron_run))
1530 .with_state(state)
1531 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1532 .layer(TimeoutLayer::with_status_code(
1533 StatusCode::REQUEST_TIMEOUT,
1534 Duration::from_secs(gateway_long_running_request_timeout_secs(&config.gateway)),
1535 ));
1536
1537 let inner = inner.merge(cron_run_router);
1538
1539 let app = if let Some(prefix) = path_prefix {
1543 let redirect_target = prefix.to_string();
1544 Router::new().nest(prefix, inner).route(
1545 &format!("{prefix}/"),
1546 get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1547 )
1548 } else {
1549 inner
1550 };
1551
1552 let tls_acceptor = match &config.gateway.tls {
1554 Some(tls_cfg) if tls_cfg.enabled => {
1555 let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1556 if has_mtls {
1557 ::zeroclaw_log::record!(
1558 INFO,
1559 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1560 "TLS enabled with mutual TLS (mTLS) client verification"
1561 );
1562 } else {
1563 ::zeroclaw_log::record!(
1564 INFO,
1565 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1566 "TLS enabled (no client certificate requirement)"
1567 );
1568 }
1569 Some(tls::build_tls_acceptor(tls_cfg)?)
1570 }
1571 _ => None,
1572 };
1573
1574 if let Some(tls_acceptor) = tls_acceptor {
1575 let app = app.into_make_service_with_connect_info::<SocketAddr>();
1577 let mut app = app;
1578
1579 let mut shutdown_signal = shutdown_rx;
1580 loop {
1581 tokio::select! {
1582 conn = listener.accept() => {
1583 let (tcp_stream, remote_addr) = conn?;
1584 let tls_acceptor = tls_acceptor.clone();
1585 let svc = tower::MakeService::<
1586 SocketAddr,
1587 hyper::Request<hyper::body::Incoming>,
1588 >::make_service(&mut app, remote_addr)
1589 .await
1590 .expect("infallible make_service");
1591
1592 tokio::spawn(async move {
1593 let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1594 Ok(s) => s,
1595 Err(e) => {
1596 ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"error": format!("{}", e), "remote_addr": remote_addr})), "TLS handshake failed from");
1597 return;
1598 }
1599 };
1600 let io = hyper_util::rt::TokioIo::new(tls_stream);
1601 let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1602 let mut svc = svc.clone();
1603 async move {
1604 tower::Service::call(&mut svc, req).await
1605 }
1606 });
1607 if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1608 hyper_util::rt::TokioExecutor::new(),
1609 )
1610 .serve_connection(io, hyper_svc)
1611 .await
1612 {
1613 ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"error": format!("{}", e), "remote_addr": remote_addr})), "connection error from");
1614 }
1615 });
1616 }
1617 _ = shutdown_signal.changed() => {
1618 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "ZeroClaw Gateway shutting down");
1619 break;
1620 }
1621 }
1622 }
1623 } else {
1624 axum::serve(
1626 listener,
1627 app.into_make_service_with_connect_info::<SocketAddr>(),
1628 )
1629 .with_graceful_shutdown(async move {
1630 let _ = shutdown_rx.changed().await;
1631 ::zeroclaw_log::record!(
1632 INFO,
1633 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1634 "ZeroClaw Gateway shutting down"
1635 );
1636 })
1637 .await?;
1638 }
1639
1640 drop(broadcast_hook_guard);
1641
1642 Ok(())
1643}
1644
1645fn format_paircode_recovery_command(host: &str, port: u16) -> String {
1646 let mut cmd = format!("zeroclaw gateway get-paircode --new --port {port}");
1647 if let Some(host_arg) = paircode_recovery_host_arg(host) {
1648 cmd.push_str(" --host ");
1649 cmd.push_str(host_arg);
1650 }
1651 cmd
1652}
1653
1654fn paircode_recovery_host_arg(host: &str) -> Option<&str> {
1655 match host {
1656 "127.0.0.1" | "localhost" | "::1" | "0.0.0.0" | "::" => None,
1657 _ => Some(host),
1658 }
1659}
1660
1661fn format_paircode_recovery_curl(host: &str, port: u16, path_prefix: &str) -> String {
1662 format!("curl -s -X POST http://{host}:{port}{path_prefix}/admin/paircode/new")
1663}
1664
1665async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1671 let body = serde_json::json!({
1672 "status": "ok",
1673 "paired": state.pairing.is_paired(),
1674 "require_pairing": state.pairing.require_pairing(),
1675 "runtime": zeroclaw_runtime::health::snapshot_json(),
1676 });
1677 Json(body)
1678}
1679
1680const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1682
1683fn prometheus_disabled_hint() -> String {
1684 String::from(
1685 "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1686 )
1687}
1688
1689#[cfg(feature = "observability-prometheus")]
1690fn prometheus_observer_from_state(
1691 observer: &dyn zeroclaw_runtime::observability::Observer,
1692) -> Option<&zeroclaw_runtime::observability::PrometheusObserver> {
1693 observer
1697 .as_any()
1698 .downcast_ref::<zeroclaw_runtime::observability::PrometheusObserver>()
1699}
1700
1701async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1703 let body = {
1704 #[cfg(feature = "observability-prometheus")]
1705 {
1706 if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1707 prom.encode()
1708 } else {
1709 prometheus_disabled_hint()
1710 }
1711 }
1712 #[cfg(not(feature = "observability-prometheus"))]
1713 {
1714 let _ = &state;
1715 prometheus_disabled_hint()
1716 }
1717 };
1718
1719 (
1720 StatusCode::OK,
1721 [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1722 body,
1723 )
1724}
1725
1726#[axum::debug_handler]
1728async fn handle_pair(
1729 State(state): State<AppState>,
1730 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1731 headers: HeaderMap,
1732) -> impl IntoResponse {
1733 let rate_key =
1734 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1735 if !state.rate_limiter.allow_pair(&rate_key) {
1736 ::zeroclaw_log::record!(
1737 WARN,
1738 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1739 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1740 "/pair rate limit exceeded"
1741 );
1742 let err = serde_json::json!({
1743 "error": "Too many pairing requests. Please retry later.",
1744 "retry_after": RATE_LIMIT_WINDOW_SECS,
1745 });
1746 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1747 }
1748
1749 if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key) {
1751 ::zeroclaw_log::record!(
1752 WARN,
1753 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1754 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1755 .with_attrs(::serde_json::json!({"rate_key": rate_key})),
1756 "pairing auth rate limit exceeded"
1757 );
1758 let err = serde_json::json!({
1759 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1760 "retry_after": e.retry_after_secs,
1761 });
1762 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1763 }
1764
1765 let code = headers
1766 .get("X-Pairing-Code")
1767 .and_then(|v| v.to_str().ok())
1768 .unwrap_or("");
1769
1770 match state.pairing.try_pair(code, &rate_key).await {
1771 Ok(Some(token)) => {
1772 ::zeroclaw_log::record!(
1773 INFO,
1774 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1775 "new client paired successfully"
1776 );
1777 if let Err(err) =
1778 Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
1779 {
1780 ::zeroclaw_log::record!(
1781 ERROR,
1782 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1783 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1784 .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
1785 "pairing succeeded but token persistence failed"
1786 );
1787 let body = serde_json::json!({
1788 "paired": true,
1789 "persisted": false,
1790 "token": token,
1791 "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
1792 });
1793 return (StatusCode::OK, Json(body));
1794 }
1795
1796 let body = serde_json::json!({
1797 "paired": true,
1798 "persisted": true,
1799 "token": token,
1800 "message": "Save this token — use it as Authorization: Bearer <token>"
1801 });
1802 (StatusCode::OK, Json(body))
1803 }
1804 Ok(None) => {
1805 state.auth_limiter.record_attempt(&rate_key);
1806 ::zeroclaw_log::record!(
1807 WARN,
1808 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1809 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1810 "pairing attempt with invalid code"
1811 );
1812 let err = serde_json::json!({"error": "Invalid pairing code"});
1813 (StatusCode::FORBIDDEN, Json(err))
1814 }
1815 Err(lockout_secs) => {
1816 ::zeroclaw_log::record!(
1817 WARN,
1818 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1819 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1820 .with_attrs(::serde_json::json!({"lockout_secs": lockout_secs})),
1821 "pairing locked out; too many failed attempts"
1822 );
1823 let err = serde_json::json!({
1824 "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
1825 "retry_after": lockout_secs
1826 });
1827 (StatusCode::TOO_MANY_REQUESTS, Json(err))
1828 }
1829 }
1830}
1831
1832async fn persist_pairing_tokens(config: Arc<RwLock<Config>>, pairing: &PairingGuard) -> Result<()> {
1833 let paired_tokens = pairing.tokens();
1834 let mut updated_cfg = { config.read().clone() };
1837 updated_cfg.gateway.paired_tokens = paired_tokens;
1838 updated_cfg.mark_dirty("gateway.paired-tokens");
1839 updated_cfg
1840 .save_dirty()
1841 .await
1842 .context("Failed to persist paired tokens to config.toml")?;
1843
1844 *config.write() = updated_cfg;
1846 Ok(())
1847}
1848
1849struct GatewayChatOutcome {
1854 response: String,
1855 input_tokens: Option<u64>,
1856 output_tokens: Option<u64>,
1857 cost_usd: Option<f64>,
1858}
1859
1860fn needs_onboarding_for(model: &str) -> Option<anyhow::Error> {
1867 if model.trim().is_empty() {
1868 ::zeroclaw_log::record!(
1869 WARN,
1870 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
1871 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1872 "gateway dispatch refused: no model configured (browser onboarding incomplete)"
1873 );
1874 Some(anyhow::Error::msg(
1875 "needs_onboarding: gateway has no model configured. Complete \
1876 browser onboarding at /onboard, or set [providers.models.<type>.<alias>] \
1877 model = \"...\" before sending messages.",
1878 ))
1879 } else {
1880 None
1881 }
1882}
1883
1884fn is_needs_onboarding_err(e: &anyhow::Error) -> bool {
1889 e.to_string().contains("needs_onboarding")
1890}
1891
1892fn needs_onboarding_channel_reply() -> String {
1898 i18n::get_required_cli_string("channel-needs-onboarding-reply")
1899}
1900
1901async fn run_gateway_chat_with_tools(
1903 state: &AppState,
1904 message: &str,
1905 session_id: Option<&str>,
1906) -> anyhow::Result<GatewayChatOutcome> {
1907 if let Some(err) = needs_onboarding_for(&state.model) {
1908 return Err(err);
1909 }
1910
1911 #[cfg(test)]
1916 {
1917 let _ = session_id;
1918 let response = state
1919 .model_provider
1920 .chat_with_system(None, message, &state.model, state.temperature)
1921 .await?;
1922 Ok(GatewayChatOutcome {
1923 response,
1924 input_tokens: None,
1925 output_tokens: None,
1926 cost_usd: None,
1927 })
1928 }
1929
1930 #[cfg(not(test))]
1931 {
1932 let config = state.config.read().clone();
1933 let agent_alias = config
1938 .agents
1939 .keys()
1940 .find(|k| k.as_str() == "default")
1941 .or_else(|| {
1942 config
1943 .agents
1944 .iter()
1945 .find(|(_, a)| a.enabled)
1946 .map(|(alias, _)| alias)
1947 })
1948 .ok_or_else(|| {
1949 ::zeroclaw_log::record!(
1950 WARN,
1951 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
1952 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1953 "webhook chat rejected: no configured [agents.<alias>] entry"
1954 );
1955 anyhow::Error::msg(
1956 "webhook chat requires at least one configured [agents.<alias>] entry",
1957 )
1958 })?
1959 .clone();
1960
1961 let cost_tracking_context = state.cost_tracker.as_ref().map(|tracker| {
1972 let pricing: zeroclaw_runtime::agent::cost::ModelProviderPricing = config
1973 .providers
1974 .models
1975 .iter_entries()
1976 .filter(|(_, _, base)| !base.pricing.is_empty())
1977 .map(|(type_k, alias_k, base)| {
1978 (format!("{type_k}.{alias_k}"), base.pricing.clone())
1979 })
1980 .collect();
1981 zeroclaw_runtime::agent::cost::ToolLoopCostTrackingContext::new(
1982 tracker.clone(),
1983 std::sync::Arc::new(pricing),
1984 )
1985 .with_agent_alias(&agent_alias)
1986 });
1987 let captured_usage = cost_tracking_context
1988 .as_ref()
1989 .map(|ctx| ctx.turn_usage.clone());
1990 let response = Box::pin(
1991 zeroclaw_runtime::agent::cost::TOOL_LOOP_COST_TRACKING_CONTEXT.scope(
1992 cost_tracking_context,
1993 zeroclaw_runtime::agent::process_message(config, &agent_alias, message, session_id),
1994 ),
1995 )
1996 .await?;
1997 let usage = captured_usage
1998 .map(|cell| *cell.lock())
1999 .filter(|u| u.input_tokens > 0 || u.output_tokens > 0);
2000 let (input_tokens, output_tokens, cost_usd) = match usage {
2001 Some(u) => (
2002 Some(u.input_tokens),
2003 Some(u.output_tokens),
2004 Some(u.cost_usd),
2005 ),
2006 None => (None, None, None),
2007 };
2008 Ok(GatewayChatOutcome {
2009 response,
2010 input_tokens,
2011 output_tokens,
2012 cost_usd,
2013 })
2014 }
2015}
2016
2017#[derive(serde::Deserialize)]
2019pub struct WebhookBody {
2020 pub message: String,
2021}
2022
2023async fn handle_webhook(
2025 State(state): State<AppState>,
2026 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
2027 headers: HeaderMap,
2028 body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
2029) -> impl IntoResponse {
2030 let rate_key =
2031 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
2032 if !state.rate_limiter.allow_webhook(&rate_key) {
2033 ::zeroclaw_log::record!(
2034 WARN,
2035 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2036 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2037 "/webhook rate limit exceeded"
2038 );
2039 let err = serde_json::json!({
2040 "error": "Too many webhook requests. Please retry later.",
2041 "retry_after": RATE_LIMIT_WINDOW_SECS,
2042 });
2043 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2044 }
2045
2046 if state.pairing.require_pairing() {
2048 if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key) {
2049 ::zeroclaw_log::record!(
2050 WARN,
2051 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2052 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2053 .with_attrs(::serde_json::json!({"rate_key": rate_key})),
2054 "webhook: auth rate limit exceeded for"
2055 );
2056 let err = serde_json::json!({
2057 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
2058 "retry_after": e.retry_after_secs,
2059 });
2060 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2061 }
2062 let auth = headers
2063 .get(header::AUTHORIZATION)
2064 .and_then(|v| v.to_str().ok())
2065 .unwrap_or("");
2066 let token = auth.strip_prefix("Bearer ").unwrap_or("");
2067 if !state.pairing.is_authenticated(token) {
2068 state.auth_limiter.record_attempt(&rate_key);
2069 ::zeroclaw_log::record!(
2070 WARN,
2071 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2072 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2073 "webhook: rejected — not paired / invalid bearer token"
2074 );
2075 let err = serde_json::json!({
2076 "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
2077 });
2078 return (StatusCode::UNAUTHORIZED, Json(err));
2079 }
2080 }
2081
2082 if let Some(ref secret_hash) = state.webhook_secret_hash {
2084 let header_hash = headers
2085 .get("X-Webhook-Secret")
2086 .and_then(|v| v.to_str().ok())
2087 .map(str::trim)
2088 .filter(|value| !value.is_empty())
2089 .map(hash_webhook_secret);
2090 match header_hash {
2091 Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
2092 _ => {
2093 ::zeroclaw_log::record!(
2094 WARN,
2095 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2096 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2097 "webhook: rejected request — invalid or missing X-Webhook-Secret"
2098 );
2099 let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
2100 return (StatusCode::UNAUTHORIZED, Json(err));
2101 }
2102 }
2103 }
2104
2105 let Json(webhook_body) = match body {
2107 Ok(b) => b,
2108 Err(e) => {
2109 ::zeroclaw_log::record!(
2110 WARN,
2111 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2112 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2113 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2114 "webhook JSON parse error"
2115 );
2116 let err = serde_json::json!({
2117 "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
2118 });
2119 return (StatusCode::BAD_REQUEST, Json(err));
2120 }
2121 };
2122
2123 if let Some(idempotency_key) = headers
2125 .get("X-Idempotency-Key")
2126 .and_then(|v| v.to_str().ok())
2127 .map(str::trim)
2128 .filter(|value| !value.is_empty())
2129 && !state.idempotency_store.record_if_new(idempotency_key)
2130 {
2131 ::zeroclaw_log::record!(
2132 INFO,
2133 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2134 .with_attrs(::serde_json::json!({"idempotency_key": idempotency_key})),
2135 "webhook duplicate ignored"
2136 );
2137 let body = serde_json::json!({
2138 "status": "duplicate",
2139 "idempotent": true,
2140 "message": "Request already processed for this idempotency key"
2141 });
2142 return (StatusCode::OK, Json(body));
2143 }
2144
2145 let message = &webhook_body.message;
2146 let session_id = webhook_session_id(&headers);
2147
2148 if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(message) {
2149 let key = webhook_memory_key();
2150 let _ = state
2151 .mem
2152 .store(
2153 &key,
2154 message,
2155 MemoryCategory::Conversation,
2156 session_id.as_deref(),
2157 )
2158 .await;
2159 }
2160
2161 let provider_label = state
2162 .config
2163 .read()
2164 .first_model_provider_type()
2165 .unwrap_or("unknown")
2166 .to_string();
2167 let model_label = state.model.clone();
2168 let started_at = Instant::now();
2169
2170 state.observer.record_event(
2171 &zeroclaw_runtime::observability::ObserverEvent::AgentStart {
2172 model_provider: provider_label.clone(),
2173 model: model_label.clone(),
2174 },
2175 );
2176 state.observer.record_event(
2177 &zeroclaw_runtime::observability::ObserverEvent::LlmRequest {
2178 model_provider: provider_label.clone(),
2179 model: model_label.clone(),
2180 messages_count: 1,
2181 },
2182 );
2183
2184 match run_gateway_chat_with_tools(&state, message, session_id.as_deref()).await {
2185 Ok(GatewayChatOutcome {
2186 response,
2187 input_tokens,
2188 output_tokens,
2189 cost_usd,
2190 }) => {
2191 let duration = started_at.elapsed();
2192 let tokens_used = input_tokens
2197 .zip(output_tokens)
2198 .map(|(i, o)| i + o)
2199 .or(input_tokens)
2200 .or(output_tokens);
2201 state.observer.record_event(
2202 &zeroclaw_runtime::observability::ObserverEvent::LlmResponse {
2203 model_provider: provider_label.clone(),
2204 model: model_label.clone(),
2205 duration,
2206 success: true,
2207 error_message: None,
2208 input_tokens: None,
2209 output_tokens: None,
2210 },
2211 );
2212 state.observer.record_metric(
2213 &zeroclaw_runtime::observability::traits::ObserverMetric::RequestLatency(duration),
2214 );
2215 state.observer.record_event(
2216 &zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
2217 model_provider: provider_label,
2218 model: model_label,
2219 duration,
2220 tokens_used,
2221 cost_usd,
2222 },
2223 );
2224
2225 let body = serde_json::json!({"response": response, "model": state.model});
2226 (StatusCode::OK, Json(body))
2227 }
2228 Err(e) => {
2229 let duration = started_at.elapsed();
2230 let sanitized = zeroclaw_providers::sanitize_api_error(&e.to_string());
2231
2232 state.observer.record_event(
2233 &zeroclaw_runtime::observability::ObserverEvent::LlmResponse {
2234 model_provider: provider_label.clone(),
2235 model: model_label.clone(),
2236 duration,
2237 success: false,
2238 error_message: Some(sanitized.clone()),
2239 input_tokens: None,
2240 output_tokens: None,
2241 },
2242 );
2243 state.observer.record_metric(
2244 &zeroclaw_runtime::observability::traits::ObserverMetric::RequestLatency(duration),
2245 );
2246 state
2247 .observer
2248 .record_event(&zeroclaw_runtime::observability::ObserverEvent::Error {
2249 component: "gateway".to_string(),
2250 message: sanitized.clone(),
2251 });
2252 state.observer.record_event(
2253 &zeroclaw_runtime::observability::ObserverEvent::AgentEnd {
2254 model_provider: provider_label,
2255 model: model_label,
2256 duration,
2257 tokens_used: None,
2258 cost_usd: None,
2259 },
2260 );
2261
2262 if is_needs_onboarding_err(&e) {
2263 ::zeroclaw_log::record!(
2264 WARN,
2265 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2266 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2267 "Webhook chat refused: gateway has no model configured; \
2268 visit /onboard"
2269 );
2270 let body = serde_json::json!({
2271 "error": "needs_onboarding",
2272 "url": "/onboard"
2273 });
2274 (StatusCode::SERVICE_UNAVAILABLE, Json(body))
2275 } else {
2276 ::zeroclaw_log::record!(
2277 ERROR,
2278 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2279 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2280 .with_attrs(::serde_json::json!({"error": sanitized})),
2281 "webhook model_provider error"
2282 );
2283 let err = serde_json::json!({"error": "LLM request failed"});
2284 (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
2285 }
2286 }
2287 }
2288}
2289
2290#[derive(serde::Deserialize)]
2292pub struct WhatsAppVerifyQuery {
2293 #[serde(rename = "hub.mode")]
2294 pub mode: Option<String>,
2295 #[serde(rename = "hub.verify_token")]
2296 pub verify_token: Option<String>,
2297 #[serde(rename = "hub.challenge")]
2298 pub challenge: Option<String>,
2299}
2300
2301async fn handle_whatsapp_verify(
2303 State(state): State<AppState>,
2304 Query(params): Query<WhatsAppVerifyQuery>,
2305) -> impl IntoResponse {
2306 let Some(ref wa) = state.whatsapp else {
2307 return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
2308 };
2309
2310 let token_matches = params
2312 .verify_token
2313 .as_deref()
2314 .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
2315 if params.mode.as_deref() == Some("subscribe") && token_matches {
2316 if let Some(ch) = params.challenge {
2317 ::zeroclaw_log::record!(
2318 INFO,
2319 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2320 .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2321 "webhook verified successfully"
2322 );
2323 return (StatusCode::OK, ch);
2324 }
2325 return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
2326 }
2327
2328 ::zeroclaw_log::record!(
2329 WARN,
2330 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2331 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2332 .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2333 "webhook verification failed — token mismatch"
2334 );
2335 (StatusCode::FORBIDDEN, "Forbidden".to_string())
2336}
2337
2338pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
2342 use hmac::{Hmac, Mac};
2343 use sha2::Sha256;
2344
2345 let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
2347 return false;
2348 };
2349
2350 let Ok(expected) = hex::decode(hex_sig) else {
2352 return false;
2353 };
2354
2355 let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
2357 return false;
2358 };
2359 mac.update(body);
2360
2361 mac.verify_slice(&expected).is_ok()
2363}
2364
2365async fn handle_whatsapp_message(
2367 State(state): State<AppState>,
2368 headers: HeaderMap,
2369 body: Bytes,
2370) -> impl IntoResponse {
2371 let Some(ref wa) = state.whatsapp else {
2372 return (
2373 StatusCode::NOT_FOUND,
2374 Json(serde_json::json!({"error": "WhatsApp not configured"})),
2375 );
2376 };
2377
2378 if let Some(ref app_secret) = state.whatsapp_app_secret {
2380 let signature = headers
2381 .get("X-Hub-Signature-256")
2382 .and_then(|v| v.to_str().ok())
2383 .unwrap_or("");
2384
2385 if !verify_whatsapp_signature(app_secret, &body, signature) {
2386 ::zeroclaw_log::record!(
2387 WARN,
2388 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2389 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2390 .with_attrs(::serde_json::json!({"channel": "whatsapp"})),
2391 &format!(
2392 "webhook signature verification failed (signature: {})",
2393 if signature.is_empty() {
2394 "missing"
2395 } else {
2396 "invalid"
2397 }
2398 )
2399 );
2400 return (
2401 StatusCode::UNAUTHORIZED,
2402 Json(serde_json::json!({"error": "Invalid signature"})),
2403 );
2404 }
2405 }
2406
2407 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2409 return (
2410 StatusCode::BAD_REQUEST,
2411 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2412 );
2413 };
2414
2415 let messages = wa.parse_webhook_payload(&payload);
2417
2418 if messages.is_empty() {
2419 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2421 }
2422
2423 for msg in &messages {
2425 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "whatsapp", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2426
2427 if let Some((token, response)) = zeroclaw_channels::util::parse_approval_reply(&msg.content)
2429 {
2430 let mut map = wa.pending_approvals().lock().await;
2431 if let Some(sender) = map.remove(&token) {
2432 let _ = sender.send(response);
2433 continue;
2434 }
2435 }
2436
2437 let session_id = sender_session_id("whatsapp", msg);
2438
2439 if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2441 let key = whatsapp_memory_key(msg);
2442 let _ = state
2443 .mem
2444 .store(
2445 &key,
2446 &msg.content,
2447 MemoryCategory::Conversation,
2448 Some(&session_id),
2449 )
2450 .await;
2451 }
2452
2453 match Box::pin(run_gateway_chat_with_tools(
2454 &state,
2455 &msg.content,
2456 Some(&session_id),
2457 ))
2458 .await
2459 {
2460 Ok(GatewayChatOutcome { response, .. }) => {
2461 if let Err(e) = wa
2463 .send(&SendMessage::new(response, &msg.reply_target))
2464 .await
2465 {
2466 ::zeroclaw_log::record!(
2467 ERROR,
2468 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2469 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2470 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2471 "Failed to send WhatsApp reply"
2472 );
2473 }
2474 }
2475 Err(e) => {
2476 let reply = if is_needs_onboarding_err(&e) {
2477 ::zeroclaw_log::record!(
2478 WARN,
2479 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2480 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2481 "WhatsApp chat refused: gateway has no model configured; \
2482 visit /onboard"
2483 );
2484 needs_onboarding_channel_reply()
2485 } else {
2486 ::zeroclaw_log::record!(
2487 ERROR,
2488 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2489 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2490 .with_attrs(
2491 ::serde_json::json!({"channel": "whatsapp", "error": format!("{}", e)})
2492 ),
2493 "LLM error"
2494 );
2495 "Sorry, I couldn't process your message right now.".to_string()
2496 };
2497 let _ = wa.send(&SendMessage::new(reply, &msg.reply_target)).await;
2498 }
2499 }
2500 }
2501
2502 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2504}
2505
2506async fn handle_linq_webhook(
2508 State(state): State<AppState>,
2509 headers: HeaderMap,
2510 body: Bytes,
2511) -> impl IntoResponse {
2512 let Some(ref linq) = state.linq else {
2513 return (
2514 StatusCode::NOT_FOUND,
2515 Json(serde_json::json!({"error": "Linq not configured"})),
2516 );
2517 };
2518
2519 let body_str = String::from_utf8_lossy(&body);
2520
2521 if let Some(ref signing_secret) = state.linq_signing_secret {
2523 let timestamp = headers
2524 .get("X-Webhook-Timestamp")
2525 .and_then(|v| v.to_str().ok())
2526 .unwrap_or("");
2527
2528 let signature = headers
2529 .get("X-Webhook-Signature")
2530 .and_then(|v| v.to_str().ok())
2531 .unwrap_or("");
2532
2533 if !zeroclaw_channels::linq::verify_linq_signature(
2534 signing_secret,
2535 &body_str,
2536 timestamp,
2537 signature,
2538 ) {
2539 ::zeroclaw_log::record!(
2540 WARN,
2541 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2542 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2543 &format!(
2544 "Linq webhook signature verification failed (signature: {})",
2545 if signature.is_empty() {
2546 "missing"
2547 } else {
2548 "invalid"
2549 }
2550 )
2551 );
2552 return (
2553 StatusCode::UNAUTHORIZED,
2554 Json(serde_json::json!({"error": "Invalid signature"})),
2555 );
2556 }
2557 }
2558
2559 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2561 return (
2562 StatusCode::BAD_REQUEST,
2563 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2564 );
2565 };
2566
2567 let messages = linq.parse_webhook_payload(&payload);
2569
2570 if messages.is_empty() {
2571 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2573 }
2574
2575 for msg in &messages {
2577 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "linq", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2578 let session_id = sender_session_id("linq", msg);
2579
2580 if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2582 let key = linq_memory_key(msg);
2583 let _ = state
2584 .mem
2585 .store(
2586 &key,
2587 &msg.content,
2588 MemoryCategory::Conversation,
2589 Some(&session_id),
2590 )
2591 .await;
2592 }
2593
2594 match Box::pin(run_gateway_chat_with_tools(
2596 &state,
2597 &msg.content,
2598 Some(&session_id),
2599 ))
2600 .await
2601 {
2602 Ok(GatewayChatOutcome { response, .. }) => {
2603 if let Err(e) = linq
2605 .send(&SendMessage::new(response, &msg.reply_target))
2606 .await
2607 {
2608 ::zeroclaw_log::record!(
2609 ERROR,
2610 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2611 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2612 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2613 "Failed to send Linq reply"
2614 );
2615 }
2616 }
2617 Err(e) => {
2618 let reply = if is_needs_onboarding_err(&e) {
2619 ::zeroclaw_log::record!(
2620 WARN,
2621 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2622 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2623 "Linq chat refused: gateway has no model configured; \
2624 visit /onboard"
2625 );
2626 needs_onboarding_channel_reply()
2627 } else {
2628 ::zeroclaw_log::record!(
2629 ERROR,
2630 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2631 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2632 .with_attrs(
2633 ::serde_json::json!({"channel": "linq", "error": format!("{}", e)})
2634 ),
2635 "LLM error"
2636 );
2637 "Sorry, I couldn't process your message right now.".to_string()
2638 };
2639 let _ = linq.send(&SendMessage::new(reply, &msg.reply_target)).await;
2640 }
2641 }
2642 }
2643
2644 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2646}
2647
2648async fn handle_wati_verify(
2650 State(state): State<AppState>,
2651 Query(params): Query<WatiVerifyQuery>,
2652) -> impl IntoResponse {
2653 if state.wati.is_none() {
2654 return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
2655 }
2656
2657 if let Some(challenge) = params.challenge {
2659 ::zeroclaw_log::record!(
2660 INFO,
2661 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2662 .with_attrs(::serde_json::json!({"channel": "wati"})),
2663 "webhook verified successfully"
2664 );
2665 return (StatusCode::OK, challenge);
2666 }
2667
2668 (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
2669}
2670
2671#[derive(Debug, serde::Deserialize)]
2672pub struct WatiVerifyQuery {
2673 #[serde(rename = "hub.challenge")]
2674 pub challenge: Option<String>,
2675}
2676
2677async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
2679 let Some(ref wati) = state.wati else {
2680 return (
2681 StatusCode::NOT_FOUND,
2682 Json(serde_json::json!({"error": "WATI not configured"})),
2683 );
2684 };
2685
2686 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2688 return (
2689 StatusCode::BAD_REQUEST,
2690 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2691 );
2692 };
2693
2694 let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
2696
2697 let messages = if matches!(msg_type, "audio" | "voice") {
2698 if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
2700 wati.parse_audio_as_message(&payload, transcript)
2701 } else {
2702 vec![]
2703 }
2704 } else {
2705 wati.parse_webhook_payload(&payload)
2706 };
2707
2708 if messages.is_empty() {
2709 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2710 }
2711
2712 for msg in &messages {
2714 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "wati", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2715 let session_id = sender_session_id("wati", msg);
2716
2717 if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2719 let key = wati_memory_key(msg);
2720 let _ = state
2721 .mem
2722 .store(
2723 &key,
2724 &msg.content,
2725 MemoryCategory::Conversation,
2726 Some(&session_id),
2727 )
2728 .await;
2729 }
2730
2731 match Box::pin(run_gateway_chat_with_tools(
2733 &state,
2734 &msg.content,
2735 Some(&session_id),
2736 ))
2737 .await
2738 {
2739 Ok(GatewayChatOutcome { response, .. }) => {
2740 if let Err(e) = wati
2742 .send(&SendMessage::new(response, &msg.reply_target))
2743 .await
2744 {
2745 ::zeroclaw_log::record!(
2746 ERROR,
2747 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2748 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2749 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2750 "Failed to send WATI reply"
2751 );
2752 }
2753 }
2754 Err(e) => {
2755 let reply = if is_needs_onboarding_err(&e) {
2756 ::zeroclaw_log::record!(
2757 WARN,
2758 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2759 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2760 "WATI chat refused: gateway has no model configured; \
2761 visit /onboard"
2762 );
2763 needs_onboarding_channel_reply()
2764 } else {
2765 ::zeroclaw_log::record!(
2766 ERROR,
2767 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
2768 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2769 .with_attrs(
2770 ::serde_json::json!({"channel": "wati", "error": format!("{}", e)})
2771 ),
2772 "LLM error"
2773 );
2774 "Sorry, I couldn't process your message right now.".to_string()
2775 };
2776 let _ = wati.send(&SendMessage::new(reply, &msg.reply_target)).await;
2777 }
2778 }
2779 }
2780
2781 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2783}
2784
2785async fn handle_nextcloud_talk_webhook(
2787 State(state): State<AppState>,
2788 headers: HeaderMap,
2789 body: Bytes,
2790) -> impl IntoResponse {
2791 let Some(ref nextcloud_talk) = state.nextcloud_talk else {
2792 return (
2793 StatusCode::NOT_FOUND,
2794 Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
2795 );
2796 };
2797
2798 let body_str = String::from_utf8_lossy(&body);
2799
2800 if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
2802 let random = headers
2803 .get("X-Nextcloud-Talk-Random")
2804 .and_then(|v| v.to_str().ok())
2805 .unwrap_or("");
2806
2807 let signature = headers
2808 .get("X-Nextcloud-Talk-Signature")
2809 .and_then(|v| v.to_str().ok())
2810 .unwrap_or("");
2811
2812 if !zeroclaw_channels::nextcloud_talk::verify_nextcloud_talk_signature(
2813 webhook_secret,
2814 random,
2815 &body_str,
2816 signature,
2817 ) {
2818 ::zeroclaw_log::record!(
2819 WARN,
2820 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2821 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2822 &format!(
2823 "Nextcloud Talk webhook signature verification failed (signature: {})",
2824 if signature.is_empty() {
2825 "missing"
2826 } else {
2827 "invalid"
2828 }
2829 )
2830 );
2831 return (
2832 StatusCode::UNAUTHORIZED,
2833 Json(serde_json::json!({"error": "Invalid signature"})),
2834 );
2835 }
2836 }
2837
2838 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2840 return (
2841 StatusCode::BAD_REQUEST,
2842 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2843 );
2844 };
2845
2846 let messages = nextcloud_talk.parse_webhook_payload(&payload);
2848 if messages.is_empty() {
2849 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2851 }
2852
2853 for msg in messages {
2858 let state = state.clone();
2859 let nextcloud_talk = Arc::clone(nextcloud_talk);
2860 tokio::spawn(async move {
2861 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"channel": "nextcloud_talk", "sender": msg.sender, "content": msg.content})), "inbound webhook message");
2862 let session_id = sender_session_id("nextcloud_talk", &msg);
2863
2864 if state.auto_save && !zeroclaw_memory::should_skip_autosave_content(&msg.content) {
2865 let key = nextcloud_talk_memory_key(&msg);
2866 let _ = state
2867 .mem
2868 .store(
2869 &key,
2870 &msg.content,
2871 MemoryCategory::Conversation,
2872 Some(&session_id),
2873 )
2874 .await;
2875 }
2876
2877 match Box::pin(run_gateway_chat_with_tools(
2878 &state,
2879 &msg.content,
2880 Some(&session_id),
2881 ))
2882 .await
2883 {
2884 Ok(GatewayChatOutcome { response, .. }) => {
2885 if let Err(e) = nextcloud_talk
2886 .send(&SendMessage::new(response, &msg.reply_target))
2887 .await
2888 {
2889 ::zeroclaw_log::record!(
2890 ERROR,
2891 ::zeroclaw_log::Event::new(
2892 module_path!(),
2893 ::zeroclaw_log::Action::Fail
2894 )
2895 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
2896 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
2897 "Failed to send Nextcloud Talk reply"
2898 );
2899 }
2900 }
2901 Err(e) => {
2902 let reply = if is_needs_onboarding_err(&e) {
2903 ::zeroclaw_log::record!(
2904 WARN,
2905 ::zeroclaw_log::Event::new(
2906 module_path!(),
2907 ::zeroclaw_log::Action::Note
2908 )
2909 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
2910 "Nextcloud Talk chat refused: gateway has no model configured; \
2911 visit /onboard"
2912 );
2913 needs_onboarding_channel_reply()
2914 } else {
2915 ::zeroclaw_log::record!(ERROR, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail).with_outcome(::zeroclaw_log::EventOutcome::Failure).with_attrs(::serde_json::json!({"channel": "nextcloud_talk", "error": format!("{}", e)})), "LLM error");
2916 "Sorry, I couldn't process your message right now.".to_string()
2917 };
2918 let _ = nextcloud_talk
2919 .send(&SendMessage::new(reply, &msg.reply_target))
2920 .await;
2921 }
2922 }
2923 });
2924 }
2925
2926 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2927}
2928
2929const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
2932
2933async fn handle_gmail_push_webhook(
2935 State(state): State<AppState>,
2936 headers: HeaderMap,
2937 body: Bytes,
2938) -> impl IntoResponse {
2939 let Some(ref gmail_push) = state.gmail_push else {
2940 return (
2941 StatusCode::NOT_FOUND,
2942 Json(serde_json::json!({"error": "Gmail push not configured"})),
2943 );
2944 };
2945
2946 if body.len() > GMAIL_WEBHOOK_MAX_BODY {
2948 return (
2949 StatusCode::PAYLOAD_TOO_LARGE,
2950 Json(serde_json::json!({"error": "Request body too large"})),
2951 );
2952 }
2953
2954 let secret = gmail_push.config.webhook_secret.clone();
2956 if !secret.is_empty() {
2957 let provided = headers
2958 .get(axum::http::header::AUTHORIZATION)
2959 .and_then(|v| v.to_str().ok())
2960 .and_then(|auth| auth.strip_prefix("Bearer "))
2961 .unwrap_or("");
2962
2963 if provided != secret {
2964 ::zeroclaw_log::record!(
2965 WARN,
2966 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2967 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2968 .with_attrs(::serde_json::json!({"channel": "gmail_push"})),
2969 "webhook: unauthorized request"
2970 );
2971 return (
2972 StatusCode::UNAUTHORIZED,
2973 Json(serde_json::json!({"error": "Unauthorized"})),
2974 );
2975 }
2976 }
2977
2978 let body_str = String::from_utf8_lossy(&body);
2979 let envelope: zeroclaw_channels::gmail_push::PubSubEnvelope =
2980 match serde_json::from_str(&body_str) {
2981 Ok(e) => e,
2982 Err(e) => {
2983 ::zeroclaw_log::record!(
2984 WARN,
2985 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
2986 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
2987 .with_attrs(
2988 ::serde_json::json!({"error": format!("{}", e), "channel": "gmail_push"})
2989 ),
2990 "webhook: invalid payload"
2991 );
2992 return (
2993 StatusCode::BAD_REQUEST,
2994 Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
2995 );
2996 }
2997 };
2998
2999 let channel = Arc::clone(gmail_push);
3001 tokio::spawn(async move {
3002 if let Err(e) = channel.handle_notification(&envelope).await {
3003 ::zeroclaw_log::record!(
3004 ERROR,
3005 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
3006 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
3007 .with_attrs(
3008 ::serde_json::json!({"channel": "gmail_push", "error": format!("{}", e)})
3009 ),
3010 "push notification processing failed"
3011 );
3012 }
3013 });
3014
3015 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
3017}
3018
3019#[derive(serde::Serialize)]
3025struct AdminResponse {
3026 success: bool,
3027 message: String,
3028}
3029
3030fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
3032 if peer.ip().is_loopback() {
3033 Ok(())
3034 } else {
3035 Err((
3036 StatusCode::FORBIDDEN,
3037 Json(serde_json::json!({
3038 "error": "Admin endpoints are restricted to localhost"
3039 })),
3040 ))
3041 }
3042}
3043
3044async fn handle_admin_shutdown(
3046 State(state): State<AppState>,
3047 ConnectInfo(peer): ConnectInfo<SocketAddr>,
3048) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3049 require_localhost(&peer)?;
3050 ::zeroclaw_log::record!(
3051 INFO,
3052 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3053 "admin shutdown request received; initiating graceful shutdown"
3054 );
3055
3056 let body = AdminResponse {
3057 success: true,
3058 message: "Gateway shutdown initiated".to_string(),
3059 };
3060
3061 let _ = state.shutdown_tx.send(true);
3062
3063 Ok((StatusCode::OK, Json(body)))
3064}
3065
3066async fn handle_admin_reload(
3083 State(state): State<AppState>,
3084 ConnectInfo(peer): ConnectInfo<SocketAddr>,
3085) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3086 require_localhost(&peer)?;
3087
3088 let Some(reload_tx) = state.reload_tx.clone() else {
3089 return Err((
3090 StatusCode::SERVICE_UNAVAILABLE,
3091 Json(serde_json::json!({
3092 "error": "no daemon supervisor — running as standalone gateway. \
3093 Restart the process to pick up config changes."
3094 })),
3095 ));
3096 };
3097
3098 ::zeroclaw_log::record!(
3099 INFO,
3100 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3101 "admin reload request received"
3102 );
3103 state
3108 .pending_reload
3109 .store(false, std::sync::atomic::Ordering::Relaxed);
3110 let shutdown_tx = state.shutdown_tx.clone();
3123 tokio::spawn(async move {
3125 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3126 let _ = shutdown_tx.send(true);
3128 let _ = reload_tx.send(true);
3130 });
3131
3132 Ok((
3133 StatusCode::OK,
3134 Json(AdminResponse {
3135 success: true,
3136 message: "Daemon reload initiated".to_string(),
3137 }),
3138 ))
3139}
3140
3141async fn handle_admin_paircode(
3143 State(state): State<AppState>,
3144 ConnectInfo(peer): ConnectInfo<SocketAddr>,
3145) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3146 require_localhost(&peer)?;
3147 let code = state.pairing.pairing_code();
3148
3149 let body = if let Some(c) = code {
3150 serde_json::json!({
3151 "success": true,
3152 "pairing_required": state.pairing.require_pairing(),
3153 "pairing_code": c,
3154 "message": "Use this one-time code to pair"
3155 })
3156 } else {
3157 serde_json::json!({
3158 "success": true,
3159 "pairing_required": state.pairing.require_pairing(),
3160 "pairing_code": null,
3161 "message": if state.pairing.require_pairing() {
3162 "Pairing is active but no new code available (already paired or code expired)"
3163 } else {
3164 "Pairing is disabled for this gateway"
3165 }
3166 })
3167 };
3168
3169 Ok((StatusCode::OK, Json(body)))
3170}
3171
3172async fn handle_admin_paircode_new(
3174 State(state): State<AppState>,
3175 ConnectInfo(peer): ConnectInfo<SocketAddr>,
3176) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
3177 require_localhost(&peer)?;
3178 match state.pairing.generate_new_pairing_code() {
3179 Some(code) => {
3180 ::zeroclaw_log::record!(
3181 INFO,
3182 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
3183 "new pairing code generated via admin endpoint"
3184 );
3185 let body = serde_json::json!({
3186 "success": true,
3187 "pairing_required": state.pairing.require_pairing(),
3188 "pairing_code": code,
3189 "message": "New pairing code generated — use this one-time code to pair"
3190 });
3191 Ok((StatusCode::OK, Json(body)))
3192 }
3193 None => {
3194 let body = serde_json::json!({
3195 "success": false,
3196 "pairing_required": false,
3197 "pairing_code": null,
3198 "message": "Pairing is disabled for this gateway"
3199 });
3200 Ok((StatusCode::BAD_REQUEST, Json(body)))
3201 }
3202 }
3203}
3204
3205async fn handle_pair_code(State(state): State<AppState>) -> impl IntoResponse {
3213 let require = state.pairing.require_pairing();
3214 let is_paired = state.pairing.is_paired();
3215
3216 let code = if require && !is_paired {
3218 state.pairing.pairing_code()
3219 } else {
3220 None
3221 };
3222
3223 let body = serde_json::json!({
3224 "success": true,
3225 "pairing_required": require,
3226 "pairing_code": code,
3227 });
3228
3229 (StatusCode::OK, Json(body))
3230}
3231
3232#[cfg(test)]
3233mod tests {
3234 use super::*;
3235 use async_trait::async_trait;
3236 use axum::http::HeaderValue;
3237 use axum::response::IntoResponse;
3238 use http_body_util::BodyExt;
3239 use parking_lot::{Mutex, RwLock};
3240 use std::sync::atomic::{AtomicUsize, Ordering};
3241 use zeroclaw_api::channel::ChannelMessage;
3242 use zeroclaw_memory::{Memory, MemoryCategory, MemoryEntry};
3243 use zeroclaw_providers::ModelProvider;
3244
3245 fn generate_test_secret() -> String {
3247 let bytes: [u8; 32] = rand::random();
3248 hex::encode(bytes)
3249 }
3250
3251 #[test]
3252 fn security_body_limit_is_64kb() {
3253 assert_eq!(MAX_BODY_SIZE, 65_536);
3254 }
3255
3256 #[test]
3257 fn security_timeout_default_is_30_seconds() {
3258 assert_eq!(REQUEST_TIMEOUT_SECS, 30);
3259 }
3260
3261 #[test]
3262 fn gateway_timeout_uses_typed_config_default() {
3263 let cfg = zeroclaw_config::schema::GatewayConfig::default();
3264 assert_eq!(gateway_request_timeout_secs(&cfg), 30);
3265 }
3266
3267 #[test]
3268 fn paircode_recovery_command_includes_alternate_port() {
3269 assert_eq!(
3270 format_paircode_recovery_command("127.0.0.1", 42617),
3271 "zeroclaw gateway get-paircode --new --port 42617"
3272 );
3273 }
3274
3275 #[test]
3276 fn paircode_recovery_command_includes_specific_host_when_needed() {
3277 assert_eq!(
3278 format_paircode_recovery_command("192.168.1.20", 42617),
3279 "zeroclaw gateway get-paircode --new --port 42617 --host 192.168.1.20"
3280 );
3281 }
3282
3283 #[test]
3284 fn paircode_recovery_curl_targets_running_instance() {
3285 assert_eq!(
3286 format_paircode_recovery_curl("127.0.0.1", 42617, ""),
3287 "curl -s -X POST http://127.0.0.1:42617/admin/paircode/new"
3288 );
3289 }
3290
3291 #[test]
3292 fn paircode_recovery_curl_preserves_path_prefix() {
3293 assert_eq!(
3294 format_paircode_recovery_curl("127.0.0.1", 42617, "/gw"),
3295 "curl -s -X POST http://127.0.0.1:42617/gw/admin/paircode/new"
3296 );
3297 }
3298
3299 #[test]
3300 fn long_running_request_timeout_default_is_ten_minutes() {
3301 assert_eq!(LONG_RUNNING_REQUEST_TIMEOUT_SECS, 600);
3302 }
3303
3304 #[test]
3305 fn long_running_request_timeout_uses_typed_config_default() {
3306 let cfg = zeroclaw_config::schema::GatewayConfig::default();
3307 assert_eq!(gateway_long_running_request_timeout_secs(&cfg), 600);
3308 }
3309
3310 #[test]
3311 fn webhook_body_requires_message_field() {
3312 let valid = r#"{"message": "hello"}"#;
3313 let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
3314 assert!(parsed.is_ok());
3315 assert_eq!(parsed.unwrap().message, "hello");
3316
3317 let missing = r#"{"other": "field"}"#;
3318 let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
3319 assert!(parsed.is_err());
3320 }
3321
3322 #[test]
3323 fn whatsapp_query_fields_are_optional() {
3324 let q = WhatsAppVerifyQuery {
3325 mode: None,
3326 verify_token: None,
3327 challenge: None,
3328 };
3329 assert!(q.mode.is_none());
3330 }
3331
3332 #[test]
3333 fn app_state_is_clone() {
3334 fn assert_clone<T: Clone>() {}
3335 assert_clone::<AppState>();
3336 }
3337
3338 #[tokio::test]
3345 async fn run_gateway_starts_with_zero_agents() {
3346 let config = Config::default();
3349 assert!(
3350 config.agents.is_empty(),
3351 "regression assumes default Config has no agents",
3352 );
3353
3354 let handle =
3360 tokio::spawn(
3361 async move { run_gateway("127.0.0.1", 0, config, None, None, None).await },
3362 );
3363
3364 match tokio::time::timeout(
3365 std::time::Duration::from_millis(750),
3366 &mut Box::pin(async {
3367 let _ = tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3372 }),
3373 )
3374 .await
3375 {
3376 Ok(()) => {}
3377 Err(_) => panic!("test setup timed out before checking gateway state"),
3378 }
3379
3380 if handle.is_finished() {
3384 let result = handle.await.expect("task did not panic");
3385 panic!(
3386 "gateway exited during boot with zero agents — must stay up for reload/onboard: {:?}",
3387 result
3388 );
3389 }
3390 handle.abort();
3391 }
3392
3393 #[tokio::test]
3402 async fn run_gateway_starts_with_unresolved_agent_risk_profile() {
3403 use zeroclaw_config::schema::AliasedAgentConfig;
3404
3405 let mut config = Config::default();
3406 let agent = AliasedAgentConfig {
3409 enabled: true,
3410 risk_profile: "definitely_not_configured".to_string(),
3411 ..AliasedAgentConfig::default()
3412 };
3413 config.agents.insert("fake123".to_string(), agent);
3414
3415 let handle =
3416 tokio::spawn(
3417 async move { run_gateway("127.0.0.1", 0, config, None, None, None).await },
3418 );
3419
3420 match tokio::time::timeout(
3421 std::time::Duration::from_millis(750),
3422 &mut Box::pin(async {
3423 let _ = tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3424 }),
3425 )
3426 .await
3427 {
3428 Ok(()) => {}
3429 Err(_) => panic!("test setup timed out before checking gateway state"),
3430 }
3431
3432 if handle.is_finished() {
3433 let result = handle.await.expect("task did not panic");
3434 panic!(
3435 "gateway exited during boot when agent.risk_profile was unresolved \
3436 — must stay up so operator can fix via /admin/reload or /onboard: {:?}",
3437 result
3438 );
3439 }
3440 handle.abort();
3441 }
3442
3443 #[tokio::test]
3444 async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
3445 let state = AppState {
3446 config: Arc::new(RwLock::new(Config::default())),
3447 model_provider: Arc::new(MockModelProvider::default()),
3448 model: "test-model".into(),
3449 temperature: None,
3450 mem: Arc::new(MockMemory),
3451 auto_save: false,
3452 webhook_secret_hash: None,
3453 pairing: Arc::new(PairingGuard::new(false, &[])),
3454 trust_forwarded_headers: false,
3455 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3456 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3457 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3458 whatsapp: None,
3459 whatsapp_app_secret: None,
3460 linq: None,
3461 linq_signing_secret: None,
3462 nextcloud_talk: None,
3463 nextcloud_talk_webhook_secret: None,
3464 wati: None,
3465 gmail_push: None,
3466 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
3467 tools_registry: Arc::new(Vec::new()),
3468 cost_tracker: None,
3469 event_tx: tokio::sync::broadcast::channel(16).0,
3470 event_buffer: Arc::new(sse::EventBuffer::new(16)),
3471 shutdown_tx: tokio::sync::watch::channel(false).0,
3472 reload_tx: None,
3473 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3474 path_prefix: String::new(),
3475 web_dist_dir: None,
3476 session_backend: None,
3477 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
3478 8, 30, 600,
3479 )),
3480 device_registry: None,
3481 pending_pairings: None,
3482 canvas_store: CanvasStore::new(),
3483 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
3484 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3485 #[cfg(feature = "webauthn")]
3486 webauthn: None,
3487 };
3488
3489 let response = handle_metrics(State(state)).await.into_response();
3490 assert_eq!(response.status(), StatusCode::OK);
3491 assert_eq!(
3492 response
3493 .headers()
3494 .get(header::CONTENT_TYPE)
3495 .and_then(|value| value.to_str().ok()),
3496 Some(PROMETHEUS_CONTENT_TYPE)
3497 );
3498
3499 let body = response.into_body().collect().await.unwrap().to_bytes();
3500 let text = String::from_utf8(body.to_vec()).unwrap();
3501 assert!(text.contains("Prometheus backend not enabled"));
3502 }
3503
3504 #[cfg(feature = "observability-prometheus")]
3505 #[tokio::test]
3506 async fn metrics_endpoint_renders_prometheus_output() {
3507 let event_tx = tokio::sync::broadcast::channel(16).0;
3508 let prom = zeroclaw_runtime::observability::PrometheusObserver::new();
3509 zeroclaw_runtime::observability::Observer::record_event(
3510 &prom,
3511 &zeroclaw_runtime::observability::ObserverEvent::HeartbeatTick,
3512 );
3513
3514 let observer: Arc<dyn zeroclaw_runtime::observability::Observer> = Arc::new(prom);
3515 let state = AppState {
3516 config: Arc::new(RwLock::new(Config::default())),
3517 model_provider: Arc::new(MockModelProvider::default()),
3518 model: "test-model".into(),
3519 temperature: None,
3520 mem: Arc::new(MockMemory),
3521 auto_save: false,
3522 webhook_secret_hash: None,
3523 pairing: Arc::new(PairingGuard::new(false, &[])),
3524 trust_forwarded_headers: false,
3525 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3526 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3527 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3528 whatsapp: None,
3529 whatsapp_app_secret: None,
3530 linq: None,
3531 linq_signing_secret: None,
3532 nextcloud_talk: None,
3533 nextcloud_talk_webhook_secret: None,
3534 wati: None,
3535 gmail_push: None,
3536 observer,
3537 tools_registry: Arc::new(Vec::new()),
3538 cost_tracker: None,
3539 event_tx,
3540 event_buffer: Arc::new(sse::EventBuffer::new(16)),
3541 shutdown_tx: tokio::sync::watch::channel(false).0,
3542 reload_tx: None,
3543 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3544 path_prefix: String::new(),
3545 web_dist_dir: None,
3546 session_backend: None,
3547 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
3548 8, 30, 600,
3549 )),
3550 device_registry: None,
3551 pending_pairings: None,
3552 canvas_store: CanvasStore::new(),
3553 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
3554 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3555 #[cfg(feature = "webauthn")]
3556 webauthn: None,
3557 };
3558
3559 let response = handle_metrics(State(state)).await.into_response();
3560 assert_eq!(response.status(), StatusCode::OK);
3561
3562 let body = response.into_body().collect().await.unwrap().to_bytes();
3563 let text = String::from_utf8(body.to_vec()).unwrap();
3564 assert!(text.contains("zeroclaw_heartbeat_ticks_total 1"));
3565 }
3566
3567 #[test]
3568 fn gateway_rate_limiter_blocks_after_limit() {
3569 let limiter = GatewayRateLimiter::new(2, 2, 100);
3570 assert!(limiter.allow_pair("127.0.0.1"));
3571 assert!(limiter.allow_pair("127.0.0.1"));
3572 assert!(!limiter.allow_pair("127.0.0.1"));
3573 }
3574
3575 #[test]
3576 fn rate_limiter_sweep_removes_stale_entries() {
3577 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
3578 assert!(limiter.allow("ip-1"));
3580 assert!(limiter.allow("ip-2"));
3581 assert!(limiter.allow("ip-3"));
3582
3583 {
3584 let guard = limiter.requests.lock();
3585 assert_eq!(guard.0.len(), 3);
3586 }
3587
3588 {
3590 let mut guard = limiter.requests.lock();
3591 guard.1 = Instant::now()
3592 .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
3593 .unwrap();
3594 guard.0.get_mut("ip-2").unwrap().clear();
3596 guard.0.get_mut("ip-3").unwrap().clear();
3597 }
3598
3599 assert!(limiter.allow("ip-1"));
3601
3602 {
3603 let guard = limiter.requests.lock();
3604 assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
3605 assert!(guard.0.contains_key("ip-1"));
3606 }
3607 }
3608
3609 #[test]
3610 fn rate_limiter_zero_limit_always_allows() {
3611 let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
3612 for _ in 0..100 {
3613 assert!(limiter.allow("any-key"));
3614 }
3615 }
3616
3617 #[test]
3618 fn idempotency_store_rejects_duplicate_key() {
3619 let store = IdempotencyStore::new(Duration::from_secs(30), 10);
3620 assert!(store.record_if_new("req-1"));
3621 assert!(!store.record_if_new("req-1"));
3622 assert!(store.record_if_new("req-2"));
3623 }
3624
3625 #[test]
3626 fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
3627 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
3628 assert!(limiter.allow("ip-1"));
3629 assert!(limiter.allow("ip-2"));
3630 assert!(limiter.allow("ip-3"));
3631
3632 let guard = limiter.requests.lock();
3633 assert_eq!(guard.0.len(), 2);
3634 assert!(guard.0.contains_key("ip-2"));
3635 assert!(guard.0.contains_key("ip-3"));
3636 }
3637
3638 #[test]
3639 fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
3640 let store = IdempotencyStore::new(Duration::from_secs(300), 2);
3641 assert!(store.record_if_new("k1"));
3642 std::thread::sleep(Duration::from_millis(2));
3643 assert!(store.record_if_new("k2"));
3644 std::thread::sleep(Duration::from_millis(2));
3645 assert!(store.record_if_new("k3"));
3646
3647 let keys = store.keys.lock();
3648 assert_eq!(keys.len(), 2);
3649 assert!(!keys.contains_key("k1"));
3650 assert!(keys.contains_key("k2"));
3651 assert!(keys.contains_key("k3"));
3652 }
3653
3654 #[test]
3655 fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
3656 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3657 let mut headers = HeaderMap::new();
3658 headers.insert(
3659 "X-Forwarded-For",
3660 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3661 );
3662
3663 let key = client_key_from_request(Some(peer), &headers, false);
3664 assert_eq!(key, "10.0.0.5");
3665 }
3666
3667 #[test]
3668 fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
3669 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3670 let mut headers = HeaderMap::new();
3671 headers.insert(
3672 "X-Forwarded-For",
3673 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3674 );
3675
3676 let key = client_key_from_request(Some(peer), &headers, true);
3677 assert_eq!(key, "198.51.100.10");
3678 }
3679
3680 #[test]
3681 fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
3682 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3683 let mut headers = HeaderMap::new();
3684 headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
3685
3686 let key = client_key_from_request(Some(peer), &headers, true);
3687 assert_eq!(key, "10.0.0.5");
3688 }
3689
3690 #[test]
3691 fn normalize_max_keys_uses_fallback_for_zero() {
3692 assert_eq!(normalize_max_keys(0, 10_000), 10_000);
3693 assert_eq!(normalize_max_keys(0, 0), 1);
3694 }
3695
3696 #[test]
3697 fn normalize_max_keys_preserves_nonzero_values() {
3698 assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
3699 assert_eq!(normalize_max_keys(1, 10_000), 1);
3700 }
3701
3702 #[tokio::test]
3703 async fn persist_pairing_tokens_writes_config_tokens() {
3704 let temp = tempfile::tempdir().unwrap();
3705 let config_path = temp.path().join("config.toml");
3706 let workspace_path = temp.path().join("workspace");
3707
3708 let config = Config {
3709 config_path: config_path.clone(),
3710 data_dir: workspace_path,
3711 ..Default::default()
3712 };
3713 config.save().await.unwrap();
3714
3715 let guard = PairingGuard::new(true, &[]);
3716 let code = guard.pairing_code().unwrap();
3717 let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
3718 assert!(guard.is_authenticated(&token));
3719
3720 let shared_config = Arc::new(RwLock::new(config));
3721 Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
3722 .await
3723 .unwrap();
3724
3725 let plaintext = {
3727 let in_memory = shared_config.read();
3728 assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
3729 in_memory.gateway.paired_tokens[0].clone()
3730 };
3731 assert_eq!(plaintext.len(), 64);
3732 assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
3733
3734 let saved = tokio::fs::read_to_string(config_path).await.unwrap();
3736 let raw_parsed: Config = toml::from_str(&saved).unwrap();
3737 assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
3738 let on_disk = &raw_parsed.gateway.paired_tokens[0];
3739 assert!(
3740 zeroclaw_runtime::security::SecretStore::is_encrypted(on_disk),
3741 "paired_token should be encrypted on disk"
3742 );
3743 }
3744
3745 #[test]
3746 fn webhook_memory_key_is_unique() {
3747 let key1 = webhook_memory_key();
3748 let key2 = webhook_memory_key();
3749
3750 assert!(key1.starts_with("webhook_msg_"));
3751 assert!(key2.starts_with("webhook_msg_"));
3752 assert_ne!(key1, key2);
3753 }
3754
3755 #[test]
3756 fn webhook_session_id_accepts_valid() {
3757 let mut headers = HeaderMap::new();
3758 headers.insert("X-Session-Id", HeaderValue::from_static("abc-DEF_123.foo"));
3759 assert_eq!(webhook_session_id(&headers), Some("abc-DEF_123.foo".into()));
3760 }
3761
3762 #[test]
3763 fn webhook_session_id_trims_whitespace() {
3764 let mut headers = HeaderMap::new();
3765 headers.insert("X-Session-Id", HeaderValue::from_static(" my-session "));
3766 assert_eq!(webhook_session_id(&headers), Some("my-session".into()));
3767 }
3768
3769 #[test]
3770 fn webhook_session_id_rejects_empty() {
3771 let mut headers = HeaderMap::new();
3772 headers.insert("X-Session-Id", HeaderValue::from_static(""));
3773 assert_eq!(webhook_session_id(&headers), None);
3774
3775 headers.insert("X-Session-Id", HeaderValue::from_static(" "));
3776 assert_eq!(webhook_session_id(&headers), None);
3777 }
3778
3779 #[test]
3780 fn webhook_session_id_rejects_missing() {
3781 let headers = HeaderMap::new();
3782 assert_eq!(webhook_session_id(&headers), None);
3783 }
3784
3785 #[test]
3786 fn webhook_session_id_rejects_oversized() {
3787 let mut headers = HeaderMap::new();
3788 let long = "a".repeat(129);
3789 headers.insert("X-Session-Id", HeaderValue::from_str(&long).unwrap());
3790 assert_eq!(webhook_session_id(&headers), None);
3791
3792 let at_limit = "b".repeat(128);
3793 headers.insert("X-Session-Id", HeaderValue::from_str(&at_limit).unwrap());
3794 assert!(webhook_session_id(&headers).is_some());
3795 }
3796
3797 #[test]
3798 fn webhook_session_id_rejects_invalid_chars() {
3799 let mut headers = HeaderMap::new();
3800 for bad in &[
3801 "has/slash",
3802 "has:colon",
3803 "has space",
3804 "has@at",
3805 "emoji\u{1f600}",
3806 ] {
3807 if let Ok(val) = HeaderValue::from_str(bad) {
3808 headers.insert("X-Session-Id", val);
3809 assert_eq!(webhook_session_id(&headers), None, "should reject: {bad}");
3810 }
3811 }
3812 }
3813
3814 #[test]
3815 fn whatsapp_memory_key_includes_sender_and_message_id() {
3816 let msg = ChannelMessage {
3817 id: "wamid-123".into(),
3818 sender: "+1234567890".into(),
3819 reply_target: "+1234567890".into(),
3820 content: "hello".into(),
3821 channel: "whatsapp".into(),
3822 channel_alias: None,
3823 timestamp: 1,
3824 thread_ts: None,
3825 interruption_scope_id: None,
3826 attachments: vec![],
3827 subject: None,
3828 };
3829
3830 let key = whatsapp_memory_key(&msg);
3831 assert_eq!(key, "whatsapp_+1234567890_wamid-123");
3832 }
3833
3834 #[derive(Default)]
3835 struct MockMemory;
3836
3837 #[async_trait]
3838 impl Memory for MockMemory {
3839 fn name(&self) -> &str {
3840 "mock"
3841 }
3842
3843 async fn store(
3844 &self,
3845 _key: &str,
3846 _content: &str,
3847 _category: MemoryCategory,
3848 _session_id: Option<&str>,
3849 ) -> anyhow::Result<()> {
3850 Ok(())
3851 }
3852
3853 async fn recall(
3854 &self,
3855 _query: &str,
3856 _limit: usize,
3857 _session_id: Option<&str>,
3858 _since: Option<&str>,
3859 _until: Option<&str>,
3860 ) -> anyhow::Result<Vec<MemoryEntry>> {
3861 Ok(Vec::new())
3862 }
3863
3864 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3865 Ok(None)
3866 }
3867
3868 async fn list(
3869 &self,
3870 _category: Option<&MemoryCategory>,
3871 _session_id: Option<&str>,
3872 ) -> anyhow::Result<Vec<MemoryEntry>> {
3873 Ok(Vec::new())
3874 }
3875
3876 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3877 Ok(false)
3878 }
3879
3880 async fn forget_for_agent(&self, _key: &str, _agent_id: &str) -> anyhow::Result<bool> {
3881 Ok(false)
3882 }
3883
3884 async fn count(&self) -> anyhow::Result<usize> {
3885 Ok(0)
3886 }
3887
3888 async fn health_check(&self) -> bool {
3889 true
3890 }
3891
3892 async fn store_with_agent(
3893 &self,
3894 _key: &str,
3895 _content: &str,
3896 _category: MemoryCategory,
3897 _session_id: Option<&str>,
3898 _namespace: Option<&str>,
3899 _importance: Option<f64>,
3900 _agent_id: Option<&str>,
3901 ) -> anyhow::Result<()> {
3902 Ok(())
3903 }
3904
3905 async fn recall_for_agents(
3906 &self,
3907 _allowed_agent_ids: &[&str],
3908 _query: &str,
3909 _limit: usize,
3910 _session_id: Option<&str>,
3911 _since: Option<&str>,
3912 _until: Option<&str>,
3913 ) -> anyhow::Result<Vec<MemoryEntry>> {
3914 Ok(Vec::new())
3915 }
3916 }
3917 impl ::zeroclaw_api::attribution::Attributable for MockMemory {
3918 fn role(&self) -> ::zeroclaw_api::attribution::Role {
3919 ::zeroclaw_api::attribution::Role::Memory(
3920 ::zeroclaw_api::attribution::MemoryKind::InMemory,
3921 )
3922 }
3923 fn alias(&self) -> &str {
3924 "MockMemory"
3925 }
3926 }
3927
3928 #[derive(Default)]
3929 struct MockModelProvider {
3930 calls: AtomicUsize,
3931 }
3932
3933 #[async_trait]
3934 impl ModelProvider for MockModelProvider {
3935 async fn chat_with_system(
3936 &self,
3937 _system_prompt: Option<&str>,
3938 _message: &str,
3939 _model: &str,
3940 _temperature: Option<f64>,
3941 ) -> anyhow::Result<String> {
3942 self.calls.fetch_add(1, Ordering::SeqCst);
3943 Ok("ok".into())
3944 }
3945 }
3946 impl ::zeroclaw_api::attribution::Attributable for MockModelProvider {
3947 fn role(&self) -> ::zeroclaw_api::attribution::Role {
3948 ::zeroclaw_api::attribution::Role::Provider(
3949 ::zeroclaw_api::attribution::ProviderKind::Model(
3950 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
3951 ),
3952 )
3953 }
3954 fn alias(&self) -> &str {
3955 "MockModelProvider"
3956 }
3957 }
3958
3959 #[derive(Default)]
3960 struct TrackingMemory {
3961 keys: Mutex<Vec<String>>,
3962 }
3963
3964 #[async_trait]
3965 impl Memory for TrackingMemory {
3966 fn name(&self) -> &str {
3967 "tracking"
3968 }
3969
3970 async fn store(
3971 &self,
3972 key: &str,
3973 _content: &str,
3974 _category: MemoryCategory,
3975 _session_id: Option<&str>,
3976 ) -> anyhow::Result<()> {
3977 self.keys.lock().push(key.to_string());
3978 Ok(())
3979 }
3980
3981 async fn recall(
3982 &self,
3983 _query: &str,
3984 _limit: usize,
3985 _session_id: Option<&str>,
3986 _since: Option<&str>,
3987 _until: Option<&str>,
3988 ) -> anyhow::Result<Vec<MemoryEntry>> {
3989 Ok(Vec::new())
3990 }
3991
3992 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3993 Ok(None)
3994 }
3995
3996 async fn list(
3997 &self,
3998 _category: Option<&MemoryCategory>,
3999 _session_id: Option<&str>,
4000 ) -> anyhow::Result<Vec<MemoryEntry>> {
4001 Ok(Vec::new())
4002 }
4003
4004 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
4005 Ok(false)
4006 }
4007
4008 async fn forget_for_agent(&self, _key: &str, _agent_id: &str) -> anyhow::Result<bool> {
4009 Ok(false)
4010 }
4011
4012 async fn count(&self) -> anyhow::Result<usize> {
4013 let size = self.keys.lock().len();
4014 Ok(size)
4015 }
4016
4017 async fn health_check(&self) -> bool {
4018 true
4019 }
4020
4021 async fn store_with_agent(
4022 &self,
4023 key: &str,
4024 content: &str,
4025 category: MemoryCategory,
4026 session_id: Option<&str>,
4027 _namespace: Option<&str>,
4028 _importance: Option<f64>,
4029 _agent_id: Option<&str>,
4030 ) -> anyhow::Result<()> {
4031 self.store(key, content, category, session_id).await
4032 }
4033
4034 async fn recall_for_agents(
4035 &self,
4036 _allowed_agent_ids: &[&str],
4037 _query: &str,
4038 _limit: usize,
4039 _session_id: Option<&str>,
4040 _since: Option<&str>,
4041 _until: Option<&str>,
4042 ) -> anyhow::Result<Vec<MemoryEntry>> {
4043 Ok(Vec::new())
4044 }
4045 }
4046 impl ::zeroclaw_api::attribution::Attributable for TrackingMemory {
4047 fn role(&self) -> ::zeroclaw_api::attribution::Role {
4048 ::zeroclaw_api::attribution::Role::Memory(
4049 ::zeroclaw_api::attribution::MemoryKind::InMemory,
4050 )
4051 }
4052 fn alias(&self) -> &str {
4053 "TrackingMemory"
4054 }
4055 }
4056
4057 fn test_connect_info() -> ConnectInfo<SocketAddr> {
4058 ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
4059 }
4060
4061 #[tokio::test]
4062 async fn webhook_idempotency_skips_duplicate_provider_calls() {
4063 let provider_impl = Arc::new(MockModelProvider::default());
4064 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4065 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4066
4067 let state = AppState {
4068 config: Arc::new(RwLock::new(Config::default())),
4069 model_provider,
4070 model: "test-model".into(),
4071 temperature: None,
4072 mem: memory,
4073 auto_save: false,
4074 webhook_secret_hash: None,
4075 pairing: Arc::new(PairingGuard::new(false, &[])),
4076 trust_forwarded_headers: false,
4077 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4078 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4079 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4080 whatsapp: None,
4081 whatsapp_app_secret: None,
4082 linq: None,
4083 linq_signing_secret: None,
4084 nextcloud_talk: None,
4085 nextcloud_talk_webhook_secret: None,
4086 wati: None,
4087 gmail_push: None,
4088 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4089 tools_registry: Arc::new(Vec::new()),
4090 cost_tracker: None,
4091 event_tx: tokio::sync::broadcast::channel(16).0,
4092 event_buffer: Arc::new(sse::EventBuffer::new(16)),
4093 shutdown_tx: tokio::sync::watch::channel(false).0,
4094 reload_tx: None,
4095 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4096 path_prefix: String::new(),
4097 web_dist_dir: None,
4098 session_backend: None,
4099 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4100 8, 30, 600,
4101 )),
4102 device_registry: None,
4103 pending_pairings: None,
4104 canvas_store: CanvasStore::new(),
4105 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4106 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4107 #[cfg(feature = "webauthn")]
4108 webauthn: None,
4109 };
4110
4111 let mut headers = HeaderMap::new();
4112 headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
4113
4114 let body = Ok(Json(WebhookBody {
4115 message: "hello".into(),
4116 }));
4117 let first = handle_webhook(
4118 State(state.clone()),
4119 test_connect_info(),
4120 headers.clone(),
4121 body,
4122 )
4123 .await
4124 .into_response();
4125 assert_eq!(first.status(), StatusCode::OK);
4126
4127 let body = Ok(Json(WebhookBody {
4128 message: "hello".into(),
4129 }));
4130 let second = handle_webhook(State(state), test_connect_info(), headers, body)
4131 .await
4132 .into_response();
4133 assert_eq!(second.status(), StatusCode::OK);
4134
4135 let payload = second.into_body().collect().await.unwrap().to_bytes();
4136 let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
4137 assert_eq!(parsed["status"], "duplicate");
4138 assert_eq!(parsed["idempotent"], true);
4139 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
4140 }
4141
4142 #[tokio::test]
4143 async fn webhook_autosave_stores_distinct_keys_per_request() {
4144 let provider_impl = Arc::new(MockModelProvider::default());
4145 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4146
4147 let tracking_impl = Arc::new(TrackingMemory::default());
4148 let memory: Arc<dyn Memory> = tracking_impl.clone();
4149
4150 let state = AppState {
4151 config: Arc::new(RwLock::new(Config::default())),
4152 model_provider,
4153 model: "test-model".into(),
4154 temperature: None,
4155 mem: memory,
4156 auto_save: true,
4157 webhook_secret_hash: None,
4158 pairing: Arc::new(PairingGuard::new(false, &[])),
4159 trust_forwarded_headers: false,
4160 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4161 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4162 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4163 whatsapp: None,
4164 whatsapp_app_secret: None,
4165 linq: None,
4166 linq_signing_secret: None,
4167 nextcloud_talk: None,
4168 nextcloud_talk_webhook_secret: None,
4169 wati: None,
4170 gmail_push: None,
4171 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4172 tools_registry: Arc::new(Vec::new()),
4173 cost_tracker: None,
4174 event_tx: tokio::sync::broadcast::channel(16).0,
4175 event_buffer: Arc::new(sse::EventBuffer::new(16)),
4176 shutdown_tx: tokio::sync::watch::channel(false).0,
4177 reload_tx: None,
4178 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4179 path_prefix: String::new(),
4180 web_dist_dir: None,
4181 session_backend: None,
4182 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4183 8, 30, 600,
4184 )),
4185 device_registry: None,
4186 pending_pairings: None,
4187 canvas_store: CanvasStore::new(),
4188 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4189 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4190 #[cfg(feature = "webauthn")]
4191 webauthn: None,
4192 };
4193
4194 let headers = HeaderMap::new();
4195
4196 let body1 = Ok(Json(WebhookBody {
4197 message: "hello one".into(),
4198 }));
4199 let first = handle_webhook(
4200 State(state.clone()),
4201 test_connect_info(),
4202 headers.clone(),
4203 body1,
4204 )
4205 .await
4206 .into_response();
4207 assert_eq!(first.status(), StatusCode::OK);
4208
4209 let body2 = Ok(Json(WebhookBody {
4210 message: "hello two".into(),
4211 }));
4212 let second = handle_webhook(State(state), test_connect_info(), headers, body2)
4213 .await
4214 .into_response();
4215 assert_eq!(second.status(), StatusCode::OK);
4216
4217 let keys = tracking_impl.keys.lock().clone();
4218 assert_eq!(keys.len(), 2);
4219 assert_ne!(keys[0], keys[1]);
4220 assert!(keys[0].starts_with("webhook_msg_"));
4221 assert!(keys[1].starts_with("webhook_msg_"));
4222 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
4223 }
4224
4225 #[test]
4226 fn webhook_secret_hash_is_deterministic_and_nonempty() {
4227 let secret_a = generate_test_secret();
4228 let secret_b = generate_test_secret();
4229 let one = hash_webhook_secret(&secret_a);
4230 let two = hash_webhook_secret(&secret_a);
4231 let other = hash_webhook_secret(&secret_b);
4232
4233 assert_eq!(one, two);
4234 assert_ne!(one, other);
4235 assert_eq!(one.len(), 64);
4236 }
4237
4238 #[tokio::test]
4239 async fn webhook_secret_hash_rejects_missing_header() {
4240 let provider_impl = Arc::new(MockModelProvider::default());
4241 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4242 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4243 let secret = generate_test_secret();
4244
4245 let state = AppState {
4246 config: Arc::new(RwLock::new(Config::default())),
4247 model_provider,
4248 model: "test-model".into(),
4249 temperature: None,
4250 mem: memory,
4251 auto_save: false,
4252 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
4253 pairing: Arc::new(PairingGuard::new(false, &[])),
4254 trust_forwarded_headers: false,
4255 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4256 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4257 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4258 whatsapp: None,
4259 whatsapp_app_secret: None,
4260 linq: None,
4261 linq_signing_secret: None,
4262 nextcloud_talk: None,
4263 nextcloud_talk_webhook_secret: None,
4264 wati: None,
4265 gmail_push: None,
4266 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4267 tools_registry: Arc::new(Vec::new()),
4268 cost_tracker: None,
4269 event_tx: tokio::sync::broadcast::channel(16).0,
4270 event_buffer: Arc::new(sse::EventBuffer::new(16)),
4271 shutdown_tx: tokio::sync::watch::channel(false).0,
4272 reload_tx: None,
4273 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4274 path_prefix: String::new(),
4275 web_dist_dir: None,
4276 session_backend: None,
4277 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4278 8, 30, 600,
4279 )),
4280 device_registry: None,
4281 pending_pairings: None,
4282 canvas_store: CanvasStore::new(),
4283 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4284 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4285 #[cfg(feature = "webauthn")]
4286 webauthn: None,
4287 };
4288
4289 let response = handle_webhook(
4290 State(state),
4291 test_connect_info(),
4292 HeaderMap::new(),
4293 Ok(Json(WebhookBody {
4294 message: "hello".into(),
4295 })),
4296 )
4297 .await
4298 .into_response();
4299
4300 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
4301 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
4302 }
4303
4304 #[tokio::test]
4305 async fn webhook_secret_hash_rejects_invalid_header() {
4306 let provider_impl = Arc::new(MockModelProvider::default());
4307 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4308 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4309 let valid_secret = generate_test_secret();
4310 let wrong_secret = generate_test_secret();
4311
4312 let state = AppState {
4313 config: Arc::new(RwLock::new(Config::default())),
4314 model_provider,
4315 model: "test-model".into(),
4316 temperature: None,
4317 mem: memory,
4318 auto_save: false,
4319 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
4320 pairing: Arc::new(PairingGuard::new(false, &[])),
4321 trust_forwarded_headers: false,
4322 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4323 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4324 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4325 whatsapp: None,
4326 whatsapp_app_secret: None,
4327 linq: None,
4328 linq_signing_secret: None,
4329 nextcloud_talk: None,
4330 nextcloud_talk_webhook_secret: None,
4331 wati: None,
4332 gmail_push: None,
4333 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4334 tools_registry: Arc::new(Vec::new()),
4335 cost_tracker: None,
4336 event_tx: tokio::sync::broadcast::channel(16).0,
4337 event_buffer: Arc::new(sse::EventBuffer::new(16)),
4338 shutdown_tx: tokio::sync::watch::channel(false).0,
4339 reload_tx: None,
4340 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4341 path_prefix: String::new(),
4342 web_dist_dir: None,
4343 session_backend: None,
4344 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4345 8, 30, 600,
4346 )),
4347 device_registry: None,
4348 pending_pairings: None,
4349 canvas_store: CanvasStore::new(),
4350 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4351 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4352 #[cfg(feature = "webauthn")]
4353 webauthn: None,
4354 };
4355
4356 let mut headers = HeaderMap::new();
4357 headers.insert(
4358 "X-Webhook-Secret",
4359 HeaderValue::from_str(&wrong_secret).unwrap(),
4360 );
4361
4362 let response = handle_webhook(
4363 State(state),
4364 test_connect_info(),
4365 headers,
4366 Ok(Json(WebhookBody {
4367 message: "hello".into(),
4368 })),
4369 )
4370 .await
4371 .into_response();
4372
4373 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
4374 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
4375 }
4376
4377 #[tokio::test]
4378 async fn webhook_secret_hash_accepts_valid_header() {
4379 let provider_impl = Arc::new(MockModelProvider::default());
4380 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4381 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4382 let secret = generate_test_secret();
4383
4384 let state = AppState {
4385 config: Arc::new(RwLock::new(Config::default())),
4386 model_provider,
4387 model: "test-model".into(),
4388 temperature: None,
4389 mem: memory,
4390 auto_save: false,
4391 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
4392 pairing: Arc::new(PairingGuard::new(false, &[])),
4393 trust_forwarded_headers: false,
4394 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4395 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4396 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4397 whatsapp: None,
4398 whatsapp_app_secret: None,
4399 linq: None,
4400 linq_signing_secret: None,
4401 nextcloud_talk: None,
4402 nextcloud_talk_webhook_secret: None,
4403 wati: None,
4404 gmail_push: None,
4405 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4406 tools_registry: Arc::new(Vec::new()),
4407 cost_tracker: None,
4408 event_tx: tokio::sync::broadcast::channel(16).0,
4409 event_buffer: Arc::new(sse::EventBuffer::new(16)),
4410 shutdown_tx: tokio::sync::watch::channel(false).0,
4411 reload_tx: None,
4412 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4413 path_prefix: String::new(),
4414 web_dist_dir: None,
4415 session_backend: None,
4416 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4417 8, 30, 600,
4418 )),
4419 device_registry: None,
4420 pending_pairings: None,
4421 canvas_store: CanvasStore::new(),
4422 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4423 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4424 #[cfg(feature = "webauthn")]
4425 webauthn: None,
4426 };
4427
4428 let mut headers = HeaderMap::new();
4429 headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
4430
4431 let response = handle_webhook(
4432 State(state),
4433 test_connect_info(),
4434 headers,
4435 Ok(Json(WebhookBody {
4436 message: "hello".into(),
4437 })),
4438 )
4439 .await
4440 .into_response();
4441
4442 assert_eq!(response.status(), StatusCode::OK);
4443 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
4444 }
4445
4446 fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
4447 use hmac::{Hmac, Mac};
4448 use sha2::Sha256;
4449
4450 let payload = format!("{random}{body}");
4451 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
4452 mac.update(payload.as_bytes());
4453 hex::encode(mac.finalize().into_bytes())
4454 }
4455
4456 #[tokio::test]
4457 async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
4458 let model_provider: Arc<dyn ModelProvider> = Arc::new(MockModelProvider::default());
4459 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4460
4461 let state = AppState {
4462 config: Arc::new(RwLock::new(Config::default())),
4463 model_provider,
4464 model: "test-model".into(),
4465 temperature: None,
4466 mem: memory,
4467 auto_save: false,
4468 webhook_secret_hash: None,
4469 pairing: Arc::new(PairingGuard::new(false, &[])),
4470 trust_forwarded_headers: false,
4471 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4472 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4473 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4474 whatsapp: None,
4475 whatsapp_app_secret: None,
4476 linq: None,
4477 linq_signing_secret: None,
4478 nextcloud_talk: None,
4479 nextcloud_talk_webhook_secret: None,
4480 wati: None,
4481 gmail_push: None,
4482 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4483 tools_registry: Arc::new(Vec::new()),
4484 cost_tracker: None,
4485 event_tx: tokio::sync::broadcast::channel(16).0,
4486 event_buffer: Arc::new(sse::EventBuffer::new(16)),
4487 shutdown_tx: tokio::sync::watch::channel(false).0,
4488 reload_tx: None,
4489 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4490 path_prefix: String::new(),
4491 web_dist_dir: None,
4492 session_backend: None,
4493 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4494 8, 30, 600,
4495 )),
4496 device_registry: None,
4497 pending_pairings: None,
4498 canvas_store: CanvasStore::new(),
4499 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4500 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4501 #[cfg(feature = "webauthn")]
4502 webauthn: None,
4503 };
4504
4505 let response = Box::pin(handle_nextcloud_talk_webhook(
4506 State(state),
4507 HeaderMap::new(),
4508 Bytes::from_static(br#"{"type":"message"}"#),
4509 ))
4510 .await
4511 .into_response();
4512
4513 assert_eq!(response.status(), StatusCode::NOT_FOUND);
4514 }
4515
4516 #[tokio::test]
4517 async fn nextcloud_talk_webhook_rejects_invalid_signature() {
4518 let provider_impl = Arc::new(MockModelProvider::default());
4519 let model_provider: Arc<dyn ModelProvider> = provider_impl.clone();
4520 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4521
4522 let alias = "nextcloud_talk_test_alias";
4523 let peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync> = Arc::new(Vec::new);
4524 let channel = Arc::new(NextcloudTalkChannel::new(
4525 "https://cloud.example.com".into(),
4526 "app-token".into(),
4527 String::new(),
4528 alias,
4529 peer_resolver,
4530 ));
4531
4532 let secret = "nextcloud-test-secret";
4533 let random = "seed-value";
4534 let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
4535 let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
4536 let invalid_signature = "deadbeef";
4537
4538 let state = AppState {
4539 config: Arc::new(RwLock::new(Config::default())),
4540 model_provider,
4541 model: "test-model".into(),
4542 temperature: None,
4543 mem: memory,
4544 auto_save: false,
4545 webhook_secret_hash: None,
4546 pairing: Arc::new(PairingGuard::new(false, &[])),
4547 trust_forwarded_headers: false,
4548 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4549 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4550 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4551 whatsapp: None,
4552 whatsapp_app_secret: None,
4553 linq: None,
4554 linq_signing_secret: None,
4555 nextcloud_talk: Some(channel),
4556 nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
4557 wati: None,
4558 gmail_push: None,
4559 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4560 tools_registry: Arc::new(Vec::new()),
4561 cost_tracker: None,
4562 event_tx: tokio::sync::broadcast::channel(16).0,
4563 event_buffer: Arc::new(sse::EventBuffer::new(16)),
4564 shutdown_tx: tokio::sync::watch::channel(false).0,
4565 reload_tx: None,
4566 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4567 path_prefix: String::new(),
4568 web_dist_dir: None,
4569 session_backend: None,
4570 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4571 8, 30, 600,
4572 )),
4573 device_registry: None,
4574 pending_pairings: None,
4575 canvas_store: CanvasStore::new(),
4576 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4577 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4578 #[cfg(feature = "webauthn")]
4579 webauthn: None,
4580 };
4581
4582 let mut headers = HeaderMap::new();
4583 headers.insert(
4584 "X-Nextcloud-Talk-Random",
4585 HeaderValue::from_str(random).unwrap(),
4586 );
4587 headers.insert(
4588 "X-Nextcloud-Talk-Signature",
4589 HeaderValue::from_str(invalid_signature).unwrap(),
4590 );
4591
4592 let response = Box::pin(handle_nextcloud_talk_webhook(
4593 State(state),
4594 headers,
4595 Bytes::from(body),
4596 ))
4597 .await
4598 .into_response();
4599 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
4600 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
4601 }
4602
4603 #[derive(Default)]
4607 struct SlowProvider {
4608 calls: AtomicUsize,
4609 started_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
4610 }
4611
4612 #[async_trait]
4613 impl ModelProvider for SlowProvider {
4614 async fn chat_with_system(
4615 &self,
4616 _system_prompt: Option<&str>,
4617 _message: &str,
4618 _model: &str,
4619 _temperature: Option<f64>,
4620 ) -> anyhow::Result<String> {
4621 self.calls.fetch_add(1, Ordering::SeqCst);
4622 if let Some(tx) = self.started_tx.lock().take() {
4623 let _ = tx.send(());
4624 }
4625 tokio::time::sleep(Duration::from_secs(30)).await;
4626 Ok("slow ok".into())
4627 }
4628 }
4629 impl ::zeroclaw_api::attribution::Attributable for SlowProvider {
4630 fn role(&self) -> ::zeroclaw_api::attribution::Role {
4631 ::zeroclaw_api::attribution::Role::Provider(
4632 ::zeroclaw_api::attribution::ProviderKind::Model(
4633 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
4634 ),
4635 )
4636 }
4637 fn alias(&self) -> &str {
4638 "SlowProvider"
4639 }
4640 }
4641
4642 #[tokio::test]
4643 async fn nextcloud_talk_webhook_returns_before_llm_call_completes() {
4644 let (started_tx, started_rx) = tokio::sync::oneshot::channel();
4645 let provider_impl = Arc::new(SlowProvider {
4646 calls: AtomicUsize::new(0),
4647 started_tx: Mutex::new(Some(started_tx)),
4648 });
4649 let provider: Arc<dyn ModelProvider> = provider_impl.clone();
4650 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
4651
4652 let channel = Arc::new(NextcloudTalkChannel::new(
4653 "https://cloud.example.com".into(),
4654 "app-token".into(),
4655 String::new(),
4656 "default",
4657 Arc::new(|| vec!["*".to_string()]),
4658 ));
4659
4660 let body = r#"{"type":"message","object":{"token":"room-token"},"actor":{"id":"user_a","name":"User A"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
4661
4662 let state = AppState {
4663 config: Arc::new(RwLock::new(Config::default())),
4664 model_provider: provider,
4665 model: "test-model".into(),
4666 temperature: None,
4667 mem: memory,
4668 auto_save: false,
4669 webhook_secret_hash: None,
4670 pairing: Arc::new(PairingGuard::new(false, &[])),
4671 trust_forwarded_headers: false,
4672 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
4673 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
4674 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
4675 whatsapp: None,
4676 whatsapp_app_secret: None,
4677 linq: None,
4678 linq_signing_secret: None,
4679 nextcloud_talk: Some(channel),
4680 nextcloud_talk_webhook_secret: None,
4681 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
4682 wati: None,
4683 gmail_push: None,
4684 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
4685 tools_registry: Arc::new(Vec::new()),
4686 cost_tracker: None,
4687 event_tx: tokio::sync::broadcast::channel(16).0,
4688 event_buffer: Arc::new(sse::EventBuffer::new(16)),
4689 shutdown_tx: tokio::sync::watch::channel(false).0,
4690 reload_tx: None,
4691 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
4692 path_prefix: String::new(),
4693 web_dist_dir: None,
4694 session_backend: None,
4695 session_queue: std::sync::Arc::new(crate::session_queue::SessionActorQueue::new(
4696 8, 30, 600,
4697 )),
4698 device_registry: None,
4699 pending_pairings: None,
4700 canvas_store: CanvasStore::new(),
4701 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
4702 #[cfg(feature = "webauthn")]
4703 webauthn: None,
4704 };
4705
4706 let start = std::time::Instant::now();
4707 let response = tokio::time::timeout(
4708 Duration::from_secs(2),
4709 Box::pin(handle_nextcloud_talk_webhook(
4710 State(state),
4711 HeaderMap::new(),
4712 Bytes::from(body),
4713 )),
4714 )
4715 .await
4716 .expect("webhook must return before 2s deadline (regression #6156)")
4717 .into_response();
4718
4719 let elapsed = start.elapsed();
4720 assert_eq!(response.status(), StatusCode::OK);
4721 assert!(
4722 elapsed < Duration::from_secs(2),
4723 "handler returned after {elapsed:?}; expected fast return for #6156"
4724 );
4725
4726 tokio::time::timeout(Duration::from_secs(2), started_rx)
4729 .await
4730 .expect("spawned LLM call did not start within 2s")
4731 .expect("started_tx sender was dropped");
4732 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
4733 }
4734
4735 fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
4740 use hmac::{Hmac, Mac};
4741 use sha2::Sha256;
4742
4743 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
4744 mac.update(body);
4745 hex::encode(mac.finalize().into_bytes())
4746 }
4747
4748 fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
4749 format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
4750 }
4751
4752 #[test]
4753 fn whatsapp_signature_valid() {
4754 let app_secret = generate_test_secret();
4755 let body = b"test body content";
4756
4757 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4758
4759 assert!(verify_whatsapp_signature(
4760 &app_secret,
4761 body,
4762 &signature_header
4763 ));
4764 }
4765
4766 #[test]
4767 fn whatsapp_signature_invalid_wrong_secret() {
4768 let app_secret = generate_test_secret();
4769 let wrong_secret = generate_test_secret();
4770 let body = b"test body content";
4771
4772 let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
4773
4774 assert!(!verify_whatsapp_signature(
4775 &app_secret,
4776 body,
4777 &signature_header
4778 ));
4779 }
4780
4781 #[test]
4782 fn whatsapp_signature_invalid_wrong_body() {
4783 let app_secret = generate_test_secret();
4784 let original_body = b"original body";
4785 let tampered_body = b"tampered body";
4786
4787 let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
4788
4789 assert!(!verify_whatsapp_signature(
4791 &app_secret,
4792 tampered_body,
4793 &signature_header
4794 ));
4795 }
4796
4797 #[test]
4798 fn whatsapp_signature_missing_prefix() {
4799 let app_secret = generate_test_secret();
4800 let body = b"test body";
4801
4802 let signature_header = "abc123def456";
4804
4805 assert!(!verify_whatsapp_signature(
4806 &app_secret,
4807 body,
4808 signature_header
4809 ));
4810 }
4811
4812 #[test]
4813 fn whatsapp_signature_empty_header() {
4814 let app_secret = generate_test_secret();
4815 let body = b"test body";
4816
4817 assert!(!verify_whatsapp_signature(&app_secret, body, ""));
4818 }
4819
4820 #[test]
4821 fn whatsapp_signature_invalid_hex() {
4822 let app_secret = generate_test_secret();
4823 let body = b"test body";
4824
4825 let signature_header = "sha256=not_valid_hex_zzz";
4827
4828 assert!(!verify_whatsapp_signature(
4829 &app_secret,
4830 body,
4831 signature_header
4832 ));
4833 }
4834
4835 #[test]
4836 fn whatsapp_signature_empty_body() {
4837 let app_secret = generate_test_secret();
4838 let body = b"";
4839
4840 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4841
4842 assert!(verify_whatsapp_signature(
4843 &app_secret,
4844 body,
4845 &signature_header
4846 ));
4847 }
4848
4849 #[test]
4850 fn whatsapp_signature_unicode_body() {
4851 let app_secret = generate_test_secret();
4852 let body = "Hello 🦀 World".as_bytes();
4853
4854 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4855
4856 assert!(verify_whatsapp_signature(
4857 &app_secret,
4858 body,
4859 &signature_header
4860 ));
4861 }
4862
4863 #[test]
4864 fn whatsapp_signature_json_payload() {
4865 let app_secret = generate_test_secret();
4866 let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
4867
4868 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4869
4870 assert!(verify_whatsapp_signature(
4871 &app_secret,
4872 body,
4873 &signature_header
4874 ));
4875 }
4876
4877 #[test]
4878 fn whatsapp_signature_case_sensitive_prefix() {
4879 let app_secret = generate_test_secret();
4880 let body = b"test body";
4881
4882 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4883
4884 let wrong_prefix = format!("SHA256={hex_sig}");
4886 assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
4887
4888 let correct_prefix = format!("sha256={hex_sig}");
4890 assert!(verify_whatsapp_signature(
4891 &app_secret,
4892 body,
4893 &correct_prefix
4894 ));
4895 }
4896
4897 #[test]
4898 fn whatsapp_signature_truncated_hex() {
4899 let app_secret = generate_test_secret();
4900 let body = b"test body";
4901
4902 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4903 let truncated = &hex_sig[..32]; let signature_header = format!("sha256={truncated}");
4905
4906 assert!(!verify_whatsapp_signature(
4907 &app_secret,
4908 body,
4909 &signature_header
4910 ));
4911 }
4912
4913 #[test]
4914 fn whatsapp_signature_extra_bytes() {
4915 let app_secret = generate_test_secret();
4916 let body = b"test body";
4917
4918 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4919 let extended = format!("{hex_sig}deadbeef");
4920 let signature_header = format!("sha256={extended}");
4921
4922 assert!(!verify_whatsapp_signature(
4923 &app_secret,
4924 body,
4925 &signature_header
4926 ));
4927 }
4928
4929 #[test]
4934 fn idempotency_store_allows_different_keys() {
4935 let store = IdempotencyStore::new(Duration::from_secs(60), 100);
4936 assert!(store.record_if_new("key-a"));
4937 assert!(store.record_if_new("key-b"));
4938 assert!(store.record_if_new("key-c"));
4939 assert!(store.record_if_new("key-d"));
4940 }
4941
4942 #[test]
4943 fn idempotency_store_max_keys_clamped_to_one() {
4944 let store = IdempotencyStore::new(Duration::from_secs(60), 0);
4945 assert!(store.record_if_new("only-key"));
4946 assert!(!store.record_if_new("only-key"));
4947 }
4948
4949 #[test]
4950 fn idempotency_store_rapid_duplicate_rejected() {
4951 let store = IdempotencyStore::new(Duration::from_secs(300), 100);
4952 assert!(store.record_if_new("rapid"));
4953 assert!(!store.record_if_new("rapid"));
4954 }
4955
4956 #[test]
4957 fn idempotency_store_accepts_after_ttl_expires() {
4958 let store = IdempotencyStore::new(Duration::from_millis(1), 100);
4959 assert!(store.record_if_new("ttl-key"));
4960 std::thread::sleep(Duration::from_millis(10));
4961 assert!(store.record_if_new("ttl-key"));
4962 }
4963
4964 #[test]
4965 fn idempotency_store_eviction_preserves_newest() {
4966 let store = IdempotencyStore::new(Duration::from_secs(300), 1);
4967 assert!(store.record_if_new("old-key"));
4968 std::thread::sleep(Duration::from_millis(2));
4969 assert!(store.record_if_new("new-key"));
4970
4971 let keys = store.keys.lock();
4972 assert_eq!(keys.len(), 1);
4973 assert!(!keys.contains_key("old-key"));
4974 assert!(keys.contains_key("new-key"));
4975 }
4976
4977 #[test]
4978 fn rate_limiter_allows_after_window_expires() {
4979 let window = Duration::from_millis(50);
4980 let limiter = SlidingWindowRateLimiter::new(2, window, 100);
4981 assert!(limiter.allow("ip-1"));
4982 assert!(limiter.allow("ip-1"));
4983 assert!(!limiter.allow("ip-1")); std::thread::sleep(Duration::from_millis(60));
4987
4988 assert!(limiter.allow("ip-1"));
4990 }
4991
4992 #[test]
4993 fn rate_limiter_independent_keys_tracked_separately() {
4994 let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
4995 assert!(limiter.allow("ip-1"));
4996 assert!(limiter.allow("ip-1"));
4997 assert!(!limiter.allow("ip-1")); assert!(limiter.allow("ip-2"));
5001 assert!(limiter.allow("ip-2"));
5002 assert!(!limiter.allow("ip-2")); }
5004
5005 #[test]
5006 fn rate_limiter_exact_boundary_at_max_keys() {
5007 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
5008 assert!(limiter.allow("ip-1"));
5009 assert!(limiter.allow("ip-2"));
5010 assert!(limiter.allow("ip-3"));
5011 assert!(limiter.allow("ip-4")); let guard = limiter.requests.lock();
5015 assert_eq!(guard.0.len(), 3);
5016 assert!(
5017 !guard.0.contains_key("ip-1"),
5018 "ip-1 should have been evicted"
5019 );
5020 assert!(guard.0.contains_key("ip-2"));
5021 assert!(guard.0.contains_key("ip-3"));
5022 assert!(guard.0.contains_key("ip-4"));
5023 }
5024
5025 #[test]
5026 fn gateway_rate_limiter_pair_and_webhook_are_independent() {
5027 let limiter = GatewayRateLimiter::new(2, 3, 100);
5028
5029 assert!(limiter.allow_pair("ip-1"));
5031 assert!(limiter.allow_pair("ip-1"));
5032 assert!(!limiter.allow_pair("ip-1")); assert!(limiter.allow_webhook("ip-1"));
5036 assert!(limiter.allow_webhook("ip-1"));
5037 assert!(limiter.allow_webhook("ip-1"));
5038 assert!(!limiter.allow_webhook("ip-1")); }
5040
5041 #[test]
5042 fn rate_limiter_single_key_max_allows_one_request() {
5043 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
5044 assert!(limiter.allow("ip-1"));
5045 assert!(limiter.allow("ip-2")); let guard = limiter.requests.lock();
5048 assert_eq!(guard.0.len(), 1);
5049 assert!(guard.0.contains_key("ip-2"));
5050 assert!(!guard.0.contains_key("ip-1"));
5051 }
5052
5053 #[test]
5054 fn rate_limiter_concurrent_access_safe() {
5055 use std::sync::Arc;
5056
5057 let limiter = Arc::new(SlidingWindowRateLimiter::new(
5058 1000,
5059 Duration::from_secs(60),
5060 1000,
5061 ));
5062 let mut handles = Vec::new();
5063
5064 for i in 0..10 {
5065 let limiter = limiter.clone();
5066 handles.push(std::thread::spawn(move || {
5067 for j in 0..100 {
5068 limiter.allow(&format!("thread-{i}-req-{j}"));
5069 }
5070 }));
5071 }
5072
5073 for handle in handles {
5074 handle.join().unwrap();
5075 }
5076
5077 let guard = limiter.requests.lock();
5079 assert!(guard.0.len() <= 1000, "should respect max_keys");
5080 }
5081
5082 #[test]
5083 fn idempotency_store_concurrent_access_safe() {
5084 use std::sync::Arc;
5085
5086 let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
5087 let mut handles = Vec::new();
5088
5089 for i in 0..10 {
5090 let store = store.clone();
5091 handles.push(std::thread::spawn(move || {
5092 for j in 0..100 {
5093 store.record_if_new(&format!("thread-{i}-key-{j}"));
5094 }
5095 }));
5096 }
5097
5098 for handle in handles {
5099 handle.join().unwrap();
5100 }
5101
5102 let keys = store.keys.lock();
5103 assert!(keys.len() <= 1000, "should respect max_keys");
5104 }
5105
5106 #[test]
5107 fn rate_limiter_rapid_burst_then_cooldown() {
5108 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
5109
5110 for _ in 0..5 {
5112 assert!(limiter.allow("burst-ip"));
5113 }
5114 assert!(!limiter.allow("burst-ip")); std::thread::sleep(Duration::from_millis(60));
5118
5119 assert!(limiter.allow("burst-ip"));
5121 }
5122
5123 #[test]
5124 fn require_localhost_accepts_ipv4_loopback() {
5125 let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
5126 assert!(require_localhost(&peer).is_ok());
5127 }
5128
5129 #[test]
5130 fn require_localhost_accepts_ipv6_loopback() {
5131 let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
5132 assert!(require_localhost(&peer).is_ok());
5133 }
5134
5135 #[test]
5136 fn require_localhost_rejects_non_loopback_ipv4() {
5137 let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
5138 let err = require_localhost(&peer).unwrap_err();
5139 assert_eq!(err.0, StatusCode::FORBIDDEN);
5140 }
5141
5142 #[test]
5143 fn require_localhost_rejects_non_loopback_ipv6() {
5144 let peer = SocketAddr::from((
5145 std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
5146 12345,
5147 ));
5148 let err = require_localhost(&peer).unwrap_err();
5149 assert_eq!(err.0, StatusCode::FORBIDDEN);
5150 }
5151
5152 #[test]
5153 fn needs_onboarding_for_flags_empty_model() {
5154 let err =
5155 needs_onboarding_for("").expect("empty model must produce a needs_onboarding error");
5156 let msg = err.to_string();
5157 assert!(
5158 msg.contains("needs_onboarding"),
5159 "error must carry the needs_onboarding marker for callers to map to 503; got: {msg}"
5160 );
5161 assert!(
5162 msg.contains("/onboard"),
5163 "error must point the user at /onboard; got: {msg}"
5164 );
5165 }
5166
5167 #[test]
5168 fn needs_onboarding_for_flags_whitespace_only_model() {
5169 assert!(
5170 needs_onboarding_for(" ").is_some(),
5171 "whitespace-only model must be treated as empty"
5172 );
5173 assert!(
5174 needs_onboarding_for("\n\t ").is_some(),
5175 "tabs and newlines count as empty too"
5176 );
5177 }
5178
5179 #[test]
5180 fn needs_onboarding_for_passes_real_model() {
5181 assert!(
5182 needs_onboarding_for("anthropic/claude-sonnet-4").is_none(),
5183 "a real model id must not be flagged"
5184 );
5185 assert!(
5186 needs_onboarding_for(" gpt-4 ").is_none(),
5187 "leading/trailing whitespace around a real model id must not be flagged"
5188 );
5189 }
5190
5191 #[test]
5192 fn is_needs_onboarding_err_detects_marker_from_helper() {
5193 let err = needs_onboarding_for("").expect("empty model produces marker");
5194 assert!(
5195 is_needs_onboarding_err(&err),
5196 "the marker emitted by needs_onboarding_for must be detected"
5197 );
5198 }
5199
5200 #[test]
5201 fn is_needs_onboarding_err_ignores_unrelated_errors() {
5202 let err = anyhow::Error::msg("upstream timeout: provider returned 504");
5203 assert!(
5204 !is_needs_onboarding_err(&err),
5205 "unrelated errors must not be misclassified as needs_onboarding"
5206 );
5207 let err = anyhow::Error::msg("invalid api key");
5208 assert!(!is_needs_onboarding_err(&err));
5209 }
5210
5211 #[test]
5212 fn is_needs_onboarding_err_detects_via_substring() {
5213 let err =
5217 anyhow::Error::msg("provider call failed").context("needs_onboarding: empty model");
5218 assert!(is_needs_onboarding_err(&err));
5219 }
5220
5221 #[test]
5222 fn needs_onboarding_channel_reply_resolves_via_fluent() {
5223 let reply = needs_onboarding_channel_reply();
5230 assert!(
5231 !reply.starts_with('{') && !reply.ends_with('}'),
5232 "fluent missing-key fallback leaked into channel reply: {reply:?}"
5233 );
5234 assert!(
5235 reply.to_lowercase().contains("onboarding"),
5236 "channel reply must mention onboarding so users know what's missing: {reply:?}"
5237 );
5238 }
5239}