Skip to main content

zeroclaw_runtime/cron/
scheduler.rs

1use crate::cron::store::{RunCompletionAction, persist_run_completion_state, persist_run_result};
2use crate::cron::{
3    CronJob, DeliveryConfig, JobType, Schedule, SessionTarget, all_overdue_jobs, due_jobs,
4    next_run_for_schedule, skip_missed_run, sync_declarative_jobs,
5};
6use crate::security::SecurityPolicy;
7use anyhow::Result;
8use chrono::{DateTime, Utc};
9use futures_util::{StreamExt, stream};
10use std::process::Stdio;
11use std::sync::Arc;
12use tokio::process::Command;
13use tokio::time::{self, Duration};
14use zeroclaw_config::schema::Config;
15use zeroclaw_config::schema::{CronJobDecl, CronScheduleDecl};
16use zeroclaw_log::Instrument;
17use zeroclaw_memory::{MEMORY_CONTEXT_CLOSE, MEMORY_CONTEXT_OPEN};
18
19const MIN_POLL_SECONDS: u64 = 5;
20const SHELL_JOB_TIMEOUT_SECS: u64 = 120;
21const SCHEDULER_COMPONENT: &str = "scheduler";
22const CRON_AGENT_DEFAULT_EXCLUDED_TOOLS: &[&str] = &[
23    "cron_add",
24    "cron_update",
25    "cron_remove",
26    "cron_run",
27    "schedule",
28];
29
30/// Type alias for the optional broadcast sender used to push cron results
31/// to connected dashboard/SSE clients.
32pub type EventBroadcast = Option<tokio::sync::broadcast::Sender<serde_json::Value>>;
33
34#[derive(Clone, Copy)]
35pub enum CronDeliveryContext {
36    Scheduled,
37    ToolManual,
38    GatewayManual,
39}
40
41impl CronDeliveryContext {
42    fn failure_message(self, best_effort: bool) -> &'static str {
43        match (self, best_effort) {
44            (Self::Scheduled, true) => "Cron delivery failed (best_effort)",
45            (Self::Scheduled, false) => "Cron delivery failed",
46            (Self::ToolManual, true) => "cron_run delivery failed (best_effort)",
47            (Self::ToolManual, false) => "cron_run delivery failed",
48            (Self::GatewayManual, true) => "manual cron trigger delivery failed (best_effort)",
49            (Self::GatewayManual, false) => "manual cron trigger delivery failed",
50        }
51    }
52}
53
54pub struct CronDeliveryOutcome {
55    pub success: bool,
56    pub status: String,
57    pub output: String,
58}
59
60pub async fn deliver_and_classify_run_result(
61    config: &Config,
62    job: &CronJob,
63    mut success: bool,
64    mut output: String,
65    context: CronDeliveryContext,
66) -> CronDeliveryOutcome {
67    let mut status = if success { "ok" } else { "error" }.to_string();
68
69    if let Err(e) = deliver_if_configured(config, job, &output).await {
70        // Cron add-time accepts dangling delivery refs (the job's channel
71        // may not be provisioned yet); the loudly-logged warn here is
72        // the scheduler-side half of that contract. Manual trigger paths
73        // share this classifier so status history cannot drift again.
74        let channel = job.delivery.channel.as_deref().unwrap_or("");
75        let target = job.delivery.to.as_deref().unwrap_or("");
76        let delivery_error = e.to_string();
77
78        if job.delivery.best_effort {
79            ::zeroclaw_log::record!(
80                WARN,
81                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
82                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
83                    .with_attrs(::serde_json::json!({
84                        "job_id": job.id,
85                        "agent_alias": job.agent_alias,
86                        "channel": channel,
87                        "target": target,
88                        "error": delivery_error
89                    })),
90                context.failure_message(true)
91            );
92            if success {
93                status = "degraded".to_string();
94            }
95        } else {
96            success = false;
97            ::zeroclaw_log::record!(
98                WARN,
99                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
100                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
101                    .with_attrs(::serde_json::json!({
102                        "job_id": job.id,
103                        "agent_alias": job.agent_alias,
104                        "channel": channel,
105                        "target": target,
106                        "error": delivery_error
107                    })),
108                context.failure_message(false)
109            );
110            status = "error".to_string();
111        }
112
113        if output.trim().is_empty() {
114            output = format!("delivery failed: {delivery_error}");
115        } else {
116            output.push_str("\n\ndelivery failed: ");
117            output.push_str(&delivery_error);
118        }
119    }
120
121    CronDeliveryOutcome {
122        success,
123        status,
124        output,
125    }
126}
127
128pub async fn run(config: Config, event_tx: EventBroadcast) -> Result<()> {
129    let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS);
130    let mut interval = time::interval(Duration::from_secs(poll_secs));
131    interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
132
133    crate::health::mark_component_ok(SCHEDULER_COMPONENT);
134
135    // ── Declarative job sync: reconcile config-defined jobs with the DB.
136    let mut jobs_with_builtin = config.cron.clone();
137    if let Some(ref schedule_cron) = config.backup.schedule_cron {
138        let backup_job = CronJobDecl {
139            name: Some("Scheduled backup".to_string()),
140            job_type: "shell".to_string(),
141            schedule: CronScheduleDecl::Cron {
142                expr: schedule_cron.clone(),
143                tz: config.backup.schedule_timezone.clone(),
144            },
145            command: Some("backup create".to_string()),
146            prompt: None,
147            enabled: true,
148            model: None,
149            allowed_tools: None,
150            uses_memory: true,
151            session_target: None,
152            delivery: None,
153        };
154        ::zeroclaw_log::record!(
155            DEBUG,
156            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
157                .with_attrs(::serde_json::json!({"schedule": schedule_cron})),
158            "Synthesizing builtin backup cron job from config.backup.schedule_cron"
159        );
160        jobs_with_builtin.insert("__builtin_backup".to_string(), backup_job);
161    }
162
163    match sync_declarative_jobs(&config, &jobs_with_builtin) {
164        Ok(()) => {
165            if !jobs_with_builtin.is_empty() {
166                ::zeroclaw_log::record!(
167                    INFO,
168                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
169                        .with_attrs(::serde_json::json!({"count": jobs_with_builtin.len()})),
170                    "Synced declarative cron jobs from config"
171                );
172            }
173        }
174        Err(e) => ::zeroclaw_log::record!(
175            WARN,
176            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
177                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
178                .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
179            "Failed to sync declarative cron jobs"
180        ),
181    }
182
183    // ── Startup catch-up: run ALL overdue jobs before entering the
184    //    normal polling loop. The regular loop is capped by `max_tasks`,
185    //    which could leave some overdue jobs waiting across many cycles
186    //    if the machine was off for a while. The catch-up phase fetches
187    //    without the `max_tasks` limit so every missed job fires once.
188    //    Controlled by `[scheduler] catch_up_on_startup` (default: true).
189    if config.scheduler.catch_up_on_startup {
190        catch_up_overdue_jobs(&config, &event_tx).await;
191    } else {
192        ::zeroclaw_log::record!(
193            INFO,
194            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
195            "Scheduler startup: catch-up disabled by config"
196        );
197        skip_missed_jobs_on_startup(&config).await;
198    }
199
200    loop {
201        interval.tick().await;
202        // Keep scheduler liveness fresh even when there are no due jobs.
203        crate::health::mark_component_ok(SCHEDULER_COMPONENT);
204
205        let jobs = match due_jobs(&config, Utc::now()) {
206            Ok(jobs) => jobs,
207            Err(e) => {
208                crate::health::mark_component_error(SCHEDULER_COMPONENT, e.to_string());
209                ::zeroclaw_log::record!(
210                    WARN,
211                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
212                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
213                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
214                    "Scheduler query failed"
215                );
216                continue;
217            }
218        };
219
220        process_due_jobs(&config, jobs, SCHEDULER_COMPONENT, &event_tx).await;
221    }
222}
223
224/// Resolve which agent owns a given cron job. Lookup order:
225///
226/// 1. The row's persisted `agent_alias` field, when it names a
227///    configured agent.
228/// 2. Reverse-resolve via `[agents.<x>].cron_jobs` (declarative path:
229///    every alias that lists the cron alias claims ownership).
230///
231/// Returns `None` when neither resolves. Callers (process_due_jobs,
232/// execute_job_now) log and skip the job rather than crashing the
233/// scheduler loop.
234fn resolve_owning_agent<'a>(config: &'a Config, job: &CronJob) -> Option<&'a str> {
235    if !job.agent_alias.is_empty()
236        && let Some((alias, _)) = config
237            .agents
238            .iter()
239            .find(|(alias, _)| alias.as_str() == job.agent_alias)
240    {
241        return Some(alias.as_str());
242    }
243    config.agent_for_cron_job(&job.id)
244}
245
246/// Fetch **all** overdue jobs (ignoring `max_tasks`) and execute them.
247///
248/// Called once at scheduler startup so that jobs missed during downtime
249/// (e.g. late boot, daemon restart) are caught up immediately.
250async fn catch_up_overdue_jobs(config: &Config, event_tx: &EventBroadcast) {
251    let now = Utc::now();
252    let jobs = match all_overdue_jobs(config, now) {
253        Ok(jobs) => jobs,
254        Err(e) => {
255            ::zeroclaw_log::record!(
256                WARN,
257                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
258                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
259                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
260                "Startup catch-up query failed"
261            );
262            return;
263        }
264    };
265
266    if jobs.is_empty() {
267        ::zeroclaw_log::record!(
268            INFO,
269            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
270            "Scheduler startup: no overdue jobs to catch up"
271        );
272        return;
273    }
274
275    ::zeroclaw_log::record!(
276        INFO,
277        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
278            .with_attrs(::serde_json::json!({"count": jobs.len()})),
279        "Scheduler startup: catching up overdue jobs"
280    );
281
282    process_due_jobs(config, jobs, SCHEDULER_COMPONENT, event_tx).await;
283
284    ::zeroclaw_log::record!(
285        INFO,
286        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
287        "Scheduler startup: catch-up complete"
288    );
289}
290
291/// Advance `next_run` for all overdue jobs without executing them.
292///
293/// Called at scheduler startup when `catch_up_on_startup` is disabled so
294/// that the normal polling loop (which selects `next_run <= now`) doesn't
295/// pick up jobs that became overdue during daemon downtime.
296///
297/// - Recurring jobs: `next_run` is advanced to the next future occurrence.
298/// - One-shot `At` jobs: disabled with a `skipped` last status.
299async fn skip_missed_jobs_on_startup(config: &Config) {
300    let now = Utc::now();
301    let jobs = match all_overdue_jobs(config, now) {
302        Ok(jobs) => jobs,
303        Err(e) => {
304            ::zeroclaw_log::record!(
305                WARN,
306                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
307                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
308                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
309                "Scheduler startup skip: query failed",
310            );
311            return;
312        }
313    };
314
315    if jobs.is_empty() {
316        ::zeroclaw_log::record!(
317            INFO,
318            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
319            "Scheduler startup skip: no overdue jobs to advance",
320        );
321        return;
322    }
323
324    let mut skipped_recurring: u64 = 0;
325    let mut skipped_oneshot: u64 = 0;
326
327    for job in &jobs {
328        let is_oneshot = matches!(job.schedule, Schedule::At { .. });
329        match skip_missed_run(config, job, now) {
330            Ok(()) => {
331                if is_oneshot {
332                    skipped_oneshot += 1;
333                } else {
334                    skipped_recurring += 1;
335                }
336            }
337            Err(e) => {
338                ::zeroclaw_log::record!(
339                    WARN,
340                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
341                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
342                        .with_attrs(::serde_json::json!({
343                            "job_id": job.id,
344                            "error": format!("{}", e),
345                        })),
346                    "Scheduler startup skip: failed to advance job",
347                );
348            }
349        }
350    }
351
352    ::zeroclaw_log::record!(
353        INFO,
354        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(
355            ::serde_json::json!({
356                "total": jobs.len(),
357                "skipped_recurring": skipped_recurring,
358                "skipped_oneshot": skipped_oneshot,
359            })
360        ),
361        "Scheduler startup skip: advanced overdue jobs without executing",
362    );
363}
364
365pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) {
366    use zeroclaw_log::Instrument;
367    let Some(agent_alias) = resolve_owning_agent(config, job) else {
368        return (
369            false,
370            format!(
371                "cron job {id:?} has no owning agent; add the alias to an [agents.<x>].cron_jobs list",
372                id = job.id
373            ),
374        );
375    };
376    let agent_alias = agent_alias.to_string();
377    let security = match SecurityPolicy::for_agent(config, &agent_alias) {
378        Ok(s) => s,
379        Err(e) => return (false, format!("agent {agent_alias} risk profile: {e}")),
380    };
381    let span = zeroclaw_log::attribution_span!(job);
382    Box::pin(execute_job_with_retry(config, &security, &agent_alias, job))
383        .instrument(span)
384        .await
385}
386
387fn cron_agent_run_security_policy(base: &SecurityPolicy, job: &CronJob) -> SecurityPolicy {
388    let mut policy = base.clone();
389    if !matches!(job.job_type, JobType::Agent) || job.allowed_tools.is_some() {
390        return policy;
391    }
392
393    let excluded = policy.excluded_tools.get_or_insert_with(Vec::new);
394    for tool in CRON_AGENT_DEFAULT_EXCLUDED_TOOLS {
395        if !excluded.iter().any(|existing| existing == tool) {
396            excluded.push((*tool).to_string());
397        }
398    }
399    policy
400}
401
402async fn execute_job_with_retry(
403    config: &Config,
404    security: &SecurityPolicy,
405    agent_alias: &str,
406    job: &CronJob,
407) -> (bool, String) {
408    let mut last_output = String::new();
409    let retries = config.reliability.scheduler_retries;
410    let mut backoff_ms = config.reliability.provider_backoff_ms.max(200);
411
412    for attempt in 0..=retries {
413        let (success, output) = match job.job_type {
414            JobType::Shell => run_job_command(config, security, job).await,
415            JobType::Agent => Box::pin(run_agent_job(config, security, agent_alias, job)).await,
416        };
417        last_output = output;
418
419        if success {
420            return (true, last_output);
421        }
422
423        if last_output.starts_with("blocked by security policy:") {
424            // Deterministic policy violations are not retryable.
425            return (false, last_output);
426        }
427
428        if attempt < retries {
429            let jitter_ms = u64::from(Utc::now().timestamp_subsec_millis() % 250);
430            time::sleep(Duration::from_millis(backoff_ms + jitter_ms)).await;
431            backoff_ms = (backoff_ms.saturating_mul(2)).min(30_000);
432        }
433    }
434
435    (false, last_output)
436}
437
438async fn process_due_jobs(
439    config: &Config,
440    jobs: Vec<CronJob>,
441    component: &str,
442    event_tx: &EventBroadcast,
443) {
444    // Refresh scheduler health on every successful poll cycle, including idle cycles.
445    crate::health::mark_component_ok(component);
446
447    let max_concurrent = config.scheduler.max_concurrent.max(1);
448    let mut in_flight = stream::iter(jobs.into_iter().filter_map(|job| {
449        // Resolve owning agent per-job. Skip orphans with a warning so a
450        // mis-configured job can't take down the scheduler loop.
451        let Some(agent_alias) = resolve_owning_agent(config, &job) else {
452            ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"job_id": job.id})), "Cron job has no owning agent; add the alias to an [agents.<x>].cron_jobs list");
453            return None;
454        };
455        let agent_alias = agent_alias.to_owned();
456        let security = match SecurityPolicy::for_agent(config, &agent_alias) {
457            Ok(s) => Arc::new(s),
458            Err(e) => {
459                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"job_id": job.id, "agent": agent_alias, "error": format!("{}", e)})), "Cron job: failed to build SecurityPolicy for owning agent");
460                return None;
461            }
462        };
463        let config = config.clone();
464        let component = component.to_owned();
465        Some(async move {
466            Box::pin(execute_and_persist_job(
467                &config,
468                security.as_ref(),
469                &agent_alias,
470                &job,
471                &component,
472            ))
473            .await
474        })
475    }))
476    .buffer_unordered(max_concurrent);
477
478    while let Some((job_id, success, output)) = in_flight.next().await {
479        if !success {
480            ::zeroclaw_log::record!(
481                WARN,
482                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
483                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
484                    .with_attrs(::serde_json::json!({"job_id": job_id, "output": output})),
485                "Scheduler job '' failed: "
486            );
487        }
488        // Broadcast cron result to dashboard/SSE clients.
489        if let Some(tx) = event_tx {
490            let _ = tx.send(serde_json::json!({
491                "type": "cron_result",
492                "job_id": job_id,
493                "success": success,
494                "output": output,
495                "timestamp": chrono::Utc::now().to_rfc3339(),
496            }));
497        }
498    }
499}
500
501async fn execute_and_persist_job(
502    config: &Config,
503    security: &SecurityPolicy,
504    agent_alias: &str,
505    job: &CronJob,
506    component: &str,
507) -> (String, bool, String) {
508    crate::health::mark_component_ok(component);
509    warn_if_high_frequency_agent_job(job);
510
511    let started_at = Utc::now();
512    let span = zeroclaw_log::attribution_span!(job);
513    let (success, output) = Box::pin(execute_job_with_retry(config, security, agent_alias, job))
514        .instrument(span)
515        .await;
516    let finished_at = Utc::now();
517    let success = Box::pin(persist_job_result(
518        config,
519        job,
520        success,
521        &output,
522        started_at,
523        finished_at,
524    ))
525    .await;
526
527    (job.id.clone(), success, output)
528}
529
530async fn run_agent_job(
531    config: &Config,
532    security: &SecurityPolicy,
533    agent_alias: &str,
534    job: &CronJob,
535) -> (bool, String) {
536    // Cron is one of two SubAgent spawn sites; the other is the
537    // agent-loop `spawn_subagent` tool. Both funnel through
538    // `SubAgentSpawn::for_agent` so permission inheritance, tracing
539    // span shape, and audit attribution stay uniform across spawn
540    // sites.
541    let subagent_ctx = match crate::subagent::SubAgentSpawn::for_agent(config, agent_alias)
542        .and_then(|spawn| spawn.build(crate::subagent::SubAgentOverrides::default()))
543    {
544        Ok(ctx) => ctx,
545        Err(e) => return (false, format!("subagent spawn failed: {e:#}")),
546    };
547
548    if !security.can_act() {
549        return (
550            false,
551            "blocked by security policy: autonomy is read-only".to_string(),
552        );
553    }
554
555    if security.is_rate_limited() {
556        return (
557            false,
558            "blocked by security policy: rate limit exceeded".to_string(),
559        );
560    }
561
562    if !security.record_action() {
563        return (
564            false,
565            "blocked by security policy: action budget exhausted".to_string(),
566        );
567    }
568    let name = job.name.clone().unwrap_or_else(|| "cron-job".to_string());
569    let prompt = job.prompt.clone().unwrap_or_default();
570
571    // Recall relevant memories so cron jobs have context awareness.
572    // Skipped when `job.uses_memory` is false (e.g. stateless digest jobs).
573    // Exclude `Conversation` memories to prevent chat context from
574    // leaking into scheduled executions. Routes through
575    // the cron-owning agent's per-agent memory wrapper so the
576    // recall is scoped to that agent's bound + allowlisted rows.
577    let memory_context = if !job.uses_memory {
578        String::new()
579    } else {
580        match zeroclaw_memory::create_memory_for_agent(
581            config,
582            agent_alias,
583            config
584                .model_provider_for_agent(agent_alias)
585                .and_then(|e| e.api_key.as_deref()),
586        )
587        .await
588        {
589            Ok(mem) => match mem.recall(&prompt, 5, None, None, None).await {
590                Ok(entries) if !entries.is_empty() => {
591                    let ctx: String = entries
592                        .iter()
593                        .filter(|e| {
594                            !matches!(
595                                e.category,
596                                zeroclaw_memory::traits::MemoryCategory::Conversation
597                            )
598                        })
599                        .map(|e| format!("- {}: {}", e.key, e.content))
600                        .collect::<Vec<_>>()
601                        .join("\n");
602                    if ctx.is_empty() {
603                        String::new()
604                    } else {
605                        format!("{MEMORY_CONTEXT_OPEN}\n{ctx}\n{MEMORY_CONTEXT_CLOSE}\n\n")
606                    }
607                }
608                _ => String::new(),
609            },
610            Err(_) => String::new(),
611        }
612    };
613
614    let prefixed_prompt = format!("{memory_context}[cron:{} {name}] {prompt}", job.id);
615    let model_override = job.model.clone();
616
617    let mut cron_config = config.clone();
618    cron_config.memory.auto_save = false;
619
620    // Assign a unique session ID so memories written during this run can be
621    // purged atomically if the run fails (prevents snowball accumulation).
622    // Doubles as the SubAgent run_id in the tracing span so a failed
623    // memory purge can be correlated with its sub-run.
624    let run_session_id = uuid::Uuid::new_v4().to_string();
625    let session_path = std::path::PathBuf::from(format!("cron-{run_session_id}"));
626
627    let subagent_span = zeroclaw_log::info_span!(
628        "subagent",
629        category = "cron",
630        agent_alias = %agent_alias,
631        cron_job_id = %job.id,
632        run_id = %run_session_id,
633        spawn_site = "cron",
634    );
635
636    // Pass the validated SubAgent context as run-time overrides so the
637    // policy that came back from `SubAgentSpawn::build` reaches the
638    // agent loop. Without this the loop reconstructs from config and
639    // any future caller-supplied narrowing override would silently
640    // collapse back to the parent's verbatim policy.
641    //
642    // `is_subagent: false` is explicit (not `..Default::default()`) so
643    // a future refactor that flips the default can't quietly promote
644    // every cron-launched agent to a depth-1 subagent — they're
645    // top-level runs by design, despite riding through SubAgentSpawn.
646    let run_security = cron_agent_run_security_policy(subagent_ctx.policy.as_ref(), job);
647    let run_overrides = crate::agent::loop_::AgentRunOverrides {
648        security: Some(Arc::new(run_security)),
649        memory: None,
650        is_subagent: false,
651    };
652    let run_result = match job.session_target {
653        SessionTarget::Main | SessionTarget::Isolated => {
654            Box::pin(
655                crate::agent::run(
656                    cron_config,
657                    agent_alias,
658                    Some(prefixed_prompt),
659                    None,
660                    model_override,
661                    config
662                        .model_provider_for_agent(agent_alias)
663                        .and_then(|e| e.temperature),
664                    vec![],
665                    false,
666                    Some(session_path.clone()),
667                    job.allowed_tools.clone(),
668                    run_overrides,
669                )
670                .instrument(subagent_span),
671            )
672            .await
673        }
674    };
675
676    match run_result {
677        Ok(response) => (
678            true,
679            if response.trim().is_empty() {
680                "agent job executed".to_string()
681            } else {
682                response
683            },
684        ),
685        Err(e) => {
686            // Purge memories written during this failed run so they don't
687            // pollute future recall and cause context snowball. Routes
688            // through the cron-owning agent's per-agent memory wrapper
689            // so the purge stays scoped to the agent that wrote them.
690            // Sanitize the session key so it matches what the runtime
691            // writes via the orchestrator session-key sanitizer.
692            let mem_session_key = zeroclaw_api::session_keys::sanitize_session_key(&format!(
693                "cli:{}",
694                session_path.display()
695            ));
696            if let Ok(mem) = zeroclaw_memory::create_memory_for_agent(
697                config,
698                agent_alias,
699                config
700                    .model_provider_for_agent(agent_alias)
701                    .and_then(|e| e.api_key.as_deref()),
702            )
703            .await
704            {
705                let _ = mem.purge_session(&mem_session_key).await;
706            }
707            (false, format!("agent job failed: {e}"))
708        }
709    }
710}
711
712async fn persist_job_result(
713    config: &Config,
714    job: &CronJob,
715    success: bool,
716    output: &str,
717    started_at: DateTime<Utc>,
718    finished_at: DateTime<Utc>,
719) -> bool {
720    let duration_ms = (finished_at - started_at).num_milliseconds();
721    let outcome = deliver_and_classify_run_result(
722        config,
723        job,
724        success,
725        output.to_string(),
726        CronDeliveryContext::Scheduled,
727    )
728    .await;
729
730    let action = if is_one_shot_auto_delete(job) && outcome.success {
731        RunCompletionAction::Delete
732    } else if matches!(job.schedule, Schedule::At { .. }) {
733        RunCompletionAction::Disable
734    } else {
735        RunCompletionAction::Reschedule
736    };
737
738    let job_state_at = Utc::now();
739    if let Err(e) = persist_run_result(
740        config,
741        job,
742        started_at,
743        finished_at,
744        job_state_at,
745        &outcome.status,
746        Some(&outcome.output),
747        duration_ms,
748        action,
749    ) {
750        ::zeroclaw_log::record!(
751            WARN,
752            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
753                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
754                .with_attrs(::serde_json::json!({"e": e.to_string()})),
755            "Failed to persist scheduler run result: "
756        );
757
758        if action == RunCompletionAction::Delete {
759            // Best-effort fallback for the legacy behavior: a successful
760            // auto-delete one-shot should not be picked up again if the
761            // combined history+state transaction fails while inserting or
762            // pruning the run row.
763            if let Err(disable_err) = persist_run_completion_state(
764                config,
765                job,
766                job_state_at,
767                &outcome.status,
768                Some(&outcome.output),
769                RunCompletionAction::Disable,
770            ) {
771                ::zeroclaw_log::record!(
772                    WARN,
773                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
774                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
775                        .with_attrs(::serde_json::json!({"disable_err": disable_err.to_string()})),
776                    "Failed to disable one-shot cron job after history persistence failure: "
777                );
778            }
779        } else {
780            // For recurring jobs and non-delete one-shots, keep the scheduler
781            // moving even if run-history persistence fails.
782            if let Err(state_err) = persist_run_completion_state(
783                config,
784                job,
785                job_state_at,
786                &outcome.status,
787                Some(&outcome.output),
788                action,
789            ) {
790                ::zeroclaw_log::record!(
791                    WARN,
792                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
793                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
794                        .with_attrs(::serde_json::json!({"state_err": state_err.to_string()})),
795                    "Failed to update cron job state after history persistence failure: "
796                );
797            }
798        }
799    }
800
801    outcome.success
802}
803
804fn is_one_shot_auto_delete(job: &CronJob) -> bool {
805    job.delete_after_run && matches!(job.schedule, Schedule::At { .. })
806}
807
808fn is_high_frequency_agent_job(job: &CronJob) -> bool {
809    if !matches!(job.job_type, JobType::Agent) {
810        return false;
811    }
812    match &job.schedule {
813        Schedule::Every { every_ms } => *every_ms < 5 * 60 * 1000,
814        Schedule::Cron { .. } => {
815            let now = Utc::now();
816            next_run_for_schedule(&job.schedule, now)
817                .and_then(|a| next_run_for_schedule(&job.schedule, a).map(|b| (a, b)))
818                .map(|(a, b)| (b - a).num_minutes() < 5)
819                .unwrap_or(false)
820        }
821        Schedule::At { .. } => false,
822    }
823}
824
825fn warn_if_high_frequency_agent_job(job: &CronJob) {
826    if is_high_frequency_agent_job(job) {
827        ::zeroclaw_log::record!(
828            WARN,
829            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
830                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
831            &format!(
832                "Cron agent job '{}' is scheduled more frequently than every 5 minutes",
833                job.id
834            )
835        );
836    }
837}
838
839async fn deliver_if_configured(config: &Config, job: &CronJob, output: &str) -> Result<()> {
840    let delivery: &DeliveryConfig = &job.delivery;
841    if !delivery.mode.eq_ignore_ascii_case("announce") {
842        return Ok(());
843    }
844
845    let channel = delivery.channel.as_deref().ok_or_else(|| {
846        ::zeroclaw_log::record!(
847            WARN,
848            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
849                .with_outcome(::zeroclaw_log::EventOutcome::Failure)
850                .with_attrs(::serde_json::json!({"field": "channel"})),
851            "cron delivery announce refused: required field missing"
852        );
853        anyhow::Error::msg("delivery.channel is required for announce mode")
854    })?;
855    let target = delivery.to.as_deref().ok_or_else(|| {
856        ::zeroclaw_log::record!(
857            WARN,
858            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
859                .with_outcome(::zeroclaw_log::EventOutcome::Failure)
860                .with_attrs(::serde_json::json!({"field": "to"})),
861            "cron delivery announce refused: required field missing"
862        );
863        anyhow::Error::msg("delivery.to is required for announce mode")
864    })?;
865
866    deliver_announcement(
867        config,
868        channel,
869        target,
870        delivery.thread_id.as_deref(),
871        output,
872    )
873    .await
874}
875
876/// Delivery function type — takes owned values so the returned future is 'static.
877/// The fourth `Option<String>` is the optional thread/conversation id propagated
878/// to channels whose outbound `thread_id` is distinct from the recipient (webhook).
879pub type DeliveryFn = Box<
880    dyn Fn(
881            Config,
882            String,
883            String,
884            Option<String>,
885            String,
886        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
887        + Send
888        + Sync,
889>;
890
891/// Global delivery function, injected by the binary crate at startup.
892static DELIVERY_FN: std::sync::OnceLock<DeliveryFn> = std::sync::OnceLock::new();
893
894/// Register the channel delivery function. Called once at startup by the binary.
895pub fn register_delivery_fn(f: DeliveryFn) {
896    let _ = DELIVERY_FN.set(f);
897}
898
899pub async fn deliver_announcement(
900    config: &Config,
901    channel: &str,
902    target: &str,
903    thread_id: Option<&str>,
904    output: &str,
905) -> Result<()> {
906    if let Some(f) = DELIVERY_FN.get() {
907        f(
908            config.clone(),
909            channel.to_string(),
910            target.to_string(),
911            thread_id.map(str::to_string),
912            output.to_string(),
913        )
914        .await
915    } else {
916        // No handler registered: this is a runtime-level state (the binary
917        // hasn't called `register_delivery_fn`), not a per-job failure.
918        // Returning `Err` here would force every announce-mode job to set
919        // `best_effort=true` just to survive a system that legitimately has
920        // no delivery wired (e.g. headless test runs, gateway-only deployments
921        // where channel orchestration lives elsewhere).
922        //
923        // We log loudly via `tracing::warn` so operators see the dropped
924        // delivery in their logs, then return `Ok(())` so `persist_job_result`
925        // records the job execution itself as successful. Operators that
926        // actively rely on delivery wire a handler at startup; absence is a
927        // configuration signal, not a delivery error.
928        ::zeroclaw_log::record!(
929            WARN,
930            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
931                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
932                .with_attrs(::serde_json::json!({"channel": channel, "target": target})),
933            "Cron delivery skipped: no delivery handler registered \
934             (register_delivery_fn was not called by the binary)"
935        );
936        Ok(())
937    }
938}
939
940async fn run_job_command(
941    config: &Config,
942    security: &SecurityPolicy,
943    job: &CronJob,
944) -> (bool, String) {
945    run_job_command_with_timeout(
946        config,
947        security,
948        job,
949        Duration::from_secs(SHELL_JOB_TIMEOUT_SECS),
950    )
951    .await
952}
953
954async fn run_job_command_with_timeout(
955    config: &Config,
956    security: &SecurityPolicy,
957    job: &CronJob,
958    timeout: Duration,
959) -> (bool, String) {
960    if !security.can_act() {
961        return (
962            false,
963            "blocked by security policy: autonomy is read-only".to_string(),
964        );
965    }
966
967    if security.is_rate_limited() {
968        return (
969            false,
970            "blocked by security policy: rate limit exceeded".to_string(),
971        );
972    }
973
974    // Unified command validation: allowlist + risk + path checks in one call.
975    // Jobs created via the validated helpers were already checked at creation
976    // time, but we re-validate at execution time to catch policy changes and
977    // manually-edited job stores.
978    let approved = false; // scheduler runs are never pre-approved
979    if let Err(error) =
980        crate::cron::validate_shell_command_with_security(security, &job.command, approved)
981    {
982        return (false, error.to_string());
983    }
984
985    if let Some(path) = security.forbidden_path_argument(&job.command) {
986        return (
987            false,
988            format!("blocked by security policy: forbidden path argument: {path}"),
989        );
990    }
991
992    if !security.record_action() {
993        return (
994            false,
995            "blocked by security policy: action budget exhausted".to_string(),
996        );
997    }
998
999    let child = match build_cron_shell_command(&job.command, &config.data_dir) {
1000        Ok(mut cmd) => match cmd.spawn() {
1001            Ok(child) => child,
1002            Err(e) => return (false, format!("spawn error: {e}")),
1003        },
1004        Err(e) => return (false, format!("shell setup error: {e}")),
1005    };
1006
1007    match time::timeout(timeout, child.wait_with_output()).await {
1008        Ok(Ok(output)) => {
1009            let stdout = String::from_utf8_lossy(&output.stdout);
1010            let stderr = String::from_utf8_lossy(&output.stderr);
1011            let combined = format!(
1012                "status={}\nstdout:\n{}\nstderr:\n{}",
1013                output.status,
1014                stdout.trim(),
1015                stderr.trim()
1016            );
1017            (output.status.success(), combined)
1018        }
1019        Ok(Err(e)) => (false, format!("spawn error: {e}")),
1020        Err(_) => (
1021            false,
1022            format!("job timed out after {}s", timeout.as_secs_f64()),
1023        ),
1024    }
1025}
1026
1027/// Build a shell `Command` for cron job execution.
1028///
1029/// Uses `sh -c <command>` (non-login shell). On Windows, ZeroClaw users
1030/// typically have Git Bash installed which provides `sh` in PATH, and
1031/// cron commands are written with Unix shell syntax. The previous `-lc`
1032/// (login shell) flag was dropped: login shells load the full user
1033/// profile on every invocation which is slow and may cause side effects.
1034///
1035/// The command is configured with:
1036/// - `current_dir` set to the workspace
1037/// - `stdin` piped to `/dev/null` (no interactive input)
1038/// - `stdout` and `stderr` piped for capture
1039/// - `kill_on_drop(true)` for safe timeout handling
1040fn build_cron_shell_command(
1041    command: &str,
1042    workspace_dir: &std::path::Path,
1043) -> anyhow::Result<Command> {
1044    let mut cmd = Command::new("sh");
1045    cmd.arg("-c")
1046        .arg(command)
1047        .current_dir(workspace_dir)
1048        .stdin(Stdio::null())
1049        .stdout(Stdio::piped())
1050        .stderr(Stdio::piped())
1051        .kill_on_drop(true);
1052
1053    Ok(cmd)
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058    use super::*;
1059    use crate::cron::{self, DeliveryConfig};
1060    use crate::security::SecurityPolicy;
1061    use chrono::{Duration as ChronoDuration, Utc};
1062    use tempfile::TempDir;
1063    use zeroclaw_config::schema::Config;
1064
1065    const TEST_AGENT: &str = "test-agent";
1066
1067    async fn test_config(tmp: &TempDir) -> Config {
1068        let mut config = Config {
1069            data_dir: tmp.path().join("data"),
1070            config_path: tmp.path().join("config.toml"),
1071            ..Config::default()
1072        };
1073        config.risk_profiles.insert(
1074            TEST_AGENT.to_string(),
1075            zeroclaw_config::schema::RiskProfileConfig::default(),
1076        );
1077        config.runtime_profiles.insert(
1078            TEST_AGENT.to_string(),
1079            zeroclaw_config::schema::RuntimeProfileConfig::default(),
1080        );
1081        config.providers.models.openrouter.insert(
1082            TEST_AGENT.to_string(),
1083            zeroclaw_config::schema::OpenRouterModelProviderConfig::default(),
1084        );
1085        config.agents.insert(
1086            TEST_AGENT.to_string(),
1087            zeroclaw_config::schema::AliasedAgentConfig {
1088                model_provider: format!("openrouter.{TEST_AGENT}").into(),
1089                risk_profile: TEST_AGENT.to_string(),
1090                runtime_profile: TEST_AGENT.to_string(),
1091                ..Default::default()
1092            },
1093        );
1094        tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
1095        config
1096    }
1097
1098    fn test_security(config: &Config) -> SecurityPolicy {
1099        SecurityPolicy::for_agent(config, TEST_AGENT).expect("test-agent has resolvable profiles")
1100    }
1101
1102    fn test_job(command: &str) -> CronJob {
1103        CronJob {
1104            id: "test-job".into(),
1105            expression: "* * * * *".into(),
1106            schedule: crate::cron::Schedule::Cron {
1107                expr: "* * * * *".into(),
1108                tz: None,
1109            },
1110            command: command.into(),
1111            prompt: None,
1112            name: None,
1113            job_type: JobType::Shell,
1114            session_target: SessionTarget::Isolated,
1115            model: None,
1116            agent_alias: TEST_AGENT.into(),
1117            enabled: true,
1118            delivery: DeliveryConfig::default(),
1119            delete_after_run: false,
1120            allowed_tools: None,
1121            uses_memory: true,
1122            source: "imperative".into(),
1123            created_at: Utc::now(),
1124            next_run: Utc::now(),
1125            last_run: None,
1126            last_status: None,
1127            last_output: None,
1128        }
1129    }
1130
1131    fn unique_component(prefix: &str) -> String {
1132        format!("{prefix}-{}", uuid::Uuid::new_v4())
1133    }
1134
1135    fn agent_job_with_schedule(schedule: crate::cron::Schedule) -> CronJob {
1136        CronJob {
1137            job_type: JobType::Agent,
1138            schedule,
1139            ..test_job("echo test")
1140        }
1141    }
1142
1143    #[test]
1144    fn high_frequency_daily_cron_is_not_flagged() {
1145        // `0 6 * * *` fires once per day — must never warn regardless of when the check runs
1146        let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1147            expr: "0 6 * * *".into(),
1148            tz: Some("America/Chicago".into()),
1149        });
1150        assert!(!is_high_frequency_agent_job(&job));
1151    }
1152
1153    #[test]
1154    fn high_frequency_every_4min_cron_is_flagged() {
1155        let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1156            expr: "*/4 * * * *".into(),
1157            tz: None,
1158        });
1159        assert!(is_high_frequency_agent_job(&job));
1160    }
1161
1162    #[test]
1163    fn high_frequency_every_5min_cron_is_not_flagged() {
1164        // Exactly 5 minutes is acceptable (threshold is strictly less than 5)
1165        let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1166            expr: "*/5 * * * *".into(),
1167            tz: None,
1168        });
1169        assert!(!is_high_frequency_agent_job(&job));
1170    }
1171
1172    #[test]
1173    fn high_frequency_every_interval_below_threshold_is_flagged() {
1174        let job = agent_job_with_schedule(crate::cron::Schedule::Every {
1175            every_ms: 4 * 60 * 1000, // 4 minutes
1176        });
1177        assert!(is_high_frequency_agent_job(&job));
1178    }
1179
1180    #[test]
1181    fn high_frequency_every_interval_at_threshold_is_not_flagged() {
1182        let job = agent_job_with_schedule(crate::cron::Schedule::Every {
1183            every_ms: 5 * 60 * 1000, // exactly 5 minutes
1184        });
1185        assert!(!is_high_frequency_agent_job(&job));
1186    }
1187
1188    #[test]
1189    fn high_frequency_shell_job_is_never_flagged() {
1190        // Shell jobs are exempt regardless of frequency
1191        let job = CronJob {
1192            job_type: JobType::Shell,
1193            schedule: crate::cron::Schedule::Every {
1194                every_ms: 60 * 1000, // 1 minute
1195            },
1196            ..test_job("echo test")
1197        };
1198        assert!(!is_high_frequency_agent_job(&job));
1199    }
1200
1201    #[test]
1202    fn cron_agent_run_security_policy_excludes_scheduler_mutation_tools_by_default() {
1203        let security = SecurityPolicy::default();
1204        let mut job = test_job("");
1205        job.job_type = JobType::Agent;
1206        job.allowed_tools = None;
1207
1208        let policy = cron_agent_run_security_policy(&security, &job);
1209
1210        for tool in [
1211            "cron_add",
1212            "cron_update",
1213            "cron_remove",
1214            "cron_run",
1215            "schedule",
1216        ] {
1217            assert!(
1218                !policy.is_tool_allowed(tool),
1219                "{tool} must be excluded from default cron agent runs"
1220            );
1221        }
1222        assert!(
1223            policy.is_tool_allowed("http_request"),
1224            "non-scheduler tools remain available when the base policy is unrestricted"
1225        );
1226    }
1227
1228    #[test]
1229    fn cron_agent_run_security_policy_respects_explicit_allowed_tools() {
1230        let security = SecurityPolicy::default();
1231        let mut job = test_job("");
1232        job.job_type = JobType::Agent;
1233        job.allowed_tools = Some(vec!["cron_add".into()]);
1234
1235        let policy = cron_agent_run_security_policy(&security, &job);
1236
1237        assert!(
1238            policy.is_tool_allowed("cron_add"),
1239            "explicit cron job allowed_tools should remain the override for intentional scheduler automation"
1240        );
1241    }
1242
1243    #[tokio::test]
1244    async fn run_job_command_success() {
1245        let tmp = TempDir::new().unwrap();
1246        let config = test_config(&tmp).await;
1247        let job = test_job("echo scheduler-ok");
1248        let security = test_security(&config);
1249
1250        let (success, output) = run_job_command(&config, &security, &job).await;
1251        assert!(success);
1252        assert!(output.contains("scheduler-ok"));
1253        assert!(output.contains("status=exit status: 0"));
1254    }
1255
1256    #[tokio::test]
1257    async fn run_job_command_failure() {
1258        let tmp = TempDir::new().unwrap();
1259        let config = test_config(&tmp).await;
1260        let job = test_job("ls definitely_missing_file_for_scheduler_test");
1261        let security = test_security(&config);
1262
1263        let (success, output) = run_job_command(&config, &security, &job).await;
1264        assert!(!success);
1265        assert!(output.contains("definitely_missing_file_for_scheduler_test"));
1266        assert!(output.contains("status=exit status:"));
1267    }
1268
1269    #[tokio::test]
1270    async fn run_job_command_times_out() {
1271        let tmp = TempDir::new().unwrap();
1272        let mut config = test_config(&tmp).await;
1273        config
1274            .risk_profiles
1275            .entry(TEST_AGENT.into())
1276            .or_default()
1277            .allowed_commands = vec!["sleep".into()];
1278        let job = test_job("sleep 1");
1279        let security = test_security(&config);
1280
1281        let (success, output) =
1282            run_job_command_with_timeout(&config, &security, &job, Duration::from_millis(50)).await;
1283        assert!(!success);
1284        assert!(output.contains("job timed out after"));
1285    }
1286
1287    #[tokio::test]
1288    async fn run_job_command_blocks_disallowed_command() {
1289        let tmp = TempDir::new().unwrap();
1290        let mut config = test_config(&tmp).await;
1291        config
1292            .risk_profiles
1293            .entry(TEST_AGENT.into())
1294            .or_default()
1295            .allowed_commands = vec!["echo".into()];
1296        let job = test_job("curl https://evil.example");
1297        let security = test_security(&config);
1298
1299        let (success, output) = run_job_command(&config, &security, &job).await;
1300        assert!(!success);
1301        assert!(output.contains("blocked by security policy"));
1302        assert!(output.to_lowercase().contains("not allowed"));
1303    }
1304
1305    #[tokio::test]
1306    async fn run_job_command_blocks_forbidden_path_argument() {
1307        let tmp = TempDir::new().unwrap();
1308        let mut config = test_config(&tmp).await;
1309        config
1310            .risk_profiles
1311            .entry(TEST_AGENT.into())
1312            .or_default()
1313            .allowed_commands = vec!["cat".into()];
1314        let job = test_job("cat /etc/passwd");
1315        let security = test_security(&config);
1316
1317        let (success, output) = run_job_command(&config, &security, &job).await;
1318        assert!(!success);
1319        assert!(output.contains("blocked by security policy"));
1320        assert!(output.contains("forbidden path argument"));
1321        assert!(output.contains("/etc/passwd"));
1322    }
1323
1324    #[tokio::test]
1325    async fn run_job_command_blocks_forbidden_option_assignment_path_argument() {
1326        let tmp = TempDir::new().unwrap();
1327        let mut config = test_config(&tmp).await;
1328        config
1329            .risk_profiles
1330            .entry(TEST_AGENT.into())
1331            .or_default()
1332            .allowed_commands = vec!["grep".into()];
1333        let job = test_job("grep --file=/etc/passwd root ./src");
1334        let security = test_security(&config);
1335
1336        let (success, output) = run_job_command(&config, &security, &job).await;
1337        assert!(!success);
1338        assert!(output.contains("blocked by security policy"));
1339        assert!(output.contains("forbidden path argument"));
1340        assert!(output.contains("/etc/passwd"));
1341    }
1342
1343    #[tokio::test]
1344    async fn run_job_command_blocks_forbidden_short_option_attached_path_argument() {
1345        let tmp = TempDir::new().unwrap();
1346        let mut config = test_config(&tmp).await;
1347        config
1348            .risk_profiles
1349            .entry(TEST_AGENT.into())
1350            .or_default()
1351            .allowed_commands = vec!["grep".into()];
1352        let job = test_job("grep -f/etc/passwd root ./src");
1353        let security = test_security(&config);
1354
1355        let (success, output) = run_job_command(&config, &security, &job).await;
1356        assert!(!success);
1357        assert!(output.contains("blocked by security policy"));
1358        assert!(output.contains("forbidden path argument"));
1359        assert!(output.contains("/etc/passwd"));
1360    }
1361
1362    #[tokio::test]
1363    async fn run_job_command_blocks_tilde_user_path_argument() {
1364        let tmp = TempDir::new().unwrap();
1365        let mut config = test_config(&tmp).await;
1366        config
1367            .risk_profiles
1368            .entry(TEST_AGENT.into())
1369            .or_default()
1370            .allowed_commands = vec!["cat".into()];
1371        let job = test_job("cat ~root/.ssh/id_rsa");
1372        let security = test_security(&config);
1373
1374        let (success, output) = run_job_command(&config, &security, &job).await;
1375        assert!(!success);
1376        assert!(output.contains("blocked by security policy"));
1377        assert!(output.contains("forbidden path argument"));
1378        assert!(output.contains("~root/.ssh/id_rsa"));
1379    }
1380
1381    #[tokio::test]
1382    async fn run_job_command_blocks_input_redirection_path_bypass() {
1383        let tmp = TempDir::new().unwrap();
1384        let mut config = test_config(&tmp).await;
1385        config
1386            .risk_profiles
1387            .entry(TEST_AGENT.into())
1388            .or_default()
1389            .allowed_commands = vec!["cat".into()];
1390        let job = test_job("cat </etc/passwd");
1391        let security = test_security(&config);
1392
1393        let (success, output) = run_job_command(&config, &security, &job).await;
1394        assert!(!success);
1395        assert!(output.contains("blocked by security policy"));
1396        assert!(output.to_lowercase().contains("not allowed"));
1397    }
1398
1399    #[tokio::test]
1400    async fn run_job_command_blocks_readonly_mode() {
1401        let tmp = TempDir::new().unwrap();
1402        let mut config = test_config(&tmp).await;
1403        config
1404            .risk_profiles
1405            .entry(TEST_AGENT.into())
1406            .or_default()
1407            .level = crate::security::AutonomyLevel::ReadOnly;
1408        let job = test_job("echo should-not-run");
1409        let security = test_security(&config);
1410
1411        let (success, output) = run_job_command(&config, &security, &job).await;
1412        assert!(!success);
1413        assert!(output.contains("blocked by security policy"));
1414        assert!(output.contains("read-only"));
1415    }
1416
1417    #[tokio::test]
1418    async fn run_job_command_blocks_rate_limited() {
1419        let tmp = TempDir::new().unwrap();
1420        let mut config = test_config(&tmp).await;
1421        config
1422            .runtime_profiles
1423            .entry(TEST_AGENT.into())
1424            .or_default()
1425            .max_actions_per_hour = 0;
1426        let job = test_job("echo should-not-run");
1427        let security = test_security(&config);
1428
1429        let (success, output) = run_job_command(&config, &security, &job).await;
1430        assert!(!success);
1431        assert!(output.contains("blocked by security policy"));
1432        assert!(output.contains("rate limit exceeded"));
1433    }
1434
1435    #[tokio::test]
1436    async fn execute_job_with_retry_recovers_after_first_failure() {
1437        let tmp = TempDir::new().unwrap();
1438        let mut config = test_config(&tmp).await;
1439        config.reliability.scheduler_retries = 1;
1440        config.reliability.provider_backoff_ms = 1;
1441        config
1442            .risk_profiles
1443            .entry(TEST_AGENT.into())
1444            .or_default()
1445            .allowed_commands = vec!["sh".into()];
1446        let security = test_security(&config);
1447
1448        tokio::fs::write(
1449            config.data_dir.join("retry-once.sh"),
1450            "#!/bin/sh\nif [ -f retry-ok.flag ]; then\n  echo recovered\n  exit 0\nfi\ntouch retry-ok.flag\nexit 1\n",
1451        )
1452        .await
1453        .unwrap();
1454        let job = test_job("sh ./retry-once.sh");
1455
1456        let (success, output) = Box::pin(execute_job_with_retry(
1457            &config,
1458            &security,
1459            "test-agent",
1460            &job,
1461        ))
1462        .await;
1463        assert!(success);
1464        assert!(output.contains("recovered"));
1465    }
1466
1467    #[tokio::test]
1468    async fn execute_job_with_retry_exhausts_attempts() {
1469        let tmp = TempDir::new().unwrap();
1470        let mut config = test_config(&tmp).await;
1471        config.reliability.scheduler_retries = 1;
1472        config.reliability.provider_backoff_ms = 1;
1473        let security = test_security(&config);
1474
1475        let job = test_job("ls always_missing_for_retry_test");
1476
1477        let (success, output) = Box::pin(execute_job_with_retry(
1478            &config,
1479            &security,
1480            "test-agent",
1481            &job,
1482        ))
1483        .await;
1484        assert!(!success);
1485        assert!(output.contains("always_missing_for_retry_test"));
1486    }
1487
1488    #[tokio::test]
1489    async fn run_agent_job_returns_error_without_provider_key() {
1490        let tmp = TempDir::new().unwrap();
1491        let config = test_config(&tmp).await;
1492        let mut job = test_job("");
1493        job.job_type = JobType::Agent;
1494        job.prompt = Some("Say hello".into());
1495        let security = test_security(&config);
1496
1497        let (success, output) =
1498            Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1499        assert!(!success);
1500        assert!(output.contains("agent job failed:"));
1501    }
1502
1503    #[tokio::test]
1504    async fn run_agent_job_blocks_readonly_mode() {
1505        let tmp = TempDir::new().unwrap();
1506        let mut config = test_config(&tmp).await;
1507        config
1508            .risk_profiles
1509            .entry(TEST_AGENT.into())
1510            .or_default()
1511            .level = crate::security::AutonomyLevel::ReadOnly;
1512        let mut job = test_job("");
1513        job.job_type = JobType::Agent;
1514        job.prompt = Some("Say hello".into());
1515        let security = test_security(&config);
1516
1517        let (success, output) =
1518            Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1519        assert!(!success);
1520        assert!(output.contains("blocked by security policy"));
1521        assert!(output.contains("read-only"));
1522    }
1523
1524    #[tokio::test]
1525    async fn run_agent_job_blocks_rate_limited() {
1526        let tmp = TempDir::new().unwrap();
1527        let mut config = test_config(&tmp).await;
1528        config
1529            .runtime_profiles
1530            .entry(TEST_AGENT.into())
1531            .or_default()
1532            .max_actions_per_hour = 0;
1533        let mut job = test_job("");
1534        job.job_type = JobType::Agent;
1535        job.prompt = Some("Say hello".into());
1536        let security = test_security(&config);
1537
1538        let (success, output) =
1539            Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1540        assert!(!success);
1541        assert!(output.contains("blocked by security policy"));
1542        assert!(output.contains("rate limit exceeded"));
1543    }
1544
1545    #[tokio::test]
1546    async fn process_due_jobs_marks_component_ok_even_when_idle() {
1547        let tmp = TempDir::new().unwrap();
1548        let config = test_config(&tmp).await;
1549        let component = unique_component("scheduler-idle");
1550
1551        crate::health::mark_component_error(&component, "pre-existing error");
1552        process_due_jobs(&config, Vec::new(), &component, &None).await;
1553
1554        let snapshot = crate::health::snapshot_json();
1555        let entry = &snapshot["components"][component.as_str()];
1556        assert_eq!(entry["status"], "ok");
1557        assert!(entry["last_ok"].as_str().is_some());
1558        assert!(entry["last_error"].is_null());
1559    }
1560
1561    #[tokio::test]
1562    async fn process_due_jobs_failure_does_not_mark_component_unhealthy() {
1563        let tmp = TempDir::new().unwrap();
1564        let config = test_config(&tmp).await;
1565        let job = test_job("ls definitely_missing_file_for_scheduler_component_health_test");
1566        let component = unique_component("scheduler-fail");
1567
1568        crate::health::mark_component_ok(&component);
1569        process_due_jobs(&config, vec![job], &component, &None).await;
1570
1571        let snapshot = crate::health::snapshot_json();
1572        let entry = &snapshot["components"][component.as_str()];
1573        assert_eq!(entry["status"], "ok");
1574    }
1575
1576    #[tokio::test]
1577    async fn persist_job_result_records_run_and_reschedules_shell_job() {
1578        let tmp = TempDir::new().unwrap();
1579        let config = test_config(&tmp).await;
1580        let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1581        let started = Utc::now();
1582        let finished = started + ChronoDuration::milliseconds(10);
1583
1584        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1585        assert!(success);
1586
1587        let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1588        assert_eq!(runs.len(), 1);
1589        let updated = cron::get_job(&config, &job.id).unwrap();
1590        assert_eq!(updated.last_status.as_deref(), Some("ok"));
1591    }
1592
1593    #[tokio::test]
1594    async fn persist_job_result_uses_one_write_connection_for_recurring_job() {
1595        let tmp = TempDir::new().unwrap();
1596        let config = test_config(&tmp).await;
1597        let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1598        let started = Utc::now();
1599        let finished = started + ChronoDuration::milliseconds(10);
1600
1601        crate::cron::store::reset_write_connection_count_for_tests(&config);
1602        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1603
1604        assert!(success);
1605        assert_eq!(
1606            crate::cron::store::write_connection_count_for_tests(&config),
1607            1
1608        );
1609    }
1610
1611    #[tokio::test]
1612    async fn persist_job_result_prunes_run_history_and_updates_last_fields() {
1613        let tmp = TempDir::new().unwrap();
1614        let mut config = test_config(&tmp).await;
1615        config.scheduler.max_run_history = 2;
1616        let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1617        let base = Utc::now();
1618
1619        for idx in 0..3 {
1620            let started = base + ChronoDuration::seconds(idx);
1621            let finished = started + ChronoDuration::milliseconds(10);
1622            let output = format!("run-{idx}");
1623
1624            let success = persist_job_result(&config, &job, true, &output, started, finished).await;
1625            assert!(success);
1626        }
1627
1628        let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1629        assert_eq!(runs.len(), 2);
1630        assert_eq!(runs[0].output.as_deref(), Some("run-2"));
1631        assert_eq!(runs[1].output.as_deref(), Some("run-1"));
1632
1633        let updated = cron::get_job(&config, &job.id).unwrap();
1634        assert_eq!(updated.last_status.as_deref(), Some("ok"));
1635        assert_eq!(updated.last_output.as_deref(), Some("run-2"));
1636        assert!(updated.last_run.is_some());
1637    }
1638
1639    #[tokio::test]
1640    async fn persist_job_result_rolls_back_run_history_when_job_state_update_fails() {
1641        let tmp = TempDir::new().unwrap();
1642        let config = test_config(&tmp).await;
1643        let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1644        let original_next_run = job.next_run;
1645        let started = Utc::now();
1646        let finished = started + ChronoDuration::milliseconds(10);
1647
1648        let conn =
1649            rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1650        conn.execute_batch(
1651            "CREATE TRIGGER fail_cron_job_update
1652             BEFORE UPDATE ON cron_jobs
1653             BEGIN
1654                 SELECT RAISE(ABORT, 'blocked update');
1655             END;",
1656        )
1657        .unwrap();
1658        drop(conn);
1659
1660        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1661
1662        assert!(success);
1663        assert!(cron::list_runs(&config, &job.id, 10).unwrap().is_empty());
1664
1665        let stored = cron::get_job(&config, &job.id).unwrap();
1666        assert_eq!(stored.next_run, original_next_run);
1667        assert!(stored.last_run.is_none());
1668        assert!(stored.last_status.is_none());
1669        assert!(stored.last_output.is_none());
1670    }
1671
1672    #[tokio::test]
1673    async fn persist_job_result_success_deletes_one_shot() {
1674        let tmp = TempDir::new().unwrap();
1675        let config = test_config(&tmp).await;
1676        let at = Utc::now() + ChronoDuration::minutes(10);
1677        let job = cron::add_agent_job(
1678            &config,
1679            TEST_AGENT,
1680            Some("one-shot".into()),
1681            crate::cron::Schedule::At { at },
1682            "Hello",
1683            SessionTarget::Isolated,
1684            None,
1685            None,
1686            true,
1687            None,
1688        )
1689        .unwrap();
1690        let started = Utc::now();
1691        let finished = started + ChronoDuration::milliseconds(10);
1692
1693        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1694        assert!(success);
1695        let lookup = cron::get_job(&config, &job.id);
1696        assert!(lookup.is_err());
1697    }
1698
1699    #[tokio::test]
1700    async fn persist_job_result_failure_disables_one_shot() {
1701        let tmp = TempDir::new().unwrap();
1702        let config = test_config(&tmp).await;
1703        let at = Utc::now() + ChronoDuration::minutes(10);
1704        let job = cron::add_agent_job(
1705            &config,
1706            TEST_AGENT,
1707            Some("one-shot".into()),
1708            crate::cron::Schedule::At { at },
1709            "Hello",
1710            SessionTarget::Isolated,
1711            None,
1712            None,
1713            true,
1714            None,
1715        )
1716        .unwrap();
1717        let started = Utc::now();
1718        let finished = started + ChronoDuration::milliseconds(10);
1719
1720        let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1721        assert!(!success);
1722        let updated = cron::get_job(&config, &job.id).unwrap();
1723        assert!(!updated.enabled);
1724        assert_eq!(updated.last_status.as_deref(), Some("error"));
1725    }
1726
1727    #[tokio::test]
1728    async fn persist_job_result_uses_one_write_connection_for_failed_one_shot_disable() {
1729        let tmp = TempDir::new().unwrap();
1730        let config = test_config(&tmp).await;
1731        let at = Utc::now() + ChronoDuration::minutes(10);
1732        let job = cron::add_agent_job(
1733            &config,
1734            "test-agent",
1735            Some("one-shot".into()),
1736            crate::cron::Schedule::At { at },
1737            "Hello",
1738            SessionTarget::Isolated,
1739            None,
1740            None,
1741            true,
1742            None,
1743        )
1744        .unwrap();
1745        let started = Utc::now();
1746        let finished = started + ChronoDuration::milliseconds(10);
1747
1748        crate::cron::store::reset_write_connection_count_for_tests(&config);
1749        let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1750
1751        assert!(!success);
1752        assert_eq!(
1753            crate::cron::store::write_connection_count_for_tests(&config),
1754            1
1755        );
1756    }
1757
1758    #[tokio::test]
1759    async fn persist_job_result_falls_back_to_state_update_when_history_prune_fails() {
1760        let tmp = TempDir::new().unwrap();
1761        let mut config = test_config(&tmp).await;
1762        config.scheduler.max_run_history = 1;
1763        let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1764        let original_next_run = job.next_run;
1765        let seed_started = Utc::now() - ChronoDuration::minutes(20);
1766        let seed_finished = seed_started + ChronoDuration::milliseconds(10);
1767        let started = Utc::now();
1768        let finished = started + ChronoDuration::milliseconds(10);
1769
1770        let conn =
1771            rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1772        conn.execute(
1773            "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
1774             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1775            rusqlite::params![
1776                job.id,
1777                seed_started.to_rfc3339(),
1778                seed_finished.to_rfc3339(),
1779                "seed",
1780                "seed",
1781                10,
1782            ],
1783        )
1784        .unwrap();
1785        conn.execute_batch(
1786            "CREATE TRIGGER fail_cron_run_prune
1787             BEFORE DELETE ON cron_runs
1788             BEGIN
1789                 SELECT RAISE(ABORT, 'blocked prune');
1790             END;",
1791        )
1792        .unwrap();
1793        drop(conn);
1794
1795        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1796        assert!(success);
1797
1798        let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1799        assert_eq!(runs.len(), 1);
1800        assert_eq!(runs[0].status, "seed");
1801
1802        let updated = cron::get_job(&config, &job.id).unwrap();
1803        assert_eq!(updated.last_status.as_deref(), Some("ok"));
1804        assert_eq!(updated.last_output.as_deref(), Some("ok"));
1805        assert!(updated.last_run.is_some());
1806        assert!(updated.next_run >= original_next_run);
1807    }
1808
1809    #[tokio::test]
1810    async fn persist_job_result_falls_back_to_disable_when_auto_delete_history_insert_fails() {
1811        let tmp = TempDir::new().unwrap();
1812        let config = test_config(&tmp).await;
1813        let at = Utc::now() + ChronoDuration::minutes(10);
1814        let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1815        assert!(job.delete_after_run);
1816        let started = Utc::now();
1817        let finished = started + ChronoDuration::milliseconds(10);
1818
1819        let conn =
1820            rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1821        conn.execute_batch(
1822            "CREATE TRIGGER fail_cron_run_insert
1823             BEFORE INSERT ON cron_runs
1824             BEGIN
1825                 SELECT RAISE(ABORT, 'blocked insert');
1826             END;",
1827        )
1828        .unwrap();
1829        drop(conn);
1830
1831        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1832        assert!(success);
1833
1834        let updated = cron::get_job(&config, &job.id).unwrap();
1835        assert!(!updated.enabled);
1836        assert_eq!(updated.last_status.as_deref(), Some("ok"));
1837        assert_eq!(updated.last_output.as_deref(), Some("ok"));
1838        assert!(cron::list_runs(&config, &job.id, 10).unwrap().is_empty());
1839    }
1840
1841    #[tokio::test]
1842    async fn persist_job_result_success_deletes_one_shot_shell_job() {
1843        let tmp = TempDir::new().unwrap();
1844        let config = test_config(&tmp).await;
1845        let at = Utc::now() + ChronoDuration::minutes(10);
1846        let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1847        assert!(job.delete_after_run);
1848        let started = Utc::now();
1849        let finished = started + ChronoDuration::milliseconds(10);
1850
1851        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1852        assert!(success);
1853        let lookup = cron::get_job(&config, &job.id);
1854        assert!(lookup.is_err());
1855    }
1856
1857    #[tokio::test]
1858    async fn persist_job_result_failure_disables_one_shot_shell_job() {
1859        let tmp = TempDir::new().unwrap();
1860        let config = test_config(&tmp).await;
1861        let at = Utc::now() + ChronoDuration::minutes(10);
1862        let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1863        assert!(job.delete_after_run);
1864        let started = Utc::now();
1865        let finished = started + ChronoDuration::milliseconds(10);
1866
1867        let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1868        assert!(!success);
1869        let updated = cron::get_job(&config, &job.id).unwrap();
1870        assert!(!updated.enabled);
1871        assert_eq!(updated.last_status.as_deref(), Some("error"));
1872    }
1873
1874    #[tokio::test]
1875    async fn persist_job_result_delivery_stubbed_succeeds() {
1876        // Delivery is stubbed (moved to zeroclaw-channels orchestrator).
1877        // This test verifies the stub returns Ok, so persist_job_result succeeds.
1878        let tmp = TempDir::new().unwrap();
1879        let config = test_config(&tmp).await;
1880        let job = cron::add_agent_job(
1881            &config,
1882            TEST_AGENT,
1883            Some("announce-job".into()),
1884            crate::cron::Schedule::Cron {
1885                expr: "*/5 * * * *".into(),
1886                tz: None,
1887            },
1888            "deliver this",
1889            SessionTarget::Isolated,
1890            None,
1891            Some(DeliveryConfig {
1892                mode: "announce".into(),
1893                channel: Some("telegram".into()),
1894                to: Some("123456".into()),
1895                thread_id: None,
1896                best_effort: false,
1897            }),
1898            false,
1899            None,
1900        )
1901        .unwrap();
1902        let started = Utc::now();
1903        let finished = started + ChronoDuration::milliseconds(10);
1904
1905        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1906        assert!(success);
1907
1908        let updated = cron::get_job(&config, &job.id).unwrap();
1909        assert!(updated.enabled);
1910        assert_eq!(updated.last_status.as_deref(), Some("ok"));
1911
1912        let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1913        assert_eq!(runs.len(), 1);
1914        assert_eq!(runs[0].status, "ok");
1915    }
1916
1917    #[tokio::test]
1918    async fn persist_job_result_delivery_failure_best_effort_marks_degraded() {
1919        let tmp = TempDir::new().unwrap();
1920        let config = test_config(&tmp).await;
1921        register_delivery_fn(Box::new(
1922            |_config, channel, _target, _thread_id, _output| {
1923                Box::pin(async move {
1924                    if channel == "fail-delivery" {
1925                        anyhow::bail!("synthetic delivery failure");
1926                    }
1927                    Ok(())
1928                })
1929            },
1930        ));
1931        let mut job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1932        job.delivery = DeliveryConfig {
1933            mode: "announce".into(),
1934            channel: Some("fail-delivery".into()),
1935            to: Some("123456".into()),
1936            thread_id: None,
1937            best_effort: true,
1938        };
1939        let started = Utc::now();
1940        let finished = started + ChronoDuration::milliseconds(10);
1941
1942        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1943        assert!(success);
1944
1945        let updated = cron::get_job(&config, &job.id).unwrap();
1946        assert!(updated.enabled);
1947        assert_eq!(updated.last_status.as_deref(), Some("degraded"));
1948        assert!(
1949            updated
1950                .last_output
1951                .as_deref()
1952                .unwrap_or_default()
1953                .contains("delivery failed:")
1954        );
1955
1956        let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1957        assert_eq!(runs.len(), 1);
1958        assert_eq!(runs[0].status, "degraded");
1959    }
1960
1961    #[tokio::test]
1962    async fn delivery_failure_classification_preserves_empty_output_evidence() {
1963        let tmp = TempDir::new().unwrap();
1964        let config = test_config(&tmp).await;
1965        register_delivery_fn(Box::new(
1966            |_config, channel, _target, _thread_id, _output| {
1967                Box::pin(async move {
1968                    if channel == "fail-delivery" {
1969                        anyhow::bail!("synthetic delivery failure");
1970                    }
1971                    Ok(())
1972                })
1973            },
1974        ));
1975        let mut job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1976        job.delivery = DeliveryConfig {
1977            mode: "announce".into(),
1978            channel: Some("fail-delivery".into()),
1979            to: Some("123456".into()),
1980            thread_id: None,
1981            best_effort: true,
1982        };
1983
1984        let outcome = deliver_and_classify_run_result(
1985            &config,
1986            &job,
1987            true,
1988            String::new(),
1989            CronDeliveryContext::Scheduled,
1990        )
1991        .await;
1992
1993        assert!(outcome.success);
1994        assert_eq!(outcome.status, "degraded");
1995        assert!(outcome.output.starts_with("delivery failed:"));
1996    }
1997
1998    #[tokio::test]
1999    async fn persist_job_result_at_schedule_without_delete_after_run_is_disabled() {
2000        let tmp = TempDir::new().unwrap();
2001        let config = test_config(&tmp).await;
2002        let at = Utc::now() + ChronoDuration::minutes(10);
2003        let job = cron::add_agent_job(
2004            &config,
2005            TEST_AGENT,
2006            Some("at-no-autodelete".into()),
2007            crate::cron::Schedule::At { at },
2008            "Hello",
2009            SessionTarget::Isolated,
2010            None,
2011            None,
2012            false,
2013            None,
2014        )
2015        .unwrap();
2016        assert!(!job.delete_after_run);
2017
2018        let started = Utc::now();
2019        let finished = started + ChronoDuration::milliseconds(10);
2020        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
2021        assert!(success);
2022
2023        // After reschedule_after_run, At schedule jobs should be disabled
2024        // to prevent re-execution with a past next_run timestamp.
2025        let updated = cron::get_job(&config, &job.id).unwrap();
2026        assert!(
2027            !updated.enabled,
2028            "At schedule job should be disabled after execution via reschedule"
2029        );
2030        assert_eq!(updated.last_status.as_deref(), Some("ok"));
2031    }
2032
2033    #[tokio::test]
2034    async fn deliver_if_configured_handles_none_mode() {
2035        let tmp = TempDir::new().unwrap();
2036        let config = test_config(&tmp).await;
2037        let job = test_job("echo ok");
2038
2039        // Default delivery mode is not "announce", so should be a no-op.
2040        assert!(deliver_if_configured(&config, &job, "x").await.is_ok());
2041    }
2042
2043    #[tokio::test]
2044    async fn deliver_announcement_returns_ok_when_no_handler_registered() {
2045        let tmp = TempDir::new().unwrap();
2046        let config = test_config(&tmp).await;
2047        // No registered handler is a runtime-level state, not a delivery
2048        // failure. The caller (persist_job_result) should record the job
2049        // execution as successful; the missing handler is logged via
2050        // tracing::warn for operator visibility.
2051        deliver_announcement(&config, "telegram", "chat-id", None, "payload")
2052            .await
2053            .expect("missing delivery handler should be Ok with a warn log");
2054    }
2055
2056    #[test]
2057    fn build_cron_shell_command_uses_sh_non_login() {
2058        let workspace = std::env::temp_dir();
2059        let cmd = build_cron_shell_command("echo cron-test", &workspace).unwrap();
2060        let debug = format!("{cmd:?}");
2061        assert!(debug.contains("echo cron-test"));
2062        assert!(debug.contains("\"sh\""), "should use sh: {debug}");
2063        // Must NOT use login shell (-l) — login shells load full profile
2064        // and are slow/unpredictable for cron jobs.
2065        assert!(
2066            !debug.contains("\"-lc\""),
2067            "must not use login shell: {debug}"
2068        );
2069    }
2070
2071    #[tokio::test]
2072    async fn build_cron_shell_command_executes_successfully() {
2073        let workspace = std::env::temp_dir();
2074        let mut cmd = build_cron_shell_command("echo cron-ok", &workspace).unwrap();
2075        let output = cmd.output().await.unwrap();
2076        assert!(output.status.success());
2077        let stdout = String::from_utf8_lossy(&output.stdout);
2078        assert!(stdout.contains("cron-ok"));
2079    }
2080
2081    #[tokio::test]
2082    async fn catch_up_queries_all_overdue_jobs_ignoring_max_tasks() {
2083        let tmp = TempDir::new().unwrap();
2084        let mut config = test_config(&tmp).await;
2085        config.scheduler.max_tasks = 1; // limit normal polling to 1
2086
2087        // Create 3 jobs with "every minute" schedule
2088        for i in 0..3 {
2089            let _ = cron::add_job(
2090                &config,
2091                "test-agent",
2092                "* * * * *",
2093                &format!("echo catchup-{i}"),
2094            )
2095            .unwrap();
2096        }
2097
2098        // Verify normal due_jobs is limited to max_tasks=1
2099        let far_future = Utc::now() + ChronoDuration::days(1);
2100        let due = cron::due_jobs(&config, far_future).unwrap();
2101        assert_eq!(due.len(), 1, "due_jobs must respect max_tasks");
2102
2103        // all_overdue_jobs ignores the limit
2104        let overdue = cron::all_overdue_jobs(&config, far_future).unwrap();
2105        assert_eq!(overdue.len(), 3, "all_overdue_jobs must return all");
2106    }
2107
2108    // scan_and_redact_output tests moved to zeroclaw-channels orchestrator
2109
2110    // ── Broadcast / EventBroadcast tests ─────────────────────────────
2111
2112    #[tokio::test]
2113    async fn broadcast_sends_cron_result_on_success() {
2114        let tmp = TempDir::new().unwrap();
2115        let mut config = test_config(&tmp).await;
2116        let job = test_job("echo broadcast-ok");
2117        // Bind the synthetic test job to test-agent so process_due_jobs's
2118        // owning-agent lookup succeeds (jobs without an owner are skipped).
2119        config
2120            .agents
2121            .get_mut("test-agent")
2122            .unwrap()
2123            .cron_jobs
2124            .push(job.id.clone());
2125        let component = unique_component("broadcast-ok");
2126
2127        let (tx, mut rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
2128        let event_tx: EventBroadcast = Some(tx);
2129
2130        process_due_jobs(&config, vec![job], &component, &event_tx).await;
2131
2132        let event = rx.try_recv().expect("should receive a broadcast event");
2133        assert_eq!(event["type"], "cron_result");
2134        assert_eq!(event["job_id"], "test-job");
2135        assert_eq!(event["success"], true);
2136        assert!(event["output"].as_str().unwrap().contains("broadcast-ok"));
2137        assert!(event["timestamp"].as_str().is_some());
2138    }
2139
2140    #[tokio::test]
2141    async fn broadcast_sends_cron_result_on_failure() {
2142        let tmp = TempDir::new().unwrap();
2143        let mut config = test_config(&tmp).await;
2144        let job = test_job("ls definitely_missing_file_for_broadcast_fail_test");
2145        config
2146            .agents
2147            .get_mut("test-agent")
2148            .unwrap()
2149            .cron_jobs
2150            .push(job.id.clone());
2151        let component = unique_component("broadcast-fail");
2152
2153        let (tx, mut rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
2154        let event_tx: EventBroadcast = Some(tx);
2155
2156        process_due_jobs(&config, vec![job], &component, &event_tx).await;
2157
2158        let event = rx.try_recv().expect("should receive a broadcast event");
2159        assert_eq!(event["type"], "cron_result");
2160        assert_eq!(event["job_id"], "test-job");
2161        assert_eq!(event["success"], false);
2162        assert!(event["timestamp"].as_str().is_some());
2163    }
2164
2165    #[tokio::test]
2166    async fn broadcast_none_skips_without_error() {
2167        let tmp = TempDir::new().unwrap();
2168        let config = test_config(&tmp).await;
2169        let job = test_job("echo no-broadcast");
2170        let component = unique_component("broadcast-none");
2171
2172        // event_tx = None — should complete without panic.
2173        process_due_jobs(&config, vec![job], &component, &None).await;
2174    }
2175
2176    #[tokio::test]
2177    async fn broadcast_handles_no_subscribers() {
2178        let tmp = TempDir::new().unwrap();
2179        let config = test_config(&tmp).await;
2180        let job = test_job("echo no-subscribers");
2181        let component = unique_component("broadcast-no-sub");
2182
2183        let (tx, _) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
2184        // Drop the only receiver immediately — `let _ = tx.send(...)` in
2185        // process_due_jobs must not panic when there are no subscribers.
2186        let event_tx: EventBroadcast = Some(tx);
2187
2188        process_due_jobs(&config, vec![job], &component, &event_tx).await;
2189        // If we got here without panic, the test passes.
2190    }
2191}