1use super::AppState;
6use axum::{
7 extract::{Path, Query, State},
8 http::{HeaderMap, StatusCode, header},
9 response::{IntoResponse, Json},
10};
11use serde::{Deserialize, Serialize};
12
13fn extract_bearer_token(headers: &HeaderMap) -> Option<&str> {
17 headers
18 .get(header::AUTHORIZATION)
19 .and_then(|v| v.to_str().ok())
20 .and_then(|auth| auth.strip_prefix("Bearer "))
21}
22
23pub(super) fn require_auth(
25 state: &AppState,
26 headers: &HeaderMap,
27) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
28 if !state.pairing.require_pairing() {
29 return Ok(());
30 }
31
32 let token = extract_bearer_token(headers).unwrap_or("");
33 if state.pairing.is_authenticated(token) {
34 Ok(())
35 } else {
36 Err((
37 StatusCode::UNAUTHORIZED,
38 Json(serde_json::json!({
39 "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
40 })),
41 ))
42 }
43}
44
45#[derive(Deserialize)]
48pub struct MemoryQuery {
49 pub query: Option<String>,
50 pub category: Option<String>,
51 pub since: Option<String>,
53 pub until: Option<String>,
55 #[serde(default)]
60 pub agent: Option<String>,
61}
62
63#[derive(Deserialize)]
64pub struct MemoryStoreBody {
65 pub key: String,
66 pub content: String,
67 pub category: Option<String>,
68 #[serde(default)]
71 pub agent: Option<String>,
72}
73
74#[derive(Deserialize)]
75pub struct MemoryDeleteQuery {
76 #[serde(default)]
79 pub agent: Option<String>,
80}
81
82#[derive(Deserialize)]
83pub struct CronRunsQuery {
84 pub limit: Option<u32>,
85}
86
87#[derive(Deserialize)]
88pub struct CronAddBody {
89 pub agent: String,
92 pub name: Option<String>,
93 pub schedule: String,
94 pub tz: Option<String>,
95 pub command: Option<String>,
96 pub job_type: Option<String>,
97 pub prompt: Option<String>,
98 pub delivery: Option<zeroclaw_runtime::cron::DeliveryConfig>,
99 pub session_target: Option<String>,
100 pub model: Option<String>,
101 pub allowed_tools: Option<Vec<String>>,
102 pub delete_after_run: Option<bool>,
103}
104
105#[derive(Deserialize)]
106pub struct CronPatchBody {
107 pub agent: String,
110 pub name: Option<String>,
111 pub schedule: Option<String>,
112 pub tz: Option<String>,
113 pub clear_tz: Option<bool>,
114 pub command: Option<String>,
115 pub prompt: Option<String>,
116}
117
118enum CronTimezonePatch {
119 Preserve,
120 Set(String),
121 Clear,
122}
123
124fn bad_request(message: impl Into<String>) -> (StatusCode, Json<serde_json::Value>) {
125 (
126 StatusCode::BAD_REQUEST,
127 Json(serde_json::json!({ "error": message.into() })),
128 )
129}
130
131fn normalize_optional_timezone(
132 tz: Option<String>,
133) -> Result<Option<String>, (StatusCode, Json<serde_json::Value>)> {
134 match tz {
135 Some(raw) => {
136 let trimmed = raw.trim();
137 if trimmed.is_empty() {
138 Err(bad_request(
139 "tz must be a non-empty IANA timezone; use clear_tz=true to clear it",
140 ))
141 } else {
142 Ok(Some(trimmed.to_string()))
143 }
144 }
145 None => Ok(None),
146 }
147}
148
149fn parse_timezone_patch(
150 tz: Option<String>,
151 clear_tz: Option<bool>,
152) -> Result<CronTimezonePatch, (StatusCode, Json<serde_json::Value>)> {
153 let tz = normalize_optional_timezone(tz)?;
154 let clear_tz = clear_tz.unwrap_or(false);
155
156 if clear_tz && tz.is_some() {
157 return Err(bad_request("Provide either tz or clear_tz=true, not both"));
158 }
159
160 if clear_tz {
161 Ok(CronTimezonePatch::Clear)
162 } else if let Some(tz) = tz {
163 Ok(CronTimezonePatch::Set(tz))
164 } else {
165 Ok(CronTimezonePatch::Preserve)
166 }
167}
168
169fn cron_schedule_from_api(
170 expr: String,
171 tz: Option<String>,
172) -> Result<zeroclaw_runtime::cron::Schedule, (StatusCode, Json<serde_json::Value>)> {
173 let schedule = zeroclaw_runtime::cron::Schedule::Cron { expr, tz };
174 zeroclaw_runtime::cron::validate_schedule(&schedule, chrono::Utc::now())
175 .map_err(|e| bad_request(format!("Invalid cron schedule: {e}")))?;
176 Ok(schedule)
177}
178
179#[derive(Deserialize)]
180pub struct SessionMessagePostBody {
181 pub content: String,
182}
183
184#[derive(Debug, Deserialize)]
191pub struct StatusQuery {
192 #[serde(default)]
193 pub agent: Option<String>,
194}
195
196pub async fn handle_api_status(
198 State(state): State<AppState>,
199 headers: HeaderMap,
200 Query(query): Query<StatusQuery>,
201) -> impl IntoResponse {
202 if let Err(e) = require_auth(&state, &headers) {
203 return e.into_response();
204 }
205
206 let config = state.config.read().clone();
207 let health = zeroclaw_runtime::health::snapshot();
208
209 let mut channels = serde_json::Map::new();
213 for info in config.channels_by_alias() {
214 let composite = format!("{}.{}", info.channel_type, info.alias);
215 channels.insert(composite, serde_json::Value::Bool(true));
216 }
217
218 let locale = config
219 .locale
220 .as_deref()
221 .filter(|s| !s.is_empty())
222 .map(String::from)
223 .unwrap_or_else(zeroclaw_runtime::i18n::detect_locale);
224
225 let agent_alias = query.agent.as_deref().filter(|s| !s.trim().is_empty());
230 let (model_provider, model, temperature, memory_backend) =
231 match agent_alias.and_then(|alias| config.agent(alias).map(|a| (alias, a))) {
232 Some((alias, agent)) => {
233 let provider_ref = if agent.model_provider.is_empty() {
234 None
235 } else {
236 Some(agent.model_provider.as_str().to_string())
237 };
238 let resolved = config.resolved_model_provider_for_agent(alias);
239 let model = resolved
240 .as_ref()
241 .and_then(|(_, _, cfg)| cfg.model.clone())
242 .unwrap_or_default();
243 let temperature: Option<f64> =
244 resolved.as_ref().and_then(|(_, _, cfg)| cfg.temperature);
245 let backend_kind = agent.memory.backend;
246 let backend = serde_json::to_value(backend_kind)
247 .ok()
248 .and_then(|v| v.as_str().map(String::from))
249 .unwrap_or_else(|| format!("{backend_kind:?}").to_lowercase());
250 (provider_ref, model, temperature, backend)
251 }
252 None => (
253 config.first_model_provider_alias(),
254 state.model.clone(),
255 state.temperature,
256 state.mem.name().to_string(),
257 ),
258 };
259
260 let process = zeroclaw_runtime::process_stats::sample();
261
262 let body = serde_json::json!({
263 "version": env!("CARGO_PKG_VERSION"),
264 "model_provider": model_provider,
265 "model": model,
266 "temperature": temperature,
267 "uptime_seconds": health.uptime_seconds,
268 "daemon_started_at": zeroclaw_runtime::health::daemon_started_at(),
269 "gateway_port": config.gateway.port,
270 "locale": locale,
271 "memory_backend": memory_backend,
272 "paired": state.pairing.is_paired(),
273 "channels": channels,
274 "health": health,
275 "agent_alias": agent_alias,
276 "process": process,
277 });
278
279 Json(body).into_response()
280}
281
282pub async fn handle_api_tools(
284 State(state): State<AppState>,
285 headers: HeaderMap,
286) -> impl IntoResponse {
287 if let Err(e) = require_auth(&state, &headers) {
288 return e.into_response();
289 }
290
291 let tools: Vec<serde_json::Value> = state
292 .tools_registry
293 .iter()
294 .map(|spec| {
295 serde_json::json!({
296 "name": spec.name,
297 "description": spec.description,
298 "parameters": spec.parameters,
299 })
300 })
301 .collect();
302
303 Json(serde_json::json!({"tools": tools})).into_response()
304}
305
306pub async fn handle_api_cron_list(
308 State(state): State<AppState>,
309 headers: HeaderMap,
310) -> impl IntoResponse {
311 if let Err(e) = require_auth(&state, &headers) {
312 return e.into_response();
313 }
314
315 let config = state.config.read().clone();
316 match zeroclaw_runtime::cron::list_jobs(&config) {
317 Ok(jobs) => Json(serde_json::json!({"jobs": jobs})).into_response(),
318 Err(e) => (
319 StatusCode::INTERNAL_SERVER_ERROR,
320 Json(serde_json::json!({"error": format!("Failed to list cron jobs: {e}")})),
321 )
322 .into_response(),
323 }
324}
325
326pub async fn handle_api_cron_add(
328 State(state): State<AppState>,
329 headers: HeaderMap,
330 Json(body): Json<CronAddBody>,
331) -> impl IntoResponse {
332 if let Err(e) = require_auth(&state, &headers) {
333 return e.into_response();
334 }
335
336 let CronAddBody {
337 agent: agent_alias,
338 name,
339 schedule,
340 tz,
341 command,
342 job_type,
343 prompt,
344 delivery,
345 session_target,
346 model,
347 allowed_tools,
348 delete_after_run,
349 } = body;
350
351 let config = state.config.read().clone();
352 if config.agent(&agent_alias).is_none() {
353 return (
354 StatusCode::BAD_REQUEST,
355 Json(serde_json::json!({"error": format!(
356 "Unknown agent {agent_alias:?} (no [agents.{agent_alias}] entry configured)"
357 )})),
358 )
359 .into_response();
360 }
361 let tz = match normalize_optional_timezone(tz) {
362 Ok(tz) => tz,
363 Err(e) => return e.into_response(),
364 };
365 let schedule = match cron_schedule_from_api(schedule, tz) {
366 Ok(schedule) => schedule,
367 Err(e) => return e.into_response(),
368 };
369 if let Err(e) = zeroclaw_runtime::cron::validate_delivery_config(delivery.as_ref()) {
370 return (
371 StatusCode::BAD_REQUEST,
372 Json(serde_json::json!({"error": format!("Failed to add cron job: {e}")})),
373 )
374 .into_response();
375 }
376
377 let is_agent =
379 matches!(job_type.as_deref(), Some("agent")) || (job_type.is_none() && prompt.is_some());
380
381 let result = if is_agent {
382 let prompt = match prompt.as_deref() {
383 Some(p) if !p.trim().is_empty() => p,
384 _ => {
385 return (
386 StatusCode::BAD_REQUEST,
387 Json(serde_json::json!({"error": "Missing 'prompt' for agent job"})),
388 )
389 .into_response();
390 }
391 };
392
393 let session_target = session_target
394 .as_deref()
395 .map(zeroclaw_runtime::cron::SessionTarget::parse)
396 .unwrap_or_default();
397
398 let default_delete = matches!(schedule, zeroclaw_runtime::cron::Schedule::At { .. });
399 let delete_after_run = delete_after_run.unwrap_or(default_delete);
400
401 zeroclaw_runtime::cron::add_agent_job(
402 &config,
403 &agent_alias,
404 name,
405 schedule,
406 prompt,
407 session_target,
408 model,
409 delivery,
410 delete_after_run,
411 allowed_tools,
412 )
413 } else {
414 let command = match command.as_deref() {
415 Some(c) if !c.trim().is_empty() => c,
416 _ => {
417 return (
418 StatusCode::BAD_REQUEST,
419 Json(serde_json::json!({"error": "Missing 'command' for shell job"})),
420 )
421 .into_response();
422 }
423 };
424
425 zeroclaw_runtime::cron::add_shell_job_with_approval(
426 &config,
427 &agent_alias,
428 name,
429 schedule,
430 command,
431 delivery,
432 false,
433 )
434 };
435
436 match result {
437 Ok(job) => Json(serde_json::json!({"status": "ok", "job": job})).into_response(),
438 Err(e) => (
439 StatusCode::INTERNAL_SERVER_ERROR,
440 Json(serde_json::json!({"error": format!("Failed to add cron job: {e}")})),
441 )
442 .into_response(),
443 }
444}
445
446pub async fn handle_api_cron_runs(
448 State(state): State<AppState>,
449 headers: HeaderMap,
450 Path(id): Path<String>,
451 Query(params): Query<CronRunsQuery>,
452) -> impl IntoResponse {
453 if let Err(e) = require_auth(&state, &headers) {
454 return e.into_response();
455 }
456
457 let limit = params.limit.unwrap_or(20).clamp(1, 100) as usize;
458 let config = state.config.read().clone();
459
460 if let Err(e) = zeroclaw_runtime::cron::get_job(&config, &id) {
462 return (
463 StatusCode::NOT_FOUND,
464 Json(serde_json::json!({"error": format!("Cron job not found: {e}")})),
465 )
466 .into_response();
467 }
468
469 match zeroclaw_runtime::cron::list_runs(&config, &id, limit) {
470 Ok(runs) => {
471 let runs_json: Vec<serde_json::Value> = runs
472 .iter()
473 .map(|r| {
474 serde_json::json!({
475 "id": r.id,
476 "job_id": r.job_id,
477 "started_at": r.started_at.to_rfc3339(),
478 "finished_at": r.finished_at.to_rfc3339(),
479 "status": r.status,
480 "output": r.output,
481 "duration_ms": r.duration_ms,
482 })
483 })
484 .collect();
485 Json(serde_json::json!({"runs": runs_json})).into_response()
486 }
487 Err(e) => (
488 StatusCode::INTERNAL_SERVER_ERROR,
489 Json(serde_json::json!({"error": format!("Failed to list cron runs: {e}")})),
490 )
491 .into_response(),
492 }
493}
494
495pub async fn handle_api_cron_run(
497 State(state): State<AppState>,
498 headers: HeaderMap,
499 Path(id): Path<String>,
500) -> impl IntoResponse {
501 if let Err(e) = require_auth(&state, &headers) {
502 return e.into_response();
503 }
504
505 let config = state.config.read().clone();
506
507 let job = match zeroclaw_runtime::cron::get_job(&config, &id) {
508 Ok(job) => job,
509 Err(e) => {
510 return (
511 StatusCode::NOT_FOUND,
512 Json(serde_json::json!({"error": format!("Cron job not found: {e}")})),
513 )
514 .into_response();
515 }
516 };
517
518 let started_at = chrono::Utc::now();
519 let (mut success, output) =
520 zeroclaw_runtime::cron::scheduler::execute_job_now(&config, &job).await;
521 let finished_at = chrono::Utc::now();
522 let duration_ms = (finished_at - started_at).num_milliseconds();
523 let outcome = zeroclaw_runtime::cron::scheduler::deliver_and_classify_run_result(
524 &config,
525 &job,
526 success,
527 output,
528 zeroclaw_runtime::cron::scheduler::CronDeliveryContext::GatewayManual,
529 )
530 .await;
531 success = outcome.success;
532
533 if let Err(e) = zeroclaw_runtime::cron::record_run(
534 &config,
535 &job.id,
536 started_at,
537 finished_at,
538 &outcome.status,
539 Some(&outcome.output),
540 duration_ms,
541 ) {
542 ::zeroclaw_log::record!(
543 WARN,
544 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
545 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
546 .with_attrs(::serde_json::json!({"job_id": job.id, "error": format!("{}", e)})),
547 "manual cron trigger: failed to persist run history"
548 );
549 }
550 if let Err(e) = zeroclaw_runtime::cron::record_last_run_with_status(
551 &config,
552 &job.id,
553 finished_at,
554 &outcome.status,
555 &outcome.output,
556 ) {
557 ::zeroclaw_log::record!(
558 WARN,
559 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
560 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
561 .with_attrs(::serde_json::json!({"job_id": job.id, "error": format!("{}", e)})),
562 "manual cron trigger: failed to update last_run state"
563 );
564 }
565
566 let _ = state.event_tx.send(serde_json::json!({
569 "type": "cron_result",
570 "job_id": job.id,
571 "success": success,
572 "output": &outcome.output,
573 "manual": true,
574 "timestamp": finished_at.to_rfc3339(),
575 }));
576
577 Json(serde_json::json!({
578 "status": &outcome.status,
579 "job_id": job.id,
580 "success": success,
581 "output": &outcome.output,
582 "duration_ms": duration_ms,
583 "started_at": started_at.to_rfc3339(),
584 "finished_at": finished_at.to_rfc3339(),
585 }))
586 .into_response()
587}
588
589pub async fn handle_api_cron_patch(
591 State(state): State<AppState>,
592 headers: HeaderMap,
593 Path(id): Path<String>,
594 Json(body): Json<CronPatchBody>,
595) -> impl IntoResponse {
596 if let Err(e) = require_auth(&state, &headers) {
597 return e.into_response();
598 }
599
600 let config = state.config.read().clone();
601 if config.agent(&body.agent).is_none() {
602 return (
603 StatusCode::BAD_REQUEST,
604 Json(serde_json::json!({"error": format!(
605 "Unknown agent {a:?} (no [agents.{a}] entry configured)",
606 a = body.agent
607 )})),
608 )
609 .into_response();
610 }
611 let agent_alias = body.agent.clone();
612 let CronPatchBody {
613 agent: _,
614 name,
615 schedule: schedule_expr,
616 tz,
617 clear_tz,
618 command,
619 prompt,
620 } = body;
621 let timezone_patch = match parse_timezone_patch(tz, clear_tz) {
622 Ok(patch) => patch,
623 Err(e) => return e.into_response(),
624 };
625
626 let existing = match zeroclaw_runtime::cron::get_job(&config, &id) {
627 Ok(j) => j,
628 Err(e) => {
629 return (
630 StatusCode::NOT_FOUND,
631 Json(serde_json::json!({"error": format!("Cron job not found: {e}")})),
632 )
633 .into_response();
634 }
635 };
636 let new_expr = schedule_expr
637 .as_deref()
638 .map(str::trim)
639 .filter(|expr| !expr.is_empty())
640 .map(str::to_string);
641 let timezone_changed = !matches!(timezone_patch, CronTimezonePatch::Preserve);
642 let schedule = if new_expr.is_some() || timezone_changed {
643 let (expr, existing_tz) = match (&existing.schedule, new_expr) {
644 (_, Some(expr)) => {
645 let existing_tz = match &existing.schedule {
646 zeroclaw_runtime::cron::Schedule::Cron { tz, .. } => tz.clone(),
647 _ => None,
648 };
649 (expr, existing_tz)
650 }
651 (zeroclaw_runtime::cron::Schedule::Cron { expr, tz }, None) => {
652 (expr.clone(), tz.clone())
653 }
654 (_, None) => {
655 return bad_request("tz can only be updated on cron schedules").into_response();
656 }
657 };
658 let tz = match timezone_patch {
659 CronTimezonePatch::Preserve => existing_tz,
660 CronTimezonePatch::Set(tz) => Some(tz),
661 CronTimezonePatch::Clear => None,
662 };
663 match cron_schedule_from_api(expr, tz) {
664 Ok(schedule) => Some(schedule),
665 Err(e) => return e.into_response(),
666 }
667 } else {
668 None
669 };
670 let is_agent = matches!(existing.job_type, zeroclaw_runtime::cron::JobType::Agent);
671 let (patch_command, patch_prompt) = if is_agent {
672 (None, command.or(prompt))
673 } else {
674 (command.or(prompt), None)
675 };
676
677 let patch = zeroclaw_runtime::cron::CronJobPatch {
678 name,
679 schedule,
680 command: patch_command,
681 prompt: patch_prompt,
682 ..zeroclaw_runtime::cron::CronJobPatch::default()
683 };
684
685 match zeroclaw_runtime::cron::update_shell_job_with_approval(
686 &config,
687 &agent_alias,
688 &id,
689 patch,
690 false,
691 ) {
692 Ok(job) => Json(serde_json::json!({"status": "ok", "job": job})).into_response(),
693 Err(e) => (
694 StatusCode::INTERNAL_SERVER_ERROR,
695 Json(serde_json::json!({"error": format!("Failed to update cron job: {e}")})),
696 )
697 .into_response(),
698 }
699}
700
701pub async fn handle_api_cron_delete(
703 State(state): State<AppState>,
704 headers: HeaderMap,
705 Path(id): Path<String>,
706) -> impl IntoResponse {
707 if let Err(e) = require_auth(&state, &headers) {
708 return e.into_response();
709 }
710
711 let config = state.config.read().clone();
712 match zeroclaw_runtime::cron::remove_job(&config, &id) {
713 Ok(()) => Json(serde_json::json!({"status": "ok"})).into_response(),
714 Err(e) => (
715 StatusCode::INTERNAL_SERVER_ERROR,
716 Json(serde_json::json!({"error": format!("Failed to remove cron job: {e}")})),
717 )
718 .into_response(),
719 }
720}
721
722pub async fn handle_api_cron_settings_get(
724 State(state): State<AppState>,
725 headers: HeaderMap,
726) -> impl IntoResponse {
727 if let Err(e) = require_auth(&state, &headers) {
728 return e.into_response();
729 }
730
731 let config = state.config.read().clone();
732 Json(serde_json::json!({
733 "enabled": config.scheduler.enabled,
734 "catch_up_on_startup": config.scheduler.catch_up_on_startup,
735 "max_run_history": config.scheduler.max_run_history,
736 }))
737 .into_response()
738}
739
740pub async fn handle_api_cron_settings_patch(
742 State(state): State<AppState>,
743 headers: HeaderMap,
744 Json(body): Json<serde_json::Value>,
745) -> impl IntoResponse {
746 if let Err(e) = require_auth(&state, &headers) {
747 return e.into_response();
748 }
749
750 let mut config = state.config.read().clone();
751
752 if let Some(v) = body.get("enabled").and_then(|v| v.as_bool()) {
753 config.scheduler.enabled = v;
754 config.mark_dirty("scheduler.enabled");
755 }
756 if let Some(v) = body.get("catch_up_on_startup").and_then(|v| v.as_bool()) {
757 config.scheduler.catch_up_on_startup = v;
758 config.mark_dirty("scheduler.catch-up-on-startup");
759 }
760 if let Some(v) = body.get("max_run_history").and_then(|v| v.as_u64()) {
761 config.scheduler.max_run_history = u32::try_from(v).unwrap_or(u32::MAX);
762 config.mark_dirty("scheduler.max-run-history");
763 }
764
765 if let Err(e) = config.save_dirty().await {
766 return (
767 StatusCode::INTERNAL_SERVER_ERROR,
768 Json(serde_json::json!({"error": format!("Failed to save config: {e}")})),
769 )
770 .into_response();
771 }
772
773 *state.config.write() = config.clone();
774
775 Json(serde_json::json!({
776 "status": "ok",
777 "enabled": config.scheduler.enabled,
778 "catch_up_on_startup": config.scheduler.catch_up_on_startup,
779 "max_run_history": config.scheduler.max_run_history,
780 }))
781 .into_response()
782}
783
784pub async fn handle_api_integrations(
786 State(state): State<AppState>,
787 headers: HeaderMap,
788) -> impl IntoResponse {
789 if let Err(e) = require_auth(&state, &headers) {
790 return e.into_response();
791 }
792
793 let config = state.config.read().clone();
794 let entries = zeroclaw_runtime::integrations::registry::all_integrations(&config);
795
796 let integrations: Vec<serde_json::Value> = entries
797 .iter()
798 .map(|entry| {
799 serde_json::json!({
800 "name": entry.name,
801 "description": entry.description,
802 "category": entry.category,
803 "status": entry.status,
804 })
805 })
806 .collect();
807
808 Json(serde_json::json!({"integrations": integrations})).into_response()
809}
810
811pub async fn handle_api_integrations_settings(
813 State(state): State<AppState>,
814 headers: HeaderMap,
815) -> impl IntoResponse {
816 if let Err(e) = require_auth(&state, &headers) {
817 return e.into_response();
818 }
819
820 let config = state.config.read().clone();
821 let entries = zeroclaw_runtime::integrations::registry::all_integrations(&config);
822
823 let mut settings = serde_json::Map::new();
824 for entry in &entries {
825 let enabled = matches!(
826 entry.status,
827 zeroclaw_runtime::integrations::IntegrationStatus::Active
828 );
829 settings.insert(
830 entry.name.clone(),
831 serde_json::json!({
832 "enabled": enabled,
833 "category": entry.category,
834 "status": entry.status,
835 }),
836 );
837 }
838
839 Json(serde_json::json!({"settings": settings})).into_response()
840}
841
842pub async fn handle_api_doctor(
844 State(state): State<AppState>,
845 headers: HeaderMap,
846) -> impl IntoResponse {
847 if let Err(e) = require_auth(&state, &headers) {
848 return e.into_response();
849 }
850
851 let config = state.config.read().clone();
852 let results = zeroclaw_runtime::doctor::diagnose(&config);
853
854 let ok_count = results
855 .iter()
856 .filter(|r| r.severity == zeroclaw_runtime::doctor::Severity::Ok)
857 .count();
858 let warn_count = results
859 .iter()
860 .filter(|r| r.severity == zeroclaw_runtime::doctor::Severity::Warn)
861 .count();
862 let error_count = results
863 .iter()
864 .filter(|r| r.severity == zeroclaw_runtime::doctor::Severity::Error)
865 .count();
866
867 Json(serde_json::json!({
868 "results": results,
869 "summary": {
870 "ok": ok_count,
871 "warnings": warn_count,
872 "errors": error_count,
873 }
874 }))
875 .into_response()
876}
877
878async fn resolve_memory_handle(
885 state: &AppState,
886 agent_alias: Option<&str>,
887) -> Result<std::sync::Arc<dyn zeroclaw_memory::Memory>, (StatusCode, Json<serde_json::Value>)> {
888 let alias = match agent_alias.map(str::trim).filter(|s| !s.is_empty()) {
889 Some(a) => a,
890 None => return Ok(state.mem.clone()),
891 };
892 let config = state.config.read().clone();
893 if config.agent(alias).is_none() {
894 return Err((
895 StatusCode::BAD_REQUEST,
896 Json(serde_json::json!({"error": format!(
897 "Unknown agent {alias:?} (no [agents.{alias}] entry configured)"
898 )})),
899 ));
900 }
901 let api_key = config
902 .resolved_model_provider_for_agent(alias)
903 .and_then(|(_, _, cfg)| cfg.api_key.clone());
904 zeroclaw_memory::create_memory_for_agent(&config, alias, api_key.as_deref())
905 .await
906 .map_err(|e| {
907 (
908 StatusCode::INTERNAL_SERVER_ERROR,
909 Json(
910 serde_json::json!({"error": format!("Failed to build per-agent memory: {e:#}")}),
911 ),
912 )
913 })
914}
915
916pub async fn handle_api_memory_list(
918 State(state): State<AppState>,
919 headers: HeaderMap,
920 Query(params): Query<MemoryQuery>,
921) -> impl IntoResponse {
922 if let Err(e) = require_auth(&state, &headers) {
923 return e.into_response();
924 }
925
926 let mem = match resolve_memory_handle(&state, params.agent.as_deref()).await {
927 Ok(m) => m,
928 Err(e) => return e.into_response(),
929 };
930
931 if params.query.is_some() || params.since.is_some() || params.until.is_some() {
933 let query = params.query.as_deref().unwrap_or("");
934 let since = params.since.as_deref();
935 let until = params.until.as_deref();
936 match mem.recall(query, 50, None, since, until).await {
942 Ok(entries) => {
943 let entries = match params.category.as_deref() {
944 Some(cat) => entries
945 .into_iter()
946 .filter(|e| e.category.to_string() == cat)
947 .collect(),
948 None => entries,
949 };
950 Json(serde_json::json!({"entries": entries})).into_response()
951 }
952 Err(e) => (
953 StatusCode::INTERNAL_SERVER_ERROR,
954 Json(serde_json::json!({"error": format!("Memory recall failed: {e}")})),
955 )
956 .into_response(),
957 }
958 } else {
959 let category = params.category.as_deref().map(|cat| match cat {
961 "core" => zeroclaw_memory::MemoryCategory::Core,
962 "daily" => zeroclaw_memory::MemoryCategory::Daily,
963 "conversation" => zeroclaw_memory::MemoryCategory::Conversation,
964 other => zeroclaw_memory::MemoryCategory::Custom(other.to_string()),
965 });
966
967 match mem.list(category.as_ref(), None).await {
968 Ok(entries) => Json(serde_json::json!({"entries": entries})).into_response(),
969 Err(e) => (
970 StatusCode::INTERNAL_SERVER_ERROR,
971 Json(serde_json::json!({"error": format!("Memory list failed: {e}")})),
972 )
973 .into_response(),
974 }
975 }
976}
977
978pub async fn handle_api_memory_store(
980 State(state): State<AppState>,
981 headers: HeaderMap,
982 Json(body): Json<MemoryStoreBody>,
983) -> impl IntoResponse {
984 if let Err(e) = require_auth(&state, &headers) {
985 return e.into_response();
986 }
987
988 let category = body
989 .category
990 .as_deref()
991 .map(|cat| match cat {
992 "core" => zeroclaw_memory::MemoryCategory::Core,
993 "daily" => zeroclaw_memory::MemoryCategory::Daily,
994 "conversation" => zeroclaw_memory::MemoryCategory::Conversation,
995 other => zeroclaw_memory::MemoryCategory::Custom(other.to_string()),
996 })
997 .unwrap_or(zeroclaw_memory::MemoryCategory::Core);
998
999 let mem = match resolve_memory_handle(&state, body.agent.as_deref()).await {
1000 Ok(m) => m,
1001 Err(e) => return e.into_response(),
1002 };
1003
1004 match mem.store(&body.key, &body.content, category, None).await {
1005 Ok(()) => Json(serde_json::json!({"status": "ok"})).into_response(),
1006 Err(e) => (
1007 StatusCode::INTERNAL_SERVER_ERROR,
1008 Json(serde_json::json!({"error": format!("Memory store failed: {e}")})),
1009 )
1010 .into_response(),
1011 }
1012}
1013
1014pub async fn handle_api_memory_delete(
1016 State(state): State<AppState>,
1017 headers: HeaderMap,
1018 Path(key): Path<String>,
1019 Query(query): Query<MemoryDeleteQuery>,
1020) -> impl IntoResponse {
1021 if let Err(e) = require_auth(&state, &headers) {
1022 return e.into_response();
1023 }
1024
1025 let mem = match resolve_memory_handle(&state, query.agent.as_deref()).await {
1026 Ok(m) => m,
1027 Err(e) => return e.into_response(),
1028 };
1029
1030 match mem.forget(&key).await {
1031 Ok(deleted) => {
1032 Json(serde_json::json!({"status": "ok", "deleted": deleted})).into_response()
1033 }
1034 Err(e) => (
1035 StatusCode::INTERNAL_SERVER_ERROR,
1036 Json(serde_json::json!({"error": format!("Memory forget failed: {e}")})),
1037 )
1038 .into_response(),
1039 }
1040}
1041
1042#[derive(Debug, Deserialize)]
1045pub struct CostQuery {
1046 #[serde(default)]
1047 pub agent: Option<String>,
1048 #[serde(default)]
1052 pub from: Option<String>,
1053 #[serde(default)]
1054 pub to: Option<String>,
1055}
1056
1057pub async fn handle_api_cost(
1062 State(state): State<AppState>,
1063 headers: HeaderMap,
1064 Query(query): Query<CostQuery>,
1065) -> impl IntoResponse {
1066 if let Err(e) = require_auth(&state, &headers) {
1067 return e.into_response();
1068 }
1069
1070 let parse_bound = |s: &str| {
1071 chrono::DateTime::parse_from_rfc3339(s)
1072 .ok()
1073 .map(|d| d.with_timezone(&chrono::Utc))
1074 };
1075 let from = query.from.as_deref().and_then(parse_bound);
1076 let to = query.to.as_deref().and_then(parse_bound);
1077
1078 if let Some(ref tracker) = state.cost_tracker {
1079 let result = match query.agent.as_deref().filter(|s| !s.is_empty()) {
1080 Some(alias) => tracker.get_summary_for_agent(alias),
1081 None => tracker.get_summary_in_bounds(from, to),
1082 };
1083 match result {
1084 Ok(summary) => Json(serde_json::json!({"cost": summary})).into_response(),
1085 Err(e) => (
1086 StatusCode::INTERNAL_SERVER_ERROR,
1087 Json(serde_json::json!({"error": format!("Cost summary failed: {e}")})),
1088 )
1089 .into_response(),
1090 }
1091 } else {
1092 Json(serde_json::json!({
1093 "cost": {
1094 "session_cost_usd": 0.0,
1095 "daily_cost_usd": 0.0,
1096 "monthly_cost_usd": 0.0,
1097 "total_tokens": 0,
1098 "request_count": 0,
1099 "by_model": {},
1100 "by_agent": {},
1101 }
1102 }))
1103 .into_response()
1104 }
1105}
1106
1107pub async fn handle_api_cli_tools(
1109 State(state): State<AppState>,
1110 headers: HeaderMap,
1111) -> impl IntoResponse {
1112 if let Err(e) = require_auth(&state, &headers) {
1113 return e.into_response();
1114 }
1115
1116 let tools = zeroclaw_tools::cli_discovery::discover_cli_tools(&[], &[]);
1117
1118 Json(serde_json::json!({"cli_tools": tools})).into_response()
1119}
1120
1121pub async fn handle_api_channels(
1123 State(state): State<AppState>,
1124 headers: HeaderMap,
1125) -> impl IntoResponse {
1126 if let Err(e) = require_auth(&state, &headers) {
1127 return e.into_response();
1128 }
1129
1130 let config = state.config.read().clone();
1131 let health = zeroclaw_runtime::health::snapshot();
1132 let channels: Vec<serde_json::Value> = config
1135 .channels_by_alias()
1136 .into_iter()
1137 .map(|info| {
1138 let composite = format!("{}.{}", info.channel_type, info.alias);
1139 let readiness = channel_readiness(&config, &info, &health, &state);
1140 let (status, health_status) = channel_readiness_summary(&readiness);
1141 serde_json::json!({
1142 "name": composite,
1143 "type": info.channel_type,
1144 "alias": info.alias,
1145 "owning_agent": info.owning_agent,
1146 "enabled": info.enabled,
1147 "status": status,
1148 "message_count": 0,
1149 "last_message_at": null,
1150 "health": health_status,
1151 "readiness": readiness,
1152 })
1153 })
1154 .collect();
1155
1156 Json(serde_json::json!({ "channels": channels })).into_response()
1157}
1158
1159#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
1160#[serde(rename_all = "snake_case")]
1161enum ChannelReadinessState {
1162 Ready,
1163 Missing,
1164 Unknown,
1165}
1166
1167const CHANNEL_LISTENER_HEALTH_MAX_AGE_SECS: i64 = 30;
1168
1169#[derive(Debug, Clone, Serialize)]
1170struct ChannelReadiness {
1171 enabled: ChannelReadinessState,
1172 bound_to_agent: ChannelReadinessState,
1173 authenticated: ChannelReadinessState,
1174 listening: ChannelReadinessState,
1175 requirements: Vec<String>,
1176 notes: Vec<String>,
1177}
1178
1179fn channel_readiness(
1180 config: &zeroclaw_config::schema::Config,
1181 info: &zeroclaw_config::schema::ChannelAliasInfo,
1182 health: &zeroclaw_runtime::health::HealthSnapshot,
1183 state: &AppState,
1184) -> ChannelReadiness {
1185 let mut readiness = ChannelReadiness {
1186 enabled: if info.enabled {
1187 ChannelReadinessState::Ready
1188 } else {
1189 ChannelReadinessState::Missing
1190 },
1191 bound_to_agent: if info.owning_agent.is_some() {
1192 ChannelReadinessState::Ready
1193 } else {
1194 ChannelReadinessState::Missing
1195 },
1196 authenticated: ChannelReadinessState::Unknown,
1197 listening: ChannelReadinessState::Unknown,
1198 requirements: Vec::new(),
1199 notes: Vec::new(),
1200 };
1201
1202 if readiness.enabled == ChannelReadinessState::Missing {
1203 readiness
1204 .requirements
1205 .push("Enable this channel alias.".to_string());
1206 }
1207 if readiness.bound_to_agent == ChannelReadinessState::Missing {
1208 readiness
1209 .requirements
1210 .push("Bind this channel to an enabled agent.".to_string());
1211 }
1212
1213 if readiness.enabled == ChannelReadinessState::Ready
1214 && readiness.bound_to_agent == ChannelReadinessState::Ready
1215 {
1216 if info.channel_type == "webhook" {
1217 apply_webhook_readiness(config, &info.alias, health, state, &mut readiness);
1218 } else {
1219 readiness.notes.push(format!(
1220 "Live readiness is not checked for `{}` channels yet.",
1221 info.channel_type
1222 ));
1223 }
1224 }
1225
1226 readiness
1227}
1228
1229fn channel_readiness_summary(readiness: &ChannelReadiness) -> (&'static str, &'static str) {
1230 if readiness.enabled == ChannelReadinessState::Missing
1231 || readiness.bound_to_agent == ChannelReadinessState::Missing
1232 {
1233 return ("inactive", "degraded");
1234 }
1235
1236 if readiness.authenticated == ChannelReadinessState::Unknown
1237 && readiness.listening == ChannelReadinessState::Unknown
1238 {
1239 return ("unknown", "degraded");
1240 }
1241
1242 if readiness.authenticated == ChannelReadinessState::Ready
1243 && readiness.listening == ChannelReadinessState::Ready
1244 {
1245 ("active", "healthy")
1246 } else {
1247 ("error", "down")
1248 }
1249}
1250
1251fn apply_webhook_readiness(
1252 config: &zeroclaw_config::schema::Config,
1253 alias: &str,
1254 health: &zeroclaw_runtime::health::HealthSnapshot,
1255 state: &AppState,
1256 readiness: &mut ChannelReadiness,
1257) {
1258 let Some(webhook) = config.channels.webhook.get(alias) else {
1259 readiness.authenticated = ChannelReadinessState::Missing;
1260 readiness.listening = ChannelReadinessState::Missing;
1261 readiness
1262 .requirements
1263 .push("Webhook config block is missing.".to_string());
1264 return;
1265 };
1266
1267 if state.pairing.require_pairing() && !state.pairing.is_paired() {
1268 readiness.authenticated = ChannelReadinessState::Missing;
1269 readiness
1270 .requirements
1271 .push("Pair the gateway before using the webhook endpoint.".to_string());
1272 } else {
1273 readiness.authenticated = ChannelReadinessState::Ready;
1274 }
1275
1276 let component = format!("channel:webhook.{alias}");
1277 let component_health = health.components.get(&component);
1278 let component_status = component_health.map(|component| component.status.as_str());
1279 let supervised_listener_ok = component_health.is_some_and(component_health_ok_and_fresh);
1280 let listen_path = normalized_webhook_path(webhook.listen_path.as_deref());
1281
1282 if supervised_listener_ok {
1283 readiness.listening = ChannelReadinessState::Ready;
1284 } else if component_status == Some("error") {
1285 readiness.listening = ChannelReadinessState::Missing;
1286 readiness.requirements.push(format!(
1287 "Resolve the listener error for `webhook.{alias}` before using this channel."
1288 ));
1289 } else {
1290 readiness.listening = ChannelReadinessState::Missing;
1291 readiness.requirements.push(format!(
1292 "Start a channel listener for `webhook.{alias}` on port {}{}.",
1293 webhook.port, listen_path
1294 ));
1295 }
1296}
1297
1298fn component_health_ok_and_fresh(component: &zeroclaw_runtime::health::ComponentHealth) -> bool {
1299 if component.status != "ok" {
1300 return false;
1301 }
1302
1303 let Ok(updated_at) = chrono::DateTime::parse_from_rfc3339(&component.updated_at) else {
1304 return false;
1305 };
1306 let age = chrono::Utc::now().signed_duration_since(updated_at.with_timezone(&chrono::Utc));
1307 age >= chrono::Duration::zero()
1308 && age <= chrono::Duration::seconds(CHANNEL_LISTENER_HEALTH_MAX_AGE_SECS)
1309}
1310
1311fn normalized_webhook_path(path: Option<&str>) -> String {
1312 let trimmed = path.unwrap_or("/webhook").trim();
1313 if trimmed.is_empty() {
1314 "/webhook".to_string()
1315 } else if trimmed.starts_with('/') {
1316 trimmed.to_string()
1317 } else {
1318 format!("/{trimmed}")
1319 }
1320}
1321
1322pub async fn handle_api_health(
1324 State(state): State<AppState>,
1325 headers: HeaderMap,
1326) -> impl IntoResponse {
1327 if let Err(e) = require_auth(&state, &headers) {
1328 return e.into_response();
1329 }
1330
1331 let snapshot = zeroclaw_runtime::health::snapshot();
1332 Json(serde_json::json!({"health": snapshot})).into_response()
1333}
1334
1335pub async fn handle_api_sessions_list(
1341 State(state): State<AppState>,
1342 headers: HeaderMap,
1343) -> impl IntoResponse {
1344 if let Err(e) = require_auth(&state, &headers) {
1345 return e.into_response();
1346 }
1347
1348 let Some(ref backend) = state.session_backend else {
1349 return Json(serde_json::json!({
1350 "sessions": [],
1351 "message": "Session persistence is disabled"
1352 }))
1353 .into_response();
1354 };
1355
1356 let config = state.config.read().clone();
1360 let all_metadata = backend.list_sessions_with_metadata();
1361 let sessions: Vec<serde_json::Value> = all_metadata
1362 .into_iter()
1363 .filter(|meta| meta.agent_alias.is_some() || meta.channel_id.is_some())
1364 .map(|meta| {
1365 let agent_alias = meta.agent_alias.clone().or_else(|| {
1369 meta.channel_id
1370 .as_deref()
1371 .and_then(|c| config.agent_for_channel(c))
1372 .map(str::to_string)
1373 });
1374 let session_id = meta
1377 .key
1378 .strip_prefix("gw_")
1379 .map(str::to_string)
1380 .unwrap_or_else(|| meta.key.clone());
1381 let mut entry = serde_json::json!({
1382 "session_id": session_id,
1385 "session_key": meta.key.clone(),
1387 "created_at": meta.created_at.to_rfc3339(),
1388 "last_activity": meta.last_activity.to_rfc3339(),
1389 "message_count": meta.message_count,
1390 "agent_alias": agent_alias,
1391 "channel_id": meta.channel_id,
1392 });
1393 if let Some(name) = meta.name {
1394 entry["name"] = serde_json::Value::String(name);
1395 }
1396 entry
1397 })
1398 .collect();
1399
1400 Json(serde_json::json!({ "sessions": sessions })).into_response()
1401}
1402
1403pub async fn handle_api_session_messages(
1405 State(state): State<AppState>,
1406 headers: HeaderMap,
1407 Path(id): Path<String>,
1408) -> impl IntoResponse {
1409 if let Err(e) = require_auth(&state, &headers) {
1410 return e.into_response();
1411 }
1412
1413 let Some(ref backend) = state.session_backend else {
1414 return Json(serde_json::json!({
1415 "session_id": id,
1416 "messages": [],
1417 "session_persistence": false,
1418 }))
1419 .into_response();
1420 };
1421
1422 let session_key = if id.starts_with("gw_") || id.contains('_') {
1426 id.clone()
1427 } else {
1428 format!("gw_{id}")
1429 };
1430 let msgs = backend.load_with_timestamps(&session_key);
1431 let messages: Vec<serde_json::Value> = msgs
1432 .into_iter()
1433 .map(|m| {
1434 serde_json::json!({
1435 "role": m.message.role,
1436 "content": m.message.content,
1437 "created_at": m.created_at.map(|dt| dt.to_rfc3339()),
1438 })
1439 })
1440 .collect();
1441
1442 Json(serde_json::json!({
1443 "session_id": id,
1444 "messages": messages,
1445 "session_persistence": true,
1446 }))
1447 .into_response()
1448}
1449
1450pub async fn handle_api_session_message_post(
1452 State(state): State<AppState>,
1453 headers: HeaderMap,
1454 Path(id): Path<String>,
1455 Json(body): Json<SessionMessagePostBody>,
1456) -> impl IntoResponse {
1457 if let Err(e) = require_auth(&state, &headers) {
1458 return e.into_response();
1459 }
1460
1461 if body.content.trim().is_empty() {
1462 return (
1463 StatusCode::BAD_REQUEST,
1464 Json(serde_json::json!({"error": "content is required"})),
1465 )
1466 .into_response();
1467 }
1468
1469 let Some(ref backend) = state.session_backend else {
1470 return (
1471 StatusCode::SERVICE_UNAVAILABLE,
1472 Json(serde_json::json!({"error": "Session persistence is disabled"})),
1473 )
1474 .into_response();
1475 };
1476
1477 let session_key = format!("gw_{id}");
1478 if !backend
1479 .list_sessions()
1480 .iter()
1481 .any(|key| key == &session_key)
1482 {
1483 return (
1484 StatusCode::NOT_FOUND,
1485 Json(serde_json::json!({"error": "Session not found"})),
1486 )
1487 .into_response();
1488 }
1489
1490 let _session_guard = match state.session_queue.acquire(&session_key).await {
1491 Ok(guard) => guard,
1492 Err(crate::session_queue::SessionQueueError::QueueFull { .. }) => {
1493 return (
1494 StatusCode::TOO_MANY_REQUESTS,
1495 Json(serde_json::json!({"error": "Session queue is full"})),
1496 )
1497 .into_response();
1498 }
1499 Err(crate::session_queue::SessionQueueError::Timeout { .. }) => {
1500 return (
1501 StatusCode::REQUEST_TIMEOUT,
1502 Json(serde_json::json!({"error": "Timed out waiting for session queue"})),
1503 )
1504 .into_response();
1505 }
1506 };
1507
1508 let message = zeroclaw_providers::ChatMessage::assistant(&body.content);
1509 if let Err(e) = backend.append(&session_key, &message) {
1510 return (
1511 StatusCode::INTERNAL_SERVER_ERROR,
1512 Json(serde_json::json!({"error": format!("Failed to append session message: {e}")})),
1513 )
1514 .into_response();
1515 }
1516
1517 let event = serde_json::json!({
1520 "type": "message",
1521 "session_id": id.clone(),
1522 "role": "assistant",
1523 "content": body.content.clone(),
1524 "source": "api",
1525 "timestamp": chrono::Utc::now().to_rfc3339(),
1526 });
1527 let _ = state.event_tx.send(event);
1528
1529 Json(serde_json::json!({
1530 "status": "ok",
1531 "session_id": id,
1532 "message": {
1533 "role": "assistant",
1534 "content": message.content,
1535 },
1536 "session_persistence": true,
1537 }))
1538 .into_response()
1539}
1540
1541pub async fn handle_api_session_delete(
1543 State(state): State<AppState>,
1544 headers: HeaderMap,
1545 Path(id): Path<String>,
1546) -> impl IntoResponse {
1547 if let Err(e) = require_auth(&state, &headers) {
1548 return e.into_response();
1549 }
1550
1551 let Some(ref backend) = state.session_backend else {
1552 return (
1553 StatusCode::NOT_FOUND,
1554 Json(serde_json::json!({"error": "Session persistence is disabled"})),
1555 )
1556 .into_response();
1557 };
1558
1559 let session_key = if id.starts_with("gw_") || id.contains('_') {
1560 id.clone()
1561 } else {
1562 format!("gw_{id}")
1563 };
1564
1565 let token = state
1572 .cancel_tokens
1573 .lock()
1574 .expect("cancel_tokens lock poisoned")
1575 .remove(&session_key);
1576 if let Some(token) = token {
1577 token.cancel();
1578 ::zeroclaw_log::record!(
1579 INFO,
1580 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1581 .with_attrs(::serde_json::json!({"session_key": session_key})),
1582 "cancelled in-flight turn for deleted session"
1583 );
1584 }
1585
1586 match backend.delete_session(&session_key) {
1587 Ok(true) => Json(serde_json::json!({"deleted": true, "session_id": id})).into_response(),
1588 Ok(false) => (
1589 StatusCode::NOT_FOUND,
1590 Json(serde_json::json!({"error": "Session not found"})),
1591 )
1592 .into_response(),
1593 Err(e) => (
1594 StatusCode::INTERNAL_SERVER_ERROR,
1595 Json(serde_json::json!({"error": format!("Failed to delete session: {e}")})),
1596 )
1597 .into_response(),
1598 }
1599}
1600
1601pub async fn handle_api_session_rename(
1603 State(state): State<AppState>,
1604 headers: HeaderMap,
1605 Path(id): Path<String>,
1606 Json(body): Json<serde_json::Value>,
1607) -> impl IntoResponse {
1608 if let Err(e) = require_auth(&state, &headers) {
1609 return e.into_response();
1610 }
1611
1612 let Some(ref backend) = state.session_backend else {
1613 return (
1614 StatusCode::NOT_FOUND,
1615 Json(serde_json::json!({"error": "Session persistence is disabled"})),
1616 )
1617 .into_response();
1618 };
1619
1620 let name = body["name"].as_str().unwrap_or("").trim();
1621 if name.is_empty() {
1622 return (
1623 StatusCode::BAD_REQUEST,
1624 Json(serde_json::json!({"error": "name is required"})),
1625 )
1626 .into_response();
1627 }
1628
1629 let session_key = format!("gw_{id}");
1630
1631 let sessions = backend.list_sessions();
1633 if !sessions.contains(&session_key) {
1634 return (
1635 StatusCode::NOT_FOUND,
1636 Json(serde_json::json!({"error": "Session not found"})),
1637 )
1638 .into_response();
1639 }
1640
1641 match backend.set_session_name(&session_key, name) {
1642 Ok(()) => Json(serde_json::json!({"session_id": id, "name": name})).into_response(),
1643 Err(e) => (
1644 StatusCode::INTERNAL_SERVER_ERROR,
1645 Json(serde_json::json!({"error": format!("Failed to rename session: {e}")})),
1646 )
1647 .into_response(),
1648 }
1649}
1650
1651pub async fn handle_api_sessions_running(
1653 State(state): State<AppState>,
1654 headers: HeaderMap,
1655) -> impl IntoResponse {
1656 if let Err(e) = require_auth(&state, &headers) {
1657 return e.into_response();
1658 }
1659
1660 let Some(ref backend) = state.session_backend else {
1661 return Json(serde_json::json!({
1662 "sessions": [],
1663 "message": "Session persistence is disabled"
1664 }))
1665 .into_response();
1666 };
1667
1668 let running = backend.list_running_sessions();
1669 let sessions: Vec<serde_json::Value> = running
1670 .into_iter()
1671 .filter_map(|meta| {
1672 let session_id = meta.key.strip_prefix("gw_")?;
1673 Some(serde_json::json!({
1674 "session_id": session_id,
1675 "created_at": meta.created_at.to_rfc3339(),
1676 "last_activity": meta.last_activity.to_rfc3339(),
1677 "message_count": meta.message_count,
1678 }))
1679 })
1680 .collect();
1681
1682 Json(serde_json::json!({ "sessions": sessions })).into_response()
1683}
1684
1685pub async fn handle_api_session_state(
1687 State(state): State<AppState>,
1688 headers: HeaderMap,
1689 Path(id): Path<String>,
1690) -> impl IntoResponse {
1691 if let Err(e) = require_auth(&state, &headers) {
1692 return e.into_response();
1693 }
1694
1695 let Some(ref backend) = state.session_backend else {
1696 return (
1697 StatusCode::NOT_FOUND,
1698 Json(serde_json::json!({"error": "Session persistence is disabled"})),
1699 )
1700 .into_response();
1701 };
1702
1703 let session_key = format!("gw_{id}");
1704 match backend.get_session_state(&session_key) {
1705 Ok(Some(ss)) => {
1706 let mut resp = serde_json::json!({
1707 "session_id": id,
1708 "state": ss.state,
1709 });
1710 if let Some(turn_id) = ss.turn_id {
1711 resp["turn_id"] = serde_json::Value::String(turn_id);
1712 }
1713 if let Some(started) = ss.turn_started_at {
1714 resp["turn_started_at"] = serde_json::Value::String(started.to_rfc3339());
1715 }
1716 Json(resp).into_response()
1717 }
1718 Ok(None) => (
1719 StatusCode::NOT_FOUND,
1720 Json(serde_json::json!({"error": "Session not found"})),
1721 )
1722 .into_response(),
1723 Err(e) => (
1724 StatusCode::INTERNAL_SERVER_ERROR,
1725 Json(serde_json::json!({"error": format!("Failed to get session state: {e}")})),
1726 )
1727 .into_response(),
1728 }
1729}
1730
1731pub async fn handle_api_session_abort(
1745 State(state): State<AppState>,
1746 headers: HeaderMap,
1747 Path(id): Path<String>,
1748) -> impl IntoResponse {
1749 if let Err(e) = require_auth(&state, &headers) {
1750 return e.into_response();
1751 }
1752
1753 let session_key = format!("gw_{id}");
1754
1755 let token = state
1758 .cancel_tokens
1759 .lock()
1760 .expect("cancel_tokens lock poisoned")
1761 .get(&session_key)
1762 .cloned();
1763
1764 if let Some(token) = token {
1765 token.cancel();
1766 ::zeroclaw_log::record!(
1767 INFO,
1768 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1769 .with_attrs(::serde_json::json!({"session_key": session_key})),
1770 "session abort requested"
1771 );
1772 Json(serde_json::json!({ "status": "aborted" })).into_response()
1773 } else {
1774 Json(serde_json::json!({ "status": "no_active_response" })).into_response()
1775 }
1776}
1777
1778pub async fn handle_claude_code_hook(
1787 State(state): State<AppState>,
1788 Json(payload): Json<zeroclaw_tools::claude_code_runner::ClaudeCodeHookEvent>,
1789) -> impl IntoResponse {
1790 let _ = &state; ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"session_id": payload.session_id, "event_type": payload.event_type, "tool_name": payload.tool_name, "summary": payload.summary})), "Claude Code hook event received");
1796
1797 Json(serde_json::json!({ "ok": true }))
1798}
1799
1800#[cfg(test)]
1801mod tests {
1802 use super::*;
1803 use crate::{AppState, GatewayRateLimiter, IdempotencyStore, nodes};
1804 use async_trait::async_trait;
1805 use axum::response::IntoResponse;
1806 use http_body_util::BodyExt;
1807 use parking_lot::RwLock;
1808 use std::sync::Arc;
1809 use std::time::Duration;
1810 use zeroclaw_infra::session_backend::SessionBackend;
1811 use zeroclaw_infra::session_store::SessionStore;
1812 use zeroclaw_memory::{Memory, MemoryCategory, MemoryEntry};
1813 use zeroclaw_providers::ModelProvider;
1814 use zeroclaw_runtime::security::pairing::PairingGuard;
1815
1816 struct MockMemory;
1817
1818 #[async_trait]
1819 impl Memory for MockMemory {
1820 fn name(&self) -> &str {
1821 "mock"
1822 }
1823
1824 async fn store(
1825 &self,
1826 _key: &str,
1827 _content: &str,
1828 _category: MemoryCategory,
1829 _session_id: Option<&str>,
1830 ) -> anyhow::Result<()> {
1831 Ok(())
1832 }
1833
1834 async fn recall(
1835 &self,
1836 _query: &str,
1837 _limit: usize,
1838 _session_id: Option<&str>,
1839 _since: Option<&str>,
1840 _until: Option<&str>,
1841 ) -> anyhow::Result<Vec<MemoryEntry>> {
1842 Ok(Vec::new())
1843 }
1844
1845 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
1846 Ok(None)
1847 }
1848
1849 async fn list(
1850 &self,
1851 _category: Option<&MemoryCategory>,
1852 _session_id: Option<&str>,
1853 ) -> anyhow::Result<Vec<MemoryEntry>> {
1854 Ok(Vec::new())
1855 }
1856
1857 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
1858 Ok(false)
1859 }
1860
1861 async fn forget_for_agent(&self, _key: &str, _agent_id: &str) -> anyhow::Result<bool> {
1862 Ok(false)
1863 }
1864
1865 async fn count(&self) -> anyhow::Result<usize> {
1866 Ok(0)
1867 }
1868
1869 async fn health_check(&self) -> bool {
1870 true
1871 }
1872
1873 async fn store_with_agent(
1874 &self,
1875 _key: &str,
1876 _content: &str,
1877 _category: MemoryCategory,
1878 _session_id: Option<&str>,
1879 _namespace: Option<&str>,
1880 _importance: Option<f64>,
1881 _agent_id: Option<&str>,
1882 ) -> anyhow::Result<()> {
1883 Ok(())
1884 }
1885
1886 async fn recall_for_agents(
1887 &self,
1888 _allowed_agent_ids: &[&str],
1889 _query: &str,
1890 _limit: usize,
1891 _session_id: Option<&str>,
1892 _since: Option<&str>,
1893 _until: Option<&str>,
1894 ) -> anyhow::Result<Vec<MemoryEntry>> {
1895 Ok(Vec::new())
1896 }
1897 }
1898 impl ::zeroclaw_api::attribution::Attributable for MockMemory {
1899 fn role(&self) -> ::zeroclaw_api::attribution::Role {
1900 ::zeroclaw_api::attribution::Role::Memory(
1901 ::zeroclaw_api::attribution::MemoryKind::InMemory,
1902 )
1903 }
1904 fn alias(&self) -> &str {
1905 "MockMemory"
1906 }
1907 }
1908
1909 struct MockModelProvider;
1910
1911 #[async_trait]
1912 impl ModelProvider for MockModelProvider {
1913 async fn chat_with_system(
1914 &self,
1915 _system_prompt: Option<&str>,
1916 _message: &str,
1917 _model: &str,
1918 _temperature: Option<f64>,
1919 ) -> anyhow::Result<String> {
1920 Ok("ok".to_string())
1921 }
1922 }
1923 impl ::zeroclaw_api::attribution::Attributable for MockModelProvider {
1924 fn role(&self) -> ::zeroclaw_api::attribution::Role {
1925 ::zeroclaw_api::attribution::Role::Provider(
1926 ::zeroclaw_api::attribution::ProviderKind::Model(
1927 ::zeroclaw_api::attribution::ModelProviderKind::Custom,
1928 ),
1929 )
1930 }
1931 fn alias(&self) -> &str {
1932 "MockModelProvider"
1933 }
1934 }
1935
1936 fn with_test_agent(
1939 mut config: zeroclaw_config::schema::Config,
1940 ) -> zeroclaw_config::schema::Config {
1941 config.providers.models.openrouter.insert(
1942 "default".to_string(),
1943 zeroclaw_config::schema::OpenRouterModelProviderConfig::default(),
1944 );
1945 config.risk_profiles.insert(
1946 "test-profile".to_string(),
1947 zeroclaw_config::schema::RiskProfileConfig::default(),
1948 );
1949 config.agents.insert(
1950 "test-agent".to_string(),
1951 zeroclaw_config::schema::AliasedAgentConfig {
1952 model_provider: "openrouter.default".into(),
1953 risk_profile: "test-profile".to_string(),
1954 ..Default::default()
1955 },
1956 );
1957 config
1958 }
1959
1960 fn test_state(config: zeroclaw_config::schema::Config) -> AppState {
1961 AppState {
1962 config: Arc::new(RwLock::new(config)),
1963 model_provider: Arc::new(MockModelProvider),
1964 model: "test-model".into(),
1965 temperature: None,
1966 mem: Arc::new(MockMemory),
1967 auto_save: false,
1968 webhook_secret_hash: None,
1969 pairing: Arc::new(PairingGuard::new(false, &[])),
1970 trust_forwarded_headers: false,
1971 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
1972 auth_limiter: Arc::new(crate::auth_rate_limit::AuthRateLimiter::new()),
1973 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
1974 whatsapp: None,
1975 whatsapp_app_secret: None,
1976 linq: None,
1977 linq_signing_secret: None,
1978 nextcloud_talk: None,
1979 nextcloud_talk_webhook_secret: None,
1980 wati: None,
1981 gmail_push: None,
1982 observer: Arc::new(zeroclaw_runtime::observability::NoopObserver),
1983 tools_registry: Arc::new(Vec::new()),
1984 cost_tracker: None,
1985 event_tx: tokio::sync::broadcast::channel(16).0,
1986 event_buffer: Arc::new(crate::sse::EventBuffer::new(16)),
1987 shutdown_tx: tokio::sync::watch::channel(false).0,
1988 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
1989 session_backend: None,
1990 session_queue: Arc::new(crate::session_queue::SessionActorQueue::new(8, 30, 600)),
1991 device_registry: None,
1992 pending_pairings: None,
1993 path_prefix: String::new(),
1994 web_dist_dir: None,
1995 canvas_store: zeroclaw_runtime::tools::CanvasStore::new(),
1996 cancel_tokens: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
1997 pending_reload: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1998 reload_tx: None,
1999 #[cfg(feature = "webauthn")]
2000 webauthn: None,
2001 }
2002 }
2003
2004 async fn response_json(response: axum::response::Response) -> serde_json::Value {
2005 let body = response
2006 .into_body()
2007 .collect()
2008 .await
2009 .expect("response body")
2010 .to_bytes();
2011 serde_json::from_slice(&body).expect("valid json response")
2012 }
2013
2014 fn link_job_to_test_agent(state: &AppState, job_id: &str) {
2015 state
2016 .config
2017 .write()
2018 .agents
2019 .get_mut("test-agent")
2020 .expect("test-agent configured by with_test_agent")
2021 .cron_jobs
2022 .push(job_id.to_string());
2023 }
2024
2025 fn config_with_webhook(
2026 alias: &str,
2027 enabled: bool,
2028 bound: bool,
2029 port: u16,
2030 listen_path: Option<&str>,
2031 ) -> zeroclaw_config::schema::Config {
2032 let mut config = zeroclaw_config::schema::Config::default();
2033 config.gateway.port = 42617;
2034 config.gateway.require_pairing = false;
2035 config.channels.webhook.insert(
2036 alias.to_string(),
2037 zeroclaw_config::schema::WebhookConfig {
2038 enabled,
2039 port,
2040 listen_path: listen_path.map(ToString::to_string),
2041 ..Default::default()
2042 },
2043 );
2044 if bound {
2045 config.agents.insert(
2046 "rowan".to_string(),
2047 zeroclaw_config::schema::AliasedAgentConfig {
2048 channels: vec![zeroclaw_config::providers::ChannelRef::new(format!(
2049 "webhook.{alias}"
2050 ))],
2051 ..Default::default()
2052 },
2053 );
2054 }
2055 config
2056 }
2057
2058 fn config_with_telegram(alias: &str) -> zeroclaw_config::schema::Config {
2059 let mut config = zeroclaw_config::schema::Config::default();
2060 config.channels.telegram.insert(
2061 alias.to_string(),
2062 zeroclaw_config::schema::TelegramConfig {
2063 enabled: true,
2064 bot_token: "test-token".to_string(),
2065 ..Default::default()
2066 },
2067 );
2068 config.agents.insert(
2069 "rowan".to_string(),
2070 zeroclaw_config::schema::AliasedAgentConfig {
2071 channels: vec![zeroclaw_config::providers::ChannelRef::new(format!(
2072 "telegram.{alias}"
2073 ))],
2074 ..Default::default()
2075 },
2076 );
2077 config
2078 }
2079
2080 fn first_channel_info(
2081 config: &zeroclaw_config::schema::Config,
2082 ) -> zeroclaw_config::schema::ChannelAliasInfo {
2083 config
2084 .channels_by_alias()
2085 .into_iter()
2086 .next()
2087 .expect("channel alias should be present")
2088 }
2089
2090 #[test]
2091 fn channel_readiness_webhook_does_not_call_gateway_route_healthy_without_listener() {
2092 let config = config_with_webhook("default", true, true, 42617, Some("/webhook"));
2093 let state = test_state(config.clone());
2094 let health = zeroclaw_runtime::health::snapshot();
2095 let info = first_channel_info(&config);
2096 let readiness = channel_readiness(&config, &info, &health, &state);
2097
2098 assert_eq!(readiness.authenticated, ChannelReadinessState::Ready);
2099 assert_eq!(readiness.listening, ChannelReadinessState::Missing);
2100 assert_eq!(channel_readiness_summary(&readiness), ("error", "down"));
2101 assert!(
2102 readiness
2103 .requirements
2104 .iter()
2105 .any(|item| item.contains("Start a channel listener"))
2106 );
2107 }
2108
2109 #[test]
2110 fn channel_readiness_webhook_does_not_call_custom_path_healthy_without_listener() {
2111 let config = config_with_webhook("custom_path", true, true, 42632, Some("/eyrie"));
2112 let state = test_state(config.clone());
2113 let health = zeroclaw_runtime::health::snapshot();
2114 let info = first_channel_info(&config);
2115 let readiness = channel_readiness(&config, &info, &health, &state);
2116
2117 assert_eq!(readiness.authenticated, ChannelReadinessState::Ready);
2118 assert_eq!(readiness.listening, ChannelReadinessState::Missing);
2119 assert_eq!(channel_readiness_summary(&readiness), ("error", "down"));
2120 assert!(
2121 readiness
2122 .requirements
2123 .iter()
2124 .any(|item| item.contains("Start a channel listener"))
2125 );
2126 }
2127
2128 #[test]
2129 fn channel_readiness_webhook_uses_supervised_listener_health_for_custom_path() {
2130 let config = config_with_webhook("supervised", true, true, 42632, Some("/eyrie"));
2131 zeroclaw_runtime::health::mark_component_ok("channel:webhook.supervised");
2132 let state = test_state(config.clone());
2133 let health = zeroclaw_runtime::health::snapshot();
2134 let info = first_channel_info(&config);
2135 let readiness = channel_readiness(&config, &info, &health, &state);
2136
2137 assert_eq!(readiness.listening, ChannelReadinessState::Ready);
2138 assert_eq!(channel_readiness_summary(&readiness), ("active", "healthy"));
2139 }
2140
2141 #[test]
2142 fn channel_readiness_webhook_rejects_stale_listener_health() {
2143 let config = config_with_webhook("stale", true, true, 42632, Some("/eyrie"));
2144 let component = "channel:webhook.stale".to_string();
2145 let old = (chrono::Utc::now()
2146 - chrono::Duration::seconds(CHANNEL_LISTENER_HEALTH_MAX_AGE_SECS + 5))
2147 .to_rfc3339();
2148 let health = zeroclaw_runtime::health::HealthSnapshot {
2149 pid: std::process::id(),
2150 updated_at: chrono::Utc::now().to_rfc3339(),
2151 uptime_seconds: 1,
2152 components: std::collections::BTreeMap::from([(
2153 component,
2154 zeroclaw_runtime::health::ComponentHealth {
2155 status: "ok".to_string(),
2156 updated_at: old,
2157 last_ok: None,
2158 last_error: None,
2159 restart_count: 0,
2160 },
2161 )]),
2162 };
2163 let state = test_state(config.clone());
2164 let info = first_channel_info(&config);
2165 let readiness = channel_readiness(&config, &info, &health, &state);
2166
2167 assert_eq!(readiness.listening, ChannelReadinessState::Missing);
2168 assert_eq!(channel_readiness_summary(&readiness), ("error", "down"));
2169 }
2170
2171 #[test]
2172 fn channel_readiness_webhook_uses_live_pairing_guard_for_auth() {
2173 let config = config_with_webhook("paired", true, true, 42632, Some("/eyrie"));
2174 zeroclaw_runtime::health::mark_component_ok("channel:webhook.paired");
2175 let mut state = test_state(config.clone());
2176 state.pairing = Arc::new(PairingGuard::new(true, &[]));
2177 let health = zeroclaw_runtime::health::snapshot();
2178 let info = first_channel_info(&config);
2179 let readiness = channel_readiness(&config, &info, &health, &state);
2180
2181 assert_eq!(readiness.authenticated, ChannelReadinessState::Missing);
2182 assert_eq!(readiness.listening, ChannelReadinessState::Ready);
2183 assert_eq!(channel_readiness_summary(&readiness), ("error", "down"));
2184 }
2185
2186 #[test]
2187 fn channel_readiness_unchecked_channel_types_are_unknown_not_down() {
2188 let config = config_with_telegram("ops");
2189 let state = test_state(config.clone());
2190 let health = zeroclaw_runtime::health::snapshot();
2191 let info = first_channel_info(&config);
2192 let readiness = channel_readiness(&config, &info, &health, &state);
2193
2194 assert_eq!(readiness.enabled, ChannelReadinessState::Ready);
2195 assert_eq!(readiness.bound_to_agent, ChannelReadinessState::Ready);
2196 assert_eq!(readiness.authenticated, ChannelReadinessState::Unknown);
2197 assert_eq!(readiness.listening, ChannelReadinessState::Unknown);
2198 assert_eq!(
2199 channel_readiness_summary(&readiness),
2200 ("unknown", "degraded")
2201 );
2202 assert!(readiness.requirements.is_empty());
2203 assert!(
2204 readiness
2205 .notes
2206 .iter()
2207 .any(|item| item.contains("not checked"))
2208 );
2209 }
2210
2211 #[test]
2212 fn channel_readiness_orphan_channel_reports_missing_agent_binding_without_broken_health() {
2213 let config = config_with_webhook("orphan", true, false, 42617, Some("/webhook"));
2214 let state = test_state(config.clone());
2215 let health = zeroclaw_runtime::health::snapshot();
2216 let info = first_channel_info(&config);
2217 let readiness = channel_readiness(&config, &info, &health, &state);
2218
2219 assert_eq!(readiness.bound_to_agent, ChannelReadinessState::Missing);
2220 assert_eq!(readiness.listening, ChannelReadinessState::Unknown);
2221 assert_eq!(
2222 channel_readiness_summary(&readiness),
2223 ("inactive", "degraded")
2224 );
2225 assert!(
2226 readiness
2227 .requirements
2228 .iter()
2229 .any(|item| item.contains("Bind this channel"))
2230 );
2231 }
2232
2233 #[tokio::test]
2234 async fn api_channels_serializes_readiness_without_duplicate_summary_fields() {
2235 let config = config_with_telegram("ops");
2236 let state = test_state(config);
2237
2238 let response = handle_api_channels(State(state), HeaderMap::new())
2239 .await
2240 .into_response();
2241
2242 assert_eq!(response.status(), StatusCode::OK);
2243 let json = response_json(response).await;
2244 let channel = &json["channels"][0];
2245 assert_eq!(channel["name"], "telegram.ops");
2246 assert_eq!(channel["status"], "unknown");
2247 assert_eq!(channel["health"], "degraded");
2248 assert_eq!(channel["readiness"]["enabled"], "ready");
2249 assert_eq!(channel["readiness"]["authenticated"], "unknown");
2250 assert!(channel["readiness"].get("configured").is_none());
2251 assert!(channel["readiness"].get("status").is_none());
2252 assert!(channel["readiness"].get("health").is_none());
2253 }
2254
2255 fn test_state_with_session_backend(
2256 config: zeroclaw_config::schema::Config,
2257 backend: Arc<dyn SessionBackend>,
2258 ) -> AppState {
2259 let mut state = test_state(config);
2260 state.session_backend = Some(backend);
2261 state
2262 }
2263
2264 #[tokio::test]
2265 async fn session_message_post_persists_and_broadcasts_to_session() {
2266 let tmp = tempfile::TempDir::new().unwrap();
2267 let config = zeroclaw_config::schema::Config {
2268 data_dir: tmp.path().join("workspace"),
2269 config_path: tmp.path().join("config.toml"),
2270 ..zeroclaw_config::schema::Config::default()
2271 };
2272 std::fs::create_dir_all(&config.data_dir).unwrap();
2273 let backend: Arc<dyn SessionBackend> = Arc::new(SessionStore::new(tmp.path()).unwrap());
2274 backend
2275 .append(
2276 "gw_operator-1",
2277 &zeroclaw_providers::ChatMessage::assistant("existing"),
2278 )
2279 .unwrap();
2280 let state = test_state_with_session_backend(config, backend.clone());
2281 let mut rx = state.event_tx.subscribe();
2282
2283 let response = handle_api_session_message_post(
2284 State(state.clone()),
2285 HeaderMap::new(),
2286 Path("operator-1".to_string()),
2287 Json(
2288 serde_json::from_value::<SessionMessagePostBody>(serde_json::json!({
2289 "content": "deploy finished"
2290 }))
2291 .expect("body should deserialize"),
2292 ),
2293 )
2294 .await
2295 .into_response();
2296
2297 assert_eq!(response.status(), StatusCode::OK);
2298 let json = response_json(response).await;
2299 assert_eq!(json["status"], "ok");
2300 assert_eq!(json["session_id"], "operator-1");
2301 assert_eq!(json["message"]["role"], "assistant");
2302 assert_eq!(json["message"]["content"], "deploy finished");
2303 assert!(json.get("message_count").is_none());
2304
2305 let messages = backend.load("gw_operator-1");
2306 assert_eq!(messages.len(), 2);
2307 assert_eq!(messages[1].role, "assistant");
2308 assert_eq!(messages[1].content, "deploy finished");
2309
2310 let event = tokio::time::timeout(Duration::from_secs(1), rx.recv())
2311 .await
2312 .expect("broadcast event")
2313 .expect("broadcast value");
2314 assert_eq!(event["type"], "message");
2315 assert_eq!(event["session_id"], "operator-1");
2316 assert_eq!(event["role"], "assistant");
2317 assert_eq!(event["content"], "deploy finished");
2318
2319 let history = state.event_buffer.snapshot();
2320 assert!(
2321 history.is_empty(),
2322 "session-scoped chat messages stay out of global event history"
2323 );
2324 }
2325
2326 #[tokio::test]
2327 async fn session_message_post_rejects_empty_content() {
2328 let tmp = tempfile::TempDir::new().unwrap();
2329 let config = zeroclaw_config::schema::Config {
2330 data_dir: tmp.path().join("workspace"),
2331 config_path: tmp.path().join("config.toml"),
2332 ..zeroclaw_config::schema::Config::default()
2333 };
2334 std::fs::create_dir_all(&config.data_dir).unwrap();
2335 let backend: Arc<dyn SessionBackend> = Arc::new(SessionStore::new(tmp.path()).unwrap());
2336 let state = test_state_with_session_backend(config, backend);
2337
2338 let response = handle_api_session_message_post(
2339 State(state),
2340 HeaderMap::new(),
2341 Path("operator-1".to_string()),
2342 Json(
2343 serde_json::from_value::<SessionMessagePostBody>(serde_json::json!({
2344 "content": " "
2345 }))
2346 .expect("body should deserialize"),
2347 ),
2348 )
2349 .await
2350 .into_response();
2351
2352 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
2353 let json = response_json(response).await;
2354 assert_eq!(json["error"], "content is required");
2355 }
2356
2357 #[tokio::test]
2358 async fn session_message_post_rejects_unknown_session_without_creating_it() {
2359 let tmp = tempfile::TempDir::new().unwrap();
2360 let config = zeroclaw_config::schema::Config {
2361 data_dir: tmp.path().join("workspace"),
2362 config_path: tmp.path().join("config.toml"),
2363 ..zeroclaw_config::schema::Config::default()
2364 };
2365 std::fs::create_dir_all(&config.data_dir).unwrap();
2366 let backend: Arc<dyn SessionBackend> = Arc::new(SessionStore::new(tmp.path()).unwrap());
2367 let state = test_state_with_session_backend(config, backend.clone());
2368
2369 let response = handle_api_session_message_post(
2370 State(state),
2371 HeaderMap::new(),
2372 Path("operator-1".to_string()),
2373 Json(
2374 serde_json::from_value::<SessionMessagePostBody>(serde_json::json!({
2375 "content": "deploy finished"
2376 }))
2377 .expect("body should deserialize"),
2378 ),
2379 )
2380 .await
2381 .into_response();
2382
2383 assert_eq!(response.status(), StatusCode::NOT_FOUND);
2384 let json = response_json(response).await;
2385 assert_eq!(json["error"], "Session not found");
2386 assert!(backend.load("gw_operator-1").is_empty());
2387 }
2388
2389 #[tokio::test]
2390 async fn session_message_post_waits_for_session_queue_before_append() {
2391 let tmp = tempfile::TempDir::new().unwrap();
2392 let config = zeroclaw_config::schema::Config {
2393 data_dir: tmp.path().join("workspace"),
2394 config_path: tmp.path().join("config.toml"),
2395 ..zeroclaw_config::schema::Config::default()
2396 };
2397 std::fs::create_dir_all(&config.data_dir).unwrap();
2398 let backend: Arc<dyn SessionBackend> = Arc::new(SessionStore::new(tmp.path()).unwrap());
2399 backend
2400 .append(
2401 "gw_operator-1",
2402 &zeroclaw_providers::ChatMessage::assistant("existing"),
2403 )
2404 .unwrap();
2405 let state = test_state_with_session_backend(config, backend.clone());
2406 let session_guard = state.session_queue.acquire("gw_operator-1").await.unwrap();
2407
2408 let response_fut = handle_api_session_message_post(
2409 State(state),
2410 HeaderMap::new(),
2411 Path("operator-1".to_string()),
2412 Json(
2413 serde_json::from_value::<SessionMessagePostBody>(serde_json::json!({
2414 "content": "queued notification"
2415 }))
2416 .expect("body should deserialize"),
2417 ),
2418 );
2419 tokio::pin!(response_fut);
2420
2421 assert!(
2422 tokio::time::timeout(Duration::from_millis(50), &mut response_fut)
2423 .await
2424 .is_err(),
2425 "POST should wait behind the active session queue guard"
2426 );
2427 assert_eq!(backend.load("gw_operator-1").len(), 1);
2428
2429 drop(session_guard);
2430 let response = tokio::time::timeout(Duration::from_secs(1), response_fut)
2431 .await
2432 .expect("queued POST should complete")
2433 .into_response();
2434
2435 assert_eq!(response.status(), StatusCode::OK);
2436 let messages = backend.load("gw_operator-1");
2437 assert_eq!(messages.len(), 2);
2438 assert_eq!(messages[1].content, "queued notification");
2439 }
2440
2441 #[tokio::test]
2442 async fn cron_api_shell_roundtrip_includes_delivery() {
2443 let tmp = tempfile::TempDir::new().unwrap();
2444 let config = zeroclaw_config::schema::Config {
2445 data_dir: tmp.path().join("data"),
2446 config_path: tmp.path().join("config.toml"),
2447 ..zeroclaw_config::schema::Config::default()
2448 };
2449 std::fs::create_dir_all(&config.data_dir).unwrap();
2450 let state = test_state(with_test_agent(config));
2451
2452 let add_response = handle_api_cron_add(
2453 State(state.clone()),
2454 HeaderMap::new(),
2455 Json(
2456 serde_json::from_value::<CronAddBody>(serde_json::json!({
2457 "name": "test-job",
2458 "agent": "test-agent",
2459 "schedule": "*/5 * * * *",
2460 "command": "echo hello",
2461 "delivery": {
2462 "mode": "announce",
2463 "channel": "discord",
2464 "to": "1234567890",
2465 "best_effort": true
2466 }
2467 }))
2468 .expect("body should deserialize"),
2469 ),
2470 )
2471 .await
2472 .into_response();
2473
2474 let add_json = response_json(add_response).await;
2475 assert_eq!(add_json["status"], "ok");
2476 assert_eq!(add_json["job"]["delivery"]["mode"], "announce");
2477 assert_eq!(add_json["job"]["delivery"]["channel"], "discord");
2478 assert_eq!(add_json["job"]["delivery"]["to"], "1234567890");
2479
2480 let list_response = handle_api_cron_list(State(state), HeaderMap::new())
2481 .await
2482 .into_response();
2483 let list_json = response_json(list_response).await;
2484 let jobs = list_json["jobs"].as_array().expect("jobs array");
2485 assert_eq!(jobs.len(), 1);
2486 assert_eq!(jobs[0]["delivery"]["mode"], "announce");
2487 assert_eq!(jobs[0]["delivery"]["channel"], "discord");
2488 assert_eq!(jobs[0]["delivery"]["to"], "1234567890");
2489 }
2490
2491 #[tokio::test]
2492 async fn cron_api_accepts_agent_jobs() {
2493 let tmp = tempfile::TempDir::new().unwrap();
2494 let config = zeroclaw_config::schema::Config {
2495 data_dir: tmp.path().join("data"),
2496 config_path: tmp.path().join("config.toml"),
2497 ..zeroclaw_config::schema::Config::default()
2498 };
2499 std::fs::create_dir_all(&config.data_dir).unwrap();
2500 let state = test_state(with_test_agent(config));
2501
2502 let response = handle_api_cron_add(
2503 State(state.clone()),
2504 HeaderMap::new(),
2505 Json(
2506 serde_json::from_value::<CronAddBody>(serde_json::json!({
2507 "name": "agent-job",
2508 "agent": "test-agent",
2509 "schedule": "*/5 * * * *",
2510 "job_type": "agent",
2511 "command": "ignored shell command",
2512 "prompt": "summarize the latest logs"
2513 }))
2514 .expect("body should deserialize"),
2515 ),
2516 )
2517 .await
2518 .into_response();
2519
2520 let json = response_json(response).await;
2521 assert_eq!(json["status"], "ok");
2522
2523 let config = state.config.read().clone();
2524 let jobs = zeroclaw_runtime::cron::list_jobs(&config).unwrap();
2525 assert_eq!(jobs.len(), 1);
2526 assert_eq!(jobs[0].job_type, zeroclaw_runtime::cron::JobType::Agent);
2527 assert_eq!(jobs[0].prompt.as_deref(), Some("summarize the latest logs"));
2528 }
2529
2530 #[tokio::test]
2531 async fn cron_api_timezone_add_persists_explicit_timezone() {
2532 let tmp = tempfile::TempDir::new().unwrap();
2533 let config = zeroclaw_config::schema::Config {
2534 data_dir: tmp.path().join("workspace"),
2535 config_path: tmp.path().join("config.toml"),
2536 ..zeroclaw_config::schema::Config::default()
2537 };
2538 std::fs::create_dir_all(&config.data_dir).unwrap();
2539 let state = test_state(with_test_agent(config));
2540
2541 let response = handle_api_cron_add(
2542 State(state.clone()),
2543 HeaderMap::new(),
2544 Json(
2545 serde_json::from_value::<CronAddBody>(serde_json::json!({
2546 "agent": "test-agent",
2547 "name": "localized-job",
2548 "schedule": "0 9 * * *",
2549 "tz": "America/New_York",
2550 "command": "echo hello"
2551 }))
2552 .expect("body should deserialize"),
2553 ),
2554 )
2555 .await
2556 .into_response();
2557
2558 assert_eq!(response.status(), StatusCode::OK);
2559 let config = state.config.read().clone();
2560 let jobs = zeroclaw_runtime::cron::list_jobs(&config).unwrap();
2561 assert_eq!(
2562 jobs[0].schedule,
2563 zeroclaw_runtime::cron::Schedule::Cron {
2564 expr: "0 9 * * *".to_string(),
2565 tz: Some("America/New_York".to_string()),
2566 }
2567 );
2568 }
2569
2570 #[tokio::test]
2571 async fn cron_api_timezone_add_rejects_invalid_timezone_as_bad_request() {
2572 let tmp = tempfile::TempDir::new().unwrap();
2573 let config = zeroclaw_config::schema::Config {
2574 data_dir: tmp.path().join("workspace"),
2575 config_path: tmp.path().join("config.toml"),
2576 ..zeroclaw_config::schema::Config::default()
2577 };
2578 std::fs::create_dir_all(&config.data_dir).unwrap();
2579 let state = test_state(with_test_agent(config));
2580
2581 let response = handle_api_cron_add(
2582 State(state),
2583 HeaderMap::new(),
2584 Json(
2585 serde_json::from_value::<CronAddBody>(serde_json::json!({
2586 "agent": "test-agent",
2587 "name": "invalid-timezone-job",
2588 "schedule": "0 9 * * *",
2589 "tz": "Invalid/Zone",
2590 "command": "echo hello"
2591 }))
2592 .expect("body should deserialize"),
2593 ),
2594 )
2595 .await
2596 .into_response();
2597
2598 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
2599 let json = response_json(response).await;
2600 assert!(
2601 json["error"]
2602 .as_str()
2603 .unwrap_or_default()
2604 .contains("Invalid IANA timezone")
2605 );
2606 }
2607
2608 #[tokio::test]
2609 async fn cron_api_timezone_patch_schedule_preserves_existing_timezone() {
2610 let tmp = tempfile::TempDir::new().unwrap();
2611 let config = zeroclaw_config::schema::Config {
2612 data_dir: tmp.path().join("workspace"),
2613 config_path: tmp.path().join("config.toml"),
2614 ..zeroclaw_config::schema::Config::default()
2615 };
2616 std::fs::create_dir_all(&config.data_dir).unwrap();
2617 let state = test_state(with_test_agent(config));
2618 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
2619 &state.config.read().clone(),
2620 "test-agent",
2621 Some("localized-job".to_string()),
2622 zeroclaw_runtime::cron::Schedule::Cron {
2623 expr: "0 9 * * *".to_string(),
2624 tz: Some("Europe/Berlin".to_string()),
2625 },
2626 "echo hello",
2627 None,
2628 true,
2629 )
2630 .expect("job added");
2631
2632 let response = handle_api_cron_patch(
2633 State(state.clone()),
2634 HeaderMap::new(),
2635 Path(job.id.clone()),
2636 Json(
2637 serde_json::from_value::<CronPatchBody>(serde_json::json!({
2638 "agent": "test-agent",
2639 "schedule": "30 9 * * *"
2640 }))
2641 .expect("body should deserialize"),
2642 ),
2643 )
2644 .await
2645 .into_response();
2646
2647 assert_eq!(response.status(), StatusCode::OK);
2648 let updated = zeroclaw_runtime::cron::get_job(&state.config.read().clone(), &job.id)
2649 .expect("updated job");
2650 assert_eq!(
2651 updated.schedule,
2652 zeroclaw_runtime::cron::Schedule::Cron {
2653 expr: "30 9 * * *".to_string(),
2654 tz: Some("Europe/Berlin".to_string()),
2655 }
2656 );
2657 }
2658
2659 #[tokio::test]
2660 async fn cron_api_timezone_patch_replaces_timezone_when_provided() {
2661 let tmp = tempfile::TempDir::new().unwrap();
2662 let config = zeroclaw_config::schema::Config {
2663 data_dir: tmp.path().join("workspace"),
2664 config_path: tmp.path().join("config.toml"),
2665 ..zeroclaw_config::schema::Config::default()
2666 };
2667 std::fs::create_dir_all(&config.data_dir).unwrap();
2668 let state = test_state(with_test_agent(config));
2669 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
2670 &state.config.read().clone(),
2671 "test-agent",
2672 Some("localized-job".to_string()),
2673 zeroclaw_runtime::cron::Schedule::Cron {
2674 expr: "0 9 * * *".to_string(),
2675 tz: Some("America/New_York".to_string()),
2676 },
2677 "echo hello",
2678 None,
2679 true,
2680 )
2681 .expect("job added");
2682
2683 let response = handle_api_cron_patch(
2684 State(state.clone()),
2685 HeaderMap::new(),
2686 Path(job.id.clone()),
2687 Json(
2688 serde_json::from_value::<CronPatchBody>(serde_json::json!({
2689 "agent": "test-agent",
2690 "schedule": "30 9 * * *",
2691 "tz": "Asia/Tokyo"
2692 }))
2693 .expect("body should deserialize"),
2694 ),
2695 )
2696 .await
2697 .into_response();
2698
2699 assert_eq!(response.status(), StatusCode::OK);
2700 let updated = zeroclaw_runtime::cron::get_job(&state.config.read().clone(), &job.id)
2701 .expect("updated job");
2702 assert_eq!(
2703 updated.schedule,
2704 zeroclaw_runtime::cron::Schedule::Cron {
2705 expr: "30 9 * * *".to_string(),
2706 tz: Some("Asia/Tokyo".to_string()),
2707 }
2708 );
2709 }
2710
2711 #[tokio::test]
2712 async fn cron_api_timezone_patch_sets_timezone_without_schedule_change() {
2713 let tmp = tempfile::TempDir::new().unwrap();
2714 let config = zeroclaw_config::schema::Config {
2715 data_dir: tmp.path().join("workspace"),
2716 config_path: tmp.path().join("config.toml"),
2717 ..zeroclaw_config::schema::Config::default()
2718 };
2719 std::fs::create_dir_all(&config.data_dir).unwrap();
2720 let state = test_state(with_test_agent(config));
2721 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
2722 &state.config.read().clone(),
2723 "test-agent",
2724 Some("runtime-local-job".to_string()),
2725 zeroclaw_runtime::cron::Schedule::Cron {
2726 expr: "0 9 * * *".to_string(),
2727 tz: None,
2728 },
2729 "echo hello",
2730 None,
2731 true,
2732 )
2733 .expect("job added");
2734
2735 let response = handle_api_cron_patch(
2736 State(state.clone()),
2737 HeaderMap::new(),
2738 Path(job.id.clone()),
2739 Json(
2740 serde_json::from_value::<CronPatchBody>(serde_json::json!({
2741 "agent": "test-agent",
2742 "tz": "America/Chicago"
2743 }))
2744 .expect("body should deserialize"),
2745 ),
2746 )
2747 .await
2748 .into_response();
2749
2750 assert_eq!(response.status(), StatusCode::OK);
2751 let updated = zeroclaw_runtime::cron::get_job(&state.config.read().clone(), &job.id)
2752 .expect("updated job");
2753 assert_eq!(
2754 updated.schedule,
2755 zeroclaw_runtime::cron::Schedule::Cron {
2756 expr: "0 9 * * *".to_string(),
2757 tz: Some("America/Chicago".to_string()),
2758 }
2759 );
2760 }
2761
2762 #[tokio::test]
2763 async fn cron_api_timezone_patch_rejects_invalid_timezone_as_bad_request() {
2764 let tmp = tempfile::TempDir::new().unwrap();
2765 let config = zeroclaw_config::schema::Config {
2766 data_dir: tmp.path().join("workspace"),
2767 config_path: tmp.path().join("config.toml"),
2768 ..zeroclaw_config::schema::Config::default()
2769 };
2770 std::fs::create_dir_all(&config.data_dir).unwrap();
2771 let state = test_state(with_test_agent(config));
2772 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
2773 &state.config.read().clone(),
2774 "test-agent",
2775 Some("localized-job".to_string()),
2776 zeroclaw_runtime::cron::Schedule::Cron {
2777 expr: "0 9 * * *".to_string(),
2778 tz: Some("America/New_York".to_string()),
2779 },
2780 "echo hello",
2781 None,
2782 true,
2783 )
2784 .expect("job added");
2785
2786 let response = handle_api_cron_patch(
2787 State(state),
2788 HeaderMap::new(),
2789 Path(job.id),
2790 Json(
2791 serde_json::from_value::<CronPatchBody>(serde_json::json!({
2792 "agent": "test-agent",
2793 "tz": "Invalid/Zone"
2794 }))
2795 .expect("body should deserialize"),
2796 ),
2797 )
2798 .await
2799 .into_response();
2800
2801 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
2802 let json = response_json(response).await;
2803 assert!(
2804 json["error"]
2805 .as_str()
2806 .unwrap_or_default()
2807 .contains("Invalid IANA timezone")
2808 );
2809 }
2810
2811 #[tokio::test]
2812 async fn cron_api_timezone_patch_clears_timezone_with_explicit_signal() {
2813 let tmp = tempfile::TempDir::new().unwrap();
2814 let config = zeroclaw_config::schema::Config {
2815 data_dir: tmp.path().join("workspace"),
2816 config_path: tmp.path().join("config.toml"),
2817 ..zeroclaw_config::schema::Config::default()
2818 };
2819 std::fs::create_dir_all(&config.data_dir).unwrap();
2820 let state = test_state(with_test_agent(config));
2821 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
2822 &state.config.read().clone(),
2823 "test-agent",
2824 Some("localized-job".to_string()),
2825 zeroclaw_runtime::cron::Schedule::Cron {
2826 expr: "0 9 * * *".to_string(),
2827 tz: Some("America/New_York".to_string()),
2828 },
2829 "echo hello",
2830 None,
2831 true,
2832 )
2833 .expect("job added");
2834
2835 let response = handle_api_cron_patch(
2836 State(state.clone()),
2837 HeaderMap::new(),
2838 Path(job.id.clone()),
2839 Json(
2840 serde_json::from_value::<CronPatchBody>(serde_json::json!({
2841 "agent": "test-agent",
2842 "clear_tz": true
2843 }))
2844 .expect("body should deserialize"),
2845 ),
2846 )
2847 .await
2848 .into_response();
2849
2850 assert_eq!(response.status(), StatusCode::OK);
2851 let updated = zeroclaw_runtime::cron::get_job(&state.config.read().clone(), &job.id)
2852 .expect("updated job");
2853 assert_eq!(
2854 updated.schedule,
2855 zeroclaw_runtime::cron::Schedule::Cron {
2856 expr: "0 9 * * *".to_string(),
2857 tz: None,
2858 }
2859 );
2860 }
2861
2862 #[tokio::test]
2863 async fn cron_api_rejects_announce_delivery_without_target() {
2864 let tmp = tempfile::TempDir::new().unwrap();
2865 let config = zeroclaw_config::schema::Config {
2866 data_dir: tmp.path().join("data"),
2867 config_path: tmp.path().join("config.toml"),
2868 ..zeroclaw_config::schema::Config::default()
2869 };
2870 std::fs::create_dir_all(&config.data_dir).unwrap();
2871 let state = test_state(with_test_agent(config));
2872
2873 let response = handle_api_cron_add(
2874 State(state.clone()),
2875 HeaderMap::new(),
2876 Json(
2877 serde_json::from_value::<CronAddBody>(serde_json::json!({
2878 "name": "invalid-delivery-job",
2879 "agent": "test-agent",
2880 "schedule": "*/5 * * * *",
2881 "command": "echo hello",
2882 "delivery": {
2883 "mode": "announce",
2884 "channel": "discord"
2885 }
2886 }))
2887 .expect("body should deserialize"),
2888 ),
2889 )
2890 .await
2891 .into_response();
2892
2893 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
2894 let json = response_json(response).await;
2895 assert!(
2896 json["error"]
2897 .as_str()
2898 .unwrap_or_default()
2899 .contains("delivery.to is required")
2900 );
2901
2902 let config = state.config.read().clone();
2903 assert!(
2904 zeroclaw_runtime::cron::list_jobs(&config)
2905 .unwrap()
2906 .is_empty()
2907 );
2908 }
2909
2910 #[tokio::test]
2911 async fn cron_api_run_executes_shell_job_and_records_run() {
2912 let tmp = tempfile::TempDir::new().unwrap();
2913 let config = zeroclaw_config::schema::Config {
2914 data_dir: tmp.path().join("data"),
2915 config_path: tmp.path().join("config.toml"),
2916 ..zeroclaw_config::schema::Config::default()
2917 };
2918 std::fs::create_dir_all(&config.data_dir).unwrap();
2919 let state = test_state(with_test_agent(config));
2920
2921 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
2922 &state.config.read().clone(),
2923 "test-agent",
2924 None,
2925 zeroclaw_runtime::cron::Schedule::Cron {
2926 expr: "*/5 * * * *".to_string(),
2927 tz: None,
2928 },
2929 "echo hello-from-manual-trigger",
2930 None,
2931 true,
2932 )
2933 .expect("job added");
2934
2935 link_job_to_test_agent(&state, &job.id);
2938
2939 let response =
2940 handle_api_cron_run(State(state.clone()), HeaderMap::new(), Path(job.id.clone()))
2941 .await
2942 .into_response();
2943
2944 assert_eq!(response.status(), StatusCode::OK);
2945 let json = response_json(response).await;
2946 assert_eq!(json["status"], "ok");
2947 assert_eq!(json["success"], true);
2948 assert_eq!(json["job_id"], job.id);
2949 assert!(
2950 json["output"]
2951 .as_str()
2952 .unwrap_or_default()
2953 .contains("hello-from-manual-trigger")
2954 );
2955
2956 let runs = zeroclaw_runtime::cron::list_runs(&state.config.read().clone(), &job.id, 10)
2957 .expect("runs listed");
2958 assert_eq!(runs.len(), 1);
2959 assert_eq!(runs[0].status, "ok");
2960 }
2961
2962 #[tokio::test]
2963 async fn cron_api_run_records_best_effort_delivery_failure_as_degraded() {
2964 zeroclaw_runtime::cron::scheduler::register_delivery_fn(Box::new(
2965 |_config, channel, _target, _thread_id, _output| {
2966 Box::pin(async move {
2967 if channel == "fail-delivery" {
2968 anyhow::bail!("synthetic delivery failure");
2969 }
2970 Ok(())
2971 })
2972 },
2973 ));
2974
2975 let tmp = tempfile::TempDir::new().unwrap();
2976 let config = zeroclaw_config::schema::Config {
2977 data_dir: tmp.path().join("data"),
2978 config_path: tmp.path().join("config.toml"),
2979 ..zeroclaw_config::schema::Config::default()
2980 };
2981 std::fs::create_dir_all(&config.data_dir).unwrap();
2982 let state = test_state(with_test_agent(config));
2983
2984 let job = zeroclaw_runtime::cron::add_shell_job_with_approval(
2985 &state.config.read().clone(),
2986 "test-agent",
2987 None,
2988 zeroclaw_runtime::cron::Schedule::Cron {
2989 expr: "*/5 * * * *".to_string(),
2990 tz: None,
2991 },
2992 "echo hello-from-manual-trigger",
2993 Some(zeroclaw_runtime::cron::DeliveryConfig {
2994 mode: "announce".into(),
2995 channel: Some("fail-delivery".into()),
2996 to: Some("123456".into()),
2997 thread_id: None,
2998 best_effort: true,
2999 }),
3000 true,
3001 )
3002 .expect("job added");
3003 link_job_to_test_agent(&state, &job.id);
3004
3005 let response =
3006 handle_api_cron_run(State(state.clone()), HeaderMap::new(), Path(job.id.clone()))
3007 .await
3008 .into_response();
3009
3010 assert_eq!(response.status(), StatusCode::OK);
3011 let json = response_json(response).await;
3012 assert_eq!(json["status"], "degraded");
3013 assert_eq!(json["success"], true);
3014 assert!(
3015 json["output"]
3016 .as_str()
3017 .unwrap_or_default()
3018 .contains("delivery failed:")
3019 );
3020
3021 let config = state.config.read().clone();
3022 let updated = zeroclaw_runtime::cron::get_job(&config, &job.id).expect("updated job");
3023 assert_eq!(updated.last_status.as_deref(), Some("degraded"));
3024 assert!(
3025 updated
3026 .last_output
3027 .as_deref()
3028 .unwrap_or_default()
3029 .contains("delivery failed:")
3030 );
3031
3032 let runs = zeroclaw_runtime::cron::list_runs(&config, &job.id, 10).expect("runs listed");
3033 assert_eq!(runs.len(), 1);
3034 assert_eq!(runs[0].status, "degraded");
3035 assert!(
3036 runs[0]
3037 .output
3038 .as_deref()
3039 .unwrap_or_default()
3040 .contains("delivery failed:")
3041 );
3042 }
3043
3044 #[tokio::test]
3045 async fn cron_api_run_returns_not_found_for_unknown_job() {
3046 let tmp = tempfile::TempDir::new().unwrap();
3047 let config = zeroclaw_config::schema::Config {
3048 data_dir: tmp.path().join("data"),
3049 config_path: tmp.path().join("config.toml"),
3050 ..zeroclaw_config::schema::Config::default()
3051 };
3052 std::fs::create_dir_all(&config.data_dir).unwrap();
3053 let state = test_state(with_test_agent(config));
3054
3055 let response = handle_api_cron_run(
3056 State(state),
3057 HeaderMap::new(),
3058 Path("does-not-exist".to_string()),
3059 )
3060 .await
3061 .into_response();
3062
3063 assert_eq!(response.status(), StatusCode::NOT_FOUND);
3064 }
3065}