1use crate::cron::store::{RunCompletionAction, persist_run_completion_state, persist_run_result};
2use crate::cron::{
3 CronJob, DeliveryConfig, JobType, Schedule, SessionTarget, all_overdue_jobs, due_jobs,
4 next_run_for_schedule, sync_declarative_jobs,
5};
6use crate::security::SecurityPolicy;
7use anyhow::Result;
8use chrono::{DateTime, Utc};
9use futures_util::{StreamExt, stream};
10use std::process::Stdio;
11use std::sync::Arc;
12use tokio::process::Command;
13use tokio::time::{self, Duration};
14use zeroclaw_config::schema::Config;
15use zeroclaw_config::schema::{CronJobDecl, CronScheduleDecl};
16use zeroclaw_log::Instrument;
17use zeroclaw_memory::{MEMORY_CONTEXT_CLOSE, MEMORY_CONTEXT_OPEN};
18
19const MIN_POLL_SECONDS: u64 = 5;
20const SHELL_JOB_TIMEOUT_SECS: u64 = 120;
21const SCHEDULER_COMPONENT: &str = "scheduler";
22
23pub type EventBroadcast = Option<tokio::sync::broadcast::Sender<serde_json::Value>>;
26
27#[derive(Clone, Copy)]
28pub enum CronDeliveryContext {
29 Scheduled,
30 ToolManual,
31 GatewayManual,
32}
33
34impl CronDeliveryContext {
35 fn failure_message(self, best_effort: bool) -> &'static str {
36 match (self, best_effort) {
37 (Self::Scheduled, true) => "Cron delivery failed (best_effort)",
38 (Self::Scheduled, false) => "Cron delivery failed",
39 (Self::ToolManual, true) => "cron_run delivery failed (best_effort)",
40 (Self::ToolManual, false) => "cron_run delivery failed",
41 (Self::GatewayManual, true) => "manual cron trigger delivery failed (best_effort)",
42 (Self::GatewayManual, false) => "manual cron trigger delivery failed",
43 }
44 }
45}
46
47pub struct CronDeliveryOutcome {
48 pub success: bool,
49 pub status: String,
50 pub output: String,
51}
52
53pub async fn deliver_and_classify_run_result(
54 config: &Config,
55 job: &CronJob,
56 mut success: bool,
57 mut output: String,
58 context: CronDeliveryContext,
59) -> CronDeliveryOutcome {
60 let mut status = if success { "ok" } else { "error" }.to_string();
61
62 if let Err(e) = deliver_if_configured(config, job, &output).await {
63 let channel = job.delivery.channel.as_deref().unwrap_or("");
68 let target = job.delivery.to.as_deref().unwrap_or("");
69 let delivery_error = e.to_string();
70
71 if job.delivery.best_effort {
72 ::zeroclaw_log::record!(
73 WARN,
74 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
75 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
76 .with_attrs(::serde_json::json!({
77 "job_id": job.id,
78 "agent_alias": job.agent_alias,
79 "channel": channel,
80 "target": target,
81 "error": delivery_error
82 })),
83 context.failure_message(true)
84 );
85 if success {
86 status = "degraded".to_string();
87 }
88 } else {
89 success = false;
90 ::zeroclaw_log::record!(
91 WARN,
92 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
93 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
94 .with_attrs(::serde_json::json!({
95 "job_id": job.id,
96 "agent_alias": job.agent_alias,
97 "channel": channel,
98 "target": target,
99 "error": delivery_error
100 })),
101 context.failure_message(false)
102 );
103 status = "error".to_string();
104 }
105
106 if output.trim().is_empty() {
107 output = format!("delivery failed: {delivery_error}");
108 } else {
109 output.push_str("\n\ndelivery failed: ");
110 output.push_str(&delivery_error);
111 }
112 }
113
114 CronDeliveryOutcome {
115 success,
116 status,
117 output,
118 }
119}
120
121pub async fn run(config: Config, event_tx: EventBroadcast) -> Result<()> {
122 let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS);
123 let mut interval = time::interval(Duration::from_secs(poll_secs));
124 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
125
126 crate::health::mark_component_ok(SCHEDULER_COMPONENT);
127
128 let mut jobs_with_builtin = config.cron.clone();
130 if let Some(ref schedule_cron) = config.backup.schedule_cron {
131 let backup_job = CronJobDecl {
132 name: Some("Scheduled backup".to_string()),
133 job_type: "shell".to_string(),
134 schedule: CronScheduleDecl::Cron {
135 expr: schedule_cron.clone(),
136 tz: config.backup.schedule_timezone.clone(),
137 },
138 command: Some("backup create".to_string()),
139 prompt: None,
140 enabled: true,
141 model: None,
142 allowed_tools: None,
143 uses_memory: true,
144 session_target: None,
145 delivery: None,
146 };
147 ::zeroclaw_log::record!(
148 DEBUG,
149 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
150 .with_attrs(::serde_json::json!({"schedule": schedule_cron})),
151 "Synthesizing builtin backup cron job from config.backup.schedule_cron"
152 );
153 jobs_with_builtin.insert("__builtin_backup".to_string(), backup_job);
154 }
155
156 match sync_declarative_jobs(&config, &jobs_with_builtin) {
157 Ok(()) => {
158 if !jobs_with_builtin.is_empty() {
159 ::zeroclaw_log::record!(
160 INFO,
161 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
162 .with_attrs(::serde_json::json!({"count": jobs_with_builtin.len()})),
163 "Synced declarative cron jobs from config"
164 );
165 }
166 }
167 Err(e) => ::zeroclaw_log::record!(
168 WARN,
169 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
170 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
171 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
172 "Failed to sync declarative cron jobs"
173 ),
174 }
175
176 if config.scheduler.catch_up_on_startup {
183 catch_up_overdue_jobs(&config, &event_tx).await;
184 } else {
185 ::zeroclaw_log::record!(
186 INFO,
187 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
188 "Scheduler startup: catch-up disabled by config"
189 );
190 }
191
192 loop {
193 interval.tick().await;
194 crate::health::mark_component_ok(SCHEDULER_COMPONENT);
196
197 let jobs = match due_jobs(&config, Utc::now()) {
198 Ok(jobs) => jobs,
199 Err(e) => {
200 crate::health::mark_component_error(SCHEDULER_COMPONENT, e.to_string());
201 ::zeroclaw_log::record!(
202 WARN,
203 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
204 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
205 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
206 "Scheduler query failed"
207 );
208 continue;
209 }
210 };
211
212 process_due_jobs(&config, jobs, SCHEDULER_COMPONENT, &event_tx).await;
213 }
214}
215
216fn resolve_owning_agent<'a>(config: &'a Config, job: &CronJob) -> Option<&'a str> {
227 if !job.agent_alias.is_empty()
228 && let Some((alias, _)) = config
229 .agents
230 .iter()
231 .find(|(alias, _)| alias.as_str() == job.agent_alias)
232 {
233 return Some(alias.as_str());
234 }
235 config.agent_for_cron_job(&job.id)
236}
237
238async fn catch_up_overdue_jobs(config: &Config, event_tx: &EventBroadcast) {
243 let now = Utc::now();
244 let jobs = match all_overdue_jobs(config, now) {
245 Ok(jobs) => jobs,
246 Err(e) => {
247 ::zeroclaw_log::record!(
248 WARN,
249 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
250 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
251 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
252 "Startup catch-up query failed"
253 );
254 return;
255 }
256 };
257
258 if jobs.is_empty() {
259 ::zeroclaw_log::record!(
260 INFO,
261 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
262 "Scheduler startup: no overdue jobs to catch up"
263 );
264 return;
265 }
266
267 ::zeroclaw_log::record!(
268 INFO,
269 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
270 .with_attrs(::serde_json::json!({"count": jobs.len()})),
271 "Scheduler startup: catching up overdue jobs"
272 );
273
274 process_due_jobs(config, jobs, SCHEDULER_COMPONENT, event_tx).await;
275
276 ::zeroclaw_log::record!(
277 INFO,
278 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
279 "Scheduler startup: catch-up complete"
280 );
281}
282
283pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) {
284 use zeroclaw_log::Instrument;
285 let Some(agent_alias) = resolve_owning_agent(config, job) else {
286 return (
287 false,
288 format!(
289 "cron job {id:?} has no owning agent; add the alias to an [agents.<x>].cron_jobs list",
290 id = job.id
291 ),
292 );
293 };
294 let agent_alias = agent_alias.to_string();
295 let security = match SecurityPolicy::for_agent(config, &agent_alias) {
296 Ok(s) => s,
297 Err(e) => return (false, format!("agent {agent_alias} risk profile: {e}")),
298 };
299 let span = zeroclaw_log::attribution_span!(job);
300 Box::pin(execute_job_with_retry(config, &security, &agent_alias, job))
301 .instrument(span)
302 .await
303}
304
305async fn execute_job_with_retry(
306 config: &Config,
307 security: &SecurityPolicy,
308 agent_alias: &str,
309 job: &CronJob,
310) -> (bool, String) {
311 let mut last_output = String::new();
312 let retries = config.reliability.scheduler_retries;
313 let mut backoff_ms = config.reliability.provider_backoff_ms.max(200);
314
315 for attempt in 0..=retries {
316 let (success, output) = match job.job_type {
317 JobType::Shell => run_job_command(config, security, job).await,
318 JobType::Agent => Box::pin(run_agent_job(config, security, agent_alias, job)).await,
319 };
320 last_output = output;
321
322 if success {
323 return (true, last_output);
324 }
325
326 if last_output.starts_with("blocked by security policy:") {
327 return (false, last_output);
329 }
330
331 if attempt < retries {
332 let jitter_ms = u64::from(Utc::now().timestamp_subsec_millis() % 250);
333 time::sleep(Duration::from_millis(backoff_ms + jitter_ms)).await;
334 backoff_ms = (backoff_ms.saturating_mul(2)).min(30_000);
335 }
336 }
337
338 (false, last_output)
339}
340
341async fn process_due_jobs(
342 config: &Config,
343 jobs: Vec<CronJob>,
344 component: &str,
345 event_tx: &EventBroadcast,
346) {
347 crate::health::mark_component_ok(component);
349
350 let max_concurrent = config.scheduler.max_concurrent.max(1);
351 let mut in_flight = stream::iter(jobs.into_iter().filter_map(|job| {
352 let Some(agent_alias) = resolve_owning_agent(config, &job) else {
355 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"job_id": job.id})), "Cron job has no owning agent; add the alias to an [agents.<x>].cron_jobs list");
356 return None;
357 };
358 let agent_alias = agent_alias.to_owned();
359 let security = match SecurityPolicy::for_agent(config, &agent_alias) {
360 Ok(s) => Arc::new(s),
361 Err(e) => {
362 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"job_id": job.id, "agent": agent_alias, "error": format!("{}", e)})), "Cron job: failed to build SecurityPolicy for owning agent");
363 return None;
364 }
365 };
366 let config = config.clone();
367 let component = component.to_owned();
368 Some(async move {
369 Box::pin(execute_and_persist_job(
370 &config,
371 security.as_ref(),
372 &agent_alias,
373 &job,
374 &component,
375 ))
376 .await
377 })
378 }))
379 .buffer_unordered(max_concurrent);
380
381 while let Some((job_id, success, output)) = in_flight.next().await {
382 if !success {
383 ::zeroclaw_log::record!(
384 WARN,
385 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
386 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
387 .with_attrs(::serde_json::json!({"job_id": job_id, "output": output})),
388 "Scheduler job '' failed: "
389 );
390 }
391 if let Some(tx) = event_tx {
393 let _ = tx.send(serde_json::json!({
394 "type": "cron_result",
395 "job_id": job_id,
396 "success": success,
397 "output": output,
398 "timestamp": chrono::Utc::now().to_rfc3339(),
399 }));
400 }
401 }
402}
403
404async fn execute_and_persist_job(
405 config: &Config,
406 security: &SecurityPolicy,
407 agent_alias: &str,
408 job: &CronJob,
409 component: &str,
410) -> (String, bool, String) {
411 crate::health::mark_component_ok(component);
412 warn_if_high_frequency_agent_job(job);
413
414 let started_at = Utc::now();
415 let span = zeroclaw_log::attribution_span!(job);
416 let (success, output) = Box::pin(execute_job_with_retry(config, security, agent_alias, job))
417 .instrument(span)
418 .await;
419 let finished_at = Utc::now();
420 let success = Box::pin(persist_job_result(
421 config,
422 job,
423 success,
424 &output,
425 started_at,
426 finished_at,
427 ))
428 .await;
429
430 (job.id.clone(), success, output)
431}
432
433async fn run_agent_job(
434 config: &Config,
435 security: &SecurityPolicy,
436 agent_alias: &str,
437 job: &CronJob,
438) -> (bool, String) {
439 let subagent_ctx = match crate::subagent::SubAgentSpawn::for_agent(config, agent_alias)
445 .and_then(|spawn| spawn.build(crate::subagent::SubAgentOverrides::default()))
446 {
447 Ok(ctx) => ctx,
448 Err(e) => return (false, format!("subagent spawn failed: {e:#}")),
449 };
450
451 if !security.can_act() {
452 return (
453 false,
454 "blocked by security policy: autonomy is read-only".to_string(),
455 );
456 }
457
458 if security.is_rate_limited() {
459 return (
460 false,
461 "blocked by security policy: rate limit exceeded".to_string(),
462 );
463 }
464
465 if !security.record_action() {
466 return (
467 false,
468 "blocked by security policy: action budget exhausted".to_string(),
469 );
470 }
471 let name = job.name.clone().unwrap_or_else(|| "cron-job".to_string());
472 let prompt = job.prompt.clone().unwrap_or_default();
473
474 let memory_context = if !job.uses_memory {
481 String::new()
482 } else {
483 match zeroclaw_memory::create_memory_for_agent(
484 config,
485 agent_alias,
486 config
487 .model_provider_for_agent(agent_alias)
488 .and_then(|e| e.api_key.as_deref()),
489 )
490 .await
491 {
492 Ok(mem) => match mem.recall(&prompt, 5, None, None, None).await {
493 Ok(entries) if !entries.is_empty() => {
494 let ctx: String = entries
495 .iter()
496 .filter(|e| {
497 !matches!(
498 e.category,
499 zeroclaw_memory::traits::MemoryCategory::Conversation
500 )
501 })
502 .map(|e| format!("- {}: {}", e.key, e.content))
503 .collect::<Vec<_>>()
504 .join("\n");
505 if ctx.is_empty() {
506 String::new()
507 } else {
508 format!("{MEMORY_CONTEXT_OPEN}\n{ctx}\n{MEMORY_CONTEXT_CLOSE}\n\n")
509 }
510 }
511 _ => String::new(),
512 },
513 Err(_) => String::new(),
514 }
515 };
516
517 let prefixed_prompt = format!("{memory_context}[cron:{} {name}] {prompt}", job.id);
518 let model_override = job.model.clone();
519
520 let mut cron_config = config.clone();
521 cron_config.memory.auto_save = false;
522
523 let run_session_id = uuid::Uuid::new_v4().to_string();
528 let session_path = std::path::PathBuf::from(format!("cron-{run_session_id}"));
529
530 let subagent_span = zeroclaw_log::info_span!(
531 "subagent",
532 category = "cron",
533 agent_alias = %agent_alias,
534 cron_job_id = %job.id,
535 run_id = %run_session_id,
536 spawn_site = "cron",
537 );
538
539 let run_overrides = crate::agent::loop_::AgentRunOverrides {
550 security: Some(subagent_ctx.policy.clone()),
551 memory: None,
552 is_subagent: false,
553 };
554 let run_result = match job.session_target {
555 SessionTarget::Main | SessionTarget::Isolated => {
556 Box::pin(
557 crate::agent::run(
558 cron_config,
559 agent_alias,
560 Some(prefixed_prompt),
561 None,
562 model_override,
563 config
564 .model_provider_for_agent(agent_alias)
565 .and_then(|e| e.temperature),
566 vec![],
567 false,
568 Some(session_path.clone()),
569 job.allowed_tools.clone(),
570 run_overrides,
571 )
572 .instrument(subagent_span),
573 )
574 .await
575 }
576 };
577
578 match run_result {
579 Ok(response) => (
580 true,
581 if response.trim().is_empty() {
582 "agent job executed".to_string()
583 } else {
584 response
585 },
586 ),
587 Err(e) => {
588 let mem_session_key = zeroclaw_api::session_keys::sanitize_session_key(&format!(
595 "cli:{}",
596 session_path.display()
597 ));
598 if let Ok(mem) = zeroclaw_memory::create_memory_for_agent(
599 config,
600 agent_alias,
601 config
602 .model_provider_for_agent(agent_alias)
603 .and_then(|e| e.api_key.as_deref()),
604 )
605 .await
606 {
607 let _ = mem.purge_session(&mem_session_key).await;
608 }
609 (false, format!("agent job failed: {e}"))
610 }
611 }
612}
613
614async fn persist_job_result(
615 config: &Config,
616 job: &CronJob,
617 success: bool,
618 output: &str,
619 started_at: DateTime<Utc>,
620 finished_at: DateTime<Utc>,
621) -> bool {
622 let duration_ms = (finished_at - started_at).num_milliseconds();
623 let outcome = deliver_and_classify_run_result(
624 config,
625 job,
626 success,
627 output.to_string(),
628 CronDeliveryContext::Scheduled,
629 )
630 .await;
631
632 let action = if is_one_shot_auto_delete(job) && outcome.success {
633 RunCompletionAction::Delete
634 } else if matches!(job.schedule, Schedule::At { .. }) {
635 RunCompletionAction::Disable
636 } else {
637 RunCompletionAction::Reschedule
638 };
639
640 let job_state_at = Utc::now();
641 if let Err(e) = persist_run_result(
642 config,
643 job,
644 started_at,
645 finished_at,
646 job_state_at,
647 &outcome.status,
648 Some(&outcome.output),
649 duration_ms,
650 action,
651 ) {
652 ::zeroclaw_log::record!(
653 WARN,
654 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
655 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
656 .with_attrs(::serde_json::json!({"e": e.to_string()})),
657 "Failed to persist scheduler run result: "
658 );
659
660 if action == RunCompletionAction::Delete {
661 if let Err(disable_err) = persist_run_completion_state(
666 config,
667 job,
668 job_state_at,
669 &outcome.status,
670 Some(&outcome.output),
671 RunCompletionAction::Disable,
672 ) {
673 ::zeroclaw_log::record!(
674 WARN,
675 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
676 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
677 .with_attrs(::serde_json::json!({"disable_err": disable_err.to_string()})),
678 "Failed to disable one-shot cron job after history persistence failure: "
679 );
680 }
681 } else {
682 if let Err(state_err) = persist_run_completion_state(
685 config,
686 job,
687 job_state_at,
688 &outcome.status,
689 Some(&outcome.output),
690 action,
691 ) {
692 ::zeroclaw_log::record!(
693 WARN,
694 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
695 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
696 .with_attrs(::serde_json::json!({"state_err": state_err.to_string()})),
697 "Failed to update cron job state after history persistence failure: "
698 );
699 }
700 }
701 }
702
703 outcome.success
704}
705
706fn is_one_shot_auto_delete(job: &CronJob) -> bool {
707 job.delete_after_run && matches!(job.schedule, Schedule::At { .. })
708}
709
710fn is_high_frequency_agent_job(job: &CronJob) -> bool {
711 if !matches!(job.job_type, JobType::Agent) {
712 return false;
713 }
714 match &job.schedule {
715 Schedule::Every { every_ms } => *every_ms < 5 * 60 * 1000,
716 Schedule::Cron { .. } => {
717 let now = Utc::now();
718 next_run_for_schedule(&job.schedule, now)
719 .and_then(|a| next_run_for_schedule(&job.schedule, a).map(|b| (a, b)))
720 .map(|(a, b)| (b - a).num_minutes() < 5)
721 .unwrap_or(false)
722 }
723 Schedule::At { .. } => false,
724 }
725}
726
727fn warn_if_high_frequency_agent_job(job: &CronJob) {
728 if is_high_frequency_agent_job(job) {
729 ::zeroclaw_log::record!(
730 WARN,
731 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
732 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
733 &format!(
734 "Cron agent job '{}' is scheduled more frequently than every 5 minutes",
735 job.id
736 )
737 );
738 }
739}
740
741async fn deliver_if_configured(config: &Config, job: &CronJob, output: &str) -> Result<()> {
742 let delivery: &DeliveryConfig = &job.delivery;
743 if !delivery.mode.eq_ignore_ascii_case("announce") {
744 return Ok(());
745 }
746
747 let channel = delivery.channel.as_deref().ok_or_else(|| {
748 ::zeroclaw_log::record!(
749 WARN,
750 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
751 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
752 .with_attrs(::serde_json::json!({"field": "channel"})),
753 "cron delivery announce refused: required field missing"
754 );
755 anyhow::Error::msg("delivery.channel is required for announce mode")
756 })?;
757 let target = delivery.to.as_deref().ok_or_else(|| {
758 ::zeroclaw_log::record!(
759 WARN,
760 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
761 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
762 .with_attrs(::serde_json::json!({"field": "to"})),
763 "cron delivery announce refused: required field missing"
764 );
765 anyhow::Error::msg("delivery.to is required for announce mode")
766 })?;
767
768 deliver_announcement(
769 config,
770 channel,
771 target,
772 delivery.thread_id.as_deref(),
773 output,
774 )
775 .await
776}
777
778pub type DeliveryFn = Box<
782 dyn Fn(
783 Config,
784 String,
785 String,
786 Option<String>,
787 String,
788 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
789 + Send
790 + Sync,
791>;
792
793static DELIVERY_FN: std::sync::OnceLock<DeliveryFn> = std::sync::OnceLock::new();
795
796pub fn register_delivery_fn(f: DeliveryFn) {
798 let _ = DELIVERY_FN.set(f);
799}
800
801pub async fn deliver_announcement(
802 config: &Config,
803 channel: &str,
804 target: &str,
805 thread_id: Option<&str>,
806 output: &str,
807) -> Result<()> {
808 if let Some(f) = DELIVERY_FN.get() {
809 f(
810 config.clone(),
811 channel.to_string(),
812 target.to_string(),
813 thread_id.map(str::to_string),
814 output.to_string(),
815 )
816 .await
817 } else {
818 ::zeroclaw_log::record!(
831 WARN,
832 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
833 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
834 .with_attrs(::serde_json::json!({"channel": channel, "target": target})),
835 "Cron delivery skipped: no delivery handler registered \
836 (register_delivery_fn was not called by the binary)"
837 );
838 Ok(())
839 }
840}
841
842async fn run_job_command(
843 config: &Config,
844 security: &SecurityPolicy,
845 job: &CronJob,
846) -> (bool, String) {
847 run_job_command_with_timeout(
848 config,
849 security,
850 job,
851 Duration::from_secs(SHELL_JOB_TIMEOUT_SECS),
852 )
853 .await
854}
855
856async fn run_job_command_with_timeout(
857 config: &Config,
858 security: &SecurityPolicy,
859 job: &CronJob,
860 timeout: Duration,
861) -> (bool, String) {
862 if !security.can_act() {
863 return (
864 false,
865 "blocked by security policy: autonomy is read-only".to_string(),
866 );
867 }
868
869 if security.is_rate_limited() {
870 return (
871 false,
872 "blocked by security policy: rate limit exceeded".to_string(),
873 );
874 }
875
876 let approved = false; if let Err(error) =
882 crate::cron::validate_shell_command_with_security(security, &job.command, approved)
883 {
884 return (false, error.to_string());
885 }
886
887 if let Some(path) = security.forbidden_path_argument(&job.command) {
888 return (
889 false,
890 format!("blocked by security policy: forbidden path argument: {path}"),
891 );
892 }
893
894 if !security.record_action() {
895 return (
896 false,
897 "blocked by security policy: action budget exhausted".to_string(),
898 );
899 }
900
901 let child = match build_cron_shell_command(&job.command, &config.data_dir) {
902 Ok(mut cmd) => match cmd.spawn() {
903 Ok(child) => child,
904 Err(e) => return (false, format!("spawn error: {e}")),
905 },
906 Err(e) => return (false, format!("shell setup error: {e}")),
907 };
908
909 match time::timeout(timeout, child.wait_with_output()).await {
910 Ok(Ok(output)) => {
911 let stdout = String::from_utf8_lossy(&output.stdout);
912 let stderr = String::from_utf8_lossy(&output.stderr);
913 let combined = format!(
914 "status={}\nstdout:\n{}\nstderr:\n{}",
915 output.status,
916 stdout.trim(),
917 stderr.trim()
918 );
919 (output.status.success(), combined)
920 }
921 Ok(Err(e)) => (false, format!("spawn error: {e}")),
922 Err(_) => (
923 false,
924 format!("job timed out after {}s", timeout.as_secs_f64()),
925 ),
926 }
927}
928
929fn build_cron_shell_command(
943 command: &str,
944 workspace_dir: &std::path::Path,
945) -> anyhow::Result<Command> {
946 let mut cmd = Command::new("sh");
947 cmd.arg("-c")
948 .arg(command)
949 .current_dir(workspace_dir)
950 .stdin(Stdio::null())
951 .stdout(Stdio::piped())
952 .stderr(Stdio::piped())
953 .kill_on_drop(true);
954
955 Ok(cmd)
956}
957
958#[cfg(test)]
959mod tests {
960 use super::*;
961 use crate::cron::{self, DeliveryConfig};
962 use crate::security::SecurityPolicy;
963 use chrono::{Duration as ChronoDuration, Utc};
964 use tempfile::TempDir;
965 use zeroclaw_config::schema::Config;
966
967 const TEST_AGENT: &str = "test-agent";
968
969 async fn test_config(tmp: &TempDir) -> Config {
970 let mut config = Config {
971 data_dir: tmp.path().join("data"),
972 config_path: tmp.path().join("config.toml"),
973 ..Config::default()
974 };
975 config.risk_profiles.insert(
976 TEST_AGENT.to_string(),
977 zeroclaw_config::schema::RiskProfileConfig::default(),
978 );
979 config.runtime_profiles.insert(
980 TEST_AGENT.to_string(),
981 zeroclaw_config::schema::RuntimeProfileConfig::default(),
982 );
983 config.providers.models.openrouter.insert(
984 TEST_AGENT.to_string(),
985 zeroclaw_config::schema::OpenRouterModelProviderConfig::default(),
986 );
987 config.agents.insert(
988 TEST_AGENT.to_string(),
989 zeroclaw_config::schema::AliasedAgentConfig {
990 model_provider: format!("openrouter.{TEST_AGENT}").into(),
991 risk_profile: TEST_AGENT.to_string(),
992 runtime_profile: TEST_AGENT.to_string(),
993 ..Default::default()
994 },
995 );
996 tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
997 config
998 }
999
1000 fn test_security(config: &Config) -> SecurityPolicy {
1001 SecurityPolicy::for_agent(config, TEST_AGENT).expect("test-agent has resolvable profiles")
1002 }
1003
1004 fn test_job(command: &str) -> CronJob {
1005 CronJob {
1006 id: "test-job".into(),
1007 expression: "* * * * *".into(),
1008 schedule: crate::cron::Schedule::Cron {
1009 expr: "* * * * *".into(),
1010 tz: None,
1011 },
1012 command: command.into(),
1013 prompt: None,
1014 name: None,
1015 job_type: JobType::Shell,
1016 session_target: SessionTarget::Isolated,
1017 model: None,
1018 agent_alias: TEST_AGENT.into(),
1019 enabled: true,
1020 delivery: DeliveryConfig::default(),
1021 delete_after_run: false,
1022 allowed_tools: None,
1023 uses_memory: true,
1024 source: "imperative".into(),
1025 created_at: Utc::now(),
1026 next_run: Utc::now(),
1027 last_run: None,
1028 last_status: None,
1029 last_output: None,
1030 }
1031 }
1032
1033 fn unique_component(prefix: &str) -> String {
1034 format!("{prefix}-{}", uuid::Uuid::new_v4())
1035 }
1036
1037 fn agent_job_with_schedule(schedule: crate::cron::Schedule) -> CronJob {
1038 CronJob {
1039 job_type: JobType::Agent,
1040 schedule,
1041 ..test_job("echo test")
1042 }
1043 }
1044
1045 #[test]
1046 fn high_frequency_daily_cron_is_not_flagged() {
1047 let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1049 expr: "0 6 * * *".into(),
1050 tz: Some("America/Chicago".into()),
1051 });
1052 assert!(!is_high_frequency_agent_job(&job));
1053 }
1054
1055 #[test]
1056 fn high_frequency_every_4min_cron_is_flagged() {
1057 let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1058 expr: "*/4 * * * *".into(),
1059 tz: None,
1060 });
1061 assert!(is_high_frequency_agent_job(&job));
1062 }
1063
1064 #[test]
1065 fn high_frequency_every_5min_cron_is_not_flagged() {
1066 let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1068 expr: "*/5 * * * *".into(),
1069 tz: None,
1070 });
1071 assert!(!is_high_frequency_agent_job(&job));
1072 }
1073
1074 #[test]
1075 fn high_frequency_every_interval_below_threshold_is_flagged() {
1076 let job = agent_job_with_schedule(crate::cron::Schedule::Every {
1077 every_ms: 4 * 60 * 1000, });
1079 assert!(is_high_frequency_agent_job(&job));
1080 }
1081
1082 #[test]
1083 fn high_frequency_every_interval_at_threshold_is_not_flagged() {
1084 let job = agent_job_with_schedule(crate::cron::Schedule::Every {
1085 every_ms: 5 * 60 * 1000, });
1087 assert!(!is_high_frequency_agent_job(&job));
1088 }
1089
1090 #[test]
1091 fn high_frequency_shell_job_is_never_flagged() {
1092 let job = CronJob {
1094 job_type: JobType::Shell,
1095 schedule: crate::cron::Schedule::Every {
1096 every_ms: 60 * 1000, },
1098 ..test_job("echo test")
1099 };
1100 assert!(!is_high_frequency_agent_job(&job));
1101 }
1102
1103 #[tokio::test]
1104 async fn run_job_command_success() {
1105 let tmp = TempDir::new().unwrap();
1106 let config = test_config(&tmp).await;
1107 let job = test_job("echo scheduler-ok");
1108 let security = test_security(&config);
1109
1110 let (success, output) = run_job_command(&config, &security, &job).await;
1111 assert!(success);
1112 assert!(output.contains("scheduler-ok"));
1113 assert!(output.contains("status=exit status: 0"));
1114 }
1115
1116 #[tokio::test]
1117 async fn run_job_command_failure() {
1118 let tmp = TempDir::new().unwrap();
1119 let config = test_config(&tmp).await;
1120 let job = test_job("ls definitely_missing_file_for_scheduler_test");
1121 let security = test_security(&config);
1122
1123 let (success, output) = run_job_command(&config, &security, &job).await;
1124 assert!(!success);
1125 assert!(output.contains("definitely_missing_file_for_scheduler_test"));
1126 assert!(output.contains("status=exit status:"));
1127 }
1128
1129 #[tokio::test]
1130 async fn run_job_command_times_out() {
1131 let tmp = TempDir::new().unwrap();
1132 let mut config = test_config(&tmp).await;
1133 config
1134 .risk_profiles
1135 .entry(TEST_AGENT.into())
1136 .or_default()
1137 .allowed_commands = vec!["sleep".into()];
1138 let job = test_job("sleep 1");
1139 let security = test_security(&config);
1140
1141 let (success, output) =
1142 run_job_command_with_timeout(&config, &security, &job, Duration::from_millis(50)).await;
1143 assert!(!success);
1144 assert!(output.contains("job timed out after"));
1145 }
1146
1147 #[tokio::test]
1148 async fn run_job_command_blocks_disallowed_command() {
1149 let tmp = TempDir::new().unwrap();
1150 let mut config = test_config(&tmp).await;
1151 config
1152 .risk_profiles
1153 .entry(TEST_AGENT.into())
1154 .or_default()
1155 .allowed_commands = vec!["echo".into()];
1156 let job = test_job("curl https://evil.example");
1157 let security = test_security(&config);
1158
1159 let (success, output) = run_job_command(&config, &security, &job).await;
1160 assert!(!success);
1161 assert!(output.contains("blocked by security policy"));
1162 assert!(output.to_lowercase().contains("not allowed"));
1163 }
1164
1165 #[tokio::test]
1166 async fn run_job_command_blocks_forbidden_path_argument() {
1167 let tmp = TempDir::new().unwrap();
1168 let mut config = test_config(&tmp).await;
1169 config
1170 .risk_profiles
1171 .entry(TEST_AGENT.into())
1172 .or_default()
1173 .allowed_commands = vec!["cat".into()];
1174 let job = test_job("cat /etc/passwd");
1175 let security = test_security(&config);
1176
1177 let (success, output) = run_job_command(&config, &security, &job).await;
1178 assert!(!success);
1179 assert!(output.contains("blocked by security policy"));
1180 assert!(output.contains("forbidden path argument"));
1181 assert!(output.contains("/etc/passwd"));
1182 }
1183
1184 #[tokio::test]
1185 async fn run_job_command_blocks_forbidden_option_assignment_path_argument() {
1186 let tmp = TempDir::new().unwrap();
1187 let mut config = test_config(&tmp).await;
1188 config
1189 .risk_profiles
1190 .entry(TEST_AGENT.into())
1191 .or_default()
1192 .allowed_commands = vec!["grep".into()];
1193 let job = test_job("grep --file=/etc/passwd root ./src");
1194 let security = test_security(&config);
1195
1196 let (success, output) = run_job_command(&config, &security, &job).await;
1197 assert!(!success);
1198 assert!(output.contains("blocked by security policy"));
1199 assert!(output.contains("forbidden path argument"));
1200 assert!(output.contains("/etc/passwd"));
1201 }
1202
1203 #[tokio::test]
1204 async fn run_job_command_blocks_forbidden_short_option_attached_path_argument() {
1205 let tmp = TempDir::new().unwrap();
1206 let mut config = test_config(&tmp).await;
1207 config
1208 .risk_profiles
1209 .entry(TEST_AGENT.into())
1210 .or_default()
1211 .allowed_commands = vec!["grep".into()];
1212 let job = test_job("grep -f/etc/passwd root ./src");
1213 let security = test_security(&config);
1214
1215 let (success, output) = run_job_command(&config, &security, &job).await;
1216 assert!(!success);
1217 assert!(output.contains("blocked by security policy"));
1218 assert!(output.contains("forbidden path argument"));
1219 assert!(output.contains("/etc/passwd"));
1220 }
1221
1222 #[tokio::test]
1223 async fn run_job_command_blocks_tilde_user_path_argument() {
1224 let tmp = TempDir::new().unwrap();
1225 let mut config = test_config(&tmp).await;
1226 config
1227 .risk_profiles
1228 .entry(TEST_AGENT.into())
1229 .or_default()
1230 .allowed_commands = vec!["cat".into()];
1231 let job = test_job("cat ~root/.ssh/id_rsa");
1232 let security = test_security(&config);
1233
1234 let (success, output) = run_job_command(&config, &security, &job).await;
1235 assert!(!success);
1236 assert!(output.contains("blocked by security policy"));
1237 assert!(output.contains("forbidden path argument"));
1238 assert!(output.contains("~root/.ssh/id_rsa"));
1239 }
1240
1241 #[tokio::test]
1242 async fn run_job_command_blocks_input_redirection_path_bypass() {
1243 let tmp = TempDir::new().unwrap();
1244 let mut config = test_config(&tmp).await;
1245 config
1246 .risk_profiles
1247 .entry(TEST_AGENT.into())
1248 .or_default()
1249 .allowed_commands = vec!["cat".into()];
1250 let job = test_job("cat </etc/passwd");
1251 let security = test_security(&config);
1252
1253 let (success, output) = run_job_command(&config, &security, &job).await;
1254 assert!(!success);
1255 assert!(output.contains("blocked by security policy"));
1256 assert!(output.to_lowercase().contains("not allowed"));
1257 }
1258
1259 #[tokio::test]
1260 async fn run_job_command_blocks_readonly_mode() {
1261 let tmp = TempDir::new().unwrap();
1262 let mut config = test_config(&tmp).await;
1263 config
1264 .risk_profiles
1265 .entry(TEST_AGENT.into())
1266 .or_default()
1267 .level = crate::security::AutonomyLevel::ReadOnly;
1268 let job = test_job("echo should-not-run");
1269 let security = test_security(&config);
1270
1271 let (success, output) = run_job_command(&config, &security, &job).await;
1272 assert!(!success);
1273 assert!(output.contains("blocked by security policy"));
1274 assert!(output.contains("read-only"));
1275 }
1276
1277 #[tokio::test]
1278 async fn run_job_command_blocks_rate_limited() {
1279 let tmp = TempDir::new().unwrap();
1280 let mut config = test_config(&tmp).await;
1281 config
1282 .runtime_profiles
1283 .entry(TEST_AGENT.into())
1284 .or_default()
1285 .max_actions_per_hour = 0;
1286 let job = test_job("echo should-not-run");
1287 let security = test_security(&config);
1288
1289 let (success, output) = run_job_command(&config, &security, &job).await;
1290 assert!(!success);
1291 assert!(output.contains("blocked by security policy"));
1292 assert!(output.contains("rate limit exceeded"));
1293 }
1294
1295 #[tokio::test]
1296 async fn execute_job_with_retry_recovers_after_first_failure() {
1297 let tmp = TempDir::new().unwrap();
1298 let mut config = test_config(&tmp).await;
1299 config.reliability.scheduler_retries = 1;
1300 config.reliability.provider_backoff_ms = 1;
1301 config
1302 .risk_profiles
1303 .entry(TEST_AGENT.into())
1304 .or_default()
1305 .allowed_commands = vec!["sh".into()];
1306 let security = test_security(&config);
1307
1308 tokio::fs::write(
1309 config.data_dir.join("retry-once.sh"),
1310 "#!/bin/sh\nif [ -f retry-ok.flag ]; then\n echo recovered\n exit 0\nfi\ntouch retry-ok.flag\nexit 1\n",
1311 )
1312 .await
1313 .unwrap();
1314 let job = test_job("sh ./retry-once.sh");
1315
1316 let (success, output) = Box::pin(execute_job_with_retry(
1317 &config,
1318 &security,
1319 "test-agent",
1320 &job,
1321 ))
1322 .await;
1323 assert!(success);
1324 assert!(output.contains("recovered"));
1325 }
1326
1327 #[tokio::test]
1328 async fn execute_job_with_retry_exhausts_attempts() {
1329 let tmp = TempDir::new().unwrap();
1330 let mut config = test_config(&tmp).await;
1331 config.reliability.scheduler_retries = 1;
1332 config.reliability.provider_backoff_ms = 1;
1333 let security = test_security(&config);
1334
1335 let job = test_job("ls always_missing_for_retry_test");
1336
1337 let (success, output) = Box::pin(execute_job_with_retry(
1338 &config,
1339 &security,
1340 "test-agent",
1341 &job,
1342 ))
1343 .await;
1344 assert!(!success);
1345 assert!(output.contains("always_missing_for_retry_test"));
1346 }
1347
1348 #[tokio::test]
1349 async fn run_agent_job_returns_error_without_provider_key() {
1350 let tmp = TempDir::new().unwrap();
1351 let config = test_config(&tmp).await;
1352 let mut job = test_job("");
1353 job.job_type = JobType::Agent;
1354 job.prompt = Some("Say hello".into());
1355 let security = test_security(&config);
1356
1357 let (success, output) =
1358 Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1359 assert!(!success);
1360 assert!(output.contains("agent job failed:"));
1361 }
1362
1363 #[tokio::test]
1364 async fn run_agent_job_blocks_readonly_mode() {
1365 let tmp = TempDir::new().unwrap();
1366 let mut config = test_config(&tmp).await;
1367 config
1368 .risk_profiles
1369 .entry(TEST_AGENT.into())
1370 .or_default()
1371 .level = crate::security::AutonomyLevel::ReadOnly;
1372 let mut job = test_job("");
1373 job.job_type = JobType::Agent;
1374 job.prompt = Some("Say hello".into());
1375 let security = test_security(&config);
1376
1377 let (success, output) =
1378 Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1379 assert!(!success);
1380 assert!(output.contains("blocked by security policy"));
1381 assert!(output.contains("read-only"));
1382 }
1383
1384 #[tokio::test]
1385 async fn run_agent_job_blocks_rate_limited() {
1386 let tmp = TempDir::new().unwrap();
1387 let mut config = test_config(&tmp).await;
1388 config
1389 .runtime_profiles
1390 .entry(TEST_AGENT.into())
1391 .or_default()
1392 .max_actions_per_hour = 0;
1393 let mut job = test_job("");
1394 job.job_type = JobType::Agent;
1395 job.prompt = Some("Say hello".into());
1396 let security = test_security(&config);
1397
1398 let (success, output) =
1399 Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1400 assert!(!success);
1401 assert!(output.contains("blocked by security policy"));
1402 assert!(output.contains("rate limit exceeded"));
1403 }
1404
1405 #[tokio::test]
1406 async fn process_due_jobs_marks_component_ok_even_when_idle() {
1407 let tmp = TempDir::new().unwrap();
1408 let config = test_config(&tmp).await;
1409 let component = unique_component("scheduler-idle");
1410
1411 crate::health::mark_component_error(&component, "pre-existing error");
1412 process_due_jobs(&config, Vec::new(), &component, &None).await;
1413
1414 let snapshot = crate::health::snapshot_json();
1415 let entry = &snapshot["components"][component.as_str()];
1416 assert_eq!(entry["status"], "ok");
1417 assert!(entry["last_ok"].as_str().is_some());
1418 assert!(entry["last_error"].is_null());
1419 }
1420
1421 #[tokio::test]
1422 async fn process_due_jobs_failure_does_not_mark_component_unhealthy() {
1423 let tmp = TempDir::new().unwrap();
1424 let config = test_config(&tmp).await;
1425 let job = test_job("ls definitely_missing_file_for_scheduler_component_health_test");
1426 let component = unique_component("scheduler-fail");
1427
1428 crate::health::mark_component_ok(&component);
1429 process_due_jobs(&config, vec![job], &component, &None).await;
1430
1431 let snapshot = crate::health::snapshot_json();
1432 let entry = &snapshot["components"][component.as_str()];
1433 assert_eq!(entry["status"], "ok");
1434 }
1435
1436 #[tokio::test]
1437 async fn persist_job_result_records_run_and_reschedules_shell_job() {
1438 let tmp = TempDir::new().unwrap();
1439 let config = test_config(&tmp).await;
1440 let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1441 let started = Utc::now();
1442 let finished = started + ChronoDuration::milliseconds(10);
1443
1444 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1445 assert!(success);
1446
1447 let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1448 assert_eq!(runs.len(), 1);
1449 let updated = cron::get_job(&config, &job.id).unwrap();
1450 assert_eq!(updated.last_status.as_deref(), Some("ok"));
1451 }
1452
1453 #[tokio::test]
1454 async fn persist_job_result_uses_one_write_connection_for_recurring_job() {
1455 let tmp = TempDir::new().unwrap();
1456 let config = test_config(&tmp).await;
1457 let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1458 let started = Utc::now();
1459 let finished = started + ChronoDuration::milliseconds(10);
1460
1461 crate::cron::store::reset_write_connection_count_for_tests(&config);
1462 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1463
1464 assert!(success);
1465 assert_eq!(
1466 crate::cron::store::write_connection_count_for_tests(&config),
1467 1
1468 );
1469 }
1470
1471 #[tokio::test]
1472 async fn persist_job_result_prunes_run_history_and_updates_last_fields() {
1473 let tmp = TempDir::new().unwrap();
1474 let mut config = test_config(&tmp).await;
1475 config.scheduler.max_run_history = 2;
1476 let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1477 let base = Utc::now();
1478
1479 for idx in 0..3 {
1480 let started = base + ChronoDuration::seconds(idx);
1481 let finished = started + ChronoDuration::milliseconds(10);
1482 let output = format!("run-{idx}");
1483
1484 let success = persist_job_result(&config, &job, true, &output, started, finished).await;
1485 assert!(success);
1486 }
1487
1488 let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1489 assert_eq!(runs.len(), 2);
1490 assert_eq!(runs[0].output.as_deref(), Some("run-2"));
1491 assert_eq!(runs[1].output.as_deref(), Some("run-1"));
1492
1493 let updated = cron::get_job(&config, &job.id).unwrap();
1494 assert_eq!(updated.last_status.as_deref(), Some("ok"));
1495 assert_eq!(updated.last_output.as_deref(), Some("run-2"));
1496 assert!(updated.last_run.is_some());
1497 }
1498
1499 #[tokio::test]
1500 async fn persist_job_result_rolls_back_run_history_when_job_state_update_fails() {
1501 let tmp = TempDir::new().unwrap();
1502 let config = test_config(&tmp).await;
1503 let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1504 let original_next_run = job.next_run;
1505 let started = Utc::now();
1506 let finished = started + ChronoDuration::milliseconds(10);
1507
1508 let conn =
1509 rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1510 conn.execute_batch(
1511 "CREATE TRIGGER fail_cron_job_update
1512 BEFORE UPDATE ON cron_jobs
1513 BEGIN
1514 SELECT RAISE(ABORT, 'blocked update');
1515 END;",
1516 )
1517 .unwrap();
1518 drop(conn);
1519
1520 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1521
1522 assert!(success);
1523 assert!(cron::list_runs(&config, &job.id, 10).unwrap().is_empty());
1524
1525 let stored = cron::get_job(&config, &job.id).unwrap();
1526 assert_eq!(stored.next_run, original_next_run);
1527 assert!(stored.last_run.is_none());
1528 assert!(stored.last_status.is_none());
1529 assert!(stored.last_output.is_none());
1530 }
1531
1532 #[tokio::test]
1533 async fn persist_job_result_success_deletes_one_shot() {
1534 let tmp = TempDir::new().unwrap();
1535 let config = test_config(&tmp).await;
1536 let at = Utc::now() + ChronoDuration::minutes(10);
1537 let job = cron::add_agent_job(
1538 &config,
1539 TEST_AGENT,
1540 Some("one-shot".into()),
1541 crate::cron::Schedule::At { at },
1542 "Hello",
1543 SessionTarget::Isolated,
1544 None,
1545 None,
1546 true,
1547 None,
1548 )
1549 .unwrap();
1550 let started = Utc::now();
1551 let finished = started + ChronoDuration::milliseconds(10);
1552
1553 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1554 assert!(success);
1555 let lookup = cron::get_job(&config, &job.id);
1556 assert!(lookup.is_err());
1557 }
1558
1559 #[tokio::test]
1560 async fn persist_job_result_failure_disables_one_shot() {
1561 let tmp = TempDir::new().unwrap();
1562 let config = test_config(&tmp).await;
1563 let at = Utc::now() + ChronoDuration::minutes(10);
1564 let job = cron::add_agent_job(
1565 &config,
1566 TEST_AGENT,
1567 Some("one-shot".into()),
1568 crate::cron::Schedule::At { at },
1569 "Hello",
1570 SessionTarget::Isolated,
1571 None,
1572 None,
1573 true,
1574 None,
1575 )
1576 .unwrap();
1577 let started = Utc::now();
1578 let finished = started + ChronoDuration::milliseconds(10);
1579
1580 let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1581 assert!(!success);
1582 let updated = cron::get_job(&config, &job.id).unwrap();
1583 assert!(!updated.enabled);
1584 assert_eq!(updated.last_status.as_deref(), Some("error"));
1585 }
1586
1587 #[tokio::test]
1588 async fn persist_job_result_uses_one_write_connection_for_failed_one_shot_disable() {
1589 let tmp = TempDir::new().unwrap();
1590 let config = test_config(&tmp).await;
1591 let at = Utc::now() + ChronoDuration::minutes(10);
1592 let job = cron::add_agent_job(
1593 &config,
1594 "test-agent",
1595 Some("one-shot".into()),
1596 crate::cron::Schedule::At { at },
1597 "Hello",
1598 SessionTarget::Isolated,
1599 None,
1600 None,
1601 true,
1602 None,
1603 )
1604 .unwrap();
1605 let started = Utc::now();
1606 let finished = started + ChronoDuration::milliseconds(10);
1607
1608 crate::cron::store::reset_write_connection_count_for_tests(&config);
1609 let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1610
1611 assert!(!success);
1612 assert_eq!(
1613 crate::cron::store::write_connection_count_for_tests(&config),
1614 1
1615 );
1616 }
1617
1618 #[tokio::test]
1619 async fn persist_job_result_falls_back_to_state_update_when_history_prune_fails() {
1620 let tmp = TempDir::new().unwrap();
1621 let mut config = test_config(&tmp).await;
1622 config.scheduler.max_run_history = 1;
1623 let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1624 let original_next_run = job.next_run;
1625 let seed_started = Utc::now() - ChronoDuration::minutes(20);
1626 let seed_finished = seed_started + ChronoDuration::milliseconds(10);
1627 let started = Utc::now();
1628 let finished = started + ChronoDuration::milliseconds(10);
1629
1630 let conn =
1631 rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1632 conn.execute(
1633 "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
1634 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1635 rusqlite::params![
1636 job.id,
1637 seed_started.to_rfc3339(),
1638 seed_finished.to_rfc3339(),
1639 "seed",
1640 "seed",
1641 10,
1642 ],
1643 )
1644 .unwrap();
1645 conn.execute_batch(
1646 "CREATE TRIGGER fail_cron_run_prune
1647 BEFORE DELETE ON cron_runs
1648 BEGIN
1649 SELECT RAISE(ABORT, 'blocked prune');
1650 END;",
1651 )
1652 .unwrap();
1653 drop(conn);
1654
1655 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1656 assert!(success);
1657
1658 let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1659 assert_eq!(runs.len(), 1);
1660 assert_eq!(runs[0].status, "seed");
1661
1662 let updated = cron::get_job(&config, &job.id).unwrap();
1663 assert_eq!(updated.last_status.as_deref(), Some("ok"));
1664 assert_eq!(updated.last_output.as_deref(), Some("ok"));
1665 assert!(updated.last_run.is_some());
1666 assert!(updated.next_run >= original_next_run);
1667 }
1668
1669 #[tokio::test]
1670 async fn persist_job_result_falls_back_to_disable_when_auto_delete_history_insert_fails() {
1671 let tmp = TempDir::new().unwrap();
1672 let config = test_config(&tmp).await;
1673 let at = Utc::now() + ChronoDuration::minutes(10);
1674 let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1675 assert!(job.delete_after_run);
1676 let started = Utc::now();
1677 let finished = started + ChronoDuration::milliseconds(10);
1678
1679 let conn =
1680 rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1681 conn.execute_batch(
1682 "CREATE TRIGGER fail_cron_run_insert
1683 BEFORE INSERT ON cron_runs
1684 BEGIN
1685 SELECT RAISE(ABORT, 'blocked insert');
1686 END;",
1687 )
1688 .unwrap();
1689 drop(conn);
1690
1691 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1692 assert!(success);
1693
1694 let updated = cron::get_job(&config, &job.id).unwrap();
1695 assert!(!updated.enabled);
1696 assert_eq!(updated.last_status.as_deref(), Some("ok"));
1697 assert_eq!(updated.last_output.as_deref(), Some("ok"));
1698 assert!(cron::list_runs(&config, &job.id, 10).unwrap().is_empty());
1699 }
1700
1701 #[tokio::test]
1702 async fn persist_job_result_success_deletes_one_shot_shell_job() {
1703 let tmp = TempDir::new().unwrap();
1704 let config = test_config(&tmp).await;
1705 let at = Utc::now() + ChronoDuration::minutes(10);
1706 let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1707 assert!(job.delete_after_run);
1708 let started = Utc::now();
1709 let finished = started + ChronoDuration::milliseconds(10);
1710
1711 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1712 assert!(success);
1713 let lookup = cron::get_job(&config, &job.id);
1714 assert!(lookup.is_err());
1715 }
1716
1717 #[tokio::test]
1718 async fn persist_job_result_failure_disables_one_shot_shell_job() {
1719 let tmp = TempDir::new().unwrap();
1720 let config = test_config(&tmp).await;
1721 let at = Utc::now() + ChronoDuration::minutes(10);
1722 let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1723 assert!(job.delete_after_run);
1724 let started = Utc::now();
1725 let finished = started + ChronoDuration::milliseconds(10);
1726
1727 let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1728 assert!(!success);
1729 let updated = cron::get_job(&config, &job.id).unwrap();
1730 assert!(!updated.enabled);
1731 assert_eq!(updated.last_status.as_deref(), Some("error"));
1732 }
1733
1734 #[tokio::test]
1735 async fn persist_job_result_delivery_stubbed_succeeds() {
1736 let tmp = TempDir::new().unwrap();
1739 let config = test_config(&tmp).await;
1740 let job = cron::add_agent_job(
1741 &config,
1742 TEST_AGENT,
1743 Some("announce-job".into()),
1744 crate::cron::Schedule::Cron {
1745 expr: "*/5 * * * *".into(),
1746 tz: None,
1747 },
1748 "deliver this",
1749 SessionTarget::Isolated,
1750 None,
1751 Some(DeliveryConfig {
1752 mode: "announce".into(),
1753 channel: Some("telegram".into()),
1754 to: Some("123456".into()),
1755 thread_id: None,
1756 best_effort: false,
1757 }),
1758 false,
1759 None,
1760 )
1761 .unwrap();
1762 let started = Utc::now();
1763 let finished = started + ChronoDuration::milliseconds(10);
1764
1765 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1766 assert!(success);
1767
1768 let updated = cron::get_job(&config, &job.id).unwrap();
1769 assert!(updated.enabled);
1770 assert_eq!(updated.last_status.as_deref(), Some("ok"));
1771
1772 let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1773 assert_eq!(runs.len(), 1);
1774 assert_eq!(runs[0].status, "ok");
1775 }
1776
1777 #[tokio::test]
1778 async fn persist_job_result_delivery_failure_best_effort_marks_degraded() {
1779 let tmp = TempDir::new().unwrap();
1780 let config = test_config(&tmp).await;
1781 register_delivery_fn(Box::new(
1782 |_config, channel, _target, _thread_id, _output| {
1783 Box::pin(async move {
1784 if channel == "fail-delivery" {
1785 anyhow::bail!("synthetic delivery failure");
1786 }
1787 Ok(())
1788 })
1789 },
1790 ));
1791 let mut job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1792 job.delivery = DeliveryConfig {
1793 mode: "announce".into(),
1794 channel: Some("fail-delivery".into()),
1795 to: Some("123456".into()),
1796 thread_id: None,
1797 best_effort: true,
1798 };
1799 let started = Utc::now();
1800 let finished = started + ChronoDuration::milliseconds(10);
1801
1802 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1803 assert!(success);
1804
1805 let updated = cron::get_job(&config, &job.id).unwrap();
1806 assert!(updated.enabled);
1807 assert_eq!(updated.last_status.as_deref(), Some("degraded"));
1808 assert!(
1809 updated
1810 .last_output
1811 .as_deref()
1812 .unwrap_or_default()
1813 .contains("delivery failed:")
1814 );
1815
1816 let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1817 assert_eq!(runs.len(), 1);
1818 assert_eq!(runs[0].status, "degraded");
1819 }
1820
1821 #[tokio::test]
1822 async fn delivery_failure_classification_preserves_empty_output_evidence() {
1823 let tmp = TempDir::new().unwrap();
1824 let config = test_config(&tmp).await;
1825 register_delivery_fn(Box::new(
1826 |_config, channel, _target, _thread_id, _output| {
1827 Box::pin(async move {
1828 if channel == "fail-delivery" {
1829 anyhow::bail!("synthetic delivery failure");
1830 }
1831 Ok(())
1832 })
1833 },
1834 ));
1835 let mut job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1836 job.delivery = DeliveryConfig {
1837 mode: "announce".into(),
1838 channel: Some("fail-delivery".into()),
1839 to: Some("123456".into()),
1840 thread_id: None,
1841 best_effort: true,
1842 };
1843
1844 let outcome = deliver_and_classify_run_result(
1845 &config,
1846 &job,
1847 true,
1848 String::new(),
1849 CronDeliveryContext::Scheduled,
1850 )
1851 .await;
1852
1853 assert!(outcome.success);
1854 assert_eq!(outcome.status, "degraded");
1855 assert!(outcome.output.starts_with("delivery failed:"));
1856 }
1857
1858 #[tokio::test]
1859 async fn persist_job_result_at_schedule_without_delete_after_run_is_disabled() {
1860 let tmp = TempDir::new().unwrap();
1861 let config = test_config(&tmp).await;
1862 let at = Utc::now() + ChronoDuration::minutes(10);
1863 let job = cron::add_agent_job(
1864 &config,
1865 TEST_AGENT,
1866 Some("at-no-autodelete".into()),
1867 crate::cron::Schedule::At { at },
1868 "Hello",
1869 SessionTarget::Isolated,
1870 None,
1871 None,
1872 false,
1873 None,
1874 )
1875 .unwrap();
1876 assert!(!job.delete_after_run);
1877
1878 let started = Utc::now();
1879 let finished = started + ChronoDuration::milliseconds(10);
1880 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1881 assert!(success);
1882
1883 let updated = cron::get_job(&config, &job.id).unwrap();
1886 assert!(
1887 !updated.enabled,
1888 "At schedule job should be disabled after execution via reschedule"
1889 );
1890 assert_eq!(updated.last_status.as_deref(), Some("ok"));
1891 }
1892
1893 #[tokio::test]
1894 async fn deliver_if_configured_handles_none_mode() {
1895 let tmp = TempDir::new().unwrap();
1896 let config = test_config(&tmp).await;
1897 let job = test_job("echo ok");
1898
1899 assert!(deliver_if_configured(&config, &job, "x").await.is_ok());
1901 }
1902
1903 #[tokio::test]
1904 async fn deliver_announcement_returns_ok_when_no_handler_registered() {
1905 let tmp = TempDir::new().unwrap();
1906 let config = test_config(&tmp).await;
1907 deliver_announcement(&config, "telegram", "chat-id", None, "payload")
1912 .await
1913 .expect("missing delivery handler should be Ok with a warn log");
1914 }
1915
1916 #[test]
1917 fn build_cron_shell_command_uses_sh_non_login() {
1918 let workspace = std::env::temp_dir();
1919 let cmd = build_cron_shell_command("echo cron-test", &workspace).unwrap();
1920 let debug = format!("{cmd:?}");
1921 assert!(debug.contains("echo cron-test"));
1922 assert!(debug.contains("\"sh\""), "should use sh: {debug}");
1923 assert!(
1926 !debug.contains("\"-lc\""),
1927 "must not use login shell: {debug}"
1928 );
1929 }
1930
1931 #[tokio::test]
1932 async fn build_cron_shell_command_executes_successfully() {
1933 let workspace = std::env::temp_dir();
1934 let mut cmd = build_cron_shell_command("echo cron-ok", &workspace).unwrap();
1935 let output = cmd.output().await.unwrap();
1936 assert!(output.status.success());
1937 let stdout = String::from_utf8_lossy(&output.stdout);
1938 assert!(stdout.contains("cron-ok"));
1939 }
1940
1941 #[tokio::test]
1942 async fn catch_up_queries_all_overdue_jobs_ignoring_max_tasks() {
1943 let tmp = TempDir::new().unwrap();
1944 let mut config = test_config(&tmp).await;
1945 config.scheduler.max_tasks = 1; for i in 0..3 {
1949 let _ = cron::add_job(
1950 &config,
1951 "test-agent",
1952 "* * * * *",
1953 &format!("echo catchup-{i}"),
1954 )
1955 .unwrap();
1956 }
1957
1958 let far_future = Utc::now() + ChronoDuration::days(1);
1960 let due = cron::due_jobs(&config, far_future).unwrap();
1961 assert_eq!(due.len(), 1, "due_jobs must respect max_tasks");
1962
1963 let overdue = cron::all_overdue_jobs(&config, far_future).unwrap();
1965 assert_eq!(overdue.len(), 3, "all_overdue_jobs must return all");
1966 }
1967
1968 #[tokio::test]
1973 async fn broadcast_sends_cron_result_on_success() {
1974 let tmp = TempDir::new().unwrap();
1975 let mut config = test_config(&tmp).await;
1976 let job = test_job("echo broadcast-ok");
1977 config
1980 .agents
1981 .get_mut("test-agent")
1982 .unwrap()
1983 .cron_jobs
1984 .push(job.id.clone());
1985 let component = unique_component("broadcast-ok");
1986
1987 let (tx, mut rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
1988 let event_tx: EventBroadcast = Some(tx);
1989
1990 process_due_jobs(&config, vec![job], &component, &event_tx).await;
1991
1992 let event = rx.try_recv().expect("should receive a broadcast event");
1993 assert_eq!(event["type"], "cron_result");
1994 assert_eq!(event["job_id"], "test-job");
1995 assert_eq!(event["success"], true);
1996 assert!(event["output"].as_str().unwrap().contains("broadcast-ok"));
1997 assert!(event["timestamp"].as_str().is_some());
1998 }
1999
2000 #[tokio::test]
2001 async fn broadcast_sends_cron_result_on_failure() {
2002 let tmp = TempDir::new().unwrap();
2003 let mut config = test_config(&tmp).await;
2004 let job = test_job("ls definitely_missing_file_for_broadcast_fail_test");
2005 config
2006 .agents
2007 .get_mut("test-agent")
2008 .unwrap()
2009 .cron_jobs
2010 .push(job.id.clone());
2011 let component = unique_component("broadcast-fail");
2012
2013 let (tx, mut rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
2014 let event_tx: EventBroadcast = Some(tx);
2015
2016 process_due_jobs(&config, vec![job], &component, &event_tx).await;
2017
2018 let event = rx.try_recv().expect("should receive a broadcast event");
2019 assert_eq!(event["type"], "cron_result");
2020 assert_eq!(event["job_id"], "test-job");
2021 assert_eq!(event["success"], false);
2022 assert!(event["timestamp"].as_str().is_some());
2023 }
2024
2025 #[tokio::test]
2026 async fn broadcast_none_skips_without_error() {
2027 let tmp = TempDir::new().unwrap();
2028 let config = test_config(&tmp).await;
2029 let job = test_job("echo no-broadcast");
2030 let component = unique_component("broadcast-none");
2031
2032 process_due_jobs(&config, vec![job], &component, &None).await;
2034 }
2035
2036 #[tokio::test]
2037 async fn broadcast_handles_no_subscribers() {
2038 let tmp = TempDir::new().unwrap();
2039 let config = test_config(&tmp).await;
2040 let job = test_job("echo no-subscribers");
2041 let component = unique_component("broadcast-no-sub");
2042
2043 let (tx, _) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
2044 let event_tx: EventBroadcast = Some(tx);
2047
2048 process_due_jobs(&config, vec![job], &component, &event_tx).await;
2049 }
2051}