1use super::AppState;
6use axum::{
7 extract::{Path, Query, State},
8 http::{HeaderMap, StatusCode, header},
9 response::{IntoResponse, Json},
10};
11use serde::{Deserialize, Serialize};
12use zeroclaw_config::schema::{ChannelAliasInfo, Config};
13use zeroclaw_memory::MemoryEntry;
14
15const MEMORY_API_CONTENT_MAX_CHARS: usize = 4096;
16
17fn extract_bearer_token(headers: &HeaderMap) -> Option<&str> {
21 headers
22 .get(header::AUTHORIZATION)
23 .and_then(|v| v.to_str().ok())
24 .and_then(|auth| auth.strip_prefix("Bearer "))
25}
26
27pub(super) fn require_auth(
29 state: &AppState,
30 headers: &HeaderMap,
31) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
32 if !state.pairing.require_pairing() {
33 return Ok(());
34 }
35
36 let token = extract_bearer_token(headers).unwrap_or("");
37 if state.pairing.is_authenticated(token) {
38 Ok(())
39 } else {
40 Err((
41 StatusCode::UNAUTHORIZED,
42 Json(serde_json::json!({
43 "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
44 })),
45 ))
46 }
47}
48
49#[derive(Deserialize)]
52pub struct MemoryQuery {
53 pub query: Option<String>,
54 pub category: Option<String>,
55 pub since: Option<String>,
57 pub until: Option<String>,
59 #[serde(default)]
64 pub agent: Option<String>,
65}
66
67#[derive(Deserialize)]
68pub struct MemoryStoreBody {
69 pub key: String,
70 pub content: String,
71 pub category: Option<String>,
72 #[serde(default)]
75 pub agent: Option<String>,
76}
77
78#[derive(Deserialize)]
79pub struct MemoryDeleteQuery {
80 #[serde(default)]
83 pub agent: Option<String>,
84}
85
86#[derive(Deserialize)]
87pub struct CronRunsQuery {
88 pub limit: Option<u32>,
89}
90
91#[derive(Deserialize)]
92pub struct CronAddBody {
93 pub agent: String,
96 pub name: Option<String>,
97 pub schedule: String,
98 pub tz: Option<String>,
99 pub command: Option<String>,
100 pub job_type: Option<String>,
101 pub prompt: Option<String>,
102 pub delivery: Option<zeroclaw_runtime::cron::DeliveryConfig>,
103 pub session_target: Option<String>,
104 pub model: Option<String>,
105 pub allowed_tools: Option<Vec<String>>,
106 pub delete_after_run: Option<bool>,
107}
108
109#[derive(Deserialize)]
110pub struct CronPatchBody {
111 pub agent: String,
114 pub name: Option<String>,
115 pub schedule: Option<String>,
116 pub tz: Option<String>,
117 pub clear_tz: Option<bool>,
118 pub command: Option<String>,
119 pub prompt: Option<String>,
120}
121
122enum CronTimezonePatch {
123 Preserve,
124 Set(String),
125 Clear,
126}
127
128fn bad_request(message: impl Into<String>) -> (StatusCode, Json<serde_json::Value>) {
129 (
130 StatusCode::BAD_REQUEST,
131 Json(serde_json::json!({ "error": message.into() })),
132 )
133}
134
135fn normalize_optional_timezone(
136 tz: Option<String>,
137) -> Result<Option<String>, (StatusCode, Json<serde_json::Value>)> {
138 match tz {
139 Some(raw) => {
140 let trimmed = raw.trim();
141 if trimmed.is_empty() {
142 Err(bad_request(
143 "tz must be a non-empty IANA timezone; use clear_tz=true to clear it",
144 ))
145 } else {
146 Ok(Some(trimmed.to_string()))
147 }
148 }
149 None => Ok(None),
150 }
151}
152
153fn parse_timezone_patch(
154 tz: Option<String>,
155 clear_tz: Option<bool>,
156) -> Result<CronTimezonePatch, (StatusCode, Json<serde_json::Value>)> {
157 let tz = normalize_optional_timezone(tz)?;
158 let clear_tz = clear_tz.unwrap_or(false);
159
160 if clear_tz && tz.is_some() {
161 return Err(bad_request("Provide either tz or clear_tz=true, not both"));
162 }
163
164 if clear_tz {
165 Ok(CronTimezonePatch::Clear)
166 } else if let Some(tz) = tz {
167 Ok(CronTimezonePatch::Set(tz))
168 } else {
169 Ok(CronTimezonePatch::Preserve)
170 }
171}
172
173fn cron_schedule_from_api(
174 expr: String,
175 tz: Option<String>,
176) -> Result<zeroclaw_runtime::cron::Schedule, (StatusCode, Json<serde_json::Value>)> {
177 let schedule = zeroclaw_runtime::cron::Schedule::Cron { expr, tz };
178 zeroclaw_runtime::cron::validate_schedule(&schedule, chrono::Utc::now())
179 .map_err(|e| bad_request(format!("Invalid cron schedule: {e}")))?;
180 Ok(schedule)
181}
182
183#[derive(Deserialize)]
184pub struct SessionMessagePostBody {
185 pub content: String,
186}
187
188#[derive(Debug, Deserialize)]
195pub struct StatusQuery {
196 #[serde(default)]
197 pub agent: Option<String>,
198}
199
200pub async fn handle_api_status(
202 State(state): State<AppState>,
203 headers: HeaderMap,
204 Query(query): Query<StatusQuery>,
205) -> impl IntoResponse {
206 if let Err(e) = require_auth(&state, &headers) {
207 return e.into_response();
208 }
209
210 let config = state.config.read().clone();
211 let health = zeroclaw_runtime::health::snapshot();
212
213 let mut channels = serde_json::Map::new();
216 for info in config.channels_by_alias() {
217 let composite = format!("{}.{}", info.channel_type, info.alias);
218 channels.insert(composite, serde_json::Value::Bool(true));
219 }
220
221 let locale = config
222 .locale
223 .as_deref()
224 .filter(|s| !s.is_empty())
225 .map(String::from)
226 .unwrap_or_else(zeroclaw_runtime::i18n::detect_locale);
227
228 let agent_alias = query.agent.as_deref().filter(|s| !s.trim().is_empty());
233 let (model_provider, model, temperature, memory_backend) =
234 match agent_alias.and_then(|alias| config.agent(alias).map(|a| (alias, a))) {
235 Some((alias, agent)) => {
236 let provider_ref = if agent.model_provider.is_empty() {
237 None
238 } else {
239 Some(agent.model_provider.as_str().to_string())
240 };
241 let resolved = config.resolved_model_provider_for_agent(alias);
242 let model = resolved
243 .as_ref()
244 .and_then(|(_, _, cfg)| cfg.model.clone())
245 .unwrap_or_default();
246 let temperature: Option<f64> =
247 resolved.as_ref().and_then(|(_, _, cfg)| cfg.temperature);
248 let backend_kind = agent.memory.backend;
249 let backend = serde_json::to_value(backend_kind)
250 .ok()
251 .and_then(|v| v.as_str().map(String::from))
252 .unwrap_or_else(|| format!("{backend_kind:?}").to_lowercase());
253 (provider_ref, model, temperature, backend)
254 }
255 None => (
256 config
257 .providers
258 .models
259 .iter_entries()
260 .next()
261 .map(|(ty, alias, _)| format!("{ty}.{alias}")),
262 state.model.clone(),
263 state.temperature,
264 state.mem.name().to_string(),
265 ),
266 };
267
268 let process = zeroclaw_runtime::process_stats::sample();
269
270 let body = serde_json::json!({
271 "version": env!("CARGO_PKG_VERSION"),
272 "model_provider": model_provider,
273 "model": model,
274 "temperature": temperature,
275 "uptime_seconds": health.uptime_seconds,
276 "daemon_started_at": zeroclaw_runtime::health::daemon_started_at(),
277 "gateway_port": config.gateway.port,
278 "locale": locale,
279 "memory_backend": memory_backend,
280 "paired": state.pairing.is_paired(),
281 "channels": channels,
282 "health": health,
283 "agent_alias": agent_alias,
284 "process": process,
285 });
286
287 Json(body).into_response()
288}
289
290pub async fn handle_api_tools(
292 State(state): State<AppState>,
293 headers: HeaderMap,
294) -> impl IntoResponse {
295 if let Err(e) = require_auth(&state, &headers) {
296 return e.into_response();
297 }
298
299 let tools: Vec<serde_json::Value> = state
300 .tools_registry
301 .iter()
302 .map(|spec| {
303 serde_json::json!({
304 "name": spec.name,
305 "description": spec.description,
306 "parameters": spec.parameters,
307 })
308 })
309 .collect();
310
311 Json(serde_json::json!({"tools": tools})).into_response()
312}
313
314pub async fn handle_api_cron_list(
316 State(state): State<AppState>,
317 headers: HeaderMap,
318) -> impl IntoResponse {
319 if let Err(e) = require_auth(&state, &headers) {
320 return e.into_response();
321 }
322
323 let config = state.config.read().clone();
324 match zeroclaw_runtime::cron::list_jobs(&config) {
325 Ok(jobs) => Json(serde_json::json!({"jobs": jobs})).into_response(),
326 Err(e) => (
327 StatusCode::INTERNAL_SERVER_ERROR,
328 Json(serde_json::json!({"error": format!("Failed to list cron jobs: {e}")})),
329 )
330 .into_response(),
331 }
332}
333
334pub async fn handle_api_cron_add(
336 State(state): State<AppState>,
337 headers: HeaderMap,
338 Json(body): Json<CronAddBody>,
339) -> impl IntoResponse {
340 if let Err(e) = require_auth(&state, &headers) {
341 return e.into_response();
342 }
343
344 let CronAddBody {
345 agent: agent_alias,
346 name,
347 schedule,
348 tz,
349 command,
350 job_type,
351 prompt,
352 delivery,
353 session_target,
354 model,
355 allowed_tools,
356 delete_after_run,
357 } = body;
358
359 let config = state.config.read().clone();
360 if config.agent(&agent_alias).is_none() {
361 return (
362 StatusCode::BAD_REQUEST,
363 Json(serde_json::json!({"error": format!(
364 "Unknown agent {agent_alias:?} (no [agents.{agent_alias}] entry configured)"
365 )})),
366 )
367 .into_response();
368 }
369 let tz = match normalize_optional_timezone(tz) {
370 Ok(tz) => tz,
371 Err(e) => return e.into_response(),
372 };
373 let schedule = match cron_schedule_from_api(schedule, tz) {
374 Ok(schedule) => schedule,
375 Err(e) => return e.into_response(),
376 };
377 if let Err(e) = zeroclaw_runtime::cron::validate_delivery_config(delivery.as_ref()) {
378 return (
379 StatusCode::BAD_REQUEST,
380 Json(serde_json::json!({"error": format!("Failed to add cron job: {e}")})),
381 )
382 .into_response();
383 }
384
385 let is_agent =
387 matches!(job_type.as_deref(), Some("agent")) || (job_type.is_none() && prompt.is_some());
388
389 let result = if is_agent {
390 let prompt = match prompt.as_deref() {
391 Some(p) if !p.trim().is_empty() => p,
392 _ => {
393 return (
394 StatusCode::BAD_REQUEST,
395 Json(serde_json::json!({"error": "Missing 'prompt' for agent job"})),
396 )
397 .into_response();
398 }
399 };
400
401 let session_target = session_target
402 .as_deref()
403 .map(zeroclaw_runtime::cron::SessionTarget::parse)
404 .unwrap_or_default();
405
406 let default_delete = matches!(schedule, zeroclaw_runtime::cron::Schedule::At { .. });
407 let delete_after_run = delete_after_run.unwrap_or(default_delete);
408
409 zeroclaw_runtime::cron::add_agent_job(
410 &config,
411 &agent_alias,
412 name,
413 schedule,
414 prompt,
415 session_target,
416 model,
417 delivery,
418 delete_after_run,
419 allowed_tools,
420 )
421 } else {
422 let command = match command.as_deref() {
423 Some(c) if !c.trim().is_empty() => c,
424 _ => {
425 return (
426 StatusCode::BAD_REQUEST,
427 Json(serde_json::json!({"error": "Missing 'command' for shell job"})),
428 )
429 .into_response();
430 }
431 };
432
433 zeroclaw_runtime::cron::add_shell_job_with_approval(
434 &config,
435 &agent_alias,
436 name,
437 schedule,
438 command,
439 delivery,
440 false,
441 )
442 };
443
444 match result {
445 Ok(job) => Json(serde_json::json!({"status": "ok", "job": job})).into_response(),
446 Err(e) => (
447 StatusCode::INTERNAL_SERVER_ERROR,
448 Json(serde_json::json!({"error": format!("Failed to add cron job: {e}")})),
449 )
450 .into_response(),
451 }
452}
453
454pub async fn handle_api_cron_runs(
456 State(state): State<AppState>,
457 headers: HeaderMap,
458 Path(id): Path<String>,
459 Query(params): Query<CronRunsQuery>,
460) -> impl IntoResponse {
461 if let Err(e) = require_auth(&state, &headers) {
462 return e.into_response();
463 }
464
465 let limit = params.limit.unwrap_or(20).clamp(1, 100) as usize;
466 let config = state.config.read().clone();
467
468 if let Err(e) = zeroclaw_runtime::cron::get_job(&config, &id) {
470 return (
471 StatusCode::NOT_FOUND,
472 Json(serde_json::json!({"error": format!("Cron job not found: {e}")})),
473 )
474 .into_response();
475 }
476
477 match zeroclaw_runtime::cron::list_runs(&config, &id, limit) {
478 Ok(runs) => {
479 let runs_json: Vec<serde_json::Value> = runs
480 .iter()
481 .map(|r| {
482 serde_json::json!({
483 "id": r.id,
484 "job_id": r.job_id,
485 "started_at": r.started_at.to_rfc3339(),
486 "finished_at": r.finished_at.to_rfc3339(),
487 "status": r.status,
488 "output": r.output,
489 "duration_ms": r.duration_ms,
490 })
491 })
492 .collect();
493 Json(serde_json::json!({"runs": runs_json})).into_response()
494 }
495 Err(e) => (
496 StatusCode::INTERNAL_SERVER_ERROR,
497 Json(serde_json::json!({"error": format!("Failed to list cron runs: {e}")})),
498 )
499 .into_response(),
500 }
501}
502
503pub async fn handle_api_cron_run(
505 State(state): State<AppState>,
506 headers: HeaderMap,
507 Path(id): Path<String>,
508) -> impl IntoResponse {
509 if let Err(e) = require_auth(&state, &headers) {
510 return e.into_response();
511 }
512
513 let config = state.config.read().clone();
514
515 let job = match zeroclaw_runtime::cron::get_job(&config, &id) {
516 Ok(job) => job,
517 Err(e) => {
518 return (
519 StatusCode::NOT_FOUND,
520 Json(serde_json::json!({"error": format!("Cron job not found: {e}")})),
521 )
522 .into_response();
523 }
524 };
525
526 let started_at = chrono::Utc::now();
527 let (mut success, output) =
528 zeroclaw_runtime::cron::scheduler::execute_job_now(&config, &job).await;
529 let finished_at = chrono::Utc::now();
530 let duration_ms = (finished_at - started_at).num_milliseconds();
531 let outcome = zeroclaw_runtime::cron::scheduler::deliver_and_classify_run_result(
532 &config,
533 &job,
534 success,
535 output,
536 zeroclaw_runtime::cron::scheduler::CronDeliveryContext::GatewayManual,
537 )
538 .await;
539 success = outcome.success;
540
541 if let Err(e) = zeroclaw_runtime::cron::record_run(
542 &config,
543 &job.id,
544 started_at,
545 finished_at,
546 &outcome.status,
547 Some(&outcome.output),
548 duration_ms,
549 ) {
550 ::zeroclaw_log::record!(
551 WARN,
552 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
553 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
554 .with_attrs(::serde_json::json!({"job_id": job.id, "error": format!("{}", e)})),
555 "manual cron trigger: failed to persist run history"
556 );
557 }
558 if let Err(e) = zeroclaw_runtime::cron::record_last_run_with_status(
559 &config,
560 &job.id,
561 finished_at,
562 &outcome.status,
563 &outcome.output,
564 ) {
565 ::zeroclaw_log::record!(
566 WARN,
567 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
568 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
569 .with_attrs(::serde_json::json!({"job_id": job.id, "error": format!("{}", e)})),
570 "manual cron trigger: failed to update last_run state"
571 );
572 }
573
574 let _ = state.event_tx.send(serde_json::json!({
577 "type": "cron_result",
578 "job_id": job.id,
579 "success": success,
580 "output": &outcome.output,
581 "manual": true,
582 "timestamp": finished_at.to_rfc3339(),
583 }));
584
585 Json(serde_json::json!({
586 "status": &outcome.status,
587 "job_id": job.id,
588 "success": success,
589 "output": &outcome.output,
590 "duration_ms": duration_ms,
591 "started_at": started_at.to_rfc3339(),
592 "finished_at": finished_at.to_rfc3339(),
593 }))
594 .into_response()
595}
596
597pub async fn handle_api_cron_patch(
599 State(state): State<AppState>,
600 headers: HeaderMap,
601 Path(id): Path<String>,
602 Json(body): Json<CronPatchBody>,
603) -> impl IntoResponse {
604 if let Err(e) = require_auth(&state, &headers) {
605 return e.into_response();
606 }
607
608 let config = state.config.read().clone();
609 if config.agent(&body.agent).is_none() {
610 return (
611 StatusCode::BAD_REQUEST,
612 Json(serde_json::json!({"error": format!(
613 "Unknown agent {a:?} (no [agents.{a}] entry configured)",
614 a = body.agent
615 )})),
616 )
617 .into_response();
618 }
619 let agent_alias = body.agent.clone();
620 let CronPatchBody {
621 agent: _,
622 name,
623 schedule: schedule_expr,
624 tz,
625 clear_tz,
626 command,
627 prompt,
628 } = body;
629 let timezone_patch = match parse_timezone_patch(tz, clear_tz) {
630 Ok(patch) => patch,
631 Err(e) => return e.into_response(),
632 };
633
634 let existing = match zeroclaw_runtime::cron::get_job(&config, &id) {
635 Ok(j) => j,
636 Err(e) => {
637 return (
638 StatusCode::NOT_FOUND,
639 Json(serde_json::json!({"error": format!("Cron job not found: {e}")})),
640 )
641 .into_response();
642 }
643 };
644 let new_expr = schedule_expr
645 .as_deref()
646 .map(str::trim)
647 .filter(|expr| !expr.is_empty())
648 .map(str::to_string);
649 let timezone_changed = !matches!(timezone_patch, CronTimezonePatch::Preserve);
650 let schedule = if new_expr.is_some() || timezone_changed {
651 let (expr, existing_tz) = match (&existing.schedule, new_expr) {
652 (_, Some(expr)) => {
653 let existing_tz = match &existing.schedule {
654 zeroclaw_runtime::cron::Schedule::Cron { tz, .. } => tz.clone(),
655 _ => None,
656 };
657 (expr, existing_tz)
658 }
659 (zeroclaw_runtime::cron::Schedule::Cron { expr, tz }, None) => {
660 (expr.clone(), tz.clone())
661 }
662 (_, None) => {
663 return bad_request("tz can only be updated on cron schedules").into_response();
664 }
665 };
666 let tz = match timezone_patch {
667 CronTimezonePatch::Preserve => existing_tz,
668 CronTimezonePatch::Set(tz) => Some(tz),
669 CronTimezonePatch::Clear => None,
670 };
671 match cron_schedule_from_api(expr, tz) {
672 Ok(schedule) => Some(schedule),
673 Err(e) => return e.into_response(),
674 }
675 } else {
676 None
677 };
678 let is_agent = matches!(existing.job_type, zeroclaw_runtime::cron::JobType::Agent);
679 let (patch_command, patch_prompt) = if is_agent {
680 (None, command.or(prompt))
681 } else {
682 (command.or(prompt), None)
683 };
684
685 let patch = zeroclaw_runtime::cron::CronJobPatch {
686 name,
687 schedule,
688 command: patch_command,
689 prompt: patch_prompt,
690 ..zeroclaw_runtime::cron::CronJobPatch::default()
691 };
692
693 match zeroclaw_runtime::cron::update_shell_job_with_approval(
694 &config,
695 &agent_alias,
696 &id,
697 patch,
698 false,
699 ) {
700 Ok(job) => Json(serde_json::json!({"status": "ok", "job": job})).into_response(),
701 Err(e) => (
702 StatusCode::INTERNAL_SERVER_ERROR,
703 Json(serde_json::json!({"error": format!("Failed to update cron job: {e}")})),
704 )
705 .into_response(),
706 }
707}
708
709pub async fn handle_api_cron_delete(
711 State(state): State<AppState>,
712 headers: HeaderMap,
713 Path(id): Path<String>,
714) -> impl IntoResponse {
715 if let Err(e) = require_auth(&state, &headers) {
716 return e.into_response();
717 }
718
719 let config = state.config.read().clone();
720 match zeroclaw_runtime::cron::remove_job(&config, &id) {
721 Ok(()) => Json(serde_json::json!({"status": "ok"})).into_response(),
722 Err(e) => (
723 StatusCode::INTERNAL_SERVER_ERROR,
724 Json(serde_json::json!({"error": format!("Failed to remove cron job: {e}")})),
725 )
726 .into_response(),
727 }
728}
729
730pub async fn handle_api_cron_settings_get(
732 State(state): State<AppState>,
733 headers: HeaderMap,
734) -> impl IntoResponse {
735 if let Err(e) = require_auth(&state, &headers) {
736 return e.into_response();
737 }
738
739 let config = state.config.read().clone();
740 Json(serde_json::json!({
741 "enabled": config.scheduler.enabled,
742 "catch_up_on_startup": config.scheduler.catch_up_on_startup,
743 "max_run_history": config.scheduler.max_run_history,
744 }))
745 .into_response()
746}
747
748pub async fn handle_api_cron_settings_patch(
750 State(state): State<AppState>,
751 headers: HeaderMap,
752 Json(body): Json<serde_json::Value>,
753) -> impl IntoResponse {
754 if let Err(e) = require_auth(&state, &headers) {
755 return e.into_response();
756 }
757
758 let mut config = state.config.read().clone();
759
760 if let Some(v) = body.get("enabled").and_then(|v| v.as_bool()) {
761 config.scheduler.enabled = v;
762 config.mark_dirty("scheduler.enabled");
763 }
764 if let Some(v) = body.get("catch_up_on_startup").and_then(|v| v.as_bool()) {
765 config.scheduler.catch_up_on_startup = v;
766 config.mark_dirty("scheduler.catch-up-on-startup");
767 }
768 if let Some(v) = body.get("max_run_history").and_then(|v| v.as_u64()) {
769 config.scheduler.max_run_history = u32::try_from(v).unwrap_or(u32::MAX);
770 config.mark_dirty("scheduler.max-run-history");
771 }
772
773 if let Err(e) = config.save_dirty().await {
774 return (
775 StatusCode::INTERNAL_SERVER_ERROR,
776 Json(serde_json::json!({"error": format!("Failed to save config: {e}")})),
777 )
778 .into_response();
779 }
780
781 *state.config.write() = config.clone();
782
783 Json(serde_json::json!({
784 "status": "ok",
785 "enabled": config.scheduler.enabled,
786 "catch_up_on_startup": config.scheduler.catch_up_on_startup,
787 "max_run_history": config.scheduler.max_run_history,
788 }))
789 .into_response()
790}
791
792pub async fn handle_api_integrations(
794 State(state): State<AppState>,
795 headers: HeaderMap,
796) -> impl IntoResponse {
797 if let Err(e) = require_auth(&state, &headers) {
798 return e.into_response();
799 }
800
801 let config = state.config.read().clone();
802 let entries = zeroclaw_runtime::integrations::registry::all_integrations(&config);
803
804 let integrations: Vec<serde_json::Value> = entries
805 .iter()
806 .map(|entry| {
807 serde_json::json!({
808 "name": entry.name,
809 "description": entry.description,
810 "category": entry.category,
811 "status": entry.status,
812 })
813 })
814 .collect();
815
816 Json(serde_json::json!({"integrations": integrations})).into_response()
817}
818
819pub async fn handle_api_integrations_settings(
821 State(state): State<AppState>,
822 headers: HeaderMap,
823) -> impl IntoResponse {
824 if let Err(e) = require_auth(&state, &headers) {
825 return e.into_response();
826 }
827
828 let config = state.config.read().clone();
829 let entries = zeroclaw_runtime::integrations::registry::all_integrations(&config);
830
831 let mut settings = serde_json::Map::new();
832 for entry in &entries {
833 let enabled = matches!(
834 entry.status,
835 zeroclaw_runtime::integrations::IntegrationStatus::Active
836 );
837 settings.insert(
838 entry.name.clone(),
839 serde_json::json!({
840 "enabled": enabled,
841 "category": entry.category,
842 "status": entry.status,
843 }),
844 );
845 }
846
847 Json(serde_json::json!({"settings": settings})).into_response()
848}
849
850pub async fn handle_api_doctor(
852 State(state): State<AppState>,
853 headers: HeaderMap,
854) -> impl IntoResponse {
855 if let Err(e) = require_auth(&state, &headers) {
856 return e.into_response();
857 }
858
859 let config = state.config.read().clone();
860 let results = zeroclaw_runtime::doctor::diagnose(&config);
861
862 let ok_count = results
863 .iter()
864 .filter(|r| r.severity == zeroclaw_runtime::doctor::Severity::Ok)
865 .count();
866 let warn_count = results
867 .iter()
868 .filter(|r| r.severity == zeroclaw_runtime::doctor::Severity::Warn)
869 .count();
870 let error_count = results
871 .iter()
872 .filter(|r| r.severity == zeroclaw_runtime::doctor::Severity::Error)
873 .count();
874
875 Json(serde_json::json!({
876 "results": results,
877 "summary": {
878 "ok": ok_count,
879 "warnings": warn_count,
880 "errors": error_count,
881 }
882 }))
883 .into_response()
884}
885
886async fn resolve_memory_handle(
893 state: &AppState,
894 agent_alias: Option<&str>,
895) -> Result<std::sync::Arc<dyn zeroclaw_memory::Memory>, (StatusCode, Json<serde_json::Value>)> {
896 let alias = match agent_alias.map(str::trim).filter(|s| !s.is_empty()) {
897 Some(a) => a,
898 None => return Ok(state.mem.clone()),
899 };
900 let config = state.config.read().clone();
901 if config.agent(alias).is_none() {
902 return Err((
903 StatusCode::BAD_REQUEST,
904 Json(serde_json::json!({"error": format!(
905 "Unknown agent {alias:?} (no [agents.{alias}] entry configured)"
906 )})),
907 ));
908 }
909 let api_key = config
910 .resolved_model_provider_for_agent(alias)
911 .and_then(|(_, _, cfg)| cfg.api_key.clone());
912 zeroclaw_memory::create_memory_for_agent(&config, alias, api_key.as_deref())
913 .await
914 .map_err(|e| {
915 (
916 StatusCode::INTERNAL_SERVER_ERROR,
917 Json(
918 serde_json::json!({"error": format!("Failed to build per-agent memory: {e:#}")}),
919 ),
920 )
921 })
922}
923
924pub async fn handle_api_memory_list(
926 State(state): State<AppState>,
927 headers: HeaderMap,
928 Query(params): Query<MemoryQuery>,
929) -> impl IntoResponse {
930 if let Err(e) = require_auth(&state, &headers) {
931 return e.into_response();
932 }
933
934 let mem = match resolve_memory_handle(&state, params.agent.as_deref()).await {
935 Ok(m) => m,
936 Err(e) => return e.into_response(),
937 };
938
939 if params.query.is_some() || params.since.is_some() || params.until.is_some() {
941 let query = params.query.as_deref().unwrap_or("");
942 let since = params.since.as_deref();
943 let until = params.until.as_deref();
944 match mem.recall(query, 50, None, since, until).await {
950 Ok(entries) => {
951 let entries = match params.category.as_deref() {
952 Some(cat) => entries
953 .into_iter()
954 .filter(|e| e.category.to_string() == cat)
955 .collect(),
956 None => entries,
957 };
958 Json(serde_json::json!({
959 "entries": sanitize_memory_entries_for_api(entries)
960 }))
961 .into_response()
962 }
963 Err(e) => (
964 StatusCode::INTERNAL_SERVER_ERROR,
965 Json(serde_json::json!({"error": format!("Memory recall failed: {e}")})),
966 )
967 .into_response(),
968 }
969 } else {
970 let category = params.category.as_deref().map(|cat| match cat {
972 "core" => zeroclaw_memory::MemoryCategory::Core,
973 "daily" => zeroclaw_memory::MemoryCategory::Daily,
974 "conversation" => zeroclaw_memory::MemoryCategory::Conversation,
975 other => zeroclaw_memory::MemoryCategory::Custom(other.to_string()),
976 });
977
978 match mem.list(category.as_ref(), None).await {
979 Ok(entries) => Json(serde_json::json!({
980 "entries": sanitize_memory_entries_for_api(entries)
981 }))
982 .into_response(),
983 Err(e) => (
984 StatusCode::INTERNAL_SERVER_ERROR,
985 Json(serde_json::json!({"error": format!("Memory list failed: {e}")})),
986 )
987 .into_response(),
988 }
989 }
990}
991
992fn sanitize_memory_entries_for_api(entries: Vec<MemoryEntry>) -> Vec<MemoryEntry> {
993 entries
994 .into_iter()
995 .map(|mut entry| {
996 entry.content = truncate_with_ellipsis_total_chars(entry.content);
997 entry
998 })
999 .collect()
1000}
1001
1002fn truncate_with_ellipsis_total_chars(mut s: String) -> String {
1003 if s.char_indices().nth(MEMORY_API_CONTENT_MAX_CHARS).is_none() {
1004 return s;
1005 }
1006
1007 let keep_chars = MEMORY_API_CONTENT_MAX_CHARS - 3;
1008 let cut_idx = s
1009 .char_indices()
1010 .nth(keep_chars)
1011 .map(|(idx, _)| idx)
1012 .unwrap_or(s.len());
1013 s.truncate(cut_idx);
1014 s.push_str("...");
1015 s
1016}
1017
1018pub async fn handle_api_memory_store(
1020 State(state): State<AppState>,
1021 headers: HeaderMap,
1022 Json(body): Json<MemoryStoreBody>,
1023) -> impl IntoResponse {
1024 if let Err(e) = require_auth(&state, &headers) {
1025 return e.into_response();
1026 }
1027
1028 let category = body
1029 .category
1030 .as_deref()
1031 .map(|cat| match cat {
1032 "core" => zeroclaw_memory::MemoryCategory::Core,
1033 "daily" => zeroclaw_memory::MemoryCategory::Daily,
1034 "conversation" => zeroclaw_memory::MemoryCategory::Conversation,
1035 other => zeroclaw_memory::MemoryCategory::Custom(other.to_string()),
1036 })
1037 .unwrap_or(zeroclaw_memory::MemoryCategory::Core);
1038
1039 let mem = match resolve_memory_handle(&state, body.agent.as_deref()).await {
1040 Ok(m) => m,
1041 Err(e) => return e.into_response(),
1042 };
1043
1044 match mem.store(&body.key, &body.content, category, None).await {
1045 Ok(()) => Json(serde_json::json!({"status": "ok"})).into_response(),
1046 Err(e) => (
1047 StatusCode::INTERNAL_SERVER_ERROR,
1048 Json(serde_json::json!({"error": format!("Memory store failed: {e}")})),
1049 )
1050 .into_response(),
1051 }
1052}
1053
1054pub async fn handle_api_memory_delete(
1056 State(state): State<AppState>,
1057 headers: HeaderMap,
1058 Path(key): Path<String>,
1059 Query(query): Query<MemoryDeleteQuery>,
1060) -> impl IntoResponse {
1061 if let Err(e) = require_auth(&state, &headers) {
1062 return e.into_response();
1063 }
1064
1065 let mem = match resolve_memory_handle(&state, query.agent.as_deref()).await {
1066 Ok(m) => m,
1067 Err(e) => return e.into_response(),
1068 };
1069
1070 match mem.forget(&key).await {
1071 Ok(deleted) => {
1072 Json(serde_json::json!({"status": "ok", "deleted": deleted})).into_response()
1073 }
1074 Err(e) => (
1075 StatusCode::INTERNAL_SERVER_ERROR,
1076 Json(serde_json::json!({"error": format!("Memory forget failed: {e}")})),
1077 )
1078 .into_response(),
1079 }
1080}
1081
1082#[derive(Debug, Deserialize)]
1085pub struct CostQuery {
1086 #[serde(default)]
1087 pub agent: Option<String>,
1088 #[serde(default)]
1092 pub from: Option<String>,
1093 #[serde(default)]
1094 pub to: Option<String>,
1095}
1096
1097pub async fn handle_api_cost(
1102 State(state): State<AppState>,
1103 headers: HeaderMap,
1104 Query(query): Query<CostQuery>,
1105) -> impl IntoResponse {
1106 if let Err(e) = require_auth(&state, &headers) {
1107 return e.into_response();
1108 }
1109
1110 let parse_bound = |s: &str| {
1111 chrono::DateTime::parse_from_rfc3339(s)
1112 .ok()
1113 .map(|d| d.with_timezone(&chrono::Utc))
1114 };
1115 let from = query.from.as_deref().and_then(parse_bound);
1116 let to = query.to.as_deref().and_then(parse_bound);
1117
1118 if let Some(ref tracker) = state.cost_tracker {
1119 let result = match query.agent.as_deref().filter(|s| !s.is_empty()) {
1120 Some(alias) => tracker.get_summary_for_agent(alias),
1121 None => tracker.get_summary_in_bounds(from, to),
1122 };
1123 match result {
1124 Ok(summary) => Json(serde_json::json!({"cost": summary})).into_response(),
1125 Err(e) => (
1126 StatusCode::INTERNAL_SERVER_ERROR,
1127 Json(serde_json::json!({"error": format!("Cost summary failed: {e}")})),
1128 )
1129 .into_response(),
1130 }
1131 } else {
1132 Json(serde_json::json!({
1133 "cost": {
1134 "session_cost_usd": 0.0,
1135 "daily_cost_usd": 0.0,
1136 "monthly_cost_usd": 0.0,
1137 "total_tokens": 0,
1138 "request_count": 0,
1139 "by_model": {},
1140 "by_agent": {},
1141 }
1142 }))
1143 .into_response()
1144 }
1145}
1146
1147pub async fn handle_api_cli_tools(
1149 State(state): State<AppState>,
1150 headers: HeaderMap,
1151) -> impl IntoResponse {
1152 if let Err(e) = require_auth(&state, &headers) {
1153 return e.into_response();
1154 }
1155
1156 let tools = match tokio::task::spawn_blocking(|| {
1159 zeroclaw_tools::cli_discovery::discover_cli_tools(&[], &[])
1160 })
1161 .await
1162 {
1163 Ok(tools) => tools,
1164 Err(e) => {
1165 ::zeroclaw_log::record!(
1168 WARN,
1169 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1170 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1171 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1172 "cli-tools discovery task failed; returning empty list"
1173 );
1174 Vec::new()
1175 }
1176 };
1177
1178 Json(serde_json::json!({"cli_tools": tools})).into_response()
1179}
1180
1181pub async fn handle_api_channels(
1183 State(state): State<AppState>,
1184 headers: HeaderMap,
1185) -> impl IntoResponse {
1186 if let Err(e) = require_auth(&state, &headers) {
1187 return e.into_response();
1188 }
1189
1190 let config = state.config.read().clone();
1191 let health = zeroclaw_runtime::health::snapshot();
1192 let channels: Vec<serde_json::Value> = config
1195 .channels_by_alias()
1196 .into_iter()
1197 .map(|info| {
1198 let composite = format!("{}.{}", info.channel_type, info.alias);
1199 let compiled_key = compiled_readiness_key_for_alias(&config, &info);
1200 let compiled = zeroclaw_channels::listing::is_channel_type_compiled(compiled_key);
1201 let readiness = channel_readiness(&config, &info, &health, &state);
1202 let (status, health_status) = if compiled {
1203 channel_readiness_summary(&readiness)
1204 } else {
1205 ("not_compiled", "unavailable")
1206 };
1207 serde_json::json!({
1208 "name": composite,
1209 "type": info.channel_type,
1210 "alias": info.alias,
1211 "owning_agent": info.owning_agent,
1212 "enabled": info.enabled,
1213 "compiled": compiled,
1214 "status": status,
1215 "message_count": 0,
1216 "last_message_at": null,
1217 "health": health_status,
1218 "readiness": readiness,
1219 })
1220 })
1221 .collect();
1222
1223 Json(serde_json::json!({ "channels": channels })).into_response()
1224}
1225
1226pub async fn handle_api_tuis(
1228 State(state): State<AppState>,
1229 headers: HeaderMap,
1230) -> impl IntoResponse {
1231 if let Err(e) = require_auth(&state, &headers) {
1232 return e.into_response();
1233 }
1234
1235 let tuis: Vec<serde_json::Value> = state
1236 .tui_registry
1237 .as_ref()
1238 .map(|r| {
1239 r.list()
1240 .into_iter()
1241 .map(|e| {
1242 serde_json::json!({
1243 "tui_id": e.tui_id,
1244 "connected_at": e.connected_at.to_rfc3339(),
1245 "peer_label": e.peer_label,
1246 "transport": e.transport,
1247 })
1248 })
1249 .collect()
1250 })
1251 .unwrap_or_default();
1252
1253 Json(serde_json::json!({ "tuis": tuis })).into_response()
1254}
1255
1256fn compiled_readiness_key_for_alias<'a>(config: &'a Config, info: &'a ChannelAliasInfo) -> &'a str {
1257 if info.channel_type == "whatsapp"
1258 && config
1259 .channels
1260 .whatsapp
1261 .get(&info.alias)
1262 .is_some_and(|whatsapp| whatsapp.backend_type() == "web")
1263 {
1264 "whatsapp-web"
1265 } else {
1266 info.channel_type.as_str()
1267 }
1268}
1269
1270#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
1271#[serde(rename_all = "snake_case")]
1272enum ChannelReadinessState {
1273 Ready,
1274 Missing,
1275 Unknown,
1276}
1277
1278const CHANNEL_LISTENER_HEALTH_MAX_AGE_SECS: i64 = 30;
1279
1280#[derive(Debug, Clone, Serialize)]
1281struct ChannelReadiness {
1282 enabled: ChannelReadinessState,
1283 bound_to_agent: ChannelReadinessState,
1284 authenticated: ChannelReadinessState,
1285 listening: ChannelReadinessState,
1286 requirements: Vec<String>,
1287 notes: Vec<String>,
1288}
1289
1290fn channel_readiness(
1291 config: &zeroclaw_config::schema::Config,
1292 info: &zeroclaw_config::schema::ChannelAliasInfo,
1293 health: &zeroclaw_runtime::health::HealthSnapshot,
1294 state: &AppState,
1295) -> ChannelReadiness {
1296 let mut readiness = ChannelReadiness {
1297 enabled: if info.enabled {
1298 ChannelReadinessState::Ready
1299 } else {
1300 ChannelReadinessState::Missing
1301 },
1302 bound_to_agent: if info.owning_agent.is_some() {
1303 ChannelReadinessState::Ready
1304 } else {
1305 ChannelReadinessState::Missing
1306 },
1307 authenticated: ChannelReadinessState::Unknown,
1308 listening: ChannelReadinessState::Unknown,
1309 requirements: Vec::new(),
1310 notes: Vec::new(),
1311 };
1312
1313 if readiness.enabled == ChannelReadinessState::Missing {
1314 readiness
1315 .requirements
1316 .push("Enable this channel alias.".to_string());
1317 }
1318 if readiness.bound_to_agent == ChannelReadinessState::Missing {
1319 readiness
1320 .requirements
1321 .push("Bind this channel to an enabled agent.".to_string());
1322 }
1323
1324 if readiness.enabled == ChannelReadinessState::Ready
1325 && readiness.bound_to_agent == ChannelReadinessState::Ready
1326 {
1327 if info.channel_type == "webhook" {
1328 apply_webhook_readiness(config, &info.alias, health, state, &mut readiness);
1329 } else {
1330 readiness.notes.push(format!(
1331 "Live readiness is not checked for `{}` channels yet.",
1332 info.channel_type
1333 ));
1334 }
1335 }
1336
1337 readiness
1338}
1339
1340fn channel_readiness_summary(readiness: &ChannelReadiness) -> (&'static str, &'static str) {
1341 if readiness.enabled == ChannelReadinessState::Missing
1342 || readiness.bound_to_agent == ChannelReadinessState::Missing
1343 {
1344 return ("inactive", "degraded");
1345 }
1346
1347 if readiness.authenticated == ChannelReadinessState::Unknown
1348 && readiness.listening == ChannelReadinessState::Unknown
1349 {
1350 return ("unknown", "degraded");
1351 }
1352
1353 if readiness.authenticated == ChannelReadinessState::Ready
1354 && readiness.listening == ChannelReadinessState::Ready
1355 {
1356 ("active", "healthy")
1357 } else {
1358 ("error", "down")
1359 }
1360}
1361
1362fn apply_webhook_readiness(
1363 config: &zeroclaw_config::schema::Config,
1364 alias: &str,
1365 health: &zeroclaw_runtime::health::HealthSnapshot,
1366 state: &AppState,
1367 readiness: &mut ChannelReadiness,
1368) {
1369 let Some(webhook) = config.channels.webhook.get(alias) else {
1370 readiness.authenticated = ChannelReadinessState::Missing;
1371 readiness.listening = ChannelReadinessState::Missing;
1372 readiness
1373 .requirements
1374 .push("Webhook config block is missing.".to_string());
1375 return;
1376 };
1377
1378 if state.pairing.require_pairing() && !state.pairing.is_paired() {
1379 readiness.authenticated = ChannelReadinessState::Missing;
1380 readiness
1381 .requirements
1382 .push("Pair the gateway before using the webhook endpoint.".to_string());
1383 } else {
1384 readiness.authenticated = ChannelReadinessState::Ready;
1385 }
1386
1387 let component = format!("channel:webhook.{alias}");
1388 let component_health = health.components.get(&component);
1389 let component_status = component_health.map(|component| component.status.as_str());
1390 let supervised_listener_ok = component_health.is_some_and(component_health_ok_and_fresh);
1391 let listen_path = normalized_webhook_path(webhook.listen_path.as_deref());
1392
1393 if supervised_listener_ok {
1394 readiness.listening = ChannelReadinessState::Ready;
1395 } else if component_status == Some("error") {
1396 readiness.listening = ChannelReadinessState::Missing;
1397 readiness.requirements.push(format!(
1398 "Resolve the listener error for `webhook.{alias}` before using this channel."
1399 ));
1400 } else {
1401 readiness.listening = ChannelReadinessState::Missing;
1402 readiness.requirements.push(format!(
1403 "Start a channel listener for `webhook.{alias}` on port {}{}.",
1404 webhook.port, listen_path
1405 ));
1406 }
1407}
1408
1409fn component_health_ok_and_fresh(component: &zeroclaw_runtime::health::ComponentHealth) -> bool {
1410 if component.status != "ok" {
1411 return false;
1412 }
1413
1414 let Ok(updated_at) = chrono::DateTime::parse_from_rfc3339(&component.updated_at) else {
1415 return false;
1416 };
1417 let age = chrono::Utc::now().signed_duration_since(updated_at.with_timezone(&chrono::Utc));
1418 age >= chrono::Duration::zero()
1419 && age <= chrono::Duration::seconds(CHANNEL_LISTENER_HEALTH_MAX_AGE_SECS)
1420}
1421
1422fn normalized_webhook_path(path: Option<&str>) -> String {
1423 let trimmed = path.unwrap_or("/webhook").trim();
1424 if trimmed.is_empty() {
1425 "/webhook".to_string()
1426 } else if trimmed.starts_with('/') {
1427 trimmed.to_string()
1428 } else {
1429 format!("/{trimmed}")
1430 }
1431}
1432
1433pub async fn handle_api_health(
1435 State(state): State<AppState>,
1436 headers: HeaderMap,
1437) -> impl IntoResponse {
1438 if let Err(e) = require_auth(&state, &headers) {
1439 return e.into_response();
1440 }
1441
1442 let snapshot = zeroclaw_runtime::health::snapshot();
1443 Json(serde_json::json!({"health": snapshot})).into_response()
1444}
1445
1446pub async fn handle_api_sessions_list(
1452 State(state): State<AppState>,
1453 headers: HeaderMap,
1454) -> impl IntoResponse {
1455 if let Err(e) = require_auth(&state, &headers) {
1456 return e.into_response();
1457 }
1458
1459 let Some(ref backend) = state.session_backend else {
1460 return Json(serde_json::json!({
1461 "sessions": [],
1462 "message": "Session persistence is disabled"
1463 }))
1464 .into_response();
1465 };
1466
1467 let config = state.config.read().clone();
1471 let all_metadata = backend.list_sessions_with_metadata();
1472 let sessions: Vec<serde_json::Value> = all_metadata
1473 .into_iter()
1474 .filter(|meta| meta.agent_alias.is_some() || meta.channel_id.is_some())
1475 .map(|meta| {
1476 let agent_alias = meta.agent_alias.clone().or_else(|| {
1480 meta.channel_id
1481 .as_deref()
1482 .and_then(|c| config.agent_for_channel(c))
1483 .map(str::to_string)
1484 });
1485 let session_id = meta
1488 .key
1489 .strip_prefix("gw_")
1490 .map(str::to_string)
1491 .unwrap_or_else(|| meta.key.clone());
1492 let mut entry = serde_json::json!({
1493 "session_id": session_id,
1496 "session_key": meta.key.clone(),
1498 "created_at": meta.created_at.to_rfc3339(),
1499 "last_activity": meta.last_activity.to_rfc3339(),
1500 "message_count": meta.message_count,
1501 "agent_alias": agent_alias,
1502 "channel_id": meta.channel_id,
1503 });
1504 if let Some(name) = meta.name {
1505 entry["name"] = serde_json::Value::String(name);
1506 }
1507 entry
1508 })
1509 .collect();
1510
1511 Json(serde_json::json!({ "sessions": sessions })).into_response()
1512}
1513
1514pub async fn handle_api_session_messages(
1516 State(state): State<AppState>,
1517 headers: HeaderMap,
1518 Path(id): Path<String>,
1519) -> impl IntoResponse {
1520 if let Err(e) = require_auth(&state, &headers) {
1521 return e.into_response();
1522 }
1523
1524 let Some(ref backend) = state.session_backend else {
1525 return Json(serde_json::json!({
1526 "session_id": id,
1527 "messages": [],
1528 "session_persistence": false,
1529 }))
1530 .into_response();
1531 };
1532
1533 let session_key = if id.starts_with("gw_") || id.contains('_') {
1537 id.clone()
1538 } else {
1539 format!("gw_{id}")
1540 };
1541 let msgs = backend.load_with_timestamps(&session_key);
1542 let messages: Vec<serde_json::Value> = msgs
1543 .into_iter()
1544 .map(|m| {
1545 serde_json::json!({
1546 "role": m.message.role,
1547 "content": m.message.content,
1548 "created_at": m.created_at.map(|dt| dt.to_rfc3339()),
1549 })
1550 })
1551 .collect();
1552
1553 Json(serde_json::json!({
1554 "session_id": id,
1555 "messages": messages,
1556 "session_persistence": true,
1557 }))
1558 .into_response()
1559}
1560
1561pub async fn handle_api_session_message_post(
1563 State(state): State<AppState>,
1564 headers: HeaderMap,
1565 Path(id): Path<String>,
1566 Json(body): Json<SessionMessagePostBody>,
1567) -> impl IntoResponse {
1568 if let Err(e) = require_auth(&state, &headers) {
1569 return e.into_response();
1570 }
1571
1572 if body.content.trim().is_empty() {
1573 return (
1574 StatusCode::BAD_REQUEST,
1575 Json(serde_json::json!({"error": "content is required"})),
1576 )
1577 .into_response();
1578 }
1579
1580 let Some(ref backend) = state.session_backend else {
1581 return (
1582 StatusCode::SERVICE_UNAVAILABLE,
1583 Json(serde_json::json!({"error": "Session persistence is disabled"})),
1584 )
1585 .into_response();
1586 };
1587
1588 let session_key = format!("gw_{id}");
1589 if !backend
1590 .list_sessions()
1591 .iter()
1592 .any(|key| key == &session_key)
1593 {
1594 return (
1595 StatusCode::NOT_FOUND,
1596 Json(serde_json::json!({"error": "Session not found"})),
1597 )
1598 .into_response();
1599 }
1600
1601 let _session_guard = match state.session_queue.acquire(&session_key).await {
1602 Ok(guard) => guard,
1603 Err(crate::session_queue::SessionQueueError::QueueFull { .. }) => {
1604 return (
1605 StatusCode::TOO_MANY_REQUESTS,
1606 Json(serde_json::json!({"error": "Session queue is full"})),
1607 )
1608 .into_response();
1609 }
1610 Err(crate::session_queue::SessionQueueError::Timeout { .. }) => {
1611 return (
1612 StatusCode::REQUEST_TIMEOUT,
1613 Json(serde_json::json!({"error": "Timed out waiting for session queue"})),
1614 )
1615 .into_response();
1616 }
1617 };
1618
1619 let message = zeroclaw_providers::ChatMessage::assistant(&body.content);
1620 if let Err(e) = backend.append(&session_key, &message) {
1621 return (
1622 StatusCode::INTERNAL_SERVER_ERROR,
1623 Json(serde_json::json!({"error": format!("Failed to append session message: {e}")})),
1624 )
1625 .into_response();
1626 }
1627
1628 let event = serde_json::json!({
1631 "type": "message",
1632 "session_id": id.clone(),
1633 "role": "assistant",
1634 "content": body.content.clone(),
1635 "source": "api",
1636 "timestamp": chrono::Utc::now().to_rfc3339(),
1637 });
1638 let _ = state.event_tx.send(event);
1639
1640 Json(serde_json::json!({
1641 "status": "ok",
1642 "session_id": id,
1643 "message": {
1644 "role": "assistant",
1645 "content": message.content,
1646 },
1647 "session_persistence": true,
1648 }))
1649 .into_response()
1650}
1651
1652pub async fn handle_api_session_delete(
1654 State(state): State<AppState>,
1655 headers: HeaderMap,
1656 Path(id): Path<String>,
1657) -> impl IntoResponse {
1658 if let Err(e) = require_auth(&state, &headers) {
1659 return e.into_response();
1660 }
1661
1662 let Some(ref backend) = state.session_backend else {
1663 return (
1664 StatusCode::NOT_FOUND,
1665 Json(serde_json::json!({"error": "Session persistence is disabled"})),
1666 )
1667 .into_response();
1668 };
1669
1670 let session_key = if id.starts_with("gw_") || id.contains('_') {
1671 id.clone()
1672 } else {
1673 format!("gw_{id}")
1674 };
1675
1676 let token = state
1683 .cancel_tokens
1684 .lock()
1685 .expect("cancel_tokens lock poisoned")
1686 .remove(&session_key);
1687 if let Some(token) = token {
1688 token.cancel();
1689 ::zeroclaw_log::record!(
1690 INFO,
1691 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1692 .with_attrs(::serde_json::json!({"session_key": session_key})),
1693 "cancelled in-flight turn for deleted session"
1694 );
1695 }
1696
1697 match backend.delete_session(&session_key) {
1698 Ok(true) => Json(serde_json::json!({"deleted": true, "session_id": id})).into_response(),
1699 Ok(false) => (
1700 StatusCode::NOT_FOUND,
1701 Json(serde_json::json!({"error": "Session not found"})),
1702 )
1703 .into_response(),
1704 Err(e) => (
1705 StatusCode::INTERNAL_SERVER_ERROR,
1706 Json(serde_json::json!({"error": format!("Failed to delete session: {e}")})),
1707 )
1708 .into_response(),
1709 }
1710}
1711
1712pub async fn handle_api_session_rename(
1714 State(state): State<AppState>,
1715 headers: HeaderMap,
1716 Path(id): Path<String>,
1717 Json(body): Json<serde_json::Value>,
1718) -> impl IntoResponse {
1719 if let Err(e) = require_auth(&state, &headers) {
1720 return e.into_response();
1721 }
1722
1723 let Some(ref backend) = state.session_backend else {
1724 return (
1725 StatusCode::NOT_FOUND,
1726 Json(serde_json::json!({"error": "Session persistence is disabled"})),
1727 )
1728 .into_response();
1729 };
1730
1731 let name = body["name"].as_str().unwrap_or("").trim();
1732 if name.is_empty() {
1733 return (
1734 StatusCode::BAD_REQUEST,
1735 Json(serde_json::json!({"error": "name is required"})),
1736 )
1737 .into_response();
1738 }
1739
1740 let session_key = format!("gw_{id}");
1741
1742 let sessions = backend.list_sessions();
1744 if !sessions.contains(&session_key) {
1745 return (
1746 StatusCode::NOT_FOUND,
1747 Json(serde_json::json!({"error": "Session not found"})),
1748 )
1749 .into_response();
1750 }
1751
1752 match backend.set_session_name(&session_key, name) {
1753 Ok(()) => Json(serde_json::json!({"session_id": id, "name": name})).into_response(),
1754 Err(e) => (
1755 StatusCode::INTERNAL_SERVER_ERROR,
1756 Json(serde_json::json!({"error": format!("Failed to rename session: {e}")})),
1757 )
1758 .into_response(),
1759 }
1760}
1761
1762pub async fn handle_api_sessions_running(
1764 State(state): State<AppState>,
1765 headers: HeaderMap,
1766) -> impl IntoResponse {
1767 if let Err(e) = require_auth(&state, &headers) {
1768 return e.into_response();
1769 }
1770
1771 let Some(ref backend) = state.session_backend else {
1772 return Json(serde_json::json!({
1773 "sessions": [],
1774 "message": "Session persistence is disabled"
1775 }))
1776 .into_response();
1777 };
1778
1779 let running = backend.list_running_sessions();
1780 let sessions: Vec<serde_json::Value> = running
1781 .into_iter()
1782 .filter_map(|meta| {
1783 let session_id = meta.key.strip_prefix("gw_")?;
1784 Some(serde_json::json!({
1785 "session_id": session_id,
1786 "created_at": meta.created_at.to_rfc3339(),
1787 "last_activity": meta.last_activity.to_rfc3339(),
1788 "message_count": meta.message_count,
1789 }))
1790 })
1791 .collect();
1792
1793 Json(serde_json::json!({ "sessions": sessions })).into_response()
1794}
1795
1796pub async fn handle_api_session_state(
1798 State(state): State<AppState>,
1799 headers: HeaderMap,
1800 Path(id): Path<String>,
1801) -> impl IntoResponse {
1802 if let Err(e) = require_auth(&state, &headers) {
1803 return e.into_response();
1804 }
1805
1806 let Some(ref backend) = state.session_backend else {
1807 return (
1808 StatusCode::NOT_FOUND,
1809 Json(serde_json::json!({"error": "Session persistence is disabled"})),
1810 )
1811 .into_response();
1812 };
1813
1814 let session_key = format!("gw_{id}");
1815 match backend.get_session_state(&session_key) {
1816 Ok(Some(ss)) => {
1817 let mut resp = serde_json::json!({
1818 "session_id": id,
1819 "state": ss.state,
1820 });
1821 if let Some(turn_id) = ss.turn_id {
1822 resp["turn_id"] = serde_json::Value::String(turn_id);
1823 }
1824 if let Some(started) = ss.turn_started_at {
1825 resp["turn_started_at"] = serde_json::Value::String(started.to_rfc3339());
1826 }
1827 Json(resp).into_response()
1828 }
1829 Ok(None) => (
1830 StatusCode::NOT_FOUND,
1831 Json(serde_json::json!({"error": "Session not found"})),
1832 )
1833 .into_response(),
1834 Err(e) => (
1835 StatusCode::INTERNAL_SERVER_ERROR,
1836 Json(serde_json::json!({"error": format!("Failed to get session state: {e}")})),
1837 )
1838 .into_response(),
1839 }
1840}
1841
1842pub async fn handle_api_session_abort(
1856 State(state): State<AppState>,
1857 headers: HeaderMap,
1858 Path(id): Path<String>,
1859) -> impl IntoResponse {
1860 if let Err(e) = require_auth(&state, &headers) {
1861 return e.into_response();
1862 }
1863
1864 let session_key = format!("gw_{id}");
1865
1866 let token = state
1869 .cancel_tokens
1870 .lock()
1871 .expect("cancel_tokens lock poisoned")
1872 .get(&session_key)
1873 .cloned();
1874
1875 if let Some(token) = token {
1876 token.cancel();
1877 ::zeroclaw_log::record!(
1878 INFO,
1879 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1880 .with_attrs(::serde_json::json!({"session_key": session_key})),
1881 "session abort requested"
1882 );
1883 Json(serde_json::json!({ "status": "aborted" })).into_response()
1884 } else {
1885 Json(serde_json::json!({ "status": "no_active_response" })).into_response()
1886 }
1887}
1888
1889pub async fn handle_claude_code_hook(
1898 State(state): State<AppState>,
1899 Json(payload): Json<zeroclaw_tools::claude_code_runner::ClaudeCodeHookEvent>,
1900) -> impl IntoResponse {
1901 let _ = &state; ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"session_id": payload.session_id, "event_type": payload.event_type, "tool_name": payload.tool_name, "summary": payload.summary})), "Claude Code hook event received");
1907
1908 Json(serde_json::json!({ "ok": true }))
1909}
1910
1911#[cfg(test)]
1912mod tests {
1913 use super::*;
1914 use crate::{AppState, GatewayRateLimiter, IdempotencyStore, nodes};
1915 use async_trait::async_trait;
1916 use axum::response::IntoResponse;
1917 use http_body_util::BodyExt;
1918 use parking_lot::RwLock;
1919 #[cfg(feature = "channel-linq")]
1920 use std::collections::HashMap;
1921 use std::sync::Arc;
1922 use std::time::Duration;
1923 use zeroclaw_infra::session_backend::SessionBackend;
1924 use zeroclaw_infra::session_store::SessionStore;
1925 use zeroclaw_memory::{Memory, MemoryCategory, MemoryEntry};
1926 use zeroclaw_providers::ModelProvider;
1927 use zeroclaw_runtime::security::pairing::PairingGuard;
1928
1929 #[derive(Default)]
1930 struct MockMemory {
1931 entries: Vec<MemoryEntry>,
1932 }
1933
1934 #[async_trait]
1935 impl Memory for MockMemory {
1936 fn name(&self) -> &str {
1937 "mock"
1938 }
1939
1940 async fn store(
1941 &self,
1942 _key: &str,
1943 _content: &str,
1944 _category: MemoryCategory,
1945 _session_id: Option<&str>,
1946 ) -> anyhow::Result<()> {
1947 Ok(())
1948 }
1949
1950 async fn recall(
1951 &self,
1952 _query: &str,
1953 _limit: usize,
1954 _session_id: Option<&str>,
1955 _since: Option<&str>,
1956 _until: Option<&str>,
1957 ) -> anyhow::Result<Vec<MemoryEntry>> {
1958 Ok(self.entries.clone())
1959 }
1960
1961 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
1962 Ok(None)
1963 }
1964
1965 async fn list(
1966 &self,
1967 _category: Option<&MemoryCategory>,
1968 _session_id: Option<&str>,
1969 ) -> anyhow::Result<Vec<MemoryEntry>> {
1970 Ok(self.entries.clone())
1971 }
1972
1973 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
1974 Ok(false)
1975 }
1976
1977 async fn forget_for_agent(&self, _key: &str, _agent_id: &str) -> anyhow::Result<bool> {
1978 Ok(false)
1979 }
1980
1981 async fn count(&self) -> anyhow::Result<usize> {
1982 Ok(self.entries.len())
1983 }
1984
1985 async fn health_check(&self) -> bool {
1986 true
1987 }
1988
1989 async fn store_with_agent(
1990 &self,
1991 _key: &str,
1992 _content: &str,
1993 _category: MemoryCategory,
1994 _session_id: Option<&str>,
1995 _namespace: Option<&str>,
1996 _importance: Option<f64>,
1997 _agent_id: Option<&str>,
1998 ) -> anyhow::Result<()> {
1999 Ok(())
2000 }
2001
2002 async fn recall_for_agents(
2003 &self,
2004 _allowed_agent_ids: &[&str],
2005 _query: &str,
2006 _limit: usize,
2007 _session_id: Option<&str>,
2008 _since: Option<&str>,
2009 _until: Option<&str>,
2010 ) -> anyhow::Result<Vec<MemoryEntry>> {
2011 Ok(Vec::new())
2012 }
2013 }
2014 impl ::zeroclaw_api::attribution::Attributable for MockMemory {
2015 fn role(&self) -> ::zeroclaw_api::attribution::Role {
2016 ::zeroclaw_api::attribution::Role::Memory(
2017 ::zeroclaw_api::attribution::MemoryKind::InMemory,
2018 )
2019 }
2020 fn alias(&self) -> &str {
2021 "MockMemory"
2022 }
2023 }
2024
2025 struct MockModelProvider;
2026
2027 #[async_trait]
2028 impl ModelProvider for MockModelProvider {
2029 async fn chat_with_system(
2030 &self,
2031 _system_prompt: Option<&str>,
2032 _message: &str,
2033 _model: &str,
2034 _temperature: Option<f64>,
2035 ) -> anyhow::Result<String> {
2036 Ok("ok".to_string())
2037 }
2038 }
2039 impl ::zeroclaw_api::attribution::Attributable for MockModelProvider {
2040 fn role(&self) -> ::zeroclaw_api::attribution::Role {
2041 ::zeroclaw_api::attribution::Role::Provider(
2042 ::zeroclaw_api::attribution::ProviderKind::Model(
2043 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
2044 ),
2045 )
2046 }
2047 fn alias(&self) -> &str {
2048 "MockModelProvider"
2049 }
2050 }
2051
2052 fn with_test_agent(
2055 mut config: zeroclaw_config::schema::Config,
2056 ) -> zeroclaw_config::schema::Config {
2057 config.providers.models.openrouter.insert(
2058 "default".to_string(),
2059 zeroclaw_config::schema::OpenRouterModelProviderConfig::default(),
2060 );
2061 config.risk_profiles.insert(
2062 "test-profile".to_string(),
2063 zeroclaw_config::schema::RiskProfileConfig::default(),
2064 );
2065 config.agents.insert(
2066 "test-agent".to_string(),
2067 zeroclaw_config::schema::AliasedAgentConfig {
2068 model_provider: "openrouter.default".into(),
2069 risk_profile: "test-profile".to_string(),
2070 ..Default::default()
2071 },
2072 );
2073 config
2074 }
2075
2076 fn test_state(config: zeroclaw_config::schema::Config) -> AppState {
2077 AppState {
2078 config: Arc::new(RwLock::new(config)),
2079 model_provider: Arc::new(MockModelProvider),
2080 model: "test-model".into(),
2081 temperature: None,
2082 mem: Arc::new(MockMemory::default()),
2083 memory_strategy: Arc::new(
2084 zeroclaw_runtime::agent::memory_strategy::DefaultMemoryStrategy::with_config(
2085 Arc::new(MockMemory::default()),
2086 zeroclaw_config::schema::MemoryConfig::default(),
2087 std::path::PathBuf::new(),
2088 ),
2089 ),
2090 auto_save: false,
2091 webhook_secret_hash: None,
2092 pairing: Arc::new(PairingGuard::new(false, &[])),
2093 trust_forwarded_headers: false,
2094 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2095 auth_limiter: Arc::new(crate::auth_rate_limit::AuthRateLimiter::new()),
2096 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2097 #[cfg(feature = "channel-whatsapp-cloud")]
2098 whatsapp: None,
2099 #[cfg(feature = "channel-whatsapp-cloud")]
2100 whatsapp_app_secret: None,
2101 #[cfg(feature = "channel-linq")]
2102 linq: HashMap::new(),
2103 #[cfg(feature = "channel-linq")]
2104 linq_signing_secrets: HashMap::new(),
2105 #[cfg(feature = "channel-nextcloud")]
2106 nextcloud_talk: None,
2107 #[cfg(feature = "channel-nextcloud")]
2108 nextcloud_talk_webhook_secret: None,
2109 #[cfg(feature = "channel-wati")]
2110 wati: None,
2111 #[cfg(feature = "channel-email")]
2112 gmail_push: None,
2113 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
2114 tools_registry: Arc::new(Vec::new()),
2115 cost_tracker: None,
2116 event_tx: tokio::sync::broadcast::channel(16).0,
2117 event_buffer: Arc::new(crate::sse::EventBuffer::new(16)),
2118 shutdown_tx: tokio::sync::watch::channel(false).0,
2119 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2120 session_backend: None,
2121 session_queue: Arc::new(crate::session_queue::SessionActorQueue::new(8, 30, 600)),
2122 device_registry: None,
2123 pending_pairings: None,
2124 path_prefix: String::new(),
2125 web_dist_dir: None,
2126 canvas_store: zeroclaw_runtime::tools::CanvasStore::new(),
2127 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
2128 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
2129 tui_registry: None,
2130 reload_tx: None,
2131 #[cfg(feature = "webauthn")]
2132 webauthn: None,
2133 }
2134 }
2135
2136 fn test_state_with_memory(
2137 config: zeroclaw_config::schema::Config,
2138 entries: Vec<MemoryEntry>,
2139 ) -> AppState {
2140 AppState {
2141 mem: Arc::new(MockMemory { entries }),
2142 ..test_state(config)
2143 }
2144 }
2145
2146 async fn response_json(response: axum::response::Response) -> serde_json::Value {
2147 let body = response
2148 .into_body()
2149 .collect()
2150 .await
2151 .expect("response body")
2152 .to_bytes();
2153 serde_json::from_slice(&body).expect("valid json response")
2154 }
2155
2156 fn memory_entry_with_content(content: String) -> MemoryEntry {
2157 MemoryEntry {
2158 id: "entry-1".into(),
2159 key: "huge-memory".into(),
2160 content,
2161 category: MemoryCategory::Conversation,
2162 timestamp: "2026-04-06T00:00:00Z".into(),
2163 session_id: None,
2164 score: None,
2165 namespace: "default".into(),
2166 importance: Some(0.5),
2167 superseded_by: None,
2168 agent_alias: None,
2169 agent_id: None,
2170 }
2171 }
2172
2173 fn memory_content_from_response(json: &serde_json::Value) -> &str {
2174 json["entries"][0]["content"]
2175 .as_str()
2176 .expect("string content")
2177 }
2178
2179 #[test]
2180 fn truncate_memory_api_content_caps_total_chars_with_ellipsis() {
2181 let exact = "x".repeat(MEMORY_API_CONTENT_MAX_CHARS);
2182 assert_eq!(truncate_with_ellipsis_total_chars(exact.clone()), exact);
2183
2184 let short = "short memory".to_string();
2185 assert_eq!(truncate_with_ellipsis_total_chars(short.clone()), short);
2186
2187 let over = "火".repeat(MEMORY_API_CONTENT_MAX_CHARS + 1);
2188 let truncated = truncate_with_ellipsis_total_chars(over.clone());
2189 assert_eq!(truncated.chars().count(), MEMORY_API_CONTENT_MAX_CHARS);
2190 assert!(truncated.ends_with("..."));
2191 assert_ne!(truncated, over);
2192 }
2193
2194 #[tokio::test]
2195 async fn handle_api_memory_list_truncates_oversized_content() {
2196 let mut config = zeroclaw_config::schema::Config::default();
2197 config.gateway.require_pairing = false;
2198 let huge = "x".repeat(MEMORY_API_CONTENT_MAX_CHARS + 128);
2199 let state = test_state_with_memory(config, vec![memory_entry_with_content(huge.clone())]);
2200
2201 let response = handle_api_memory_list(
2202 State(state),
2203 HeaderMap::new(),
2204 Query(MemoryQuery {
2205 query: None,
2206 category: None,
2207 since: None,
2208 until: None,
2209 agent: None,
2210 }),
2211 )
2212 .await
2213 .into_response();
2214
2215 let json = response_json(response).await;
2216 let content = memory_content_from_response(&json);
2217
2218 assert_eq!(content.chars().count(), MEMORY_API_CONTENT_MAX_CHARS);
2219 assert!(content.ends_with("..."));
2220 assert_eq!(json["entries"][0]["key"], "huge-memory");
2221 assert_eq!(json["entries"][0]["category"], "conversation");
2222 assert_ne!(content, huge);
2223 }
2224
2225 #[tokio::test]
2226 async fn handle_api_memory_search_truncates_oversized_content_after_filtering() {
2227 let mut config = zeroclaw_config::schema::Config::default();
2228 config.gateway.require_pairing = false;
2229 let huge = "火".repeat(MEMORY_API_CONTENT_MAX_CHARS + 128);
2230 let state = test_state_with_memory(config, vec![memory_entry_with_content(huge.clone())]);
2231
2232 let response = handle_api_memory_list(
2233 State(state),
2234 HeaderMap::new(),
2235 Query(MemoryQuery {
2236 query: Some("huge".into()),
2237 category: Some("conversation".into()),
2238 since: None,
2239 until: None,
2240 agent: None,
2241 }),
2242 )
2243 .await
2244 .into_response();
2245
2246 let json = response_json(response).await;
2247 let content = memory_content_from_response(&json);
2248
2249 assert_eq!(content.chars().count(), MEMORY_API_CONTENT_MAX_CHARS);
2250 assert!(content.ends_with("..."));
2251 assert_ne!(content, huge);
2252 }
2253
2254 #[test]
2255 fn api_channels_readiness_key_tracks_whatsapp_backend_type() {
2256 let mut config = zeroclaw_config::schema::Config::default();
2257 config.channels.whatsapp.insert(
2258 "web".to_string(),
2259 zeroclaw_config::schema::WhatsAppConfig {
2260 enabled: true,
2261 session_path: Some("~/.zeroclaw/state/whatsapp-web/session.db".into()),
2262 ..Default::default()
2263 },
2264 );
2265 config.channels.whatsapp.insert(
2266 "cloud".to_string(),
2267 zeroclaw_config::schema::WhatsAppConfig {
2268 enabled: true,
2269 access_token: Some("token".into()),
2270 phone_number_id: Some("phone-id".into()),
2271 verify_token: Some("verify".into()),
2272 ..Default::default()
2273 },
2274 );
2275 config.channels.whatsapp.insert(
2276 "ambiguous".to_string(),
2277 zeroclaw_config::schema::WhatsAppConfig {
2278 enabled: true,
2279 access_token: Some("token".into()),
2280 phone_number_id: Some("phone-id".into()),
2281 verify_token: Some("verify".into()),
2282 session_path: Some("~/.zeroclaw/state/whatsapp-web/session.db".into()),
2283 ..Default::default()
2284 },
2285 );
2286
2287 let web = zeroclaw_config::schema::ChannelAliasInfo {
2288 channel_type: "whatsapp".to_string(),
2289 alias: "web".to_string(),
2290 owning_agent: None,
2291 enabled: true,
2292 };
2293 let cloud = zeroclaw_config::schema::ChannelAliasInfo {
2294 channel_type: "whatsapp".to_string(),
2295 alias: "cloud".to_string(),
2296 owning_agent: None,
2297 enabled: true,
2298 };
2299 let ambiguous = zeroclaw_config::schema::ChannelAliasInfo {
2300 channel_type: "whatsapp".to_string(),
2301 alias: "ambiguous".to_string(),
2302 owning_agent: None,
2303 enabled: true,
2304 };
2305 let discord = zeroclaw_config::schema::ChannelAliasInfo {
2306 channel_type: "discord".to_string(),
2307 alias: "default".to_string(),
2308 owning_agent: None,
2309 enabled: true,
2310 };
2311
2312 assert_eq!(
2313 compiled_readiness_key_for_alias(&config, &web),
2314 "whatsapp-web"
2315 );
2316 assert_eq!(
2317 compiled_readiness_key_for_alias(&config, &cloud),
2318 "whatsapp"
2319 );
2320 assert_eq!(
2321 compiled_readiness_key_for_alias(&config, &ambiguous),
2322 "whatsapp",
2323 "ambiguous WhatsApp configs follow runtime Cloud precedence"
2324 );
2325 assert_eq!(
2326 compiled_readiness_key_for_alias(&config, &discord),
2327 "discord"
2328 );
2329 }
2330
2331 #[cfg(not(feature = "channel-nextcloud"))]
2332 #[tokio::test]
2333 async fn api_channels_marks_configured_uncompiled_channel_unavailable() {
2334 let mut config = zeroclaw_config::schema::Config::default();
2335 config.channels.nextcloud_talk.insert(
2336 "default".to_string(),
2337 zeroclaw_config::schema::NextcloudTalkConfig {
2338 enabled: true,
2339 base_url: "https://cloud.example.com".to_string(),
2340 app_token: "test-token".to_string(),
2341 ..Default::default()
2342 },
2343 );
2344
2345 let response = handle_api_channels(State(test_state(config)), HeaderMap::new())
2346 .await
2347 .into_response();
2348 let json = response_json(response).await;
2349 let channels = json["channels"].as_array().expect("channels array");
2350 let nextcloud = channels
2351 .iter()
2352 .find(|channel| channel["alias"] == "default")
2353 .expect("configured channel is listed");
2354
2355 assert!(
2356 matches!(
2357 nextcloud["type"].as_str(),
2358 Some("nextcloud-talk" | "nextcloud_talk")
2359 ),
2360 "unexpected channel type: {}",
2361 nextcloud["type"]
2362 );
2363 assert_eq!(nextcloud["enabled"], true);
2364 assert_eq!(nextcloud["compiled"], false);
2365 assert_eq!(nextcloud["status"], "not_compiled");
2366 assert_eq!(nextcloud["health"], "unavailable");
2367 }
2368
2369 fn link_job_to_test_agent(state: &AppState, job_id: &str) {
2370 state
2371 .config
2372 .write()
2373 .agents
2374 .get_mut("test-agent")
2375 .expect("test-agent configured by with_test_agent")
2376 .cron_jobs
2377 .push(job_id.to_string());
2378 }
2379
2380 fn config_with_webhook(
2381 alias: &str,
2382 enabled: bool,
2383 bound: bool,
2384 port: u16,
2385 listen_path: Option<&str>,
2386 ) -> zeroclaw_config::schema::Config {
2387 let mut config = zeroclaw_config::schema::Config::default();
2388 config.gateway.port = 42617;
2389 config.gateway.require_pairing = false;
2390 config.channels.webhook.insert(
2391 alias.to_string(),
2392 zeroclaw_config::schema::WebhookConfig {
2393 enabled,
2394 port,
2395 listen_path: listen_path.map(ToString::to_string),
2396 ..Default::default()
2397 },
2398 );
2399 if bound {
2400 config.agents.insert(
2401 "rowan".to_string(),
2402 zeroclaw_config::schema::AliasedAgentConfig {
2403 channels: vec![zeroclaw_config::providers::ChannelRef::new(format!(
2404 "webhook.{alias}"
2405 ))],
2406 ..Default::default()
2407 },
2408 );
2409 }
2410 config
2411 }
2412
2413 fn config_with_telegram(alias: &str) -> zeroclaw_config::schema::Config {
2414 let mut config = zeroclaw_config::schema::Config::default();
2415 config.channels.telegram.insert(
2416 alias.to_string(),
2417 zeroclaw_config::schema::TelegramConfig {
2418 enabled: true,
2419 bot_token: "test-token".to_string(),
2420 ..Default::default()
2421 },
2422 );
2423 config.agents.insert(
2424 "rowan".to_string(),
2425 zeroclaw_config::schema::AliasedAgentConfig {
2426 channels: vec![zeroclaw_config::providers::ChannelRef::new(format!(
2427 "telegram.{alias}"
2428 ))],
2429 ..Default::default()
2430 },
2431 );
2432 config
2433 }
2434
2435 fn first_channel_info(
2436 config: &zeroclaw_config::schema::Config,
2437 ) -> zeroclaw_config::schema::ChannelAliasInfo {
2438 config
2439 .channels_by_alias()
2440 .into_iter()
2441 .next()
2442 .expect("channel alias should be present")
2443 }
2444
2445 #[test]
2446 fn channel_readiness_webhook_does_not_call_gateway_route_healthy_without_listener() {
2447 let config = config_with_webhook("default", true, true, 42617, Some("/webhook"));
2448 let state = test_state(config.clone());
2449 let health = zeroclaw_runtime::health::snapshot();
2450 let info = first_channel_info(&config);
2451 let readiness = channel_readiness(&config, &info, &health, &state);
2452
2453 assert_eq!(readiness.authenticated, ChannelReadinessState::Ready);
2454 assert_eq!(readiness.listening, ChannelReadinessState::Missing);
2455 assert_eq!(channel_readiness_summary(&readiness), ("error", "down"));
2456 assert!(
2457 readiness
2458 .requirements
2459 .iter()
2460 .any(|item| item.contains("Start a channel listener"))
2461 );
2462 }
2463
2464 #[test]
2465 fn channel_readiness_webhook_does_not_call_custom_path_healthy_without_listener() {
2466 let config = config_with_webhook("custom_path", true, true, 42632, Some("/eyrie"));
2467 let state = test_state(config.clone());
2468 let health = zeroclaw_runtime::health::snapshot();
2469 let info = first_channel_info(&config);
2470 let readiness = channel_readiness(&config, &info, &health, &state);
2471
2472 assert_eq!(readiness.authenticated, ChannelReadinessState::Ready);
2473 assert_eq!(readiness.listening, ChannelReadinessState::Missing);
2474 assert_eq!(channel_readiness_summary(&readiness), ("error", "down"));
2475 assert!(
2476 readiness
2477 .requirements
2478 .iter()
2479 .any(|item| item.contains("Start a channel listener"))
2480 );
2481 }
2482
2483 #[test]
2484 fn channel_readiness_webhook_uses_supervised_listener_health_for_custom_path() {
2485 let config = config_with_webhook("supervised", true, true, 42632, Some("/eyrie"));
2486 zeroclaw_runtime::health::mark_component_ok("channel:webhook.supervised");
2487 let state = test_state(config.clone());
2488 let health = zeroclaw_runtime::health::snapshot();
2489 let info = first_channel_info(&config);
2490 let readiness = channel_readiness(&config, &info, &health, &state);
2491
2492 assert_eq!(readiness.listening, ChannelReadinessState::Ready);
2493 assert_eq!(channel_readiness_summary(&readiness), ("active", "healthy"));
2494 }
2495
2496 #[test]
2497 fn channel_readiness_webhook_rejects_stale_listener_health() {
2498 let config = config_with_webhook("stale", true, true, 42632, Some("/eyrie"));
2499 let component = "channel:webhook.stale".to_string();
2500 let old = (chrono::Utc::now()
2501 - chrono::Duration::seconds(CHANNEL_LISTENER_HEALTH_MAX_AGE_SECS + 5))
2502 .to_rfc3339();
2503 let health = zeroclaw_runtime::health::HealthSnapshot {
2504 pid: std::process::id(),
2505 updated_at: chrono::Utc::now().to_rfc3339(),
2506 uptime_seconds: 1,
2507 components: std::collections::BTreeMap::from([(
2508 component,
2509 zeroclaw_runtime::health::ComponentHealth {
2510 status: "ok".to_string(),
2511 updated_at: old,
2512 last_ok: None,
2513 last_error: None,
2514 restart_count: 0,
2515 },
2516 )]),
2517 };
2518 let state = test_state(config.clone());
2519 let info = first_channel_info(&config);
2520 let readiness = channel_readiness(&config, &info, &health, &state);
2521
2522 assert_eq!(readiness.listening, ChannelReadinessState::Missing);
2523 assert_eq!(channel_readiness_summary(&readiness), ("error", "down"));
2524 }
2525
2526 #[test]
2527 fn channel_readiness_webhook_uses_live_pairing_guard_for_auth() {
2528 let config = config_with_webhook("paired", true, true, 42632, Some("/eyrie"));
2529 zeroclaw_runtime::health::mark_component_ok("channel:webhook.paired");
2530 let mut state = test_state(config.clone());
2531 state.pairing = Arc::new(PairingGuard::new(true, &[]));
2532 let health = zeroclaw_runtime::health::snapshot();
2533 let info = first_channel_info(&config);
2534 let readiness = channel_readiness(&config, &info, &health, &state);
2535
2536 assert_eq!(readiness.authenticated, ChannelReadinessState::Missing);
2537 assert_eq!(readiness.listening, ChannelReadinessState::Ready);
2538 assert_eq!(channel_readiness_summary(&readiness), ("error", "down"));
2539 }
2540
2541 #[test]
2542 fn channel_readiness_unchecked_channel_types_are_unknown_not_down() {
2543 let config = config_with_telegram("ops");
2544 let state = test_state(config.clone());
2545 let health = zeroclaw_runtime::health::snapshot();
2546 let info = first_channel_info(&config);
2547 let readiness = channel_readiness(&config, &info, &health, &state);
2548
2549 assert_eq!(readiness.enabled, ChannelReadinessState::Ready);
2550 assert_eq!(readiness.bound_to_agent, ChannelReadinessState::Ready);
2551 assert_eq!(readiness.authenticated, ChannelReadinessState::Unknown);
2552 assert_eq!(readiness.listening, ChannelReadinessState::Unknown);
2553 assert_eq!(
2554 channel_readiness_summary(&readiness),
2555 ("unknown", "degraded")
2556 );
2557 assert!(readiness.requirements.is_empty());
2558 assert!(
2559 readiness
2560 .notes
2561 .iter()
2562 .any(|item| item.contains("not checked"))
2563 );
2564 }
2565
2566 #[test]
2567 fn channel_readiness_orphan_channel_reports_missing_agent_binding_without_broken_health() {
2568 let config = config_with_webhook("orphan", true, false, 42617, Some("/webhook"));
2569 let state = test_state(config.clone());
2570 let health = zeroclaw_runtime::health::snapshot();
2571 let info = first_channel_info(&config);
2572 let readiness = channel_readiness(&config, &info, &health, &state);
2573
2574 assert_eq!(readiness.bound_to_agent, ChannelReadinessState::Missing);
2575 assert_eq!(readiness.listening, ChannelReadinessState::Unknown);
2576 assert_eq!(
2577 channel_readiness_summary(&readiness),
2578 ("inactive", "degraded")
2579 );
2580 assert!(
2581 readiness
2582 .requirements
2583 .iter()
2584 .any(|item| item.contains("Bind this channel"))
2585 );
2586 }
2587
2588 #[tokio::test]
2589 async fn api_channels_serializes_readiness_without_duplicate_summary_fields() {
2590 let config = config_with_webhook("ops", true, true, 42617, Some("/webhook"));
2591 let state = test_state(config);
2592
2593 let response = handle_api_channels(State(state), HeaderMap::new())
2594 .await
2595 .into_response();
2596
2597 assert_eq!(response.status(), StatusCode::OK);
2598 let json = response_json(response).await;
2599 let channel = &json["channels"][0];
2600 let webhook_compiled = zeroclaw_channels::listing::is_channel_type_compiled("webhook");
2601 assert_eq!(channel["name"], "webhook.ops");
2602 assert_eq!(channel["compiled"], webhook_compiled);
2603 if webhook_compiled {
2604 assert_eq!(channel["status"], "error");
2605 assert_eq!(channel["health"], "down");
2606 } else {
2607 assert_eq!(channel["status"], "not_compiled");
2608 assert_eq!(channel["health"], "unavailable");
2609 }
2610 assert_eq!(channel["readiness"]["enabled"], "ready");
2611 assert_eq!(channel["readiness"]["authenticated"], "ready");
2612 assert_eq!(channel["readiness"]["listening"], "missing");
2613 assert!(channel["readiness"].get("configured").is_none());
2614 assert!(channel["readiness"].get("status").is_none());
2615 assert!(channel["readiness"].get("health").is_none());
2616 }
2617
2618 fn test_state_with_session_backend(
2619 config: zeroclaw_config::schema::Config,
2620 backend: Arc<dyn SessionBackend>,
2621 ) -> AppState {
2622 let mut state = test_state(config);
2623 state.session_backend = Some(backend);
2624 state
2625 }
2626
2627 #[tokio::test]
2628 async fn session_message_post_persists_and_broadcasts_to_session() {
2629 let tmp = tempfile::TempDir::new().unwrap();
2630 let config = zeroclaw_config::schema::Config {
2631 data_dir: tmp.path().join("workspace"),
2632 config_path: tmp.path().join("config.toml"),
2633 ..zeroclaw_config::schema::Config::default()
2634 };
2635 std::fs::create_dir_all(&config.data_dir).unwrap();
2636 let backend: Arc<dyn SessionBackend> = Arc::new(SessionStore::new(tmp.path()).unwrap());
2637 backend
2638 .append(
2639 "gw_operator-1",
2640 &zeroclaw_providers::ChatMessage::assistant("existing"),
2641 )
2642 .unwrap();
2643 let state = test_state_with_session_backend(config, backend.clone());
2644 let mut rx = state.event_tx.subscribe();
2645
2646 let response = handle_api_session_message_post(
2647 State(state.clone()),
2648 HeaderMap::new(),
2649 Path("operator-1".to_string()),
2650 Json(
2651 serde_json::from_value::<SessionMessagePostBody>(serde_json::json!({
2652 "content": "deploy finished"
2653 }))
2654 .expect("body should deserialize"),
2655 ),
2656 )
2657 .await
2658 .into_response();
2659
2660 assert_eq!(response.status(), StatusCode::OK);
2661 let json = response_json(response).await;
2662 assert_eq!(json["status"], "ok");
2663 assert_eq!(json["session_id"], "operator-1");
2664 assert_eq!(json["message"]["role"], "assistant");
2665 assert_eq!(json["message"]["content"], "deploy finished");
2666 assert!(json.get("message_count").is_none());
2667
2668 let messages = backend.load("gw_operator-1");
2669 assert_eq!(messages.len(), 2);
2670 assert_eq!(messages[1].role, "assistant");
2671 assert_eq!(messages[1].content, "deploy finished");
2672
2673 let event = tokio::time::timeout(Duration::from_secs(1), rx.recv())
2674 .await
2675 .expect("broadcast event")
2676 .expect("broadcast value");
2677 assert_eq!(event["type"], "message");
2678 assert_eq!(event["session_id"], "operator-1");
2679 assert_eq!(event["role"], "assistant");
2680 assert_eq!(event["content"], "deploy finished");
2681
2682 let history = state.event_buffer.snapshot();
2683 assert!(
2684 history.is_empty(),
2685 "session-scoped chat messages stay out of global event history"
2686 );
2687 }
2688
2689 #[tokio::test]
2690 async fn session_message_post_rejects_empty_content() {
2691 let tmp = tempfile::TempDir::new().unwrap();
2692 let config = zeroclaw_config::schema::Config {
2693 data_dir: tmp.path().join("workspace"),
2694 config_path: tmp.path().join("config.toml"),
2695 ..zeroclaw_config::schema::Config::default()
2696 };
2697 std::fs::create_dir_all(&config.data_dir).unwrap();
2698 let backend: Arc<dyn SessionBackend> = Arc::new(SessionStore::new(tmp.path()).unwrap());
2699 let state = test_state_with_session_backend(config, backend);
2700
2701 let response = handle_api_session_message_post(
2702 State(state),
2703 HeaderMap::new(),
2704 Path("operator-1".to_string()),
2705 Json(
2706 serde_json::from_value::<SessionMessagePostBody>(serde_json::json!({
2707 "content": " "
2708 }))
2709 .expect("body should deserialize"),
2710 ),
2711 )
2712 .await
2713 .into_response();
2714
2715 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
2716 let json = response_json(response).await;
2717 assert_eq!(json["error"], "content is required");
2718 }
2719
2720 #[tokio::test]
2721 async fn session_message_post_rejects_unknown_session_without_creating_it() {
2722 let tmp = tempfile::TempDir::new().unwrap();
2723 let config = zeroclaw_config::schema::Config {
2724 data_dir: tmp.path().join("workspace"),
2725 config_path: tmp.path().join("config.toml"),
2726 ..zeroclaw_config::schema::Config::default()
2727 };
2728 std::fs::create_dir_all(&config.data_dir).unwrap();
2729 let backend: Arc<dyn SessionBackend> = Arc::new(SessionStore::new(tmp.path()).unwrap());
2730 let state = test_state_with_session_backend(config, backend.clone());
2731
2732 let response = handle_api_session_message_post(
2733 State(state),
2734 HeaderMap::new(),
2735 Path("operator-1".to_string()),
2736 Json(
2737 serde_json::from_value::<SessionMessagePostBody>(serde_json::json!({
2738 "content": "deploy finished"
2739 }))
2740 .expect("body should deserialize"),
2741 ),
2742 )
2743 .await
2744 .into_response();
2745
2746 assert_eq!(response.status(), StatusCode::NOT_FOUND);
2747 let json = response_json(response).await;
2748 assert_eq!(json["error"], "Session not found");
2749 assert!(backend.load("gw_operator-1").is_empty());
2750 }
2751
2752 #[tokio::test]
2753 async fn session_message_post_waits_for_session_queue_before_append() {
2754 let tmp = tempfile::TempDir::new().unwrap();
2755 let config = zeroclaw_config::schema::Config {
2756 data_dir: tmp.path().join("workspace"),
2757 config_path: tmp.path().join("config.toml"),
2758 ..zeroclaw_config::schema::Config::default()
2759 };
2760 std::fs::create_dir_all(&config.data_dir).unwrap();
2761 let backend: Arc<dyn SessionBackend> = Arc::new(SessionStore::new(tmp.path()).unwrap());
2762 backend
2763 .append(
2764 "gw_operator-1",
2765 &zeroclaw_providers::ChatMessage::assistant("existing"),
2766 )
2767 .unwrap();
2768 let state = test_state_with_session_backend(config, backend.clone());
2769 let session_guard = state.session_queue.acquire("gw_operator-1").await.unwrap();
2770
2771 let response_fut = handle_api_session_message_post(
2772 State(state),
2773 HeaderMap::new(),
2774 Path("operator-1".to_string()),
2775 Json(
2776 serde_json::from_value::<SessionMessagePostBody>(serde_json::json!({
2777 "content": "queued notification"
2778 }))
2779 .expect("body should deserialize"),
2780 ),
2781 );
2782 tokio::pin!(response_fut);
2783
2784 assert!(
2785 tokio::time::timeout(Duration::from_millis(50), &mut response_fut)
2786 .await
2787 .is_err(),
2788 "POST should wait behind the active session queue guard"
2789 );
2790 assert_eq!(backend.load("gw_operator-1").len(), 1);
2791
2792 drop(session_guard);
2793 let response = tokio::time::timeout(Duration::from_secs(1), response_fut)
2794 .await
2795 .expect("queued POST should complete")
2796 .into_response();
2797
2798 assert_eq!(response.status(), StatusCode::OK);
2799 let messages = backend.load("gw_operator-1");
2800 assert_eq!(messages.len(), 2);
2801 assert_eq!(messages[1].content, "queued notification");
2802 }
2803
2804 #[tokio::test]
2805 async fn cron_api_shell_roundtrip_includes_delivery() {
2806 let tmp = tempfile::TempDir::new().unwrap();
2807 let config = zeroclaw_config::schema::Config {
2808 data_dir: tmp.path().join("data"),
2809 config_path: tmp.path().join("config.toml"),
2810 ..zeroclaw_config::schema::Config::default()
2811 };
2812 std::fs::create_dir_all(&config.data_dir).unwrap();
2813 let state = test_state(with_test_agent(config));
2814
2815 let add_response = handle_api_cron_add(
2816 State(state.clone()),
2817 HeaderMap::new(),
2818 Json(
2819 serde_json::from_value::<CronAddBody>(serde_json::json!({
2820 "name": "test-job",
2821 "agent": "test-agent",
2822 "schedule": "*/5 * * * *",
2823 "command": "echo hello",
2824 "delivery": {
2825 "mode": "announce",
2826 "channel": "discord",
2827 "to": "1234567890",
2828 "best_effort": true
2829 }
2830 }))
2831 .expect("body should deserialize"),
2832 ),
2833 )
2834 .await
2835 .into_response();
2836
2837 let add_json = response_json(add_response).await;
2838 assert_eq!(add_json["status"], "ok");
2839 assert_eq!(add_json["job"]["delivery"]["mode"], "announce");
2840 assert_eq!(add_json["job"]["delivery"]["channel"], "discord");
2841 assert_eq!(add_json["job"]["delivery"]["to"], "1234567890");
2842
2843 let list_response = handle_api_cron_list(State(state), HeaderMap::new())
2844 .await
2845 .into_response();
2846 let list_json = response_json(list_response).await;
2847 let jobs = list_json["jobs"].as_array().expect("jobs array");
2848 assert_eq!(jobs.len(), 1);
2849 assert_eq!(jobs[0]["delivery"]["mode"], "announce");
2850 assert_eq!(jobs[0]["delivery"]["channel"], "discord");
2851 assert_eq!(jobs[0]["delivery"]["to"], "1234567890");
2852 }
2853
2854 #[tokio::test]
2855 async fn cron_api_accepts_agent_jobs() {
2856 let tmp = tempfile::TempDir::new().unwrap();
2857 let config = zeroclaw_config::schema::Config {
2858 data_dir: tmp.path().join("data"),
2859 config_path: tmp.path().join("config.toml"),
2860 ..zeroclaw_config::schema::Config::default()
2861 };
2862 std::fs::create_dir_all(&config.data_dir).unwrap();
2863 let state = test_state(with_test_agent(config));
2864
2865 let response = handle_api_cron_add(
2866 State(state.clone()),
2867 HeaderMap::new(),
2868 Json(
2869 serde_json::from_value::<CronAddBody>(serde_json::json!({
2870 "name": "agent-job",
2871 "agent": "test-agent",
2872 "schedule": "*/5 * * * *",
2873 "job_type": "agent",
2874 "command": "ignored shell command",
2875 "prompt": "summarize the latest logs"
2876 }))
2877 .expect("body should deserialize"),
2878 ),
2879 )
2880 .await
2881 .into_response();
2882
2883 let json = response_json(response).await;
2884 assert_eq!(json["status"], "ok");
2885
2886 let config = state.config.read().clone();
2887 let jobs = zeroclaw_runtime::cron::list_jobs(&config).unwrap();
2888 assert_eq!(jobs.len(), 1);
2889 assert_eq!(jobs[0].job_type, zeroclaw_runtime::cron::JobType::Agent);
2890 assert_eq!(jobs[0].prompt.as_deref(), Some("summarize the latest logs"));
2891 }
2892
2893 #[tokio::test]
2894 async fn cron_api_timezone_add_persists_explicit_timezone() {
2895 let tmp = tempfile::TempDir::new().unwrap();
2896 let config = zeroclaw_config::schema::Config {
2897 data_dir: tmp.path().join("workspace"),
2898 config_path: tmp.path().join("config.toml"),
2899 ..zeroclaw_config::schema::Config::default()
2900 };
2901 std::fs::create_dir_all(&config.data_dir).unwrap();
2902 let state = test_state(with_test_agent(config));
2903
2904 let response = handle_api_cron_add(
2905 State(state.clone()),
2906 HeaderMap::new(),
2907 Json(
2908 serde_json::from_value::<CronAddBody>(serde_json::json!({
2909 "agent": "test-agent",
2910 "name": "localized-job",
2911 "schedule": "0 9 * * *",
2912 "tz": "America/New_York",
2913 "command": "echo hello"
2914 }))
2915 .expect("body should deserialize"),
2916 ),
2917 )
2918 .await
2919 .into_response();
2920
2921 assert_eq!(response.status(), StatusCode::OK);
2922 let config = state.config.read().clone();
2923 let jobs = zeroclaw_runtime::cron::list_jobs(&config).unwrap();
2924 assert_eq!(
2925 jobs[0].schedule,
2926 zeroclaw_runtime::cron::Schedule::Cron {
2927 expr: "0 9 * * *".to_string(),
2928 tz: Some("America/New_York".to_string()),
2929 }
2930 );
2931 }
2932
2933 #[tokio::test]
2934 async fn cron_api_timezone_add_rejects_invalid_timezone_as_bad_request() {
2935 let tmp = tempfile::TempDir::new().unwrap();
2936 let config = zeroclaw_config::schema::Config {
2937 data_dir: tmp.path().join("workspace"),
2938 config_path: tmp.path().join("config.toml"),
2939 ..zeroclaw_config::schema::Config::default()
2940 };
2941 std::fs::create_dir_all(&config.data_dir).unwrap();
2942 let state = test_state(with_test_agent(config));
2943
2944 let response = handle_api_cron_add(
2945 State(state),
2946 HeaderMap::new(),
2947 Json(
2948 serde_json::from_value::<CronAddBody>(serde_json::json!({
2949 "agent": "test-agent",
2950 "name": "invalid-timezone-job",
2951 "schedule": "0 9 * * *",
2952 "tz": "Invalid/Zone",
2953 "command": "echo hello"
2954 }))
2955 .expect("body should deserialize"),
2956 ),
2957 )
2958 .await
2959 .into_response();
2960
2961 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
2962 let json = response_json(response).await;
2963 assert!(
2964 json["error"]
2965 .as_str()
2966 .unwrap_or_default()
2967 .contains("Invalid IANA timezone")
2968 );
2969 }
2970
2971 #[tokio::test]
2972 async fn cron_api_timezone_patch_schedule_preserves_existing_timezone() {
2973 let tmp = tempfile::TempDir::new().unwrap();
2974 let config = zeroclaw_config::schema::Config {
2975 data_dir: tmp.path().join("workspace"),
2976 config_path: tmp.path().join("config.toml"),
2977 ..zeroclaw_config::schema::Config::default()
2978 };
2979 std::fs::create_dir_all(&config.data_dir).unwrap();
2980 let state = test_state(with_test_agent(config));
2981 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
2982 &state.config.read().clone(),
2983 "test-agent",
2984 Some("localized-job".to_string()),
2985 zeroclaw_runtime::cron::Schedule::Cron {
2986 expr: "0 9 * * *".to_string(),
2987 tz: Some("Europe/Berlin".to_string()),
2988 },
2989 "echo hello",
2990 None,
2991 true,
2992 )
2993 .expect("job added");
2994
2995 let response = handle_api_cron_patch(
2996 State(state.clone()),
2997 HeaderMap::new(),
2998 Path(job.id.clone()),
2999 Json(
3000 serde_json::from_value::<CronPatchBody>(serde_json::json!({
3001 "agent": "test-agent",
3002 "schedule": "30 9 * * *"
3003 }))
3004 .expect("body should deserialize"),
3005 ),
3006 )
3007 .await
3008 .into_response();
3009
3010 assert_eq!(response.status(), StatusCode::OK);
3011 let updated = zeroclaw_runtime::cron::get_job(&state.config.read().clone(), &job.id)
3012 .expect("updated job");
3013 assert_eq!(
3014 updated.schedule,
3015 zeroclaw_runtime::cron::Schedule::Cron {
3016 expr: "30 9 * * *".to_string(),
3017 tz: Some("Europe/Berlin".to_string()),
3018 }
3019 );
3020 }
3021
3022 #[tokio::test]
3023 async fn cron_api_timezone_patch_replaces_timezone_when_provided() {
3024 let tmp = tempfile::TempDir::new().unwrap();
3025 let config = zeroclaw_config::schema::Config {
3026 data_dir: tmp.path().join("workspace"),
3027 config_path: tmp.path().join("config.toml"),
3028 ..zeroclaw_config::schema::Config::default()
3029 };
3030 std::fs::create_dir_all(&config.data_dir).unwrap();
3031 let state = test_state(with_test_agent(config));
3032 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
3033 &state.config.read().clone(),
3034 "test-agent",
3035 Some("localized-job".to_string()),
3036 zeroclaw_runtime::cron::Schedule::Cron {
3037 expr: "0 9 * * *".to_string(),
3038 tz: Some("America/New_York".to_string()),
3039 },
3040 "echo hello",
3041 None,
3042 true,
3043 )
3044 .expect("job added");
3045
3046 let response = handle_api_cron_patch(
3047 State(state.clone()),
3048 HeaderMap::new(),
3049 Path(job.id.clone()),
3050 Json(
3051 serde_json::from_value::<CronPatchBody>(serde_json::json!({
3052 "agent": "test-agent",
3053 "schedule": "30 9 * * *",
3054 "tz": "Asia/Tokyo"
3055 }))
3056 .expect("body should deserialize"),
3057 ),
3058 )
3059 .await
3060 .into_response();
3061
3062 assert_eq!(response.status(), StatusCode::OK);
3063 let updated = zeroclaw_runtime::cron::get_job(&state.config.read().clone(), &job.id)
3064 .expect("updated job");
3065 assert_eq!(
3066 updated.schedule,
3067 zeroclaw_runtime::cron::Schedule::Cron {
3068 expr: "30 9 * * *".to_string(),
3069 tz: Some("Asia/Tokyo".to_string()),
3070 }
3071 );
3072 }
3073
3074 #[tokio::test]
3075 async fn cron_api_timezone_patch_sets_timezone_without_schedule_change() {
3076 let tmp = tempfile::TempDir::new().unwrap();
3077 let config = zeroclaw_config::schema::Config {
3078 data_dir: tmp.path().join("workspace"),
3079 config_path: tmp.path().join("config.toml"),
3080 ..zeroclaw_config::schema::Config::default()
3081 };
3082 std::fs::create_dir_all(&config.data_dir).unwrap();
3083 let state = test_state(with_test_agent(config));
3084 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
3085 &state.config.read().clone(),
3086 "test-agent",
3087 Some("runtime-local-job".to_string()),
3088 zeroclaw_runtime::cron::Schedule::Cron {
3089 expr: "0 9 * * *".to_string(),
3090 tz: None,
3091 },
3092 "echo hello",
3093 None,
3094 true,
3095 )
3096 .expect("job added");
3097
3098 let response = handle_api_cron_patch(
3099 State(state.clone()),
3100 HeaderMap::new(),
3101 Path(job.id.clone()),
3102 Json(
3103 serde_json::from_value::<CronPatchBody>(serde_json::json!({
3104 "agent": "test-agent",
3105 "tz": "America/Chicago"
3106 }))
3107 .expect("body should deserialize"),
3108 ),
3109 )
3110 .await
3111 .into_response();
3112
3113 assert_eq!(response.status(), StatusCode::OK);
3114 let updated = zeroclaw_runtime::cron::get_job(&state.config.read().clone(), &job.id)
3115 .expect("updated job");
3116 assert_eq!(
3117 updated.schedule,
3118 zeroclaw_runtime::cron::Schedule::Cron {
3119 expr: "0 9 * * *".to_string(),
3120 tz: Some("America/Chicago".to_string()),
3121 }
3122 );
3123 }
3124
3125 #[tokio::test]
3126 async fn cron_api_timezone_patch_rejects_invalid_timezone_as_bad_request() {
3127 let tmp = tempfile::TempDir::new().unwrap();
3128 let config = zeroclaw_config::schema::Config {
3129 data_dir: tmp.path().join("workspace"),
3130 config_path: tmp.path().join("config.toml"),
3131 ..zeroclaw_config::schema::Config::default()
3132 };
3133 std::fs::create_dir_all(&config.data_dir).unwrap();
3134 let state = test_state(with_test_agent(config));
3135 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
3136 &state.config.read().clone(),
3137 "test-agent",
3138 Some("localized-job".to_string()),
3139 zeroclaw_runtime::cron::Schedule::Cron {
3140 expr: "0 9 * * *".to_string(),
3141 tz: Some("America/New_York".to_string()),
3142 },
3143 "echo hello",
3144 None,
3145 true,
3146 )
3147 .expect("job added");
3148
3149 let response = handle_api_cron_patch(
3150 State(state),
3151 HeaderMap::new(),
3152 Path(job.id),
3153 Json(
3154 serde_json::from_value::<CronPatchBody>(serde_json::json!({
3155 "agent": "test-agent",
3156 "tz": "Invalid/Zone"
3157 }))
3158 .expect("body should deserialize"),
3159 ),
3160 )
3161 .await
3162 .into_response();
3163
3164 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
3165 let json = response_json(response).await;
3166 assert!(
3167 json["error"]
3168 .as_str()
3169 .unwrap_or_default()
3170 .contains("Invalid IANA timezone")
3171 );
3172 }
3173
3174 #[tokio::test]
3175 async fn cron_api_timezone_patch_clears_timezone_with_explicit_signal() {
3176 let tmp = tempfile::TempDir::new().unwrap();
3177 let config = zeroclaw_config::schema::Config {
3178 data_dir: tmp.path().join("workspace"),
3179 config_path: tmp.path().join("config.toml"),
3180 ..zeroclaw_config::schema::Config::default()
3181 };
3182 std::fs::create_dir_all(&config.data_dir).unwrap();
3183 let state = test_state(with_test_agent(config));
3184 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
3185 &state.config.read().clone(),
3186 "test-agent",
3187 Some("localized-job".to_string()),
3188 zeroclaw_runtime::cron::Schedule::Cron {
3189 expr: "0 9 * * *".to_string(),
3190 tz: Some("America/New_York".to_string()),
3191 },
3192 "echo hello",
3193 None,
3194 true,
3195 )
3196 .expect("job added");
3197
3198 let response = handle_api_cron_patch(
3199 State(state.clone()),
3200 HeaderMap::new(),
3201 Path(job.id.clone()),
3202 Json(
3203 serde_json::from_value::<CronPatchBody>(serde_json::json!({
3204 "agent": "test-agent",
3205 "clear_tz": true
3206 }))
3207 .expect("body should deserialize"),
3208 ),
3209 )
3210 .await
3211 .into_response();
3212
3213 assert_eq!(response.status(), StatusCode::OK);
3214 let updated = zeroclaw_runtime::cron::get_job(&state.config.read().clone(), &job.id)
3215 .expect("updated job");
3216 assert_eq!(
3217 updated.schedule,
3218 zeroclaw_runtime::cron::Schedule::Cron {
3219 expr: "0 9 * * *".to_string(),
3220 tz: None,
3221 }
3222 );
3223 }
3224
3225 #[tokio::test]
3226 async fn cron_api_rejects_announce_delivery_without_target() {
3227 let tmp = tempfile::TempDir::new().unwrap();
3228 let config = zeroclaw_config::schema::Config {
3229 data_dir: tmp.path().join("data"),
3230 config_path: tmp.path().join("config.toml"),
3231 ..zeroclaw_config::schema::Config::default()
3232 };
3233 std::fs::create_dir_all(&config.data_dir).unwrap();
3234 let state = test_state(with_test_agent(config));
3235
3236 let response = handle_api_cron_add(
3237 State(state.clone()),
3238 HeaderMap::new(),
3239 Json(
3240 serde_json::from_value::<CronAddBody>(serde_json::json!({
3241 "name": "invalid-delivery-job",
3242 "agent": "test-agent",
3243 "schedule": "*/5 * * * *",
3244 "command": "echo hello",
3245 "delivery": {
3246 "mode": "announce",
3247 "channel": "discord"
3248 }
3249 }))
3250 .expect("body should deserialize"),
3251 ),
3252 )
3253 .await
3254 .into_response();
3255
3256 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
3257 let json = response_json(response).await;
3258 assert!(
3259 json["error"]
3260 .as_str()
3261 .unwrap_or_default()
3262 .contains("delivery.to is required")
3263 );
3264
3265 let config = state.config.read().clone();
3266 assert!(
3267 zeroclaw_runtime::cron::list_jobs(&config)
3268 .unwrap()
3269 .is_empty()
3270 );
3271 }
3272
3273 #[tokio::test]
3274 async fn cron_api_run_executes_shell_job_and_records_run() {
3275 let tmp = tempfile::TempDir::new().unwrap();
3276 let config = zeroclaw_config::schema::Config {
3277 data_dir: tmp.path().join("data"),
3278 config_path: tmp.path().join("config.toml"),
3279 ..zeroclaw_config::schema::Config::default()
3280 };
3281 std::fs::create_dir_all(&config.data_dir).unwrap();
3282 let state = test_state(with_test_agent(config));
3283
3284 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
3285 &state.config.read().clone(),
3286 "test-agent",
3287 None,
3288 zeroclaw_runtime::cron::Schedule::Cron {
3289 expr: "*/5 * * * *".to_string(),
3290 tz: None,
3291 },
3292 "echo hello-from-manual-trigger",
3293 None,
3294 true,
3295 )
3296 .expect("job added");
3297
3298 link_job_to_test_agent(&state, &job.id);
3301
3302 let response =
3303 handle_api_cron_run(State(state.clone()), HeaderMap::new(), Path(job.id.clone()))
3304 .await
3305 .into_response();
3306
3307 assert_eq!(response.status(), StatusCode::OK);
3308 let json = response_json(response).await;
3309 assert_eq!(json["status"], "ok");
3310 assert_eq!(json["success"], true);
3311 assert_eq!(json["job_id"], job.id);
3312 assert!(
3313 json["output"]
3314 .as_str()
3315 .unwrap_or_default()
3316 .contains("hello-from-manual-trigger")
3317 );
3318
3319 let runs = zeroclaw_runtime::cron::list_runs(&state.config.read().clone(), &job.id, 10)
3320 .expect("runs listed");
3321 assert_eq!(runs.len(), 1);
3322 assert_eq!(runs[0].status, "ok");
3323 }
3324
3325 #[tokio::test]
3326 async fn cron_api_run_records_best_effort_delivery_failure_as_degraded() {
3327 zeroclaw_runtime::cron::scheduler::register_delivery_fn(Box::new(
3328 |_config, channel, _target, _thread_id, _output| {
3329 Box::pin(async move {
3330 if channel == "fail-delivery" {
3331 anyhow::bail!("synthetic delivery failure");
3332 }
3333 Ok(())
3334 })
3335 },
3336 ));
3337
3338 let tmp = tempfile::TempDir::new().unwrap();
3339 let config = zeroclaw_config::schema::Config {
3340 data_dir: tmp.path().join("data"),
3341 config_path: tmp.path().join("config.toml"),
3342 ..zeroclaw_config::schema::Config::default()
3343 };
3344 std::fs::create_dir_all(&config.data_dir).unwrap();
3345 let state = test_state(with_test_agent(config));
3346
3347 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
3348 &state.config.read().clone(),
3349 "test-agent",
3350 None,
3351 zeroclaw_runtime::cron::Schedule::Cron {
3352 expr: "*/5 * * * *".to_string(),
3353 tz: None,
3354 },
3355 "echo hello-from-manual-trigger",
3356 Some(zeroclaw_runtime::cron::DeliveryConfig {
3357 mode: "announce".into(),
3358 channel: Some("fail-delivery".into()),
3359 to: Some("123456".into()),
3360 thread_id: None,
3361 best_effort: true,
3362 }),
3363 true,
3364 )
3365 .expect("job added");
3366 link_job_to_test_agent(&state, &job.id);
3367
3368 let response =
3369 handle_api_cron_run(State(state.clone()), HeaderMap::new(), Path(job.id.clone()))
3370 .await
3371 .into_response();
3372
3373 assert_eq!(response.status(), StatusCode::OK);
3374 let json = response_json(response).await;
3375 assert_eq!(json["status"], "degraded");
3376 assert_eq!(json["success"], true);
3377 assert!(
3378 json["output"]
3379 .as_str()
3380 .unwrap_or_default()
3381 .contains("delivery failed:")
3382 );
3383
3384 let config = state.config.read().clone();
3385 let updated = zeroclaw_runtime::cron::get_job(&config, &job.id).expect("updated job");
3386 assert_eq!(updated.last_status.as_deref(), Some("degraded"));
3387 assert!(
3388 updated
3389 .last_output
3390 .as_deref()
3391 .unwrap_or_default()
3392 .contains("delivery failed:")
3393 );
3394
3395 let runs = zeroclaw_runtime::cron::list_runs(&config, &job.id, 10).expect("runs listed");
3396 assert_eq!(runs.len(), 1);
3397 assert_eq!(runs[0].status, "degraded");
3398 assert!(
3399 runs[0]
3400 .output
3401 .as_deref()
3402 .unwrap_or_default()
3403 .contains("delivery failed:")
3404 );
3405 }
3406
3407 #[tokio::test]
3408 async fn cron_api_run_returns_not_found_for_unknown_job() {
3409 let tmp = tempfile::TempDir::new().unwrap();
3410 let config = zeroclaw_config::schema::Config {
3411 data_dir: tmp.path().join("data"),
3412 config_path: tmp.path().join("config.toml"),
3413 ..zeroclaw_config::schema::Config::default()
3414 };
3415 std::fs::create_dir_all(&config.data_dir).unwrap();
3416 let state = test_state(with_test_agent(config));
3417
3418 let response = handle_api_cron_run(
3419 State(state),
3420 HeaderMap::new(),
3421 Path("does-not-exist".to_string()),
3422 )
3423 .await
3424 .into_response();
3425
3426 assert_eq!(response.status(), StatusCode::NOT_FOUND);
3427 }
3428
3429 use crate::api_pairing::{
3436 DeviceInfo, DeviceRegistry, revoke_device, rotate_token as rotate_device_token,
3437 submit_pairing_enhanced,
3438 };
3439 use chrono::Utc;
3440
3441 async fn paired_state_with_device(tmp: &tempfile::TempDir) -> (AppState, String, String) {
3442 let data_dir = tmp.path().join("workspace");
3443 std::fs::create_dir_all(&data_dir).unwrap();
3444 let config = zeroclaw_config::schema::Config {
3445 data_dir: data_dir.clone(),
3446 config_path: tmp.path().join("config.toml"),
3447 ..zeroclaw_config::schema::Config::default()
3448 };
3449
3450 let pairing = Arc::new(PairingGuard::new(true, &[]));
3451 let code = pairing.pairing_code().unwrap();
3452 let token = pairing.try_pair(&code, "test").await.unwrap().unwrap();
3453 let token_hash = PairingGuard::token_hash(&token);
3454
3455 let registry = Arc::new(DeviceRegistry::new(&data_dir));
3456 let device_id = "dev-1".to_string();
3457 registry.register(
3458 token_hash,
3459 DeviceInfo {
3460 id: device_id.clone(),
3461 name: None,
3462 device_type: None,
3463 paired_at: Utc::now(),
3464 last_seen: Utc::now(),
3465 ip_address: None,
3466 capabilities: None,
3467 },
3468 );
3469
3470 let mut state = test_state(config);
3471 state.pairing = pairing;
3472 state.device_registry = Some(registry);
3473 (state, token, device_id)
3474 }
3475
3476 fn bearer_headers(token: &str) -> HeaderMap {
3477 let mut h = HeaderMap::new();
3478 h.insert(
3479 header::AUTHORIZATION,
3480 format!("Bearer {token}").parse().unwrap(),
3481 );
3482 h
3483 }
3484
3485 #[tokio::test]
3490 async fn rotate_token_invalidates_old_bearer_token() {
3491 let tmp = tempfile::TempDir::new().unwrap();
3492 let (state, old_token, device_id) = paired_state_with_device(&tmp).await;
3493 assert!(state.pairing.is_authenticated(&old_token));
3494
3495 let response = rotate_device_token(
3496 State(state.clone()),
3497 bearer_headers(&old_token),
3498 Path(device_id.clone()),
3499 )
3500 .await
3501 .into_response();
3502 assert_eq!(response.status(), StatusCode::OK);
3503
3504 assert!(
3505 !state.pairing.is_authenticated(&old_token),
3506 "old bearer token must not authenticate after rotate"
3507 );
3508
3509 let json = response_json(response).await;
3510 assert_eq!(json["device_id"], device_id);
3511 assert!(json["pairing_code"].is_string());
3512 }
3513
3514 #[tokio::test]
3518 async fn rotate_token_persists_revocation_to_config() {
3519 let tmp = tempfile::TempDir::new().unwrap();
3520 let (state, old_token, device_id) = paired_state_with_device(&tmp).await;
3521 let old_hash = PairingGuard::token_hash(&old_token);
3522
3523 let response = rotate_device_token(
3524 State(state.clone()),
3525 bearer_headers(&old_token),
3526 Path(device_id),
3527 )
3528 .await
3529 .into_response();
3530 assert_eq!(response.status(), StatusCode::OK);
3531
3532 let persisted = state.config.read().gateway.paired_tokens.clone();
3533 assert!(
3534 !persisted.contains(&old_hash),
3535 "revoked token hash must not remain in gateway.paired_tokens"
3536 );
3537 }
3538
3539 #[tokio::test]
3545 async fn submit_pairing_enhanced_persists_new_token() {
3546 let tmp = tempfile::TempDir::new().unwrap();
3547 let (state, _old_token, _device_id) = paired_state_with_device(&tmp).await;
3548
3549 let code = state
3550 .pairing
3551 .generate_new_pairing_code()
3552 .expect("require_pairing was enabled");
3553
3554 let response = submit_pairing_enhanced(
3555 State(state.clone()),
3556 HeaderMap::new(),
3557 Json(serde_json::json!({ "code": code, "device_name": "repaired" })),
3558 )
3559 .await
3560 .into_response();
3561 assert_eq!(response.status(), StatusCode::OK);
3562
3563 let json = response_json(response).await;
3564 assert_eq!(json["persisted"], true);
3565 let new_token = json["token"].as_str().expect("token in response");
3566 let new_hash = PairingGuard::token_hash(new_token);
3567 assert!(
3568 state
3569 .config
3570 .read()
3571 .gateway
3572 .paired_tokens
3573 .contains(&new_hash),
3574 "newly paired token hash must be persisted to gateway.paired_tokens"
3575 );
3576 }
3577
3578 #[tokio::test]
3581 async fn revoke_device_invalidates_bearer_token() {
3582 let tmp = tempfile::TempDir::new().unwrap();
3583 let (state, old_token, device_id) = paired_state_with_device(&tmp).await;
3584
3585 let response = revoke_device(
3586 State(state.clone()),
3587 bearer_headers(&old_token),
3588 Path(device_id),
3589 )
3590 .await
3591 .into_response();
3592 assert_eq!(response.status(), StatusCode::OK);
3593
3594 assert!(
3595 !state.pairing.is_authenticated(&old_token),
3596 "bearer token must not authenticate after device delete"
3597 );
3598 let old_hash = PairingGuard::token_hash(&old_token);
3599 assert!(
3600 !state
3601 .config
3602 .read()
3603 .gateway
3604 .paired_tokens
3605 .contains(&old_hash),
3606 "deleted device's token must be dropped from persisted paired_tokens"
3607 );
3608 }
3609
3610 #[tokio::test]
3611 async fn rotate_unknown_device_returns_not_found() {
3612 let tmp = tempfile::TempDir::new().unwrap();
3613 let (state, token, _) = paired_state_with_device(&tmp).await;
3614
3615 let response = rotate_device_token(
3616 State(state.clone()),
3617 bearer_headers(&token),
3618 Path("does-not-exist".into()),
3619 )
3620 .await
3621 .into_response();
3622 assert_eq!(response.status(), StatusCode::NOT_FOUND);
3623 assert!(
3624 state.pairing.is_authenticated(&token),
3625 "unknown-device rotate must not touch existing tokens"
3626 );
3627 }
3628
3629 #[tokio::test]
3635 async fn rotate_with_pending_code_revokes_but_returns_null_code() {
3636 let tmp = tempfile::TempDir::new().unwrap();
3637 let (state, token, device_id) = paired_state_with_device(&tmp).await;
3638
3639 let pending_code = state
3640 .pairing
3641 .generate_new_pairing_code()
3642 .expect("require_pairing was enabled");
3643
3644 let response = rotate_device_token(
3645 State(state.clone()),
3646 bearer_headers(&token),
3647 Path(device_id.clone()),
3648 )
3649 .await
3650 .into_response();
3651 assert_eq!(response.status(), StatusCode::OK);
3652
3653 assert!(
3654 !state.pairing.is_authenticated(&token),
3655 "old bearer token must be revoked even when a pairing code is pending"
3656 );
3657 assert_eq!(
3658 state.pairing.pairing_code().as_deref(),
3659 Some(pending_code.as_str()),
3660 "pending pairing code must survive rotate",
3661 );
3662
3663 let json = response_json(response).await;
3664 assert!(json["pairing_code"].is_null());
3665 assert_eq!(json["device_id"], device_id);
3666 }
3667
3668 #[tokio::test]
3674 async fn concurrent_rotates_do_not_both_issue_a_pairing_code() {
3675 let tmp = tempfile::TempDir::new().unwrap();
3676 let data_dir = tmp.path().join("workspace");
3677 std::fs::create_dir_all(&data_dir).unwrap();
3678 let config = zeroclaw_config::schema::Config {
3679 data_dir: data_dir.clone(),
3680 config_path: tmp.path().join("config.toml"),
3681 ..zeroclaw_config::schema::Config::default()
3682 };
3683
3684 let pairing = Arc::new(PairingGuard::new(true, &[]));
3685 let code = pairing.pairing_code().unwrap();
3686 let admin_token = pairing.try_pair(&code, "admin").await.unwrap().unwrap();
3687
3688 let registry = Arc::new(DeviceRegistry::new(&data_dir));
3689 for id in ["dev-a", "dev-b"] {
3690 let code = pairing
3692 .generate_new_pairing_code()
3693 .expect("pairing enabled");
3694 let tok = pairing.try_pair(&code, id).await.unwrap().unwrap();
3695 registry.register(
3696 PairingGuard::token_hash(&tok),
3697 DeviceInfo {
3698 id: id.to_string(),
3699 name: None,
3700 device_type: None,
3701 paired_at: Utc::now(),
3702 last_seen: Utc::now(),
3703 ip_address: None,
3704 capabilities: None,
3705 },
3706 );
3707 }
3708
3709 let mut state = test_state(config);
3710 state.pairing = pairing;
3711 state.device_registry = Some(registry);
3712
3713 let s1 = state.clone();
3714 let s2 = state.clone();
3715 let h1 = bearer_headers(&admin_token);
3716 let h2 = bearer_headers(&admin_token);
3717 let (r1, r2) = tokio::join!(
3718 async move {
3719 rotate_device_token(State(s1), h1, Path("dev-a".into()))
3720 .await
3721 .into_response()
3722 },
3723 async move {
3724 rotate_device_token(State(s2), h2, Path("dev-b".into()))
3725 .await
3726 .into_response()
3727 },
3728 );
3729
3730 assert_eq!(r1.status(), StatusCode::OK);
3731 assert_eq!(r2.status(), StatusCode::OK);
3732 let j1 = response_json(r1).await;
3733 let j2 = response_json(r2).await;
3734 let codes_issued = usize::from(j1["pairing_code"].is_string())
3735 + usize::from(j2["pairing_code"].is_string());
3736 assert_eq!(
3737 codes_issued, 1,
3738 "exactly one of two racing rotates must win the pairing slot, \
3739 got {codes_issued} (j1={j1}, j2={j2})"
3740 );
3741 }
3742}